In [None]:
!pip install pyspark

### The ETL process:

1) Import the data into the Kaggle environment

2) Start the Spark Session and read in the data

3) Do basic analysis of the data to understand it

4) Initiate transformation on the data to make the data more useful

5) Write the data to Parquet file

6) Write the data to Postgres instance running in RDS

In [None]:
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

import warnings
warnings.filterwarnings('ignore')

### The datasets in Notebook

We can get the data inside the environment by using the following linux command. 

We can also use the python OS module for listing the folders

In [None]:
!ls /kaggle/input/

In [None]:
#You will get list
print(os.listdir('/kaggle/input/'))
os.listdir('/kaggle/input/uber-nyc-forhire-vehicles-trip-data-2021/')

### We are going to use the parquet reader method that belongs to SparkSession inside Pyspark.

In [None]:
spark = SparkSession. \
    builder. \
    appName('ETL_Rider'). \
    getOrCreate()

In [None]:
uber_raw_pqt = spark.read.parquet('/kaggle/input/uber-nyc-forhire-vehicles-trip-data-2021/fhvhv_tripdata_2021-01.parquet')

In [None]:
uber_raw_pqt.show(2,truncate=False)

In [None]:
#We read only single file till now... and that contains 11 Million records
uber_raw_pqt.count()

### We are going to seperate folder for the parquet files first

### Then read the entire folder... Big Data way

In [None]:
!mkdir uber-nyc-forhire

In [None]:
!cp /kaggle/input/uber-nyc-forhire-vehicles-trip-data-2021/*.parquet \
    /kaggle/working/uber-nyc-forhire/

In [None]:
!ls /kaggle/working/uber-nyc-forhire/

In [None]:
uber_full_data = spark.read.parquet('/kaggle/working/uber-nyc-forhire/')

### Now we are talking... 174 million rows into the Spark Execution environment. 

#### We can learn couple of things with this massive dataset

1) Writing the data to spark metastore as table

2) Transforming it using Spark SQL

3) Creating temp views on Spark Metastore

4) We can also write the data to database using the Spark Drivers for Databases

In [None]:
uber_full_data.count()

In [None]:
#get the schema for understanding... and we see that all the column can have nulls
uber_full_data.printSchema()

In [None]:
uber_full_data.groupBy('hvfhs_license_num').count().show()

#### Its better touse SQL for the transformation activity since it is way more intuitive

For that we have to create the database and make the above dataframe into a table inside the spark metastore

In [None]:
#creating the database. Keep in mind the free working space allocated is only 20gb
spsql = spark.sql
spsql("CREATE DATABASE IF NOT EXISTS uber_nyc_db")
spsql("USE DATABASE uber_nyc_db")
spctg = spark.catalog

In [None]:
spctg.listDatabases()

In [None]:
#uber_hv0005_data.write.saveAsTable("uber_table")
spctg.listTables(dbName='uber_nyc_db')

In [None]:
uber_hv0004_data = uber_full_data.filter(uber_full_data['hvfhs_license_num'] == 'HV0004')

In [None]:
uber_hv0004_data.count()

In [None]:
uber_full_data.select(date_format('pickup_datetime','dd-MM-yyyy'). \
                        alias('pu_date'),
                     date_format('request_datetime','dd-MM-yyyy'). \
                        alias('req_date'),
                    date_format('pickup_datetime','EEEE'). \
                        alias('pu_day'),
                    date_format('pickup_datetime','MMM'). \
                        alias('pu_month'),
                     col('tolls'),col('trip_time'),
                     col('tips'),col('base_passenger_fare'),
                     col('originating_base_num'),col('dispatching_base_num'),
                     col("hvfhs_license_num")).show(2,truncate=False)

In [None]:
uber_transformed_data = uber_full_data.select(date_format('pickup_datetime','dd-MM-yyyy'). \
                        alias('pu_date'),
                     date_format('request_datetime','dd-MM-yyyy'). \
                        alias('req_date'),
                     date_format('pickup_datetime','EEEE'). \
                        alias('pu_day'),
                    date_format('pickup_datetime','MMM'). \
                        alias('pu_month'),
                     col('tolls'),col('trip_time'),
                     col('tips'),col('base_passenger_fare'),
                     col('originating_base_num'),col('dispatching_base_num'),
                     col("hvfhs_license_num"))

In [None]:
uber_transformed_data.filter((uber_transformed_data.pu_month == 'Jan') \
                             & (uber_transformed_data.originating_base_num == 'B02682')). \
                        show(2,truncate=False)

In [None]:
uber_jan_b02682 = uber_transformed_data.filter((uber_transformed_data.pu_month == 'Jan') \
                                                    & (uber_transformed_data.originating_base_num == 'B02682'))

In [None]:
uber_jan_b02682.count()

In [None]:
uber_jan_b02682.groupBy("pu_date").count().show()

In [None]:
uber_jan_b02682.write.saveAsTable("Jan_b02682",mode='ignore')

In [None]:
### Lets rumble with SQL now
spsql("""SELECT req_date, 
      ROUND(SUM(tips),1) AS tip_sums, ROUND(SUM(trip_time),1) AS trip_time,
      ROUND(SUM(base_passenger_fare),1) AS total_fare
      FROM Jan_b02682
      GROUP BY req_date
      ORDER BY req_date""").show(10)

In [None]:
uber_jan_b02682.write.parquet(path='/kaggle/working/jan_b02682',
                              partitionBy='req_date')

In [None]:
!ls /kaggle/working/jan_b02682

In [None]:
!ls /kaggle/working/jan_b02682/req_date\=01-01-2021

In [None]:
#Writing the dataframe as database will throw out of memory error... CRASSSSHHHHH
#In your machine, the OS will crash and hang. Requiring restart

#uber_hv0004_data.write.saveAsTable("uber_hv04_table")

In [None]:
#We will first consolidate the data using spark's groupby clause and then table it
uber_full_data.groupBy(["hvfhs_license_num","dispatching_base_num",
                        "originating_base_num"]).count().show()

In [None]:
uber_consolidate_count = uber_full_data.groupBy(["hvfhs_license_num","dispatching_base_num",
                        "originating_base_num"]).count()

In [None]:
uber_select_data = uber_full_data.select("hvfhs_license_num","dispatching_base_num",
                        "originating_base_num","tolls","trip_time","trip_miles",
                                         "driver_pay","base_passenger_fare")

In [None]:
uber_select_data.show(2,truncate=False)

In [None]:
hvfhs_license_aggregate = uber_select_data.groupby("hvfhs_license_num").sum()

In [None]:
hvfhs_license_aggregate.show(2, truncate=False)


In [None]:
disp_origin_aggregate = uber_select_data.groupby(["hvfhs_license_num",
                                                  "dispatching_base_num",
                                                  "originating_base_num"]).sum()

In [None]:
disp_origin_aggregate.show(2,truncate=False)

In [None]:
disp_origin_aggregate.count()

In [None]:
disp_origin_aggregate.write.saveAsTable('disp_orig_license',mode='ignore')

In [None]:
spctg.listTables(dbName='uber_nyc_db')

### Now we can fire on all cylinders using spark sql

In [None]:
spsql("""SELECT * FROM disp_orig_license LIMIT 10""").show()

In [None]:
spsql("""SELECT * FROM disp_orig_license 
            WHERE dispatching_base_num = 'B02875' 
            LIMIT 10""").show()

In [None]:
spsql("""SELECT dispatching_base_num, originating_base_num, `sum(tolls)`,
            `sum(trip_time)`
            FROM disp_orig_license 
            WHERE dispatching_base_num = 'B02875' 
            LIMIT 10""").show()

In [None]:
spsql("""SELECT dispatching_base_num, originating_base_num, 
                ROUND(`sum(tolls)`,1) AS toll_sum,
                ROUND(`sum(trip_time)`,1) AS trip_sum
            FROM disp_orig_license 
            WHERE `sum(tolls)` > 58 AND`sum(trip_time)` > 10000 
            LIMIT 10""").show()

In [None]:
spsql("""SELECT dispatching_base_num, originating_base_num, 
                ROUND(`sum(tolls)`,1) AS toll_sum,
                ROUND(`sum(trip_time)`,1) AS trip_sum
            FROM disp_orig_license 
            WHERE `sum(tolls)` > 58 AND`sum(trip_time)` > 10000 
            ORDER BY toll_sum ASC
            LIMIT 10""").show()

In [None]:
!ls /kaggle/working/spark-warehouse/uber_nyc_db.db/

### Complete different way to read the parquet files

In [None]:
spctg.createTable("jan_uber_data",path='/kaggle/working/jan_b02682/',
                 source='parquet')

In [None]:
spsql("SELECT * FROM jan_uber_data LIMIT 10").show(truncate=False)

In [None]:
spsql("SHOW PARTITIONS jan_uber_data").show()

In [None]:
spctg.recoverPartitions("jan_uber_data")

In [None]:
spsql("SHOW PARTITIONS jan_uber_data").show()

In [None]:
spsql("SELECT * FROM jan_uber_data LIMIT 10").show(truncate=False)

In [None]:
spsql("""SELECT req_date, COUNT(*) AS day_trips
      FROM jan_uber_data
      GROUP BY req_date
      ORDER BY day_trips DESC""").show(truncate=False)