In [None]:
from dotenv import load_dotenv
from pyspark.sql import SparkSession, functions as f
from datetime import datetime

load_dotenv()

spark = SparkSession.builder.appName('TlcTripsTrusted').getOrCreate()
spark._jsc.hadoopConfiguration().set(
    "fs.s3a.aws.credentials.provider",
    "com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
)

ingest_dt = datetime.utcnow()
y = f"{ingest_dt.year:04d}"
m = f"{ingest_dt.month:02d}"
d = f"{ingest_dt.day:02d}"

df_raw = spark.read.parquet(
    f"s3a://datalake-prd-tlc-trips/landing-zone/yellow_tripdata/year={y}/month={m}/day={d}/"
)

df = df_raw.select(
    f.col('VendorID').cast('integer'),
    f.col('passenger_count').cast('integer'),
    f.col('total_amount').cast('float'),
    f.col('tpep_pickup_datetime').cast('timestamp'),
    f.col('tpep_dropoff_datetime').cast('timestamp'),
    f.col('trip_distance').cast('float'),
    f.col('store_and_fwd_flag').cast('string'),
    f.col('RatecodeID').cast('integer'),
    f.col('PULocationID').cast('integer'),
    f.col('DOLocationID').cast('integer'),
    f.col('payment_type').cast('integer'),
    f.col('fare_amount').cast('float'),
    f.col('extra').cast('float'),
    f.col('mta_tax').cast('float'),
    f.col('tip_amount').cast('float'),
    f.col('tolls_amount').cast('float'),
    f.col('improvement_surcharge').cast('float'),
    f.col('congestion_surcharge').cast('float'),
    f.col('airport_fee').cast('float'),
)

df = df.select(
    '*',
    f.expr("""
        CASE
            WHEN VendorID = '1' THEN 'Creative Mobile Technologies, LLC'
            WHEN VendorID = '2' THEN 'Curb Mobility, LLC'
            WHEN VendorID = '6' THEN 'Myle Technologies Inc'
            WHEN VendorID = '7' THEN 'Helix'
            ELSE 'Blank'
        END
    """).alias('VendorDesc'),
    f.expr("""
        CASE
            WHEN RatecodeID = '1' THEN 'Standard rate' 
            WHEN RatecodeID = '2' THEN 'JFK' 
            WHEN RatecodeID = '3' THEN 'Newark' 
            WHEN RatecodeID = '4' THEN 'Nassau or Westchester' 
            WHEN RatecodeID = '5' THEN 'Negotiated fare' 
            WHEN RatecodeID = '6' THEN 'Group ride' 
            WHEN RatecodeID = '99' THEN 'Null/unknown' 
            ELSE 'Blank'
        END
    """).alias('RatecodeDesc'),
    (
        f.when(f.col('store_and_fwd_flag') == 'Y', 'Store and forward trip')
        .when(f.col('store_and_fwd_flag') == 'N', 'Not a store and forward trip')
        .otherwise('Blank')
        .alias('store_and_fwd_desc')
    ),
    f.expr("""
        CASE
            WHEN payment_type = '0' THEN 'Flex Fare trip' 
            WHEN payment_type = '1' THEN 'Credit card' 
            WHEN payment_type = '2' THEN 'Cash' 
            WHEN payment_type = '3' THEN 'No charge' 
            WHEN payment_type = '4' THEN 'Dispute' 
            WHEN payment_type = '5' THEN 'Unknown' 
            WHEN payment_type = '6' THEN 'Voided trip' 
            ELSE 'Blank'
        END
    """).alias('payment_desc'),
)

df.write.format('parquet').mode('overwrite').option("compression", "snappy").save("s3a://datalake-prd-tlc-trips/trusted-zone/yellow_tripdata/")

df_taxi_zone_raw = spark.read.csv(
    "s3a://datalake-prd-tlc-trips/landing-zone/taxi_zone/", header=True, inferSchema=False)

df_taxi_zone = df_taxi_zone_raw.select(
    f.col('LocationID').cast('integer'),
    f.col('Borough').cast('string'),
    f.col('Zone').cast('string'),
    f.col('service_zone').cast('string'),
)

df_taxi_zone.write.format('parquet').mode('overwrite').option("compression", "snappy").save("s3a://datalake-prd-tlc-trips/trusted-zone/taxi_zone/")