In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [6]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.DoubleType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
    ])

dispatching_base_num	pickup_datetime	dropoff_datetime	PULocationID	DOLocationID	SR_Flag	 Affiliated_base_number


In [7]:
df = spark.read \
    .option("header", 'true') \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv.gz')

In [8]:
df.head()

Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 2, 41), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 7, 46), PULocationID=174, DOLocationID=18, SR_Flag=None, Affiliated_base_number='B02764')

In [9]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: double (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [10]:
df = df.repartition(12)

In [11]:
df.write.parquet('data/processed/fhvhv/2021/06/')

In [4]:
df = spark.read.parquet('data/processed/fhvhv/2021/06/')

In [5]:
df.registerTempTable('fhvhv202106')



In [6]:
spark.sql('''
SELECT * 
FROM fhvhv202106
LIMIT 2
''').show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02869|2021-06-24 14:32:51|2021-06-24 15:15:57|         230|          13|   null|                B02869|
|              B02875|2021-06-20 09:14:56|2021-06-20 09:25:04|          56|          82|   null|                B02875|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+



In [9]:
spark.sql('''
SELECT COUNT(*) as trips
FROM fhvhv202106
WHERE DATE(pickup_datetime) = '2021-06-15' 
''').show()

+------+
| trips|
+------+
|452470|
+------+



In [20]:
spark.sql('''
SELECT (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/60/60 as hours
FROM fhvhv202106
ORDER BY 1 DESC
LIMIT 1
''').show()

+-----------------+
|            hours|
+-----------------+
|66.87888888888888|
+-----------------+



In [22]:
lookup = spark.read \
    .option("header", 'true') \
    .option("inferSchema", 'true') \
    .csv('taxi_zone_lookup.csv')

In [24]:
lookup.schema

StructType(List(StructField(LocationID,IntegerType,true),StructField(Borough,StringType,true),StructField(Zone,StringType,true),StructField(service_zone,StringType,true)))

In [25]:
lookup.createOrReplaceTempView('lookup')

In [31]:
spark.sql("""
SELECT fhv.PULocationID, Zone, COUNT(*)
FROM fhvhv202106 fhv
JOIN lookup lkp
ON fhv.PULocationID = lkp.LocationID
GROUP BY 1, 2
ORDER BY 3 DESC
LIMIT 1
""").show()

+------------+-------------------+--------+
|PULocationID|               Zone|count(1)|
+------------+-------------------+--------+
|          61|Crown Heights North|  231279|
+------------+-------------------+--------+

