In [41]:
df_pandas = pd.read_csv('fhv_tripdata_2019-10.csv.gz', nrows=1) 
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID                int64
DOlocationID                int64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
spark.version

'3.5.0'

In [39]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('affiliated_base_number', types.StringType(), True)
])

**Q2**: FHV October 2019 partition size

**Answer**: 6.22 MB (6,523,903 bytes)

In [42]:
%%time

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv.gz')

df = df.repartition(6)

df.write.mode('overwrite').parquet('data/pq/fhvhv/2019/10/')

CPU times: total: 15.6 ms
Wall time: 8.3 s


In [43]:
df = spark.read.parquet('data/pq/fhvhv/2019/10/')

**Q3**: Count records on 15th of October

**Answer**: 62295

In [44]:
from pyspark.sql import functions as F

In [45]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2019-10-15'") \
    .count()

62610

In [47]:
df.registerTempTable('fhvhv_2019_10')



In [48]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    fhvhv_2019_10
WHERE
    to_date(pickup_datetime) = '2019-10-15';
""").show()

+--------+
|count(1)|
+--------+
|   62610|
+--------+



**Q4**: Longest trip for each day

In [58]:
df \
    .withColumn('duration', (df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) / 3600.0) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()

+-----------+-----------------+
|pickup_date|    max(duration)|
+-----------+-----------------+
| 2019-10-28|         631152.5|
| 2019-10-11|         631152.5|
| 2019-10-31|87672.44083333333|
| 2019-10-01|70128.02805555555|
| 2019-10-17|           8794.0|
+-----------+-----------------+



In [59]:
spark.sql("""
SELECT
    to_date(pickup_datetime) AS pickup_date,
    MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 3600) AS duration
FROM 
    fhvhv_2019_10
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 10;
""").show()

+-----------+------------------+
|pickup_date|          duration|
+-----------+------------------+
| 2019-10-28|          631152.5|
| 2019-10-11|          631152.5|
| 2019-10-31| 87672.44083333333|
| 2019-10-01| 70128.02805555555|
| 2019-10-17|            8794.0|
| 2019-10-26| 8784.166666666666|
| 2019-10-30|1464.5344444444445|
| 2019-10-25|1056.8266666666666|
| 2019-10-02| 769.2313888888889|
| 2019-10-23| 745.6166666666667|
+-----------+------------------+



**Q6**: Most common locations pair

In [66]:
df_zones = spark.read.parquet('zones')

In [67]:
df_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [68]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'affiliated_base_number']

In [69]:
df_zones.registerTempTable('zones')

In [72]:
spark.sql("""
SELECT
    pul.Zone AS pu_do_pair,
    COUNT(1) AS cnt
FROM 
    fhvhv_2019_10 fhv 
INNER JOIN 
    zones pul 
ON 
    fhv.PULocationID = pul.LocationID
GROUP BY 
    pu_do_pair
ORDER BY
    cnt ASC
LIMIT 5;
""").show()

+--------------------+---+
|          pu_do_pair|cnt|
+--------------------+---+
|         Jamaica Bay|  1|
|Governor's Island...|  2|
| Green-Wood Cemetery|  5|
|       Broad Channel|  8|
|     Highbridge Park| 14|
+--------------------+---+

