In [163]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
         .master('local[*]') \
         .appName('test') \
         .getOrCreate()

### Solution 2

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)?

In [164]:
from pyspark.sql import types

In [165]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PUlocationID', types.IntegerType(), True),
    types.StructField('DOlocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [166]:
df = spark.read \
     .option('header', 'true') \
     .schema(schema) \
     .csv('fhv_tripdata_2019-10.csv')

In [167]:
df = df.repartition(6)

In [168]:
df.write.parquet('fhv/2019/10', mode='overwrite')

                                                                                

In [178]:
%ls -lh fhv/2019/10/*.parquet | awk '{print $5, $9}'

6.0M fhv/2019/10/part-00000-211fe1ac-f980-4040-b535-a1d60f67d967-c000.snappy.parquet
6.0M fhv/2019/10/part-00001-211fe1ac-f980-4040-b535-a1d60f67d967-c000.snappy.parquet
6.0M fhv/2019/10/part-00002-211fe1ac-f980-4040-b535-a1d60f67d967-c000.snappy.parquet
6.0M fhv/2019/10/part-00003-211fe1ac-f980-4040-b535-a1d60f67d967-c000.snappy.parquet
6.0M fhv/2019/10/part-00004-211fe1ac-f980-4040-b535-a1d60f67d967-c000.snappy.parquet
6.0M fhv/2019/10/part-00005-211fe1ac-f980-4040-b535-a1d60f67d967-c000.snappy.parquet


### Solution 3

How many taxi trips were there on the 15th of October?

In [170]:
from pyspark.sql.functions import *

In [171]:
df = df.withColumn('pickup_date', to_date(df.pickup_datetime)) \
       .withColumn('dropOff_date', to_date(df.dropOff_datetime))

In [172]:
df.filter(df.pickup_date == lit('2019-10-15')) \
  .count()

                                                                                

62610

### Solution 4

What is the length of the longest trip in the dataset in hours?

In [173]:
df = df.withColumn(
    "trip_duration_hours",
    round(
        (to_unix_timestamp("dropOff_datetime") - to_unix_timestamp("pickup_datetime"))
        / 3600,
        2,
    ),
)

In [174]:
df.select(max(col("trip_duration_hours"))) \
  .show()



+------------------------+
|max(trip_duration_hours)|
+------------------------+
|                631152.5|
+------------------------+



                                                                                

### Solution 6

Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?

#### Spark SQL Solution

In [154]:
df.createOrReplaceTempView('fhv_tripdata_2019_10')

In [155]:
df_fhv_pu_location_count = spark.sql("""
SELECT
    PUlocationID AS LocationID,
    COUNT(*) AS count_per_location
FROM
    fhv_tripdata_2019_10
GROUP BY
    PUlocationID
ORDER BY
    count_per_location
""")

In [156]:
df_fhv_pu_location_count.createOrReplaceTempView('fhv_pu_location_count')

In [157]:
df_zones = spark.read \
                .option('header', 'true') \
                .csv('taxi+_zone_lookup.csv')

In [158]:
df_zones.createOrReplaceTempView('zones')

In [159]:
spark.sql("""
SELECT
  Zone,
  count_per_location
FROM
  zones
JOIN
  fhv_pu_location_count
USING (LocationID)
ORDER BY
    count_per_location
LIMIT
    1
""").show()

+-----------+------------------+
|       Zone|count_per_location|
+-----------+------------------+
|Jamaica Bay|                 1|
+-----------+------------------+



#### DataFrame API Solution

In [160]:
df_fhv_pu_location_count = df.groupBy('PUlocationID') \
                             .count() \
                             .withColumnRenamed('count', 'count_per_location') \
                             .sort("count") \
                             .limit(1)

In [161]:
df_zones = spark.read \
                .option('header', 'true') \
                .csv('taxi+_zone_lookup.csv')

In [162]:
df_fhv_pu_location_count.join(df_zones, df.PUlocationID == df_zones.LocationID) \
                        .select('Zone', 'count_per_location') \
                        .show()

+-----------+------------------+
|       Zone|count_per_location|
+-----------+------------------+
|Jamaica Bay|                 1|
+-----------+------------------+

