In [4]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F
import pandas as pd

In [5]:
# main entrypoint to spark / object we use to interact with spark
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/09 20:43:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/09 20:43:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/03/09 20:43:24 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


## Download data

as a first step we will download that FHVHV Data for June 2021 from the Link provided in the homework instructions.

In [None]:
# download data
# ! download_fhvhv-data.sh 2021 6

In [6]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv -O "../data/raw/taxi_zone_lookup.csv"

--2023-03-09 20:43:33--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.114.3
Connecting to github.com (github.com)|140.82.114.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230309%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230309T204317Z&X-Amz-Expires=300&X-Amz-Signature=e2716af9d8c6a6b001733348ec82f7175cd56fd1ad5b673e5afa7eb014e18202&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2023-03-09 20:43:33--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

## Explore Data



### Trip Data

In [None]:
# how many lines does the file have?
!zcat "../data/raw/fhvhv/2021/06/fhvhv_tripdata_2021_06.csv.gz" | wc -l

In [None]:
!zcat "../data/raw/fhvhv/2021/06/fhvhv_tripdata_2021_06.csv.gz" | head -n 5

In [11]:
# read as sparf df
df = spark.read \
    .options(header = "true", inferSchema = "true") \
    .csv('../data/raw/fhvhv/2021/06/fhvhv_tripdata_2021_06.csv.gz')

                                                                                

In [8]:
df.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [9]:
# df.Schema()
df.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



### Zones data

In [12]:
zones_df = spark.read \
    .options(header = "true", inferSchema = "true") \
    .csv('../data/raw/taxi_zone_lookup.csv')

In [15]:
zones_df.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



## Construct Schema

- we use pandas to infer types and then use types to create schema in local DB

In [None]:
# step1: write out a mini subset with linux commands
!zcat "../data/raw/fhvhv/2021/06/fhvhv_tripdata_2021_06.csv.gz" | head -n 102 > head.csv

In [None]:
# take a look at head
!head -n 10 head.csv

In [None]:
!wc -l head.csv

In [None]:
# import as pandas df
# df_pandas = pd.read_csv('head.csv')
# df_pandas
# df_pandas.dtypes
# df_spark = spark.createDataFrame(df_pandas[df_pandas.notnull().all(1)])
# df_spark.schema

In [None]:
schema_fhvhv = types.StructType([
    types.StructField('dispatching_base_num',types.StringType(),True),
    types.StructField('pickup_datetime',types.TimestampType(),True),
    types.StructField('dropoff_datetime',types.TimestampType(),True),
    types.StructField('PULocationID',types.IntegerType(),True),
    types.StructField('DOLocationID',types.IntegerType(),True),
    types.StructField('SR_Flag',types.DoubleType(),True),
    types.StructField('Affiliated_base_number',types.StringType(),True)
])

In [None]:
df = spark.read \
    .option("header", "true") \
    .schema(schema_fhvhv) \
    .csv('../data/raw/fhvhv/2021/06/fhvhv_tripdata_2021_06.csv.gz')

In [None]:
df.show()

## Repartition to 12 partitions

In [None]:
df = df.repartition(12)

## Export to parquet

In [None]:
# specify mode='overwrite' to overwrite
df.write.parquet('../data/pq/fhvhv/2021/06', mode='overwrite')


In [None]:
df = spark.read.parquet('../data/pq/fhvhv/2021/06/*')

In [None]:
df.printSchema()

## Insights

### How many taxi trips were there on June 15? 

Consider only trips that started on June 15.

In [18]:
df.createOrReplaceTempView('fhvhv')

In [19]:
selected_pickups = spark.sql(
    """
    SELECT COUNT(*) as number_of_taxi_trips
    FROM fhvhv
    WHERE DATE(pickup_datetime) = '2021-06-15'
    """). \
    show()

[Stage 11:>                                                         (0 + 1) / 1]

+--------------------+
|number_of_taxi_trips|
+--------------------+
|              452470|
+--------------------+



                                                                                

### Longest trip for each day: How long was the longest trip in Hours?

Calculate the duration for each trip. 

In [None]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('duration', df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) \
    .groupBy('pickup_date') \
    .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()

### What is the name of the most frequent pickup location zone ?

In [17]:
zones_df.createOrReplaceTempView('zones')

In [21]:
pickup_location_counts = spark. \
    sql(
        """
            SELECT
                pul.Zone AS pickup_zone,
                COUNT(1)
            FROM 
                fhvhv fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID
            GROUP BY 
                1
            ORDER BY
                2 DESC
            LIMIT 5;
        """). \
    show()

[Stage 19:>                                                         (0 + 1) / 1]

+-------------------+--------+
|        pickup_zone|count(1)|
+-------------------+--------+
|Crown Heights North|  231279|
|       East Village|  221244|
|        JFK Airport|  188867|
|     Bushwick South|  187929|
|      East New York|  186780|
+-------------------+--------+



                                                                                