In [5]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

22/02/26 18:03:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
spark.version

'3.0.3'

In [5]:
# !wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-02-26 12:25:00--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.77.164
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.77.164|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv’


2022-02-26 12:25:24 (30.0 MB/s) - ‘fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]



In [6]:
!wc -l fhvhv_tripdata_2021-02.csv

11613943 fhvhv_tripdata_2021-02.csv


In [10]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [11]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [12]:
df = df.repartition(24)

In [13]:
df.write.parquet('fhvhv/2021/02/')

                                                                                

# Load data

In [7]:
df = spark.read.parquet('fhvhv/2021/02/')

                                                                                

In [8]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [9]:
df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02887|2021-02-06 01:18:35|2021-02-06 01:40:34|         163|         235|   null|
|           HV0005|              B02510|2021-02-05 07:13:06|2021-02-05 07:31:56|         225|         181|   null|
|           HV0003|              B02869|2021-02-04 16:56:52|2021-02-04 17:21:36|         260|          95|   null|
|           HV0003|              B02871|2021-02-03 18:34:17|2021-02-03 18:57:12|         235|          60|   null|
|           HV0003|              B02869|2021-02-04 07:25:09|2021-02-04 07:30:34|          55|          55|   null|
|           HV0003|              B02836|2021-02-04 23:15:27|2021-02-04 23:34:29|

                                                                                

In [19]:
df_zones = spark.read.parquet('zones/')

In [20]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [10]:
df.registerTempTable('fhvhv_2021_02')

In [23]:
df_zones.registerTempTable('zones')

In [11]:
spark.sql("""
SELECT
    count(1)
FROM
    fhvhv_2021_02
WHERE
    date(pickup_datetime) = '2021-02-15'
""").show()



+--------+
|count(1)|
+--------+
|  367170|
+--------+



                                                                                

In [16]:
spark.sql("""
SELECT
    date(pickup_datetime),
    (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)) as duration
FROM
    fhvhv_2021_02
ORDER BY 
    duration DESC
LIMIT 1
""").show()



+---------------+--------+
|pickup_datetime|duration|
+---------------+--------+
|     2021-02-11|   75540|
+---------------+--------+



                                                                                

In [17]:
spark.sql("""
SELECT
    dispatching_base_num,
    count(1) as count_trips
FROM
    fhvhv_2021_02
GROUP BY 
    dispatching_base_num
ORDER BY 
    count_trips DESC
LIMIT 1
""").show()



+--------------------+-----------+
|dispatching_base_num|count_trips|
+--------------------+-----------+
|              B02510|    3233664|
+--------------------+-----------+



                                                                                

In [30]:
spark.sql("""

SELECT
    concat(puzone.zone, " / ", dozone.zone),
    count(1) as count_trips
FROM
    fhvhv_2021_02
LEFT JOIN
    zones as puzone
ON 
    PULocationID = puzone.LocationID
LEFT JOIN
    zones as dozone
ON 
    DOLocationID = dozone.LocationID
GROUP BY 
    1
ORDER BY 
    count_trips DESC
LIMIT 1

""").show()



+-----------------------+-----------+
|concat(zone,  / , zone)|count_trips|
+-----------------------+-----------+
|   East New York / E...|      45041|
+-----------------------+-----------+



                                                                                

In [None]:
df_join = df.join(df_zones, on=['hour', 'zone'], how='inner')