In [92]:
from pyspark.sql import SparkSession, types
import pyspark.sql.functions as F
from datetime import datetime, timedelta

#### **Question 1: Install Spark and PySpark**

In [5]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/06 14:14:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
spark.version

'3.5.5'

#### **Question 2: Yellow October 2024**

In [7]:
df = spark.read.parquet('data/*.parquet')

In [9]:
df.repartition(4).write.parquet('data/2024-10')

                                                                                

In [11]:
# 25MB

#### **Question 3: Count records**

In [13]:
df.schema

StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampNTZType(), True), StructField('tpep_dropoff_datetime', TimestampNTZType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])

In [28]:
dt = datetime(year=2024, month=10, day=15)

df.filter((df.tpep_pickup_datetime >= dt) & (df.tpep_pickup_datetime < dt + timedelta(days=1))).count()

128893

In [42]:
df = df.withColumn('date', F.to_date(df.tpep_pickup_datetime))
df.filter(df.date == dt).count()

128893

In [43]:
df.createOrReplaceTempView('yellow')

In [55]:
spark.sql("""
    SELECT count(*)
    FROM yellow
    WHERE tpep_pickup_datetime >= '2024-10-15' AND tpep_pickup_datetime < '2024-10-16';
""").show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



In [56]:
spark.sql("""
    SELECT count(*)
    FROM yellow
    WHERE date = '2024-10-15';
""").show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



#### **Question 4: Longest trip**

In [78]:
df.select(F.max(F.unix_timestamp(df.tpep_dropoff_datetime) - F.unix_timestamp(df.tpep_pickup_datetime))/60/60).show()

+-------------------------------------------------------------------------------------------------------------------------------------------+
|((max((unix_timestamp(tpep_dropoff_datetime, yyyy-MM-dd HH:mm:ss) - unix_timestamp(tpep_pickup_datetime, yyyy-MM-dd HH:mm:ss))) / 60) / 60)|
+-------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                          162.6177777777778|
+-------------------------------------------------------------------------------------------------------------------------------------------+



#### **Question 5: User Interface**

In [80]:
4040

4040

#### **Question 6: Least frequent pickup location zone**

In [93]:
schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True), 
    types.StructField('Borough', types.StringType(), True), 
    types.StructField('Zone', types.StringType(), True), 
    types.StructField('service_zone', types.StringType(), True)
])

df_zone = spark.read \
    .schema(schema) \
    .option('header', True) \
    .csv('data/taxi_zone_lookup.csv')

In [101]:
df_join = df.join(df_zone, on=df.PULocationID == df_zone.LocationID, how='left')

In [110]:
df_join.groupby(df_join.Zone).count().sort(F.asc('count')).show(10, truncate=False)

+---------------------------------------------+-----+
|Zone                                         |count|
+---------------------------------------------+-----+
|Governor's Island/Ellis Island/Liberty Island|1    |
|Rikers Island                                |2    |
|Arden Heights                                |2    |
|Jamaica Bay                                  |3    |
|Green-Wood Cemetery                          |3    |
|Eltingville/Annadale/Prince's Bay            |4    |
|Charleston/Tottenville                       |4    |
|West Brighton                                |4    |
|Port Richmond                                |4    |
|Rossville/Woodrow                            |4    |
+---------------------------------------------+-----+
only showing top 10 rows



In [112]:
df_zone.createOrReplaceTempView('zone')

In [114]:
spark.sql("""
    SELECT
        Zone,
        count(1) as count
    FROM
        yellow
    LEFT JOIN
        zone ON yellow.PULocationID = zone.LocationID
    GROUP BY
        Zone
    ORDER BY
        count ASC        
""").show(10, truncate=False)

+---------------------------------------------+-----+
|Zone                                         |count|
+---------------------------------------------+-----+
|Governor's Island/Ellis Island/Liberty Island|1    |
|Rikers Island                                |2    |
|Arden Heights                                |2    |
|Jamaica Bay                                  |3    |
|Green-Wood Cemetery                          |3    |
|Eltingville/Annadale/Prince's Bay            |4    |
|Charleston/Tottenville                       |4    |
|West Brighton                                |4    |
|Port Richmond                                |4    |
|Rossville/Woodrow                            |4    |
+---------------------------------------------+-----+
only showing top 10 rows

