#  PySpark for ETL


The CSV files were downloaded into Databricks File System (DBFS), and then were converted into Parquet files via Koalas for better efficiency.

Download url: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page.

Data dictionary: https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf.

## Download CSV files to DBFS

In [0]:
%sh rm -r /dbfs/FileStore/taxi_parquet/

In [0]:
%sh mkdir -p /dbfs/FileStore/taxi_csv/

In [0]:
url_loc = {} # Map download url to the file location in DBFS

year = 2020
for m in range(1, 13):
   month = "{:02d}".format(m)
   fname = 'yellow_tripdata_%s-%s.csv' % (year, month)
   url = 'https://s3.amazonaws.com/nyc-tlc/trip+data/%s' % fname
   loc = '/dbfs/FileStore/taxi_csv/%s' % fname
   url_loc[url] = loc

In [0]:
import urllib.request

for url, loc in url_loc.items():
  urllib.request.urlretrieve(url, loc)

In [0]:
# Defining schema
from pyspark.sql.types import IntegerType , StringType , DoubleType , TimestampType ,StructType ,StructField
schema_df = StructType([ 
    StructField("VendorID",IntegerType(),True), 
    StructField("tpep_pickup_datetime",TimestampType(),True), 
    StructField("tpep_dropoff_datetime",TimestampType(),True), 
    StructField("passenger_count", IntegerType(), True), 
    StructField("trip_distance", DoubleType(), True), 
    StructField("RatecodeID", IntegerType(), True), 
    StructField("store_and_fwd_flag",StringType(),True), 
    StructField("PULocationID",IntegerType(),True), 
    StructField("DOLocationID",IntegerType(),True), 
    StructField("payment_type", IntegerType(), True), 
    StructField("fare_amount", DoubleType(), True), 
    StructField("extra", DoubleType(), True), 
    StructField("mta_tax",DoubleType(),True), 
    StructField("tip_amount",DoubleType(),True), 
    StructField("tolls_amount",DoubleType(),True), 
    StructField("improvement_surcharge", DoubleType(), True), 
    StructField("total_amount", DoubleType(), True), 
    StructField("congestion_surcharge", DoubleType(), True) 
  ])

In [0]:
# Creating dataframe from all CSV files for 2020
csv_Df  = spark.read.schema(schema_df).option("header",'true').csv(path = 'file:/dbfs/FileStore/taxi_csv')

In [0]:
csv_Df.select("tpep_dropoff_datetime").show(2)

In [0]:
# Data cleaning and transformations
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import col,quarter,to_timestamp,hour
from pyspark.sql.types import LongType
# Duplicates values are dropped
csv_Df=csv_Df.drop_duplicates()
# Transformation are applied
csv_Df = csv_Df.withColumn("tip_percent",(col("tip_amount")/col("total_amount"))*100).withColumn("quarter",quarter(csv_Df.tpep_dropoff_datetime)).withColumn("trip_duration",(col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long"))/60).withColumn("trip_hour",hour(col("tpep_dropoff_datetime"))).withColumn("speed",(col("trip_distance").cast("long"))/col("trip_duration").cast("long"))

In [0]:
# Writing the final data in parquet format 
sql("SET spark.databricks.delta.formatCheck.enabled=false")
csv_Df.coalesce(1).write.parquet(path='/FileStore/yellow_taxi_parquet')

In [0]:
# Writting dataframe to SQLLITE table yellow_taxi_data
#df_parquet.write.format("jdbc").mode("overwrite").options(url ="jdbc:sqlite:file:/dbfs/FileStore/DWH_task.db", driver="org.sqlite.JDBC", dbtable="yellow_taxi_data").save()

