In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]")\
    .appName("test")\
    .getOrCreate()

In [3]:
df = spark.read.parquet("fhvhv/2021/01/")

In [4]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [5]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID')\
    .filter(df.hvfhs_license_num == "HV0003")\
    .show()

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-02 10:25:05|2021-01-02 10:43:18|         188|          35|
|2021-01-01 19:13:07|2021-01-01 19:36:13|          18|         208|
|2021-01-02 15:58:09|2021-01-02 16:08:09|         249|         144|
|2021-01-02 05:32:21|2021-01-02 05:43:24|          81|          32|
|2021-01-01 02:57:42|2021-01-01 03:14:24|         232|          48|
|2021-01-01 18:25:54|2021-01-01 18:35:59|         244|         244|
|2021-01-01 05:06:53|2021-01-01 05:38:58|         229|          38|
|2021-01-02 08:52:47|2021-01-02 08:57:52|         107|         162|
|2021-01-02 15:12:35|2021-01-02 15:22:56|          78|         242|
|2021-01-02 23:36:12|2021-01-03 00:19:14|         138|         265|
|2021-01-02 00:30:11|2021-01-02 00:34:32|          48|         239|
|2021-01-03 02:19:44|2021-01-03 02:44:44|       

# Spark: User Define Function

This one can be an alternative way to aggregate data like we did in SQL but using Spark UDF instead

In [6]:
from pyspark.sql import functions as F
from pyspark.sql import types

In [7]:
df\
    .withColumn('pickup_date', F.to_date(df.pickup_datetime))\
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime))\
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID')\
    .show()

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-01-02|  2021-01-02|         188|          35|
| 2021-01-01|  2021-01-01|          18|         208|
| 2021-01-02|  2021-01-02|         249|         144|
| 2021-01-02|  2021-01-02|          81|          32|
| 2021-01-01|  2021-01-01|         232|          48|
| 2021-01-01|  2021-01-01|         244|         244|
| 2021-01-01|  2021-01-01|         229|          38|
| 2021-01-02|  2021-01-02|         107|         162|
| 2021-01-01|  2021-01-01|          76|          71|
| 2021-01-02|  2021-01-02|          78|         242|
| 2021-01-02|  2021-01-03|         138|         265|
| 2021-01-02|  2021-01-02|          48|         239|
| 2021-01-03|  2021-01-03|         254|         224|
| 2021-01-02|  2021-01-02|         138|         170|
| 2021-01-01|  2021-01-01|          50|         145|
| 2021-01-01|  2021-01-01|          80|       

In [8]:
# creating our own function

def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    else:
        return f'e/{num:03x}'
    
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [17]:
new_df = df\
    .withColumn('pickup_date', F.to_date(df.pickup_datetime))\
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime))\
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num))\
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID')

In [20]:
new_df.show()

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/a39| 2021-01-02|  2021-01-02|         188|          35|
|  e/acc| 2021-01-01|  2021-01-01|          18|         208|
|  s/b13| 2021-01-02|  2021-01-02|         249|         144|
|  s/b36| 2021-01-02|  2021-01-02|          81|          32|
|  e/acc| 2021-01-01|  2021-01-01|         232|          48|
|  e/acc| 2021-01-01|  2021-01-01|         244|         244|
|  e/b3e| 2021-01-01|  2021-01-01|         229|          38|
|  e/b3b| 2021-01-02|  2021-01-02|         107|         162|
|  e/9ce| 2021-01-01|  2021-01-01|          76|          71|
|  e/b30| 2021-01-02|  2021-01-02|          78|         242|
|  e/b48| 2021-01-02|  2021-01-03|         138|         265|
|  e/b38| 2021-01-02|  2021-01-02|          48|         239|
|  e/b3f| 2021-01-03|  2021-01-03|         254|         224|
|  e/9ce| 2021-01-02|  2

# Spark Aggregate

In [19]:
new_df.groupBy("base_id")\
    .count()\
    .show()

+-------+------+
|base_id| count|
+-------+------+
|  e/9d0| 44231|
|  e/b3b|735450|
|  s/b13|200129|
|  e/a7a|321599|
|  s/b3d|208986|
|  s/b44|257674|
|  e/b3e|312013|
|  e/b48|177542|
|  e/b38|924960|
|  e/95b|124107|
|  e/b42|241988|
|  e/b3f|216993|
|  e/b49|149398|
|  e/b1c|  3325|
|  e/b37|330085|
|  e/b40|119173|
|  s/af0|108146|
|  e/b30|316395|
|  e/b43|268391|
|  e/b35|452098|
+-------+------+
only showing top 20 rows



For multiple aggregate, we need to use Spark Functions and add *. Since Agg accept column expression we can define all column in list and we can use *list syntax to unpack and send as each element

In [49]:
new_df\
        .where((F.col("PULocationID")>200) & (F.col("DOLocationID")>200) )\
        .groupBy("base_id")\
        .agg(*[F.max("pickup_date").alias("Latest_Pickup_Date"),
          F.max("dropoff_date").alias("Latest_Dropoff_Date")])\
        .show()

+-------+------------------+-------------------+
|base_id|Latest_Pickup_Date|Latest_Dropoff_Date|
+-------+------------------+-------------------+
|  e/9d0|        2021-01-31|         2021-02-01|
|  e/b3b|        2021-01-31|         2021-02-01|
|  s/b13|        2021-01-31|         2021-02-01|
|  e/a7a|        2021-01-31|         2021-02-01|
|  s/b3d|        2021-01-31|         2021-02-01|
|  s/b44|        2021-01-31|         2021-02-01|
|  e/b3e|        2021-01-31|         2021-02-01|
|  e/b48|        2021-01-31|         2021-02-01|
|  e/b38|        2021-01-31|         2021-02-01|
|  e/95b|        2021-01-31|         2021-02-01|
|  e/b42|        2021-01-31|         2021-02-01|
|  e/b3f|        2021-01-31|         2021-02-01|
|  e/b49|        2021-01-31|         2021-02-01|
|  e/b1c|        2021-01-31|         2021-01-31|
|  e/b37|        2021-01-31|         2021-02-01|
|  e/b40|        2021-01-31|         2021-02-01|
|  s/af0|        2021-01-31|         2021-01-31|
|  e/b30|        202