In [12]:
import pyspark
from pyspark.sql import SparkSession, types
from pyspark.sql import functions as F

file = "fhvhv_tripdata_2021-02.csv"

In [2]:
config = pyspark.SparkConf() \
    .setAll([('spark.executor.memory', '6g'), ('spark.executor.cores', '3'), ('spark.cores.max', '3'), ('spark.driver.memory','6g')])
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .config(conf=config) \
    .getOrCreate()

schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv(file)

In [3]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null|
|           HV0005|              B02510|2021-02-01 00:28:38|2021-02-01 00:38:27|          39|          91|   null|
|           HV0005|              B02510|2021-02-01 00:43:37|2021-02-01 01:23:20|          91|         228|   null|
|           HV0003|              B02872|2021-02-01 00:08:42|2021-02-01 00:17:57|         126|         250|   null|
|           HV0003|              B02872|2021-02-01 00:26:02|2021-02-01 00:42:51|

In [4]:
df = df.repartition(24)

In [5]:
df.write.mode("overwrite").parquet("fhvh/2021/02/")

In [6]:
df = spark.read.parquet("fhvh/2021/02/*")

In [7]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02765|2021-02-03 19:55:14|2021-02-03 20:05:58|         143|         164|   null|
|           HV0003|              B02764|2021-02-04 12:57:01|2021-02-04 12:59:18|         205|         205|   null|
|           HV0003|              B02880|2021-02-04 20:27:39|2021-02-04 20:37:28|         258|         134|   null|
|           HV0003|              B02887|2021-02-05 06:21:51|2021-02-05 06:38:33|          57|         242|   null|
|           HV0003|              B02865|2021-02-01 07:17:28|2021-02-01 07:25:20|          76|          35|   null|
|           HV0003|              B02878|2021-02-03 07:35:03|2021-02-03 07:50:28|

In [17]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [8]:
df.registerTempTable('data')

In [11]:
# sql question 3
spark.sql("""
    SELECT
        COUNT(1) AS records
    FROM
        data
    WHERE
        DATE(pickup_datetime) = '2021-02-15'
""").show()

+-------+
|records|
+-------+
| 367170|
+-------+



In [15]:
# python question 3
df.filter(F.to_date(df.pickup_datetime) == '2021-02-15').count()

367170

In [23]:
# sql question 4
spark.sql("""
    SELECT
        DATE(pickup_datetime) AS day,
        pickup_datetime,
        dropoff_datetime,
        (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/60 AS duration
    FROM
        data
    ORDER BY 4 DESC
""").show()

+----------+-------------------+-------------------+------------------+
|       day|    pickup_datetime|   dropoff_datetime|          duration|
+----------+-------------------+-------------------+------------------+
|2021-02-11|2021-02-11 13:40:44|2021-02-12 10:39:44|            1259.0|
|2021-02-17|2021-02-17 15:54:53|2021-02-18 07:48:34| 953.6833333333333|
|2021-02-20|2021-02-20 12:08:15|2021-02-21 00:22:14| 733.9833333333333|
|2021-02-03|2021-02-03 20:24:25|2021-02-04 07:41:58|            677.55|
|2021-02-19|2021-02-19 23:17:44|2021-02-20 09:44:01| 626.2833333333333|
|2021-02-25|2021-02-25 17:13:35|2021-02-26 02:57:05|             583.5|
|2021-02-20|2021-02-20 01:36:13|2021-02-20 11:16:19|             580.1|
|2021-02-18|2021-02-18 15:24:19|2021-02-19 01:01:11| 576.8666666666667|
|2021-02-18|2021-02-18 01:31:20|2021-02-18 11:07:15| 575.9166666666666|
|2021-02-10|2021-02-10 20:51:39|2021-02-11 06:21:08| 569.4833333333333|
|2021-02-10|2021-02-10 01:56:17|2021-02-10 10:57:33| 541.2666666

In [33]:
# spark question 4
df.withColumn("duration", F.unix_timestamp(df.dropoff_datetime) - F.unix_timestamp(df.pickup_datetime)) \
    .orderBy("duration", ascending=0).show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+--------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|duration|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+--------+
|           HV0005|              B02510|2021-02-11 13:40:44|2021-02-12 10:39:44|         247|          41|   null|   75540|
|           HV0004|              B02800|2021-02-17 15:54:53|2021-02-18 07:48:34|         242|         254|   null|   57221|
|           HV0004|              B02800|2021-02-20 12:08:15|2021-02-21 00:22:14|         188|          55|   null|   44039|
|           HV0003|              B02864|2021-02-03 20:24:25|2021-02-04 07:41:58|          51|         147|   null|   40653|
|           HV0003|              B02887|2021-02-19 23:17:44|2021-02-20 09:44:01|         210|         149|   null|   37577|
|       

In [37]:
# sql question 5
spark.sql("""
    SELECT
        dispatching_base_num,
        COUNT(1) AS frequency
    FROM
        data
    GROUP BY
        dispatching_base_num
    ORDER BY
        frequency DESC
""").show()

+--------------------+---------+
|dispatching_base_num|frequency|
+--------------------+---------+
|              B02510|  3233664|
|              B02764|   965568|
|              B02872|   882689|
|              B02875|   685390|
|              B02765|   559768|
|              B02869|   429720|
|              B02887|   322331|
|              B02871|   312364|
|              B02864|   311603|
|              B02866|   311089|
|              B02878|   305185|
|              B02682|   303255|
|              B02617|   274510|
|              B02883|   251617|
|              B02884|   244963|
|              B02882|   232173|
|              B02876|   215693|
|              B02879|   210137|
|              B02867|   200530|
|              B02877|   198938|
+--------------------+---------+
only showing top 20 rows



In [43]:
# spark question 5
df.groupBy("dispatching_base_num").count().orderBy("count", ascending=False).show()

+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
|              B02878| 305185|
|              B02682| 303255|
|              B02617| 274510|
|              B02883| 251617|
|              B02884| 244963|
|              B02882| 232173|
|              B02876| 215693|
|              B02879| 210137|
|              B02867| 200530|
|              B02877| 198938|
+--------------------+-------+
only showing top 20 rows



In [49]:
zone = spark.read.parquet("zones").select("LocationID", "Zone") \
    .withColumnRenamed("Zone", "zone")

In [55]:
data = df.join(zone, df.PULocationID == zone.LocationID) \
    .withColumnRenamed("zone", "pu_location") \
    .drop("PULocationID", "LocationID") \
    .join(zone, df.DOLocationID == zone.LocationID) \
    .withColumnRenamed("zone", "do_location") \
    .drop("DOLocationID", "LocationID")

In [56]:
data.show()

+-----------------+--------------------+-------------------+-------------------+-------+--------------------+--------------------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|SR_Flag|         pu_location|         do_location|
+-----------------+--------------------+-------------------+-------------------+-------+--------------------+--------------------+
|           HV0003|              B02765|2021-02-03 19:55:14|2021-02-03 20:05:58|   null| Lincoln Square West|       Midtown South|
|           HV0003|              B02764|2021-02-04 12:57:01|2021-02-04 12:59:18|   null|        Saint Albans|        Saint Albans|
|           HV0003|              B02880|2021-02-04 20:27:39|2021-02-04 20:37:28|   null|           Woodhaven|         Kew Gardens|
|           HV0003|              B02887|2021-02-05 06:21:51|2021-02-05 06:38:33|   null|              Corona|Van Nest/Morris Park|
|           HV0003|              B02865|2021-02-01 07:17:28|2021-02-01 07:25:20|   

In [57]:
data.registerTempTable('data')

In [61]:
# sql question 6
spark.sql("""
    SELECT
        pu_location || ':' || do_location AS location,
        COUNT(1)
    FROM
        data
    GROUP BY
        1
    ORDER BY
        2 DESC
""").show(truncate=False)

+---------------------------------------------------+--------+
|location                                           |count(1)|
+---------------------------------------------------+--------+
|East New York:East New York                        |45041   |
|Borough Park:Borough Park                          |37329   |
|Canarsie:Canarsie                                  |28026   |
|Crown Heights North:Crown Heights North            |25976   |
|Bay Ridge:Bay Ridge                                |17934   |
|Jackson Heights:Jackson Heights                    |14688   |
|Astoria:Astoria                                    |14688   |
|Central Harlem North:Central Harlem North          |14481   |
|Bushwick South:Bushwick South                      |14424   |
|Flatbush/Ditmas Park:Flatbush/Ditmas Park          |13976   |
|South Ozone Park:South Ozone Park                  |13716   |
|Brownsville:Brownsville                            |12829   |
|JFK Airport:NA                                     |12