In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/04 15:48:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
import pandas as pd

In [5]:
df_fhv_pd = pd.read_csv('./data/raw/fhv/fhv_tripdata_2019-10.csv', nrows=100)

In [6]:
df_fhv_pd

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264,264,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264,264,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264,264,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264,264,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264,264,,B00014
...,...,...,...,...,...,...,...
95,B00310,2019-10-01 00:06:02,2019-10-01 00:14:04,264,242,,B00310
96,B00310,2019-10-01 00:03:43,2019-10-01 00:07:26,264,213,,B02534
97,B00310,2019-10-01 00:37:14,2019-10-01 00:51:58,264,241,,B02879
98,B00310,2019-10-01 00:42:41,2019-10-01 00:54:42,264,213,,B02875


In [10]:
df_fhv_pd.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID                int64
DOlocationID                int64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [11]:
spark.createDataFrame(df_fhv_pd).schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', LongType(), True), StructField('DOlocationID', LongType(), True), StructField('SR_Flag', DoubleType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [12]:
from pyspark.sql import types

In [14]:
fhv_schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.DoubleType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [15]:
df_fhv = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv('data/raw/fhv/')

In [16]:
df_fhv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: double (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [17]:
df_fhv \
    .repartition(6) \
    .write.parquet('data/raw/fhv/pq/')

                                                                                

In [19]:
df_fhv = spark.read.parquet('data/raw/fhv/pq/*')

In [20]:
from pyspark.sql import functions as F

In [39]:
df_fhv = df_fhv \
    .withColumn('pickup_date', F.date_part(F.lit('D'), 'pickup_datetime').alias('day')) \
    .withColumn('duration', F.round((F.col('dropOff_datetime').cast("long") - F.col('pickup_datetime').cast("long")) / 3600))

In [40]:
df_fhv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+--------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|pickup_date|duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+--------+
|              B01231|2019-10-03 21:50:40|2019-10-03 21:55:35|         264|         217|   NULL|                B02849|          3|     0.0|
|              B00227|2019-10-02 16:00:00|2019-10-02 18:10:00|         264|         264|   NULL|                B00227|          2|     2.0|
|              B02103|2019-10-02 09:14:01|2019-10-02 09:23:00|         264|          92|   NULL|                B00095|          2|     0.0|
|              B00860|2019-10-02 17:59:14|2019-10-02 18:10:28|         264|         200|   NULL|                B00860|          2|     0.0|
|            

In [28]:
df_fhv.groupBy('pickup_date').count().show()

+-----------+-----+
|pickup_date|count|
+-----------+-----+
|         31|63972|
|         28|64075|
|         26|52569|
|         27|47708|
|         12|51434|
|         22|62950|
|          1|59873|
|         13|45900|
|          6|45665|
|         16|68156|
|          3|71638|
|         20|48304|
|          5|52398|
|         19|52530|
|         15|62610|
|          9|60468|
|         17|67656|
|          4|68227|
|          8|64049|
|         23|66429|
+-----------+-----+
only showing top 20 rows



In [41]:
df_fhv.groupBy('pickup_date').max('duration').show()

+-----------+-------------+
|pickup_date|max(duration)|
+-----------+-------------+
|         31|      87672.0|
|         28|     631153.0|
|         26|       8785.0|
|         27|        169.0|
|         12|        530.0|
|         22|        290.0|
|          1|      70128.0|
|         13|        506.0|
|          6|        675.0|
|         16|        605.0|
|          3|        746.0|
|         20|        338.0|
|          5|        698.0|
|         19|        362.0|
|         15|        458.0|
|          9|        602.0|
|         17|       8794.0|
|          4|        746.0|
|          8|        626.0|
|         23|        747.0|
+-----------+-------------+
only showing top 20 rows



In [55]:
df_zones = spark.read.option("header", "true").csv('data/raw/fhv/zones/')

In [56]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [57]:
df_result = df_fhv.join(df_zones, df_fhv.PUlocationID == df_zones.LocationID)

In [60]:
df_result = df_result.drop('LocationID')

In [67]:
df_result.groupBy('Zone').count().orderBy('count').show()

+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|         Jamaica Bay|    1|
|Governor's Island...|    2|
| Green-Wood Cemetery|    5|
|       Broad Channel|    8|
|     Highbridge Park|   14|
|        Battery Park|   15|
|Saint Michaels Ce...|   23|
|Breezy Point/Fort...|   25|
|Marine Park/Floyd...|   26|
|        Astoria Park|   29|
|    Inwood Hill Park|   39|
|       Willets Point|   47|
|Forest Park/Highl...|   53|
|  Brooklyn Navy Yard|   57|
|        Crotona Park|   62|
|        Country Club|   77|
|     Freshkills Park|   89|
|       Prospect Park|   98|
|     Columbia Street|  105|
|  South Williamsburg|  110|
+--------------------+-----+
only showing top 20 rows

