In [40]:
import pyspark
from pyspark.sql import SparkSession

In [41]:
spark = SparkSession.builder.master("local[*]").appName('test').getOrCreate()

In [42]:
spark

In [4]:
#!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz

--2025-03-07 16:42:40--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/035746e8-4e24-47e8-a3ce-edcf6d1b11c7?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20250307%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250307T164240Z&X-Amz-Expires=300&X-Amz-Signature=ccd331acc525584a44f459cf766ac44d7f82ed88ed767c13034b5beff4212697&X-Amz-SignedHeaders=host&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-01.csv.gz&response-content-type=application%2Foctet-stream [following]
--2025-03-07 16:42:40--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/035746e8-4e24-47e8-a3ce-edcf6d1b11c7?X-A

In [43]:
df = spark.read \
    .option("header", "true") \
    .csv('/home/jovyan/work/data/fhvhv_tripdata_2021-01.csv')

In [44]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True)])

In [45]:
from pyspark.sql import types

In [46]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [47]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('/home/jovyan/work/data/fhvhv_tripdata_2021-01.csv')

In [48]:
df = df.repartition(24)

In [49]:
df.write.parquet('fhvhv/2021/01/', mode='overwrite')

In [52]:
df = spark.read.parquet('fhvhv/2021/01/')

In [53]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02875|2021-01-04 09:54:16|2021-01-04 10:20:02|         152|         193|   NULL|
|           HV0003|              B02836|2021-01-03 16:45:51|2021-01-03 16:58:38|         208|         213|   NULL|
|           HV0003|              B02864|2021-01-04 15:51:39|2021-01-04 16:03:16|         145|         202|   NULL|
|           HV0003|              B02395|2021-01-02 08:15:37|2021-01-02 08:19:39|          78|         169|   NULL|
|           HV0005|              B02510|2021-01-02 06:11:21|2021-01-02 06:40:23|          17|          73|   NULL|
|           HV0003|              B02617|2021-01-01 17:46:05|2021-01-01 18:10:31|

In [54]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [55]:
from pyspark.sql import functions as F

In [56]:
# creat functions
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [57]:
crazy_stuff('B02884')

's/b44'

In [58]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [59]:
# add new columns
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/b3b| 2021-01-04|  2021-01-04|         152|         193|
|  e/b14| 2021-01-03|  2021-01-03|         208|         213|
|  e/b30| 2021-01-04|  2021-01-04|         145|         202|
|  e/95b| 2021-01-02|  2021-01-02|          78|         169|
|  e/9ce| 2021-01-02|  2021-01-02|          17|          73|
|  e/a39| 2021-01-01|  2021-01-01|         181|         107|
|  e/b38| 2021-01-02|  2021-01-02|         188|          72|
|  s/b13| 2021-01-01|  2021-01-01|          61|           4|
|  a/b49| 2021-01-03|  2021-01-03|         257|          54|
|  e/b47| 2021-01-04|  2021-01-04|          75|          75|
|  e/b35| 2021-01-01|  2021-01-02|         147|         175|
|  e/b32| 2021-01-05|  2021-01-05|          81|          81|
|  e/b42| 2021-01-01|  2021-01-01|          60|          60|
|  e/b48| 2021-01-04|  2

In [60]:
# select and filter data based on function created
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
  .filter(df.hvfhs_license_num == 'HV0003').show()

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-04 09:54:16|2021-01-04 10:20:02|         152|         193|
|2021-01-03 16:45:51|2021-01-03 16:58:38|         208|         213|
|2021-01-04 15:51:39|2021-01-04 16:03:16|         145|         202|
|2021-01-02 08:15:37|2021-01-02 08:19:39|          78|         169|
|2021-01-01 17:46:05|2021-01-01 18:10:31|         181|         107|
|2021-01-02 19:36:41|2021-01-02 19:43:00|         188|          72|
|2021-01-01 16:05:01|2021-01-01 16:26:27|          61|           4|
|2021-01-03 00:05:40|2021-01-03 00:13:45|         257|          54|
|2021-01-04 16:37:58|2021-01-04 16:46:13|          75|          75|
|2021-01-01 23:42:33|2021-01-02 00:06:29|         147|         175|
|2021-01-05 08:31:45|2021-01-05 08:38:00|          81|          81|
|2021-01-01 18:57:24|2021-01-01 19:01:41|       

In [61]:
df.show(5)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02875|2021-01-04 09:54:16|2021-01-04 10:20:02|         152|         193|   NULL|
|           HV0003|              B02836|2021-01-03 16:45:51|2021-01-03 16:58:38|         208|         213|   NULL|
|           HV0003|              B02864|2021-01-04 15:51:39|2021-01-04 16:03:16|         145|         202|   NULL|
|           HV0003|              B02395|2021-01-02 08:15:37|2021-01-02 08:19:39|          78|         169|   NULL|
|           HV0005|              B02510|2021-01-02 06:11:21|2021-01-02 06:40:23|          17|          73|   NULL|
+-----------------+--------------------+-------------------+-------------------+

In [62]:
# registr template so data can work with sql
df.registerTempTable('df')

In [63]:
spark.sql('select * from df limit 5').show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02875|2021-01-04 09:54:16|2021-01-04 10:20:02|         152|         193|   NULL|
|           HV0003|              B02836|2021-01-03 16:45:51|2021-01-03 16:58:38|         208|         213|   NULL|
|           HV0003|              B02864|2021-01-04 15:51:39|2021-01-04 16:03:16|         145|         202|   NULL|
|           HV0003|              B02395|2021-01-02 08:15:37|2021-01-02 08:19:39|          78|         169|   NULL|
|           HV0005|              B02510|2021-01-02 06:11:21|2021-01-02 06:40:23|          17|          73|   NULL|
+-----------------+--------------------+-------------------+-------------------+

In [64]:
spark.sql('select max(pickup_datetime), max(dropoff_datetime) from df').show()

+--------------------+---------------------+
|max(pickup_datetime)|max(dropoff_datetime)|
+--------------------+---------------------+
| 2021-01-31 23:59:59|  2021-02-01 03:23:22|
+--------------------+---------------------+



In [65]:
result = spark.sql('select max(pickup_datetime), max(dropoff_datetime) from df')

In [66]:
result.show()

+--------------------+---------------------+
|max(pickup_datetime)|max(dropoff_datetime)|
+--------------------+---------------------+
| 2021-01-31 23:59:59|  2021-02-01 03:23:22|
+--------------------+---------------------+



In [67]:
# another way to working with sql 
df.createOrReplaceTempView('df2')

In [68]:
spark.sql('select * from df2 limit 3').show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02875|2021-01-04 09:54:16|2021-01-04 10:20:02|         152|         193|   NULL|
|           HV0003|              B02836|2021-01-03 16:45:51|2021-01-03 16:58:38|         208|         213|   NULL|
|           HV0003|              B02864|2021-01-04 15:51:39|2021-01-04 16:03:16|         145|         202|   NULL|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+



In [69]:
spark.sql('select distinct dispatching_base_num from df2').show()

+--------------------+
|dispatching_base_num|
+--------------------+
|              B02876|
|              B03136|
|              B02877|
|              B02869|
|              B02883|
|              B02835|
|              B02884|
|              B02880|
|              B02878|
|              B02836|
|              B02872|
|              B02512|
|              B02867|
|              B02866|
|              B02871|
|              B02889|
|              B02844|
|              B02510|
|              B02888|
|              B02682|
+--------------------+
only showing top 20 rows



In [70]:
spark.sql('select distinct hvfhs_license_num from df2').show()

+-----------------+
|hvfhs_license_num|
+-----------------+
|           HV0004|
|           HV0005|
|           HV0003|
+-----------------+



In [71]:
spark.sql('select count(*) from df2').show()

+--------+
|count(1)|
+--------+
|11908468|
+--------+



In [72]:
spark.sql('''
    select 
        hvfhs_license_num,
        count(*) 
    from 
        df2
    group by 1
    order by 2 desc''').show()

+-----------------+--------+
|hvfhs_license_num|count(1)|
+-----------------+--------+
|           HV0003| 8704128|
|           HV0005| 3094325|
|           HV0004|  110015|
+-----------------+--------+



In [73]:
spark.stop()