In [1]:
import pyspark
from pyspark.sql import SparkSession

print("pyspark version:", pyspark.__version__)

pyspark version: 4.1.1


In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/23 08:17:29 WARN Utils: Your hostname, Robs-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.0.197 instead (on interface en0)
26/02/23 08:17:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/23 08:17:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df_green = spark.read.option("recursiveFileLookup", "true").parquet("data/pq/green")

In [4]:
df_green.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2020-01-16 23:31:44|  2020-01-16 23:46:21|                 N|         1|          65|          56|              1|          8.6|       24.5|  0.5|    0.

In [5]:
df_green.rdd

MapPartitionsRDD[11] at javaToPython at NativeMethodAccessorImpl.java:0

In [6]:
df_green.rdd.take(5)

                                                                                

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 16, 23, 31, 44), lpep_dropoff_datetime=datetime.datetime(2020, 1, 16, 23, 46, 21), store_and_fwd_flag='N', RatecodeID=1, PULocationID=65, DOLocationID=56, passenger_count=1, trip_distance=8.6, fare_amount=24.5, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=25.8, payment_type=2, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 14, 15, 1, 42), lpep_dropoff_datetime=datetime.datetime(2020, 1, 14, 15, 28, 14), store_and_fwd_flag='N', RatecodeID=1, PULocationID=129, DOLocationID=141, passenger_count=5, trip_distance=5.16, fare_amount=20.5, extra=0.0, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=24.05, payment_type=2, trip_type=1, congestion_surcharge=2.75),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 24, 5, 55), lpep_dropoff

```sql
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
```

In [7]:
rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

In [8]:
# rdd.filter(lambda row: False).take(1)

In [9]:
from datetime import datetime

In [10]:
start = datetime(year = 2020, month = 1, day = 1)

def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

In [11]:
rows = rdd.take(10)

                                                                                

In [12]:
row = rows[0]

In [13]:
row

Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 16, 23, 31, 44), PULocationID=65, total_amount=25.8)

In [14]:
def prepare_for_grouping(row):
    hour = row.lpep_pickup_datetime.replace(minute = 0, second = 0, microsecond = 0)
    zone = row.PULocationID
    key = (hour, zone)

    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)

In [15]:
# rdd \
#     .filter(filter_outliers) \
#     .map(prepare_for_grouping) \
#     .take(10)

In [16]:
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

In [17]:
# rdd \
#     .filter(filter_outliers) \
#     .map(prepare_for_grouping) \
#     .reduceByKey(calculate_revenue) \
#     .take(10)

In [18]:
def unwrap(row):
    return (row[0][0], row[0][1], row[1][0], row[1][1])

In [19]:
# rdd \
#     .filter(filter_outliers) \
#     .map(prepare_for_grouping) \
#     .reduceByKey(calculate_revenue) \
#     .map(unwrap) \
#     .take(10)

In [20]:
# rdd \
#     .filter(filter_outliers) \
#     .map(prepare_for_grouping) \
#     .reduceByKey(calculate_revenue) \
#     .map(unwrap) \
#     .toDF() \
#     .show()

In [21]:
from collections import namedtuple

In [22]:
RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

In [23]:
def unwrap(row):
    return RevenueRow(
        hour = row[0][0], 
        zone = row[0][1],
        revenue = row[1][0],
        count = row[1][1]
    )

In [24]:
# df_result = rdd \
#     .filter(filter_outliers) \
#     .map(prepare_for_grouping) \
#     .reduceByKey(calculate_revenue) \
#     .map(unwrap) \
#     .toDF()

In [25]:
from pyspark.sql import types

In [26]:
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

In [27]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema)

In [28]:
df_result.show()



+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2020-01-16 23:00:00|  65|160.46000000000004|   11|
|2020-01-24 05:00:00| 242|             62.87|    1|
|2020-01-03 08:00:00|  74|1130.3599999999985|   91|
|2020-01-16 21:00:00| 181| 198.0300000000001|   16|
|2020-01-31 11:00:00|  41| 712.6800000000002|   49|
|2020-01-11 07:00:00| 117|            197.79|    4|
|2020-01-16 06:00:00|  42|            337.77|   14|
|2020-01-23 19:00:00| 250|             22.37|    1|
|2020-01-22 12:00:00|  74| 1000.179999999999|   65|
|2020-01-16 09:00:00|  74|1392.6799999999987|   98|
|2020-01-10 18:00:00|  82| 839.0399999999995|   59|
|2020-01-13 21:00:00|  71|            110.32|    4|
|2020-01-28 20:00:00| 130|382.94000000000005|   25|
|2020-01-11 21:00:00|  33|208.21000000000004|   12|
|2020-01-15 19:00:00|  41| 725.9799999999996|   46|
|2020-01-15 22:00:00| 260|            154.49|   11|
|2020-01-27 

                                                                                

In [29]:
df_result.write.parquet('tmp/green-revenue', mode = 'overwrite')

26/02/23 08:20:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
Traceback (most recent call last):
  File "/Users/rob/Projects/GitHub/data-engineering-zoomcamp/07-batch-processing/.venv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 233, in manager
    code = worker(sock, authenticated)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/rob/Projects/GitHub/data-engineering-zoomcamp/07-batch-processing/.venv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 87, in worker
    outfile.flush()
BrokenPipeError: [Errno 32] Broken pipe
                                                                                

In [30]:
df_result.rdd.getNumPartitions()

8

In [31]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']

duration_rdd = df_green \
    .select(columns) \
    .rdd

In [32]:
import pandas as pd

In [33]:
rows = duration_rdd.take(10)

                                                                                

In [34]:
df = pd.DataFrame(rows, columns = columns)

In [35]:
columns

['VendorID',
 'lpep_pickup_datetime',
 'PULocationID',
 'DOLocationID',
 'trip_distance']

In [36]:
df

Unnamed: 0,VendorID,lpep_pickup_datetime,PULocationID,DOLocationID,trip_distance
0,2.0,2020-01-16 23:31:44,65,56,8.6
1,2.0,2020-01-14 15:01:42,129,141,5.16
2,,2020-01-24 05:55:00,242,261,16.27
3,2.0,2020-01-31 08:19:03,166,238,1.42
4,,2020-01-12 18:54:00,41,235,4.62
5,2.0,2020-01-11 17:13:41,66,49,2.67
6,2.0,2020-01-29 19:23:03,260,226,0.64
7,2.0,2020-01-26 11:45:13,116,116,0.43
8,2.0,2020-01-03 08:58:57,74,75,1.38
9,2.0,2020-01-27 21:56:19,41,74,1.0


In [37]:
# model = ...

def model_predict(df):
    # y_pred = model.predict(df)
    y_pred = df.trip_distance * 5
    return y_pred

In [38]:
def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns = columns)
    predictions = model_predict(df)
    df['predicted_duration'] = predictions
    
    for row in df.itertuples():
        yield row

In [39]:
duration_rdd \
    .mapPartitions(apply_model_in_batch) \
    .take(10)

                                                                                

[Pandas(Index=0, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-16 23:31:44'), PULocationID=65, DOLocationID=56, trip_distance=8.6, predicted_duration=43.0),
 Pandas(Index=1, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-14 15:01:42'), PULocationID=129, DOLocationID=141, trip_distance=5.16, predicted_duration=25.8),
 Pandas(Index=2, VendorID=nan, lpep_pickup_datetime=Timestamp('2020-01-24 05:55:00'), PULocationID=242, DOLocationID=261, trip_distance=16.27, predicted_duration=81.35),
 Pandas(Index=3, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-31 08:19:03'), PULocationID=166, DOLocationID=238, trip_distance=1.42, predicted_duration=7.1),
 Pandas(Index=4, VendorID=nan, lpep_pickup_datetime=Timestamp('2020-01-12 18:54:00'), PULocationID=41, DOLocationID=235, trip_distance=4.62, predicted_duration=23.1),
 Pandas(Index=5, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-11 17:13:41'), PULocationID=66, DOLocationID=49, trip_distance=2.67, predicted_duration=13.3

In [40]:
df_predicts = duration_rdd \
    .mapPartitions(apply_model_in_batch) \
    .toDF() \
    .drop('Index')

                                                                                

In [41]:
# df_predicts.show()

In [42]:
# df_predicts.printSchema()

In [43]:
df_predicts.select('predicted_duration').show()

[Stage 11:>                                                         (0 + 1) / 1]

+------------------+
|predicted_duration|
+------------------+
|              43.0|
|              25.8|
|             81.35|
|               7.1|
|              23.1|
|             13.35|
|               3.2|
|              2.15|
|6.8999999999999995|
|               5.0|
|               7.1|
|41.150000000000006|
|             33.55|
|               0.0|
|             41.25|
|7.1499999999999995|
|              51.2|
|              3.05|
| 8.100000000000001|
|17.150000000000002|
+------------------+
only showing top 20 rows


                                                                                