In [1]:
import pandas as pd

# Load the Parquet file into a DataFrame
df = pd.read_parquet('../output_files/trip_analysis_output/avg_duration_distance_by_day/part-00000-cc5c52d9-4a1b-40cf-a28a-45753202a48e-c000.snappy.parquet')

# Get the column names
column_names = df.columns.tolist()

# Print or use the column names as needed
print(column_names)


['day_of_week', 'avg_duration', 'avg_distance']


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, hour, dayofweek, month, count, expr

def main():
    # Initialize Spark session
    spark = SparkSession.builder \
        .appName("Trip Analysis") \
        .getOrCreate()

    # Load data from GCS bucket directory
    input_path = "../../data/NYC/yellow_tripdata_2021-01.parquet"
    df = spark.read.parquet(input_path)

    # Check the schema of the DataFrame before adding trip_duration column
    print("Schema before adding trip_duration column:")
    df.printSchema()

    # Calculate additional columns for analysis
    df = df.withColumn("trip_duration", expr("unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime)")) \
           .withColumn("hour_of_day", hour(col("tpep_pickup_datetime"))) \
           .withColumn("day_of_week", dayofweek(col("tpep_pickup_datetime"))) \
           .withColumn("month_of_year", month(col("tpep_pickup_datetime")))

    # Check the schema of the DataFrame after adding trip_duration column
    print("Schema after adding trip_duration column:")
    df.printSchema()

    # Average duration and distance by time of day
    avg_duration_distance_by_hour = df.groupBy("hour_of_day") \
        .agg(avg("trip_duration").alias("avg_duration"), avg("trip_distance").alias("avg_distance"))

    # Average duration and distance by day of week
    avg_duration_distance_by_day = df.groupBy("day_of_week") \
        .agg(avg("trip_duration").alias("avg_duration"), avg("trip_distance").alias("avg_distance"))

    # Average duration and distance by month of year
    avg_duration_distance_by_month = df.groupBy("month_of_year") \
        .agg(avg("trip_duration").alias("avg_duration"), avg("trip_distance").alias("avg_distance"))

    # Top 10 pickup locations
    top_pickup_locations = df.groupBy("PULocationID") \
        .agg(count("*").alias("pickup_count")) \
        .orderBy(col("pickup_count").desc()) \
        .limit(10)

    # Top 10 dropoff locations
    top_dropoff_locations = df.groupBy("DOLocationID") \
        .agg(count("*").alias("dropoff_count")) \
        .orderBy(col("dropoff_count").desc()) \
        .limit(10)

    # Save the results to GCS
    output_base_path = "../../data/NYC/output/"

   # avg_duration_distance_by_hour.write.mode("overwrite").parquet(output_base_path + "avg_duration_distance_by_hour")
    #avg_duration_distance_by_day.write.mode("overwrite").parquet(output_base_path + "avg_duration_distance_by_day")
    #avg_duration_distance_by_month.write.mode("overwrite").parquet(output_base_path + "avg_duration_distance_by_month")
    #top_pickup_locations.write.mode("overwrite").parquet(output_base_path + "top_pickup_locations")
    #top_dropoff_locations.write.mode("overwrite").parquet(output_base_path + "top_dropoff_locations")

    # Stop Spark session
    spark.stop()

if __name__ == "__main__":
    main()


Schema before adding trip_duration column:
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)

Schema after adding trip_duration column:
root
 |-- VendorID: long (nullable = true)
 |

In [4]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
     ---------------------------------------- 0.0/317.0 MB ? eta -:--:--
     -------------------------------------- 0.0/317.0 MB 682.7 kB/s eta 0:07:45
     ---------------------------------------- 0.1/317.0 MB 1.1 MB/s eta 0:05:02
     ---------------------------------------- 0.2/317.0 MB 1.8 MB/s eta 0:02:58
     ---------------------------------------- 0.3/317.0 MB 1.8 MB/s eta 0:02:59
     ---------------------------------------- 0.4/317.0 MB 2.0 MB/s eta 0:02:41
     ---------------------------------------- 0.5/317.0 MB 2.2 MB/s eta 0:02:25
     ---------------------------------------- 0.6/317.0 MB 2.1 MB/s eta 0:02:30
     ---------------------------------------- 0.7/317.0 MB 2.2 MB/s eta 0:02:27
     ---------------------------------------- 0.8/317.0 MB 2.1 MB/s eta 0:02:31
     ---------------------------------------- 0.9/317.0 MB 2.1 MB/s eta 0:02:28
     ---------------------------------------- 1.0/317.0 MB 2.1

In [None]:
from google.cloud import storage

# Initialize a GCS client
client = storage.Client()

# Replace 'bucket_name' with your GCS bucket name
bucket = client.get_bucket('nyc-bucket-siva')

# List objects in the bucket to verify access
blobs = bucket.list_blobs()

for blob in blobs:
    print(blob.name)