In [0]:
bronze_path = '/Volumes/nyc/default/bronze/'
silver_path = '/Volumes/nyc/default/silver'

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from functools import reduce

Trip Type Data

In [0]:
df_trip_type = spark.read.csv(bronze_path + 'trip_type/', header=True, inferSchema= True)

                  

Trip zone

In [0]:
df_trip_zone = spark.read.csv(bronze_path + 'trip_zone/taxi_zone_lookup.csv', header=True, inferSchema= True)

Trip Data

In [0]:
bronze_path = "dbfs:/Volumes/nyc/default/bronze/trip-data/"
schema = StructType([
    StructField("VendorID", LongType(), True),
    StructField("lpep_pickup_datetime", TimestampType(), True),
    StructField("lpep_dropoff_datetime", TimestampType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("RatecodeID", DoubleType(), True),
    StructField("PULocationID", LongType(), True),
    StructField("DOLocationID", LongType(), True),
    StructField("passenger_count", DoubleType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("ehail_fee", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("payment_type", DoubleType(), True),
    StructField("trip_type", DoubleType(), True),
    StructField("congestion_surcharge", DoubleType(), True)
])
schema_cols = [f.name for f in schema]
files = [f.path for f in dbutils.fs.ls(bronze_path) if f.path.endswith(".parquet")]
dfs = []
for f in files:
    temp = spark.read.parquet(f)
    for field in schema:
        if field.name not in temp.columns:
            temp = temp.withColumn(field.name, lit(None).cast(field.dataType))
    for field in schema:
        temp = temp.withColumn(field.name, col(field.name).cast(field.dataType))
    temp = temp.select(schema_cols)
    dfs.append(temp)
df_trip = reduce(lambda a, b: a.unionByName(b, allowMissingColumns=True), dfs)


## Data Transformations

In [0]:
df_trip_type.display()

In [0]:
df_trip_type = df_trip_type.withColumnRenamed('description', 'trip_description')

In [0]:
df_trip_type.write.mode("overwrite").parquet("dbfs:/Volumes/nyc/default/silver/trip_type")

In [0]:
df_trip_zone = df_trip_zone \
    .withColumn("zone1", expr("get(split(zone,'/'),0)")) \
    .withColumn("zone2", expr("get(split(zone,'/'),1)")) 

In [0]:
df_trip_zone.write.mode("overwrite").parquet("dbfs:/Volumes/nyc/default/silver/trip_zone")

In [0]:
df_trip.display()

In [0]:
df_trip =df_trip.withColumn('trip_date', to_date(col('lpep_pickup_datetime'))) \
    .withColumn('trip_year', year(col('lpep_pickup_datetime'))) \
    .withColumn('trip_month', month(col('lpep_pickup_datetime')))

In [0]:
df_trip = df_trip.select("VendorID","PULocationID","DOLocationID","trip_distance","fare_amount","total_amount")

In [0]:
df_trip.write.mode("overwrite").parquet("dbfs:/Volumes/nyc/default/silver/trips2023data")