In [1]:
import pyspark

In [2]:
pyspark.__version__

'3.5.1'

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [4]:
df = spark.read \
    .option("header", "true") \
    .csv('/C:/Users/wongs/Downloads/fhv_tripdata_2019-10.csv/fhv_tripdata_2019-10.csv')

In [5]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [6]:
from pyspark.sql import types

In [7]:
schema = types.StructType(
    [
        types.StructField('dispatching_base_num', types.StringType(), True),
        types.StructField('pickup_datetime', types.TimestampType(), True),
        types.StructField('dropoff_datetime', types.TimestampType(), True),
        types.StructField('PUlocationID', types.IntegerType(), True),
        types.StructField('DOlocationID', types.IntegerType(), True),
        types.StructField('SR_Flag', types.StringType(), True),
        types.StructField('Affiliated_base_number', types.StringType(), True)
    ]
)

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('/C:/Users/wongs/Downloads/fhv_tripdata_2019-10.csv/fhv_tripdata_2019-10.csv')

In [8]:
df.head(5)

[Row(dispatching_base_num='B00009', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 23), dropoff_datetime=datetime.datetime(2019, 10, 1, 0, 35), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00009'),
 Row(dispatching_base_num='B00013', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 11, 29), dropoff_datetime=datetime.datetime(2019, 10, 1, 0, 13, 22), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00013'),
 Row(dispatching_base_num='B00014', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 11, 43), dropoff_datetime=datetime.datetime(2019, 10, 1, 0, 37, 20), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 56, 29), dropoff_datetime=datetime.datetime(2019, 10, 1, 0, 57, 47), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=

In [9]:
ddf = df.repartition(6)


In [72]:
# ddf.write.parquet('fhv/19/10', mode='overwrite')


In [16]:
import datetime
new_df = df.select('pickup_datetime', 'dropoff_datetime', 'PUlocationID', 'DOlocationID').filter(df.pickup_datetime == datetime.datetime(2019, 10, 15, 0, 0))

In [25]:
from pyspark.sql import functions as F
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PUlocationID', 'DOlocationID') \
    .filter(F.to_date(df.pickup_datetime) == "2019-10-15")\
    .count()

62610

In [69]:
from pyspark.sql.functions import col, asc,desc
df \
    .withColumn('duration_seconds', ((df.dropoff_datetime.cast('long')-df.pickup_datetime.cast('long'))/3600)) \
    .orderBy(col('duration_seconds').desc()) \
    .show()


+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|  duration_seconds|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+
|              B02832|2019-10-28 09:00:00|2091-10-28 09:30:00|         264|         264|   NULL|                B02832|          631152.5|
|              B02832|2019-10-11 18:00:00|2091-10-11 18:30:00|         264|         264|   NULL|                B02832|          631152.5|
|              B02416|2019-10-31 23:46:33|2029-11-01 00:13:00|        NULL|        NULL|   NULL|                B02416| 87672.44083333333|
|     B00746         |2019-10-01 21:43:42|2027-10-01 21:45:23|         159|         264|   NULL|       B00746         | 70128.02805555555|
|              B02921|2019-

In [34]:
df \
    .withColumn('duration', ((df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long'))/3600)) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(1) \
    .show()

+-----------+-------------+
|pickup_date|max(duration)|
+-----------+-------------+
| 2019-10-11|     631152.5|
+-----------+-------------+



In [54]:
df_zones = spark.read\
    .option("header", "true") \
        .csv('C:/Users/wongs/Downloads/taxi_zone_lookup.csv')

df_zones.columns


['LocationID', 'Borough', 'Zone', 'service_zone']

In [57]:
zpu = df_zones \
    .withColumnRenamed('Zone', 'PUzone') \
    .withColumnRenamed('LocationID', 'zPUlocationID') \
    .withColumnRenamed('Borough', 'PUBorough') \
    .drop('service_zone')
zdo = df_zones \
    .withColumnRenamed('Zone', 'DOzone') \
    .withColumnRenamed('LocationID', 'zDOlocationID') \
    .withColumnRenamed('Borough', 'DOBorough') \
    .drop('service_zone')

df_join_temp = df.join(zpu, df.PUlocationID == zpu.zPUlocationID)
df_join = df_join_temp.join(zdo, df_join_temp.DOlocationID == zdo.zDOlocationID)

In [59]:
dd = df_join.drop('PULocationID', 'DOLocationID', 'zPULocationID', 'zDOLocationID')

In [62]:
dd.createOrReplaceTempView ('join_table')

In [65]:
dd.show()

+--------------------+-------------------+-------------------+-------+----------------------+---------+---------------+---------+--------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|SR_Flag|Affiliated_base_number|PUBorough|         PUzone|DOBorough|              DOzone|
+--------------------+-------------------+-------------------+-------+----------------------+---------+---------------+---------+--------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|   NULL|                B00009|  Unknown|             NV|  Unknown|                  NV|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|   NULL|                B00013|  Unknown|             NV|  Unknown|                  NV|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|   NULL|                B00014|  Unknown|             NV|  Unknown|                  NV|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|   NULL|                B00014| 

In [66]:
spark.sql("""
SELECT
    PUzone,
    COUNT(1)
FROM
    join_table
GROUP BY
    1
ORDER BY
    2 ASC
LIMIT
    1
;
""").show()

+-----------+--------+
|     PUzone|count(1)|
+-----------+--------+
|Jamaica Bay|       1|
+-----------+--------+

