# What's in this exercise?
We run the common functions notebook so we can reuse capability defined there, and then...<BR>
1) Load yellow taxi data in staging directory to raw data directory, and save as parquet<BR> 
2) Create external unmanaged Hive tables<BR>
3) Create statistics for tables

In [18]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType



In [19]:
# Define source and destination directories
srcDataDirRoot = f"gs://nyctaxi-raw/transactional-data/" #Root dir for source data
destDataDirRoot = f"gs://nyctaxi-silver/nyctaxi/transactions/yellow-taxi/" #Root dir for consumable data

#Canonical ordered column list for yellow taxi across years to homogenize schema
canonicalTripSchemaColList = ["taxi_type","vendor_id","pickup_datetime","dropoff_datetime","store_and_fwd_flag","rate_code_id","pickup_location_id","dropoff_location_id","pickup_longitude","pickup_latitude","dropoff_longitude","dropoff_latitude","passenger_count","trip_distance","fare_amount","extra","mta_tax","tip_amount","tolls_amount","improvement_surcharge","total_amount","payment_type","trip_year","trip_month"]



### 1.  Execute notebook with common/reusable functions

In [20]:
%run "../../../../01-General/2-CommonFunctions.ipynb"

Code from file 'file:///home/dinhnn/git/personal/Databricks-NYC-Taxi/Workspace/01-General/2-CommonFunctions.ipynb':
 import os
import math
import glob
import re
prqShrinkageFactor = 0.19 #We found a saving in space of 81% with Parquet
def analyzeTables(databaseAndTable):
  try:
    print("Table: " + databaseAndTable)
    print("....refresh table")
    sql("REFRESH TABLE " + databaseAndTable)
    print("....analyze table")
    sql("ANALYZE TABLE " + databaseAndTable + " COMPUTE STATISTICS")
    print("....done")
  except Exception as e:
    return e
def calcOutputFileCountTxtToPrq(srcDataFile, targetedFileSizeMB):
  try:
    estFileCount = int(math.floor((os.path.getsize(srcDataFile) * prqShrinkageFactor) / (targetedFileSizeMB * 1024 * 1024)))
    if(estFileCount == 0):
      return 1 
    else:
      return estFileCount
  except Exception as e:
    return e
#Delete residual files from job operation (_SUCCESS, _start*, _committed*)
#Should be called with '/dbfs/mnt/...'
def recursivelyD



#### 2. Define schema for source data
Different years have different schemas - fields added/removed

In [21]:
#Schema for data based on year and month

# 2017
yellowTripSchema2017H1 = StructType([
    StructField("vendor_id", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("rate_code_id", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("pickup_location_id", IntegerType(), True),
    StructField("dropoff_location_id", IntegerType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True)])

#Second half of 2016
yellowTripSchema2016H2 = StructType([
    StructField("vendor_id", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("rate_code_id", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("pickup_location_id", IntegerType(), True),
    StructField("dropoff_location_id", IntegerType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("junk1", StringType(), True),
    StructField("junk2", StringType(), True)])

# 2015 and 2016 first half of the year
yellowTripSchema20152016H1 = StructType([
    StructField("vendor_id", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("rate_code_id", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True)])

# 2009 though 2014
yellowTripSchemaPre2015 = StructType([
    StructField("vendor_id", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("rate_code_id", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("total_amount", DoubleType(), True)])



#### 3. Some functions

In [22]:
#1) Function to determine schema for a given year and month
#Input:  Year and month
#Output: StructType for applicable schema 
#Sample call: print getSchemaStruct(2009,1)

def getTaxiSchema(tripYear, tripMonth):
  taxiSchema = None

  if(tripYear > 2008 and tripYear < 2015):
    taxiSchema = yellowTripSchemaPre2015
  elif(tripYear == 2016 and tripMonth > 6):
    taxiSchema = yellowTripSchema2016H2
  elif((tripYear == 2016 and tripMonth < 7) or (tripYear == 2015)):
    taxiSchema = yellowTripSchema20152016H1
  elif(tripYear == 2017 and tripMonth < 7):
    taxiSchema = yellowTripSchema2017H1
  
  return taxiSchema



In [23]:
#2) Function to add columns to dataframe as required to homogenize schema
#Input:  Dataframe, year and month
#Output: Dataframe with homogenized schema 
#Sample call: println(getSchemaHomogenizedDataframe(DF,2014,6))

from pyspark.sql.functions import *

def getSchemaHomogenizedDataframe(sourceDF,tripYear, tripMonth):
  if(tripYear > 2008 and tripYear < 2015):
    sourceDF = (sourceDF.withColumn("pickup_location_id", lit(0).cast("integer"))
              .withColumn("dropoff_location_id", lit(0).cast("integer"))
              .withColumn("improvement_surcharge",lit(0).cast("double"))
              .withColumn("junk1",lit(""))
              .withColumn("junk2",lit(""))
              .withColumn("trip_year",substring(col("pickup_datetime"),0, 4))
              .withColumn("trip_month",substring(col("pickup_datetime"),6,2))
              .withColumn("taxi_type",lit("yellow"))
              .withColumn("temp_pickup_longitude", col("pickup_longitude").cast("string"))
                                      .drop("pickup_longitude").withColumnRenamed("temp_pickup_longitude", "pickup_longitude")
              .withColumn("temp_dropoff_longitude", col("dropoff_longitude").cast("string"))
                                      .drop("dropoff_longitude").withColumnRenamed("temp_dropoff_longitude", "dropoff_longitude")
              .withColumn("temp_pickup_latitude", col("pickup_latitude").cast("string"))
                                      .drop("pickup_latitude").withColumnRenamed("temp_pickup_latitude", "pickup_latitude")
              .withColumn("temp_dropoff_latitude", col("dropoff_latitude").cast("string"))
                                      .drop("dropoff_latitude").withColumnRenamed("temp_dropoff_latitude", "dropoff_latitude")
              .withColumn("temp_payment_type", col("payment_type").cast("string")).drop("payment_type").withColumnRenamed("temp_payment_type", "payment_type"))
  elif((tripYear == 2016 and tripMonth < 7) or (tripYear == 2015)):
    sourceDF = (sourceDF.withColumn("pickup_location_id", lit(0).cast("integer"))
              .withColumn("dropoff_location_id", lit(0).cast("integer"))
              .withColumn("junk1",lit(""))
              .withColumn("junk2",lit(""))
              .withColumn("trip_year",substring(col("pickup_datetime"),0, 4))
              .withColumn("trip_month",substring(col("pickup_datetime"),6,2))
              .withColumn("taxi_type",lit("yellow"))
              .withColumn("temp_vendor_id", col("vendor_id").cast("string")).drop("vendor_id").withColumnRenamed("temp_vendor_id", "vendor_id")
              .withColumn("temp_pickup_longitude", col("pickup_longitude").cast("string"))
                                      .drop("pickup_longitude").withColumnRenamed("temp_pickup_longitude", "pickup_longitude")
              .withColumn("temp_dropoff_longitude", col("dropoff_longitude").cast("string"))
                                      .drop("dropoff_longitude").withColumnRenamed("temp_dropoff_longitude", "dropoff_longitude")
              .withColumn("temp_pickup_latitude", col("pickup_latitude").cast("string"))
                                      .drop("pickup_latitude").withColumnRenamed("temp_pickup_latitude", "pickup_latitude")
              .withColumn("temp_dropoff_latitude", col("dropoff_latitude").cast("string"))
                                      .drop("dropoff_latitude").withColumnRenamed("temp_dropoff_latitude", "dropoff_latitude")
              .withColumn("temp_payment_type", col("payment_type").cast("string")).drop("payment_type").withColumnRenamed("temp_payment_type", "payment_type"))
  elif(tripYear == 2016 and tripMonth > 6):
    sourceDF = (sourceDF.withColumn("pickup_longitude", lit(""))
              .withColumn("pickup_latitude", lit(""))
              .withColumn("dropoff_longitude", lit(""))
              .withColumn("dropoff_latitude", lit(""))
              .withColumn("trip_year",substring(col("pickup_datetime"),0, 4))
              .withColumn("trip_month",substring(col("pickup_datetime"),6,2))
              .withColumn("taxi_type",lit("yellow"))
              .withColumn("temp_vendor_id", col("vendor_id").cast("string")).drop("vendor_id").withColumnRenamed("temp_vendor_id", "vendor_id")
              .withColumn("temp_payment_type", col("payment_type").cast("string")).drop("payment_type").withColumnRenamed("temp_payment_type", "payment_type"))
  elif(tripYear == 2017 and tripMonth < 7):
    sourceDF = (sourceDF.withColumn("pickup_longitude", lit(""))
              .withColumn("pickup_latitude", lit(""))
              .withColumn("dropoff_longitude", lit(""))
              .withColumn("dropoff_latitude", lit(""))
              .withColumn("trip_year",substring(col("pickup_datetime"),0, 4))
              .withColumn("trip_month",substring(col("pickup_datetime"),6,2))
              .withColumn("taxi_type",lit("yellow"))
              .withColumn("junk1",lit(""))
              .withColumn("junk2",lit(""))
              .withColumn("temp_vendor_id", col("vendor_id").cast("string")).drop("vendor_id").withColumnRenamed("temp_vendor_id", "vendor_id")
              .withColumn("temp_payment_type", col("payment_type").cast("string")).drop("payment_type").withColumnRenamed("temp_payment_type", "payment_type"))
  else:
    sourceDF
    
  return sourceDF



#### 4. Read CSV, homogenize schema across years, save as parquet

In [None]:

# To make Hive Parquet format compatible with Spark Parquet format
spark.conf.set("spark.sql.parquet.writeLegacyFormat", "true")

# Disable delta optimizeWrite for consistent number of partition (when write from 2nd time, spark with delta format can reduce number of partitions)
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "false")

spark.conf.set("spark.databricks.delta.properties.defaults.minReaderVersion", "3")

spark.conf.set("spark.databricks.delta.properties.defaults.minWriterVersion", "7")

#Process data, save as parquet
for j in range(2009,2018):
    
  endMonth = None
  if (j==2017):
    endMonth = 6 
  else: endMonth = 12
  
  for i in range(1,endMonth+1):
    
    srcDataFile= "{}year={}/month={:02d}/type=yellow/yellow_tripdata_{}-{:02d}.csv".format(srcDataDirRoot,j,i,j,i)
    print("Year={}; Month={}".format(j,i))
    print(srcDataFile)

    #Source schema
    taxiSchema = getTaxiSchema(j,i)

    #Read source data
    taxiDF = (spark.read.format("csv")
                    .option("header", True)
                    .schema(taxiSchema)
                    .option("delimiter",",")
                    .load(srcDataFile).cache())


    #Add additional columns to homogenize schema across years
    taxiFormattedDF = getSchemaHomogenizedDataframe(taxiDF, j, i)

    #Order all columns to align with the canonical schema for yellow taxi
    taxiCanonicalDF = taxiFormattedDF.select(*canonicalTripSchemaColList)
    
    year_month_dir = "{}trip_year={}/trip_month={:02d}".format(destDataDirRoot,j,i)
    dbutils.fs.rm(year_month_dir,recurse=True)

    taxiCanonicalDF.repartition(4).write.option("compression", "zstd").format("delta").mode("append").partitionBy("trip_year","trip_month").save(destDataDirRoot)  

Year=2009; Month=1
gs://nyctaxi-raw/transactional-data/year=2009/month=01/type=yellow/yellow_tripdata_2009-01.csv
Year=2009; Month=2
gs://nyctaxi-raw/transactional-data/year=2009/month=02/type=yellow/yellow_tripdata_2009-02.csv
Year=2009; Month=3
gs://nyctaxi-raw/transactional-data/year=2009/month=03/type=yellow/yellow_tripdata_2009-03.csv
Year=2009; Month=4
gs://nyctaxi-raw/transactional-data/year=2009/month=04/type=yellow/yellow_tripdata_2009-04.csv
Year=2009; Month=5
gs://nyctaxi-raw/transactional-data/year=2009/month=05/type=yellow/yellow_tripdata_2009-05.csv
Year=2009; Month=6
gs://nyctaxi-raw/transactional-data/year=2009/month=06/type=yellow/yellow_tripdata_2009-06.csv
Year=2009; Month=7
gs://nyctaxi-raw/transactional-data/year=2009/month=07/type=yellow/yellow_tripdata_2009-07.csv
Year=2009; Month=8
gs://nyctaxi-raw/transactional-data/year=2009/month=08/type=yellow/yellow_tripdata_2009-08.csv
Year=2009; Month=9
gs://nyctaxi-raw/transactional-data/year=2009/month=09/type=yellow/ye

In [None]:
def create_table(schema: str, table_name: str, parquet_dir: str, location: str):
    spark.sql(f"use {schema};")
    spark.sql(f"DROP TABLE IF EXISTS {table_name};")
    spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} USING delta LOCATION '{location}/{parquet_dir}';")



In [None]:
create_table(schema="synapse_nyc_reference.nyctaxi", table_name="yellow_taxi_trips_raw", parquet_dir="", location=destDataDirRoot)



In [249]:
%sql
FSCK REPAIR TABLE nyctaxi.yellow_taxi_trips_raw;

<Empty result set>

#### 5. Statistics table

In [112]:
%sql
select * from nyctaxi.yellow_taxi_trips_raw;

taxi_type,vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_location_id,dropoff_location_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,payment_type,trip_year,trip_month
yellow,CMT,2009-01-13T18:05:39Z,2009-01-13T18:05:56Z,,,0,0,0.0,0.0,0.0,0.0,1,0.0,3.5,0.0,,0.0,0.0,0,3.5,No Charge,2009,1
yellow,CMT,2009-01-09T15:41:39Z,2009-01-09T15:50:08Z,,,0,0,-73.987789,40.738054,-73.996866,40.747556,1,1.1,6.5,0.0,,0.0,0.0,0,6.5,Cash,2009,1
yellow,VTS,2009-01-07T18:18:00Z,2009-01-07T18:28:00Z,,,0,0,-73.99966,40.718172,-73.981973,40.714512,2,1.08,6.9,1.0,,0.0,0.0,0,7.9,CASH,2009,1
yellow,VTS,2009-01-02T13:47:00Z,2009-01-02T14:09:00Z,,,0,0,-73.87292,40.773947,-74.006068,40.706818,1,14.58,32.1,0.0,,5.0,4.15,0,41.25,Credit,2009,1
yellow,VTS,2009-01-31T00:06:00Z,2009-01-31T00:30:00Z,,,0,0,-73.983933,40.766418,-73.974598,40.770735,3,3.94,16.1,0.5,,1.5,0.0,0,18.1,Credit,2009,1
yellow,CMT,2009-01-03T21:55:48Z,2009-01-03T22:13:02Z,,,0,0,-73.976963,40.743106,-73.97389,40.783972,1,3.9,13.4,0.0,,0.0,0.0,0,13.4,Cash,2009,1
yellow,CMT,2009-01-10T20:02:34Z,2009-01-10T20:06:36Z,,,0,0,-73.973516,40.74831,-73.988228,40.734625,1,1.4,5.3,0.0,,0.79,0.0,0,6.09,Credit,2009,1
yellow,VTS,2009-01-13T07:40:00Z,2009-01-13T07:39:00Z,,,0,0,0.0,0.0,0.0,0.0,1,1.34,6.9,0.0,,1.4,0.0,0,8.3,Credit,2009,1
yellow,DDS,2009-01-23T18:20:39Z,2009-01-23T18:31:24Z,,,0,0,-74.010084,40.711525,-73.994213,40.719848,1,1.1,6.5,1.0,,0.0,0.0,0,7.5,CASH,2009,1
yellow,VTS,2009-01-30T11:11:00Z,2009-01-30T11:22:00Z,,,0,0,-73.97097,40.758097,-73.984092,40.76495,1,1.05,7.3,0.0,,0.0,0.0,0,7.3,CASH,2009,1


In [27]:
%sql
select COUNT(1) from nyctaxi.yellow_taxi_trips_raw;

count(1)
1369889774
