In [1]:
import pyspark
from pyspark.sql import SparkSession, types, functions as f

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("hw") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/06 06:04:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
fhv_schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [4]:
df_fhv = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv("data/raw/fhv/")

In [5]:
df_fhv.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PUlocationID', IntegerType(), True), StructField('DOlocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [34]:
df_partitioned = df_fhv.repartition(6)

In [35]:
df_partitioned.write.parquet("data/pq/")

                                                                                

In [38]:
df_fhv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

In [42]:
df_fhv.filter(f.to_date('pickup_datetime') == '2019-10-15').count()

                                                                                

62610

In [74]:
df_fhv\
    .withColumn('trip_duration', ((f.col("dropOff_datetime").cast("long") - f.col("pickup_datetime").cast("long"))/3600.).cast("integer")) \
    .sort('trip_duration', ascending=False) \
    .limit(1) \
    .show()

[Stage 31:>                                                         (0 + 1) / 1]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|trip_duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------+
|              B02832|2019-10-11 18:00:00|2091-10-11 18:30:00|         264|         264|   null|                B02832|       631152|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------+



                                                                                

In [6]:
zones_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True), 
    types.StructField('Borough', types.StringType(), True), 
    types.StructField('Zone', types.StringType(), True), 
    types.StructField('service_zone', types.StringType(), True)
])

In [7]:
df_zones = spark.read \
    .option("header", "true") \
    .schema(zones_schema) \
    .csv('data/raw/zones')

In [8]:
df_zones.schema

StructType([StructField('LocationID', IntegerType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [9]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [19]:
df_join = df_fhv.join(df_zones, df_fhv.PUlocationID == df_zones.LocationID)

In [20]:
df_join.createOrReplaceTempView('join')

In [25]:
spark.sql("""

SELECT Zone, count(1)
FROM join
GROUP BY Zone
ORDER BY count(1)
LIMIT 1
""").show()

[Stage 30:>                                                         (0 + 1) / 1]

+-----------+--------+
|       Zone|count(1)|
+-----------+--------+
|Jamaica Bay|       1|
+-----------+--------+



                                                                                