In [1]:
import datetime
import pathlib

import pyspark
import pyspark.sql.types as ps_types
import pyspark.sql.functions as ps_functions

In [2]:
spark = pyspark.sql.SparkSession.builder \
    .master('local[*]') \
    .appName('test') \
    .getOrCreate()

24/02/27 23:19:51 WARN Utils: Your hostname, Tsyrens-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.10 instead (on interface en0)
24/02/27 23:19:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/27 23:19:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Question 1

In [3]:
print(spark.version)

3.5.0


### Load FHV data

In [4]:
tripdate_file_path = 'data/raw/fhv/2019/10/fhv_tripdata_2019_10.csv.gz'

fhv_schema = ps_types.StructType([
    ps_types.StructField('dispatching_base_num', ps_types.StringType(), True), 
    ps_types.StructField('pickup_datetime', ps_types.TimestampType(), True), 
    ps_types.StructField('dropOff_datetime', ps_types.TimestampType(), True), 
    ps_types.StructField('PUlocationID', ps_types.IntegerType(), True), 
    ps_types.StructField('DOlocationID', ps_types.IntegerType(), True), 
    ps_types.StructField('SR_Flag', ps_types.StringType(), True), 
    ps_types.StructField('Affiliated_base_number', ps_types.StringType(), True)
])

df = spark.read \
    .option('header', True) \
    .schema(fhv_schema) \
    .csv(tripdate_file_path)

df = df.withColumnsRenamed({
    'dropOff_datetime': 'drop_off_datetime',
    'PUlocationID': 'pu_location_id',
    'DOlocationID': 'do_location_id',
    'SR_Flag': 'sr_flag',
    'Affiliated_base_number': 'affiliated_base_number'
})

df.show(10)

+--------------------+-------------------+-------------------+--------------+--------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|  drop_off_datetime|pu_location_id|do_location_id|sr_flag|affiliated_base_number|
+--------------------+-------------------+-------------------+--------------+--------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|           264|           264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|           264|           264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|           264|           264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|           264|           264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|           264|           264|   NULL|                B00014|
|     B0

### Question 2

In [5]:
partitioned_dir = pathlib.Path('data/transformed/fhv/2019/10')
partitioned_dir.parent.mkdir(parents=True, exist_ok=True)

df = df.repartition(6)
df.write.parquet(str(partitioned_dir), mode='overwrite')

sizes = []
for path in partitioned_dir.iterdir():
    if not path.name.startswith('.') and path.name != '_SUCCESS':
        print(path.name)
        sizes.append(path.stat().st_size)
avg_size_mb = sum(sizes) / len(sizes) / 2**20
print(f'Average partition size: {avg_size_mb:.2f}MB')

[Stage 3:>                                                          (0 + 6) / 6]

part-00001-59fc8522-4fb1-4708-997a-fa6c48a744d9-c000.snappy.parquet
part-00000-59fc8522-4fb1-4708-997a-fa6c48a744d9-c000.snappy.parquet
part-00002-59fc8522-4fb1-4708-997a-fa6c48a744d9-c000.snappy.parquet
part-00005-59fc8522-4fb1-4708-997a-fa6c48a744d9-c000.snappy.parquet
part-00004-59fc8522-4fb1-4708-997a-fa6c48a744d9-c000.snappy.parquet
part-00003-59fc8522-4fb1-4708-997a-fa6c48a744d9-c000.snappy.parquet
Average partition size: 6.35MB


                                                                                

### Question 3

In [6]:
df.withColumn('pickup_date', ps_functions.to_date('pickup_datetime')) \
    .filter(ps_functions.col('pickup_date') == datetime.date(2019, 10, 15))\
    .count()

                                                                                

62610

### Question 4

In [7]:
duration_in_hours_udf = ps_functions.udf(
    lambda duration: duration.total_seconds() / (60 * 60),
    ps_types.DoubleType()
)

df.withColumn('duration', df.drop_off_datetime - df.pickup_datetime) \
    .withColumn('duration', duration_in_hours_udf(ps_functions.col('duration'))) \
    .agg({'duration': 'max'}) \
    .show()

[Stage 12:>                                                         (0 + 6) / 6]

+-------------+
|max(duration)|
+-------------+
|     631152.5|
+-------------+



                                                                                

### Question 5

http://localhost:4040/

### Question 6

In [8]:
zones_file_path = 'data/raw/taxi_zone_lookup.csv'

zones_schema = ps_types.StructType([
    ps_types.StructField('LocationID', ps_types.IntegerType(), True), 
    ps_types.StructField('Borough', ps_types.StringType(), True), 
    ps_types.StructField('Zone', ps_types.StringType(), True), 
    ps_types.StructField('service_zone', ps_types.StringType(), True)
])

zones_df = spark.read \
    .option('header', True) \
    .schema(zones_schema) \
    .csv(zones_file_path)

zones_df = zones_df.withColumnsRenamed({
    'LocationID': 'location_id',
    'Borough': 'borough',
    'Zone': 'zone'
})

zones_df.show(10)

+-----------+-------------+--------------------+------------+
|location_id|      borough|                zone|service_zone|
+-----------+-------------+--------------------+------------+
|          1|          EWR|      Newark Airport|         EWR|
|          2|       Queens|         Jamaica Bay|   Boro Zone|
|          3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|          4|    Manhattan|       Alphabet City| Yellow Zone|
|          5|Staten Island|       Arden Heights|   Boro Zone|
|          6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|          7|       Queens|             Astoria|   Boro Zone|
|          8|       Queens|        Astoria Park|   Boro Zone|
|          9|       Queens|          Auburndale|   Boro Zone|
|         10|       Queens|        Baisley Park|   Boro Zone|
+-----------+-------------+--------------------+------------+
only showing top 10 rows



In [9]:
df.join(zones_df, df.pu_location_id == zones_df.location_id) \
    .groupby(ps_functions.col('zone')) \
    .count() \
    .agg(ps_functions.min_by('zone', 'count')) \
    .show()

[Stage 17:>                                                         (0 + 1) / 1]

+-------------------+
|min_by(zone, count)|
+-------------------+
|        Jamaica Bay|
+-------------------+



                                                                                