In [1]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import functions as F

In [46]:
# Question 1: Spark version
spark.version

'3.3.2'

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/03/03 13:48:49 WARN Utils: Your hostname, hy-virtual-machine resolves to a loopback address: 127.0.1.1; using 172.16.167.128 instead (on interface ens160)
24/03/03 13:48:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/03 13:48:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-03 13:49:07--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 192.30.255.113
Connecting to github.com (github.com)|192.30.255.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240303%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240303T214908Z&X-Amz-Expires=300&X-Amz-Signature=9052e0f30351098ed4fa796f1914f7ab51ad3e2a8c09a111db1b944486e34deb&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-03 13:49:08--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6

In [4]:
!gunzip -c fhv_tripdata_2019-10.csv.gz > fhv_tripdata_2019-10.csv

In [3]:
!wc -l fhv_tripdata_2019-10.csv

1897494 fhv_tripdata_2019-10.csv


In [5]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

In [6]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [7]:
!head -n 1001 fhv_tripdata_2019-10.csv > head.csv

In [8]:
pd.DataFrame.iteritems = pd.DataFrame.items

In [9]:
df_pandas = pd.read_csv('head.csv')

In [10]:
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [11]:
from pyspark.sql import types

In [12]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [13]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [14]:
# Question 2: Partition the dataframe into 6 paquet
df = df.repartition(6)

In [15]:
df.write.parquet('fhv/2019/10/')

                                                                                

In [16]:
# Start here - Read data from parquets
df = spark.read.parquet('fhv/2019/10/')

In [17]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [18]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|     B01215         |2019-10-07 08:36:04|2019-10-07 08:51:10|         223|         223|   null|       B01215         |
|              B01190|2019-10-04 12:43:20|2019-10-04 13:03:28|         264|          66|   null|                B01190|
|              B00937|2019-10-05 19:05:00|2019-10-05 19:07:16|         264|         243|   null|                B00937|
|              B02735|2019-10-04 15:29:27|2019-10-04 15:51:24|         264|         265|   null|                B02872|
|              B00856|2019-10-07 07:45:22|2019-10-07 07:52:47|         264|          76|   null|                B02871|
|              B00647|2019-10-06 04:46:3

In [19]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|pickup_date|dropoff_date|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+
|     B01215         |2019-10-07 08:36:04|2019-10-07 08:51:10|         223|         223|   null|       B01215         | 2019-10-07|  2019-10-07|
|              B01190|2019-10-04 12:43:20|2019-10-04 13:03:28|         264|          66|   null|                B01190| 2019-10-04|  2019-10-04|
|              B00937|2019-10-05 19:05:00|2019-10-05 19:07:16|         264|         243|   null|                B00937| 2019-10-05|  2019-10-05|
|              B02735|2019-10-04 15:29:27|2019-10-04 15:51:24|         264|         265|   null|                B02872| 2019-10-04

In [21]:
# Convert pickup_datetime and dropoff_datetime from timestamp to date only
df = df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime))

In [35]:
# Create a Table from dataframe
df.registerTempTable('fhv_2019_10')

In [35]:
# Question 3: Count the trips that pickup_date is 2019-10-15 
df.filter(df.pickup_date == '2019-10-15') \
    .count()

62610

In [24]:
df.orderBy('dropoff_datetime', ascending=False) \
    .limit(5) \
    .show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|pickup_date|dropoff_date|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+
|              B02832|2019-10-28 09:00:00|2091-10-28 09:30:00|         264|         264|   null|                B02832| 2019-10-28|  2091-10-28|
|              B02832|2019-10-11 18:00:00|2091-10-11 18:30:00|         264|         264|   null|                B02832| 2019-10-11|  2091-10-11|
|              B02416|2019-10-31 23:46:33|2029-11-01 00:13:00|        null|        null|   null|                B02416| 2019-10-31|  2029-11-01|
|     B00746         |2019-10-01 21:43:42|2027-10-01 21:45:23|         159|         264|   null|       B00746         | 2019-10-01

In [33]:
# Question 4: Longest trip for each day 
df \
    .withColumn('duration', (df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long'))/3600) \
    .groupBy('pickup_date') \
        .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()

+-----------+-----------------+
|pickup_date|    max(duration)|
+-----------+-----------------+
| 2019-10-28|         631152.5|
| 2019-10-11|         631152.5|
| 2019-10-31|87672.44083333333|
| 2019-10-01|70128.02805555555|
| 2019-10-17|           8794.0|
+-----------+-----------------+



In [32]:
spark.sql("""
SELECT
    to_date(pickup_datetime) AS pickup_date,
    MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 3600) AS duration
FROM 
    fhv_2019_10
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 10;
""").show()

+-----------+------------------+
|pickup_date|          duration|
+-----------+------------------+
| 2019-10-28|          631152.5|
| 2019-10-11|          631152.5|
| 2019-10-31| 87672.44083333333|
| 2019-10-01| 70128.02805555555|
| 2019-10-17|            8794.0|
| 2019-10-26| 8784.166666666666|
| 2019-10-30|1465.5344444444445|
| 2019-10-25|1057.8266666666666|
| 2019-10-02| 770.2313888888889|
| 2019-10-23| 746.6166666666667|
+-----------+------------------+



In [36]:
# import taxi_zone_lookup.csv
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-03-03 14:35:00--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 192.30.255.112
Connecting to github.com (github.com)|192.30.255.112|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240303%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240303T223500Z&X-Amz-Expires=300&X-Amz-Signature=9ab9e7f2c8284b80872b58567435b20fa6783bb8bf48d471c641024a7ec61447&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-03 14:35:00--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62

In [37]:
!wc -l taxi_zone_lookup.csv

266 taxi_zone_lookup.csv


In [38]:
df_zone = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [39]:
df_zone.schema

StructType([StructField('LocationID', StringType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [40]:
df_zone.registerTempTable('zones')



In [41]:
spark.sql("""SELECT * FROM zones LIMIT 30;""").show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [44]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number',
 'pickup_date',
 'dropoff_date']

In [45]:
# Question 6: The query to find least frequent pickup zone
spark.sql("""
SELECT
    pul.Zone AS pickup_zone,
    COUNT(1) AS count
FROM 
    fhv_2019_10 fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID
                      LEFT JOIN zones dol ON fhv.DOLocationID = dol.LocationID
GROUP BY 
    pickup_zone
ORDER BY
    count ASC
LIMIT 5;
""").show()

+--------------------+-----+
|         pickup_zone|count|
+--------------------+-----+
|         Jamaica Bay|    1|
|Governor's Island...|    2|
| Green-Wood Cemetery|    5|
|       Broad Channel|    8|
|     Highbridge Park|   14|
+--------------------+-----+

