In [70]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

# We now need to instantiate a Spark session, an object that we use to interact with Spark.
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [71]:
schema = types.StructType(
    [
        types.StructField('dispatching_base_num', types.StringType(), True),
        types.StructField('pickup_datetime', types.TimestampType(), True),
        types.StructField('dropoff_datetime', types.TimestampType(), True),
        types.StructField('PULocationID', types.IntegerType(), True),
        types.StructField('DOLocationID', types.IntegerType(), True),
        types.StructField('SR_Flag', types.StringType(), True),
        types.StructField('Affiliated_base_number', types.StringType(), True)
    ]
)

In [72]:



df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('data/fhvhv_tripdata_2021-06.csv.gz')

df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

In [36]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [37]:
df = df.repartition(12)

In [38]:
df.write.parquet('fhvhv/2021/06/')

[Stage 20:>                                                         (0 + 1) / 1]

23/03/10 18:13:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


[Stage 22:>                                                        (0 + 8) / 12]

23/03/10 18:14:02 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

In [74]:
from pyspark.sql import functions as F

In [75]:
df = spark.read.parquet('fhvhv/2021/02/*')

In [76]:
df.registerTempTable('fhvhv')

In [80]:
df_zones = spark.read.parquet('zones')

df_zones.columns

df.columns

df_zones.registerTempTable('zones')

spark.sql("""
SELECT
    pul.Zone AS pu_do_pair,
    COUNT(1)
FROM 
    fhvhv fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID
GROUP BY 
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()

[Stage 62:>                                                         (0 + 8) / 8]

+-------------------+--------+
|         pu_do_pair|count(1)|
+-------------------+--------+
|Crown Heights North|  231279|
|       East Village|  221244|
|        JFK Airport|  188867|
|     Bushwick South|  187929|
|      East New York|  186780|
+-------------------+--------+



                                                                                

In [50]:
spark.sql("""
SELECT
    COUNT(*)
FROM
    fhvhv
WHERE
    DATE(pickup_datetime) = '2021-06-15'
""").show()

[Stage 31:>                                                         (0 + 8) / 8]

+--------+
|count(1)|
+--------+
|  452470|
+--------+





In [61]:
from pyspark.sql.functions import col, asc,desc
df \
    .withColumn('duration_seconds', (df.dropoff_datetime.cast('long')-df.pickup_datetime.cast('long'))) \
    .orderBy(col('duration_seconds').desc()) \
    .show()

[Stage 39:>                                                         (0 + 8) / 8]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|duration_seconds|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------------+
|              B02872|2021-06-25 13:55:41|2021-06-28 08:48:25|          98|         265|      N|                B02872|          240764|
|              B02765|2021-06-22 12:09:45|2021-06-23 13:42:44|         188|         198|      N|                B02765|           91979|
|              B02879|2021-06-27 10:32:29|2021-06-28 06:31:20|          78|         169|      N|                B02879|           71931|
|              B02800|2021-06-26 22:37:11|2021-06-27 16:49:01|         263|          36|      N|                  null|           65510|
|              B02682|2021-06-23 20:40:43



In [62]:
df_zones = spark.read.parquet('zones/')

In [63]:
zpu = df_zones \
    .withColumnRenamed('Zone', 'PUzone') \
    .withColumnRenamed('LocationID', 'zPULocationID') \
    .withColumnRenamed('Borough', 'PUBorough') \
    .drop('service_zone')
zdo = df_zones \
    .withColumnRenamed('Zone', 'DOzone') \
    .withColumnRenamed('LocationID', 'zDOLocationID') \
    .withColumnRenamed('Borough', 'DOBorough') \
    .drop('service_zone')

In [64]:
df_join_temp = df.join(zpu, df.PULocationID == zpu.zPULocationID)
df_join = df_join_temp.join(zdo, df_join_temp.DOLocationID == zdo.zDOLocationID)

In [65]:
df_join.drop('PULocationID', 'DOLocationID', 'zPULocationID', 'zDOLocationID').write.parquet('tmp/homework/6')

[Stage 42:>                                                         (0 + 8) / 8]

23/03/10 19:06:28 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

In [67]:
df_join = spark.read.parquet('tmp/homework/6')
df_join

AttributeError: module 'numpy' has no attribute 'float64'

In [68]:
df_join.registerTempTable('join_table')

In [69]:
spark.sql("""
SELECT
    CONCAT(coalesce(PUzone, 'Unknown'), '/', coalesce(DOzone, 'Unknown')) AS zone_pair,
    COUNT(1)
FROM
    join_table
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT
    1
;
""").show()

[Stage 45:>                                                         (0 + 8) / 8]

+--------------------+--------+
|           zone_pair|count(1)|
+--------------------+--------+
|East New York/Eas...|   47926|
+--------------------+--------+



                                                                                