In [24]:
import pandas as pd
from pyspark.sql import SparkSession, types

In [25]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName('test') \
        .getOrCreate()

In [26]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True), 
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID',types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True)
])

In [27]:
df = spark.read \
     .option("header", "true") \
     .schema(schema) \
     .csv('/home/myothet/repos/data-engineering/batch/fhvhv_tripdata_2021-01.csv')

In [28]:
df = df.repartition(4)

In [29]:
df.write.parquet('fhvhv/2021/01', mode="overwrite")

                                                                                

In [30]:
df = spark.read.parquet('fhvhv/2021/01')

In [43]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [32]:
columns = ["pickup_datetime", "dropoff_datetime", "PULocationID", "DOLocationID"]

In [33]:
df.select(columns) \
  .filter(df.hvfhs_license_num =='HV0003') \
  .show()

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-01 03:25:05|2021-01-01 03:33:04|          69|         247|
|2021-01-02 10:18:43|2021-01-02 10:36:45|          61|          76|
|2021-01-01 17:37:36|2021-01-01 17:44:07|         147|         147|
|2021-01-01 01:15:03|2021-01-01 01:28:24|          95|          28|
|2021-01-01 10:04:05|2021-01-01 10:08:19|         216|         216|
|2021-01-01 14:12:07|2021-01-01 14:19:55|          69|         169|
|2021-01-01 21:12:36|2021-01-01 21:21:01|          47|          78|
|2021-01-01 00:46:51|2021-01-01 01:02:46|          51|         159|
|2021-01-02 08:51:20|2021-01-02 09:23:34|          35|         205|
|2021-01-01 02:27:23|2021-01-01 02:40:18|          39|         219|
|2021-01-02 09:10:13|2021-01-02 09:28:27|           7|          17|
|2021-01-01 04:04:32|2021-01-01 04:15:29|       

In [34]:
df.select(columns) \
  .filter(df.hvfhs_license_num =='HV0003') \
  .take(5)

[Row(pickup_datetime=datetime.datetime(2021, 1, 1, 3, 25, 5), dropoff_datetime=datetime.datetime(2021, 1, 1, 3, 33, 4), PULocationID=69, DOLocationID=247),
 Row(pickup_datetime=datetime.datetime(2021, 1, 2, 10, 18, 43), dropoff_datetime=datetime.datetime(2021, 1, 2, 10, 36, 45), PULocationID=61, DOLocationID=76),
 Row(pickup_datetime=datetime.datetime(2021, 1, 1, 17, 37, 36), dropoff_datetime=datetime.datetime(2021, 1, 1, 17, 44, 7), PULocationID=147, DOLocationID=147),
 Row(pickup_datetime=datetime.datetime(2021, 1, 1, 1, 15, 3), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 28, 24), PULocationID=95, DOLocationID=28),
 Row(pickup_datetime=datetime.datetime(2021, 1, 1, 10, 4, 5), dropoff_datetime=datetime.datetime(2021, 1, 1, 10, 8, 19), PULocationID=216, DOLocationID=216)]

In [35]:
df.select(columns) \
  .filter(df.hvfhs_license_num =='HV0003') \
  .head(5)

[Row(pickup_datetime=datetime.datetime(2021, 1, 1, 3, 25, 5), dropoff_datetime=datetime.datetime(2021, 1, 1, 3, 33, 4), PULocationID=69, DOLocationID=247),
 Row(pickup_datetime=datetime.datetime(2021, 1, 2, 10, 18, 43), dropoff_datetime=datetime.datetime(2021, 1, 2, 10, 36, 45), PULocationID=61, DOLocationID=76),
 Row(pickup_datetime=datetime.datetime(2021, 1, 1, 17, 37, 36), dropoff_datetime=datetime.datetime(2021, 1, 1, 17, 44, 7), PULocationID=147, DOLocationID=147),
 Row(pickup_datetime=datetime.datetime(2021, 1, 1, 1, 15, 3), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 28, 24), PULocationID=95, DOLocationID=28),
 Row(pickup_datetime=datetime.datetime(2021, 1, 1, 10, 4, 5), dropoff_datetime=datetime.datetime(2021, 1, 1, 10, 8, 19), PULocationID=216, DOLocationID=216)]

In [36]:
from pyspark.sql import functions as F

In [37]:
df \
  .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
  .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
  .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
  .show()

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-01-01|  2021-01-01|         225|         177|
| 2021-01-01|  2021-01-01|          69|         247|
| 2021-01-01|  2021-01-01|         225|         218|
| 2021-01-02|  2021-01-02|          61|          76|
| 2021-01-01|  2021-01-01|         113|         138|
| 2021-01-01|  2021-01-01|          36|         255|
| 2021-01-01|  2021-01-01|         238|         137|
| 2021-01-01|  2021-01-01|          28|         130|
| 2021-01-01|  2021-01-01|         147|         147|
| 2021-01-02|  2021-01-02|          21|         258|
| 2021-01-01|  2021-01-01|         249|         238|
| 2021-01-02|  2021-01-02|          81|         265|
| 2021-01-01|  2021-01-01|         241|         167|
| 2021-01-01|  2021-01-01|          95|          28|
| 2021-01-01|  2021-01-01|         216|         216|
| 2021-01-01|  2021-01-01|          10|       

In [47]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 ==0:
        return f"s/{num:03x}"
    elif num %3 == 0:
        return f"a/{num:03x}"
    else:
        return f"e/{num:03x}"

In [48]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [49]:
df \
  .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
  .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
  .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
  .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
  .show()

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/9ce| 2021-01-01|  2021-01-01|         225|         177|
|  e/b30| 2021-01-01|  2021-01-01|          69|         247|
|  e/9ce| 2021-01-01|  2021-01-01|         225|         218|
|  e/b42| 2021-01-02|  2021-01-02|          61|          76|
|  e/9ce| 2021-01-01|  2021-01-01|         113|         138|
|  e/9ce| 2021-01-01|  2021-01-01|          36|         255|
|  e/9ce| 2021-01-01|  2021-01-01|         238|         137|
|  e/9ce| 2021-01-01|  2021-01-01|          28|         130|
|  e/b47| 2021-01-01|  2021-01-01|         147|         147|
|  e/9ce| 2021-01-02|  2021-01-02|          21|         258|
|  e/9ce| 2021-01-01|  2021-01-01|         249|         238|
|  e/9ce| 2021-01-02|  2021-01-02|          81|         265|
|  e/9ce| 2021-01-01|  2021-01-01|         241|         167|
|  e/b48| 2021-01-01|  2