In [1]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

In [2]:
spark = SparkSession.builder \
.master("local[*]") \
.appName('test') \
.getOrCreate()

In [None]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

In [4]:
import gzip

gzip_filename = 'fhv_tripdata_2019-10.csv.gz'

with gzip.open(gzip_filename, 'rt', newline='') as gz_file:
    csv_data = gz_file.read()

with open(gzip_filename[:-3], 'wt') as csv_file:
    csv_file.write(csv_data)

In [3]:
df = spark.read \
.option("header", "true") \
.csv('fhv_tripdata_2019-10.csv')

In [4]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [7]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [8]:
!head -n 1001 fhv_tripdata_2019-10.csv > head.csv

In [9]:
!wc -l head.csv

1001 head.csv


In [10]:
df_pandas = pd.read_csv('head.csv')

In [12]:
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [13]:
spark.createDataFrame(df_pandas).schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', DoubleType(), True), StructField('DOlocationID', DoubleType(), True), StructField('SR_Flag', DoubleType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [5]:
from pyspark.sql import types

In [6]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [7]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [8]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [22]:
df = df.repartition(6)

df.write.parquet('fhv/2019/10/')

In [9]:
from pyspark.sql import functions as F

In [16]:
df.createOrReplaceTempView ('fhv_data')

In [17]:
def udffer(pickup, dropoff):
    return dropoff - pickup

date_finder_udf = F.udf(udffer, returnType=types.IntegerType())

In [28]:
spark.sql("""
SELECT 
    count(1)
FROM 
    fhv_data
WHERE
    pickup_datetime >= '2019-10-15 00:00:00' AND
    pickup_datetime < '2019-10-16 00:00:00'
ORDER BY 
    1
    ASC
""").show()

+--------+
|count(1)|
+--------+
|   62610|
+--------+



In [30]:
spark.sql("""
SELECT 
    DATEDIFF(day, pickup_datetime, dropOff_datetime),
    pickup_datetime, 
    dropOff_datetime
FROM 
    fhv_data
WHERE
    pickup_datetime > '2019-10-01 00:00:00' AND
    pickup_datetime < '2019-10-31 00:00:00'
ORDER BY 
    1
    DESC
""").show()

+-----------------------------------------------------+-------------------+-------------------+
|timestampdiff(day, pickup_datetime, dropOff_datetime)|    pickup_datetime|   dropOff_datetime|
+-----------------------------------------------------+-------------------+-------------------+
|                                                26298|2019-10-28 09:00:00|2091-10-28 09:30:00|
|                                                26298|2019-10-11 18:00:00|2091-10-11 18:30:00|
|                                                 2922|2019-10-01 21:43:42|2027-10-01 21:45:23|
|                                                  366|2019-10-17 14:00:00|2020-10-18 00:00:00|
|                                                  366|2019-10-26 21:26:00|2020-10-26 21:36:00|
|                                                   61|2019-10-30 12:30:04|2019-12-30 13:02:08|
|                                                   44|2019-10-25 07:04:57|2019-12-08 07:21:11|
|                                       

In [34]:
df_zones = spark.read.parquet('zones')

In [36]:
df_result = df.join(df_zones, df.PUlocationID == df_zones.LocationID)

In [40]:
df_result.drop('LocationID').show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------+---------------+------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|Borough|           Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------+---------------+------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|Unknown|             NV|         N/A|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|Unknown|             NV|         N/A|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|Unknown|             NV|         N/A|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57

In [39]:
df_result.createOrReplaceTempView ('joined')

In [58]:
spark.sql("""
SELECT 
    Zone,
    count(*)
FROM 
    joined
GROUP BY
    zone
ORDER BY 
    2
    ASC
""").show()

+--------------------+--------+
|                Zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
|        Battery Park|      15|
|Saint Michaels Ce...|      23|
|Breezy Point/Fort...|      25|
|Marine Park/Floyd...|      26|
|        Astoria Park|      29|
|    Inwood Hill Park|      39|
|       Willets Point|      47|
|Forest Park/Highl...|      53|
|  Brooklyn Navy Yard|      57|
|        Crotona Park|      62|
|        Country Club|      77|
|     Freshkills Park|      89|
|       Prospect Park|      98|
|     Columbia Street|     105|
|  South Williamsburg|     110|
+--------------------+--------+
only showing top 20 rows

