In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz

--2025-03-03 12:14:24--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/035746e8-4e24-47e8-a3ce-edcf6d1b11c7?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20250303%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250303T121424Z&X-Amz-Expires=300&X-Amz-Signature=b9b510e352e6b2c703d5b5314ac3a90378bca0607577147df0c5027e70b9e023&X-Amz-SignedHeaders=host&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-01.csv.gz&response-content-type=application%2Foctet-stream [following]
--2025-03-03 12:14:24--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/035746e8-4e24-47e8-a3ce-edcf6d1b11c7?X-Amz-A

In [5]:
!gzip -d fhvhv_tripdata_2021-01.csv.gz

In [7]:
!wc -l fhvhv_tripdata_2021-01.csv

11908469 fhvhv_tripdata_2021-01.csv


In [10]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-01.csv')

In [14]:
!head -n 1001 fhvhv_tripdata_2021-01.csv > head.csv

In [16]:
import pandas as pd

In [17]:
df_pandas = pd.read_csv('head.csv')

In [18]:
df_pandas.dtypes

hvfhs_license_num        object
dispatching_base_num     object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
SR_Flag                 float64
dtype: object

In [20]:
spark.createDataFrame(df_pandas).schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('SR_Flag', DoubleType(), True)])

In [21]:
from pyspark.sql import types

In [24]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),  
    types.StructField('pickup_datetime', types.TimestampType(), True),  
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True),  
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [25]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-01.csv')

In [26]:
df.head(10)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 33, 44), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 49, 7), PULocationID=230, DOLocationID=166, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 55, 19), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 18, 21), PULocationID=152, DOLocationID=167, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 23, 56), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 38, 5), PULocationID=233, DOLocationID=142, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 42, 51), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 45, 50), PULocationID=142, DOLocationID=143, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_dat

In [36]:
df = df.repartition(24)

In [37]:
print(df.rdd.getNumPartitions())

24


In [38]:
df.write.parquet('fhvhv/2021/01')

In [None]:
ls -l 'fhvhv/2021/01'

In [53]:
df = spark.read.parquet('fhvhv/2021/01/')

In [54]:
df

DataFrame[hvfhs_license_num: string, dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int, SR_Flag: string]

In [55]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [61]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
    .filter(df['hvfhs_license_num'] == 'HV0003') \
    .show()

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-01 00:23:13|2021-01-01 00:30:35|         147|         159|
|2021-01-01 05:15:44|2021-01-01 05:34:17|         160|         191|
|2021-01-03 13:23:58|2021-01-03 13:31:10|          36|         157|
|2021-01-02 23:50:42|2021-01-02 23:54:53|         188|          72|
|2021-01-02 23:09:05|2021-01-03 00:05:24|         256|         256|
|2021-01-03 16:04:32|2021-01-03 16:15:24|         127|         241|
|2021-01-01 17:21:00|2021-01-01 17:32:58|          18|          20|
|2021-01-02 02:29:29|2021-01-02 02:39:30|         130|         134|
|2021-01-03 19:37:03|2021-01-03 20:15:18|         126|         265|
|2021-01-02 23:20:48|2021-01-02 23:54:31|         132|         211|
|2021-01-03 17:44:06|2021-01-03 17:48:43|          68|         158|
|2021-01-01 20:57:53|2021-01-01 21:26:54|       

In [62]:
from pyspark.sql import functions as F

In [68]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-01-01|  2021-01-01|         147|         159|
| 2021-01-01|  2021-01-01|         160|         191|
| 2021-01-03|  2021-01-03|         247|         244|
| 2021-01-03|  2021-01-03|          36|         157|
| 2021-01-02|  2021-01-02|         188|          72|
| 2021-01-03|  2021-01-03|         170|          90|
| 2021-01-02|  2021-01-03|         256|         256|
| 2021-01-03|  2021-01-03|         127|         241|
| 2021-01-01|  2021-01-01|          18|          20|
| 2021-01-01|  2021-01-01|          74|         249|
| 2021-01-02|  2021-01-02|         130|         134|
| 2021-01-03|  2021-01-03|         126|         265|
| 2021-01-02|  2021-01-02|         132|         211|
| 2021-01-03|  2021-01-03|          68|         158|
| 2021-01-01|  2021-01-01|          78|          94|
| 2021-01-01|  2021-01-01|          42|       

In [70]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [76]:
crazy_stuff('3333')

'a/14d'

In [77]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [79]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/acc| 2021-01-01|  2021-01-01|         147|         159|
|  e/acc| 2021-01-01|  2021-01-01|         160|         191|
|  e/9ce| 2021-01-03|  2021-01-03|         247|         244|
|  e/b38| 2021-01-03|  2021-01-03|          36|         157|
|  e/b3b| 2021-01-02|  2021-01-02|         188|          72|
|  e/9ce| 2021-01-03|  2021-01-03|         170|          90|
|  e/acc| 2021-01-02|  2021-01-03|         256|         256|
|  e/acc| 2021-01-03|  2021-01-03|         127|         241|
|  e/b3e| 2021-01-01|  2021-01-01|          18|          20|
|  e/9ce| 2021-01-01|  2021-01-01|          74|         249|
|  e/b3f| 2021-01-02|  2021-01-02|         130|         134|
|  a/b31| 2021-01-03|  2021-01-03|         126|         265|
|  e/b3b| 2021-01-02|  2021-01-02|         132|         211|
|  e/acc| 2021-01-03|  2