In [1]:
import pyspark
from pyspark.sql import SparkSession, types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('pyspark_test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/02 09:32:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Question 1:

In [14]:
spark.version

'3.4.2'

In [3]:
fhvh_schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.IntegerType(), True)
    ])

In [4]:
df_fhvh = spark.read \
    .option("header", "true") \
    .schema(fhvh_schema) \
    .csv('../data/raw/fhvh/fhv_tripdata_2019-10.csv.gz')

In [5]:
df_fhvh.show(1, vertical=True)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

-RECORD 0-------------------------------------
 dispatching_base_num   | B00009              
 pickup_datetime        | 2019-10-01 00:23:00 
 dropOff_datetime       | 2019-10-01 00:35:00 
 PUlocationID           | 264                 
 DOlocationID           | 264                 
 SR_Flag                | null                
 Affiliated_base_number | null                
only showing top 1 row



In [6]:
df_fhvh.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: integer (nullable = true)



In [15]:
df_fhvh \
    .repartition(6) \
    .write \
    .parquet('../data/pq/fhvh/2019/10/', mode='overwrite')

                                                                                

In [16]:
df_parquet = spark \
    .read.parquet('../data/pq/fhvh/2019/10/')

In [17]:
df_parquet.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: integer (nullable = true)



In [19]:
df_parquet.show(1, vertical=True)

-RECORD 0-------------------------------------
 dispatching_base_num   | B03162              
 pickup_datetime        | 2019-10-02 11:53:57 
 dropOff_datetime       | 2019-10-02 12:14:40 
 PUlocationID           | 192                 
 DOlocationID           | 16                  
 SR_Flag                | null                
 Affiliated_base_number | null                
only showing top 1 row



# Question 2:

Average size of the Parquet file = 6 MB

In [18]:
df_parquet.registerTempTable('fhvh_2019_10')



# Question 3:

In [28]:
spark.sql("""
    SELECT COUNT(date_trunc("day", pickup_datetime)) AS amount
    FROM fhvh_2019_10
    WHERE date_trunc("day", pickup_datetime) = '2019-10-15'
""").show()



+------+
|amount|
+------+
| 62610|
+------+



                                                                                

# Question 4:

In [37]:
# unix_timestamp = converts date to seconds

spark.sql("""
    SELECT MAX((unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/3600) AS max_duration
    FROM fhvh_2019_10
""").show()

+------------+
|max_duration|
+------------+
|    631152.5|
+------------+



# Question 5:

Local port for Spark UI: 4040

# Question 6:

In [38]:
df_zones = spark.read.parquet('../data/pq/zones/part-00000-0ad78abf-3482-4c49-874c-4494d4c8105d-c000.snappy.parquet')

In [41]:
df_zones.show(1, vertical=True)

-RECORD 0----------------------
 LocationID   | 1              
 Borough      | EWR            
 Zone         | Newark Airport 
 service_zone | EWR            
only showing top 1 row



In [40]:
df_zones.registerTempTable("zones")



In [74]:
sql = """
    SELECT t2.Zone as zone_with_least_frequent_pick_ups
    FROM fhvh_2019_10 t1 
    LEFT JOIN zones t2 ON t1.PUlocationID = t2.LocationID
    WHERE t1.PUlocationID = (
        SELECT PUlocationID
        FROM fhvh_2019_10
        WHERE PUlocationID IS NOT NULL
        GROUP BY PUlocationID
        ORDER BY COUNT(*) ASC
        LIMIT 1
    )
    """

spark.sql(sql).show()

+---------------------------------+
|zone_with_least_frequent_pick_ups|
+---------------------------------+
|                      Jamaica Bay|
+---------------------------------+



In [65]:
spark.sql("""
    SELECT PUlocationID, COUNT(*) AS amount
    FROM fhvh_2019_10
    WHERE PUlocationID IS NOT NULL
    GROUP BY PUlocationID
    ORDER BY COUNT(*) ASC
    LIMIT 1
""").show()

+------------+------+
|PUlocationID|amount|
+------------+------+
|           2|     1|
+------------+------+

