In [1]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('fhv_data') \
    .getOrCreate()

24/03/03 06:52:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
!wc -l fhv_tripdata_2019-10.csv

1897494 fhv_tripdata_2019-10.csv


In [106]:
spark.version

'3.3.2'

In [5]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

In [6]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [14]:
!head -n 1001 fhv_tripdata_2019-10.csv > head.csv

In [15]:
import pandas as pd

In [16]:
df_pandas = pd.read_csv('head.csv')

In [70]:
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [71]:
from pyspark.sql import types

In [72]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [101]:
df_zonedata = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

In [None]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [74]:
df = df.repartition(6)

In [75]:
df.write.parquet('fhvhv/2019/10/')

                                                                                

In [76]:
!ls -lh fhvhv/2019/10

total 38M
-rw-r--r-- 1 codespace codespace    0 Mar  3 07:51 _SUCCESS
-rw-r--r-- 1 codespace codespace 6.3M Mar  3 07:51 part-00000-aebb5d16-2526-47c4-8256-212d4f0f4097-c000.snappy.parquet
-rw-r--r-- 1 codespace codespace 6.3M Mar  3 07:51 part-00001-aebb5d16-2526-47c4-8256-212d4f0f4097-c000.snappy.parquet
-rw-r--r-- 1 codespace codespace 6.3M Mar  3 07:51 part-00002-aebb5d16-2526-47c4-8256-212d4f0f4097-c000.snappy.parquet
-rw-r--r-- 1 codespace codespace 6.3M Mar  3 07:51 part-00003-aebb5d16-2526-47c4-8256-212d4f0f4097-c000.snappy.parquet
-rw-r--r-- 1 codespace codespace 6.3M Mar  3 07:51 part-00004-aebb5d16-2526-47c4-8256-212d4f0f4097-c000.snappy.parquet
-rw-r--r-- 1 codespace codespace 6.3M Mar  3 07:51 part-00005-aebb5d16-2526-47c4-8256-212d4f0f4097-c000.snappy.parquet


In [77]:
df = spark.read.parquet('fhvhv/2019/10/')

In [78]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [79]:
from pyspark.sql import functions as F

In [80]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .filter("pickup_date == '2019-10-15'") \
    .count()

62610

In [81]:
df.registerTempTable('trips_data')



In [83]:
spark.sql("""
SELECT
    count(*)
FROM
    trips_data
where 
    to_date(pickup_datetime) = '2019-10-15'
""").show()

+--------+
|count(1)|
+--------+
|   62610|
+--------+



In [100]:
spark.sql("""
SELECT 
    DATEDIFF(hour, pickup_datetime, dropoff_datetime)
FROM
    trips_data
order by 1 desc
limit 5
""").show()



+------------------------------------------------------+
|timestampdiff(hour, pickup_datetime, dropoff_datetime)|
+------------------------------------------------------+
|                                                631152|
|                                                631152|
|                                                 87672|
|                                                 70128|
|                                                  8794|
+------------------------------------------------------+



                                                                                

In [102]:
df_zonedata.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [103]:
df_zonedata.registerTempTable('zone_data')



In [104]:
spark.sql("""
SELECT 
    Zone, count(*)
FROM
    trips_data
left join
    zone_data
on 1 = 1
and trips_data.PULocationID = zone_data.LocationID
group by Zone
order by count(*) asc
""").show()

[Stage 37:>                                                         (0 + 2) / 2]

+--------------------+--------+
|                Zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
|        Battery Park|      15|
|Saint Michaels Ce...|      23|
|Breezy Point/Fort...|      25|
|Marine Park/Floyd...|      26|
|        Astoria Park|      29|
|    Inwood Hill Park|      39|
|       Willets Point|      47|
|Forest Park/Highl...|      53|
|  Brooklyn Navy Yard|      57|
|        Crotona Park|      62|
|        Country Club|      77|
|     Freshkills Park|      89|
|       Prospect Park|      98|
|     Columbia Street|     105|
|  South Williamsburg|     110|
+--------------------+--------+
only showing top 20 rows



                                                                                

In [105]:
spark.sql("""
SELECT 
    PULocationID, count(*)
FROM
    trips_data
group by PULocationID
order by count(*) asc
Limit 5
""").show()

+------------+--------+
|PULocationID|count(1)|
+------------+--------+
|           2|       1|
|         105|       2|
|         111|       5|
|          30|       8|
|         120|      14|
+------------+--------+

