In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()



In [None]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz

In [1]:
!wc -l fhvhv_tripdata_2021-01.csv

 11908469 fhvhv_tripdata_2021-01.csv


In [12]:
#Trying inferSchema from the YT video comments instead of pandas hack.
df = spark.read.option("header", "true").option("inferSchema", "true").csv('fhvhv_tripdata_2021-01.csv')

                                                                                

In [14]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('SR_Flag', IntegerType(), True)])

In [17]:
from pyspark.sql import types

schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [18]:
df = spark.read.option("header", "true").option("schema", schema).csv('fhvhv_tripdata_2021-01.csv')

In [None]:
df = df.repartition(24)
df.write.parquet('fhvhv/2021/01')

In [20]:
df = spark.read.parquet('fhvhv/2021/01/')

In [22]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [25]:
df.select('pickup_datetime','dropoff_datetime','PULocationID','DOLocationID').filter(df.hvfhs_license_num == 'HV0003').show()

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-01 15:14:32|2021-01-01 15:21:59|          66|          97|
|2021-01-01 22:56:30|2021-01-01 23:02:40|         170|         100|
|2021-01-01 00:47:14|2021-01-01 00:57:43|         225|         189|
|2021-01-02 19:40:27|2021-01-02 19:48:31|          94|          18|
|2021-01-01 17:52:38|2021-01-01 17:55:34|         215|         215|
|2021-01-02 23:15:31|2021-01-02 23:22:44|          89|          89|
|2021-01-02 10:03:21|2021-01-02 10:10:20|         241|         220|
|2021-01-01 16:41:22|2021-01-01 16:43:45|         123|         123|
|2021-01-01 01:06:43|2021-01-01 01:15:39|         236|         170|
|2021-01-01 02:24:07|2021-01-01 02:38:18|          74|          47|
|2021-01-02 09:33:58|2021-01-02 09:53:41|         171|         130|
|2021-01-02 12:01:48|2021-01-02 12:21:45|       

In [35]:
from pyspark.sql import functions as F

In [36]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date','dropoff_date','PULocationID','DOLocationID') \
    .show()

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-01-01|  2021-01-01|         101|         101|
| 2021-01-01|  2021-01-01|          66|          97|
| 2021-01-01|  2021-01-01|         170|         100|
| 2021-01-01|  2021-01-01|         225|         189|
| 2021-01-02|  2021-01-02|          42|         244|
| 2021-01-02|  2021-01-02|          94|          18|
| 2021-01-02|  2021-01-02|         221|         206|
| 2021-01-01|  2021-01-01|         215|         215|
| 2021-01-02|  2021-01-02|          89|          89|
| 2021-01-02|  2021-01-02|          22|          55|
| 2021-01-02|  2021-01-02|         241|         220|
| 2021-01-01|  2021-01-01|         123|         123|
| 2021-01-02|  2021-01-02|         223|          24|
| 2021-01-01|  2021-01-01|         236|         170|
| 2021-01-01|  2021-01-01|          74|          47|
| 2021-01-02|  2021-01-02|         171|       

In [5]:
### Homework
!wc -l fhv_tripdata_2019-10.csv
spark.version

 1897494 fhv_tripdata_2019-10.csv


'3.5.1'

In [8]:
df = spark.read.option("header", "true").option("inferSchema", "true").csv('fhv_tripdata_2019-10.csv')

                                                                                

In [9]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PUlocationID', IntegerType(), True), StructField('DOlocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [10]:
df = df.repartition(6)
df.write.parquet('fhv/2019/10')

                                                                                

In [13]:
df = spark.read.parquet('fhv/2019/10/')

In [18]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B01239|2019-10-01 10:53:00|2019-10-01 11:08:27|         264|          20|   NULL|                B01239|
|              B02735|2019-10-02 14:11:37|2019-10-02 14:36:15|         264|         259|   NULL|                B02969|
|              B01339|2019-10-01 11:47:14|2019-10-01 12:19:20|         264|         239|   NULL|                B01339|
|              B02546|2019-10-01 14:04:54|2019-10-01 14:05:55|         264|         159|   NULL|                B02546|
|              B01087|2019-10-01 21:04:29|2019-10-01 21:31:54|         162|         265|   NULL|                B01087|
|              B02103|2019-10-02 16:14:5

In [28]:
df.createOrReplaceTempView('2019_10_fhv_trips')

In [48]:
spark.sql("""
    SELECT 
        *,
        (UNIX_TIMESTAMP(dropOff_datetime) - UNIX_TIMESTAMP(pickup_datetime)) / 3600 AS trip_duration_hours
    FROM `2019_10_fhv_trips`
    ORDER BY trip_duration_hours DESC
    LIMIT 25
""").show()


+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|trip_duration_hours|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+
|              B02832|2019-10-28 09:00:00|2091-10-28 09:30:00|         264|         264|   NULL|                B02832|           631152.5|
|              B02832|2019-10-11 18:00:00|2091-10-11 18:30:00|         264|         264|   NULL|                B02832|           631152.5|
|              B02416|2019-10-31 23:46:33|2029-11-01 00:13:00|        NULL|        NULL|   NULL|                B02416|  87672.44083333333|
|     B00746         |2019-10-01 21:43:42|2027-10-01 21:45:23|         159|         264|   NULL|       B00746         |  70128.02805555555|
|              B0292

In [50]:
spark.sql("""
    SELECT 
        PUlocationID,
        COUNT(*) as pickup_count
    FROM `2019_10_fhv_trips`
    GROUP BY PUlocationID
    ORDER BY pickup_count
    LIMIT 10
""").show()


+------------+------------+
|PUlocationID|pickup_count|
+------------+------------+
|           2|           1|
|         105|           2|
|         111|           5|
|          30|           8|
|         120|          14|
|          12|          15|
|         207|          23|
|          27|          25|
|         154|          26|
|           8|          29|
+------------+------------+

