In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/01/30 19:37:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
green_folderpath = 'data/pq/green/*/*'

df_green = spark.read.parquet(green_folderpath)

                                                                                

```sql
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS records_cnt
FROM
    green
WHERE
    lpep_pickup_datetime >= '2023-01-01 00:00:00'
GROUP BY
    1, 2
```

In [5]:
df_green.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [7]:
rdd_green = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

In [10]:
rdd_green.take(5)

[Row(lpep_pickup_datetime=datetime.datetime(2023, 3, 16, 21, 7, 37), PULocationID=7, total_amount=13.32),
 Row(lpep_pickup_datetime=datetime.datetime(2023, 3, 16, 17, 23, 11), PULocationID=75, total_amount=15.75),
 Row(lpep_pickup_datetime=datetime.datetime(2023, 3, 18, 22, 46, 26), PULocationID=66, total_amount=37.62),
 Row(lpep_pickup_datetime=datetime.datetime(2023, 3, 30, 16, 59, 44), PULocationID=43, total_amount=16.8),
 Row(lpep_pickup_datetime=datetime.datetime(2023, 3, 28, 11, 28, 46), PULocationID=42, total_amount=8.0)]

In [11]:
from datetime import datetime

# Filter
start_date = datetime(year=2023, month=1, day=1)
def filter_since_2023(row):
    return row.lpep_pickup_datetime >= start_date

# Map
def prepare_for_grouping(row):
    
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    records_cnt = 1
    value = (amount, records_cnt)
    
    return (key, value)

# Reduce
def aggregate(left_v, right_v):
    
    left_amount, left_records_cnt = left_v
    right_amount, right_records_cnt = right_v
    
    amount = left_amount + right_amount
    records_cnt = left_records_cnt + right_records_cnt
    
    return (amount, records_cnt)

In [12]:
from pyspark.sql import types

result_schema = types.StructType(
    [
        types.StructField('hour', types.TimestampType(), True),
        types.StructField('zone', types.IntegerType(), True),
        types.StructField('amount', types.DoubleType(), True),
        types.StructField('records_cnt', types.IntegerType(), True)
    ]
)

In [17]:
from collections import namedtuple

result_row = namedtuple(
    'result_row',
    ['hour', 'zone', 'amount', 'records_cnt']
)

def unwrap(row):
    return result_row(
        hour=row[0][0],
        zone=row[0][1],
        amount=row[1][0],
        records_cnt=row[1][1]
    )

In [20]:
df_result = rdd_green \
    .filter(filter_since_2023) \
    .map(prepare_for_grouping) \
    .reduceByKey(aggregate) \
    .map(unwrap) \
    .toDF(result_schema)

In [21]:
df_result.show(5)



+-------------------+----+-----------------+-----------+
|               hour|zone|           amount|records_cnt|
+-------------------+----+-----------------+-----------+
|2023-03-16 17:00:00|  75|           1109.5|         55|
|2023-03-04 21:00:00|  41|77.53999999999999|          6|
|2023-03-24 22:00:00|   7|             82.3|          4|
|2023-03-20 07:00:00|  74|846.2100000000003|         54|
|2023-03-03 19:00:00|  74|           447.23|         22|
+-------------------+----+-----------------+-----------+
only showing top 5 rows



                                                                                

In [22]:
result_folderpath = 'tmp/green-revenue'

df_result.write.parquet(result_folderpath)

                                                                                