In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
     .master("local[*]") \
     .appName("rdd") \
     .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/05 15:36:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/05 15:36:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
df_green = spark.read.parquet("data/pq/green/*/*")

                                                                                

# RDD Part 1 Acieve the same data processing with RDD functions

I would like to achieve the following actions with rdd

```
SELECT 
    -- Reveneue grouping 
    PULocationID AS zone,
    date_trunc("hour", lpep_pickup_datetime) AS hour, 

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM 
    green
WHERE
    lpep_pickup_datetime >= "2020-01-01 00:00:00"
GROUP BY 
    zone, hour
```

In [20]:
# keep needed columns
rdd = df_green \
    .select("lpep_pickup_datetime", "PULocationID", "total_amount") \
    .rdd

## step 1 filter pickup date begins from 2020-01-01
using rdd.filter()

In [11]:
from datetime import datetime

In [16]:
start = datetime(year=2020, month=1, day=1)

def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

In [17]:
rdd.filter(filter_outliers).take(1)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), PULocationID=74, total_amount=44.97)]

## step 2 group by 
using map()

In [21]:
rows = rdd.take(10)
row = rows[0]

In [24]:
row.lpep_pickup_datetime

datetime.datetime(2020, 1, 23, 13, 10, 15)

In [23]:
row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)

datetime.datetime(2020, 1, 23, 13, 0)

In [29]:
def prepare_for_grouping(row):
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)

    amount = row.total_amount
    count = 1
    value = (amount, count)
    
    return (key, value)

In [30]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .take(5)

[((datetime.datetime(2020, 1, 23, 13, 0), 74), (44.97, 1)),
 ((datetime.datetime(2020, 1, 20, 15, 0), 67), (33.45, 1)),
 ((datetime.datetime(2020, 1, 15, 20, 0), 260), (8.3, 1)),
 ((datetime.datetime(2020, 1, 5, 16, 0), 82), (8.3, 1)),
 ((datetime.datetime(2020, 1, 29, 19, 0), 166), (12.74, 1))]

## step 3 reduce records by keys 
same keys sum the values

In [31]:
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value

    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

In [32]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .take(5)

#the output is the composite key (datetime and zone_id) and value（total_revenue, records)

                                                                                

[((datetime.datetime(2020, 1, 23, 13, 0), 74), (1044.0499999999993, 60)),
 ((datetime.datetime(2020, 1, 20, 15, 0), 67), (79.5, 3)),
 ((datetime.datetime(2020, 1, 15, 20, 0), 260), (163.90000000000003, 14)),
 ((datetime.datetime(2020, 1, 5, 16, 0), 82), (500.4700000000002, 33)),
 ((datetime.datetime(2020, 1, 15, 11, 0), 179), (50.5, 5))]

## step 4 unnest the output

In [33]:
def unwrap(row):
    return (row[0][0], row[0][1], row[1][0], row[1][1])

In [34]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .take(5)

                                                                                

[(datetime.datetime(2020, 1, 23, 13, 0), 74, 1044.0499999999993, 60),
 (datetime.datetime(2020, 1, 20, 15, 0), 67, 79.5, 3),
 (datetime.datetime(2020, 1, 15, 20, 0), 260, 163.90000000000003, 14),
 (datetime.datetime(2020, 1, 5, 16, 0), 82, 500.4700000000002, 33),
 (datetime.datetime(2020, 1, 15, 11, 0), 179, 50.5, 5)]

In [35]:
## to dataframe

rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF() \
    .take(5)

                                                                                

[Row(_1=datetime.datetime(2020, 1, 23, 13, 0), _2=74, _3=1044.0499999999993, _4=60),
 Row(_1=datetime.datetime(2020, 1, 20, 15, 0), _2=67, _3=79.5, _4=3),
 Row(_1=datetime.datetime(2020, 1, 15, 20, 0), _2=260, _3=163.90000000000003, _4=14),
 Row(_1=datetime.datetime(2020, 1, 5, 16, 0), _2=82, _3=500.4700000000002, _4=33),
 Row(_1=datetime.datetime(2020, 1, 15, 11, 0), _2=179, _3=50.5, _4=5)]

add coloumn name

In [36]:
from collections import namedtuple

In [39]:
RevenueRow = namedtuple("RevenueRow", ["hour", "zone", "revenue", "count"])

In [40]:
def unwrap_with_column_name(row):
    return RevenueRow(
        hour = row[0][0], 
        zone=row[0][1], 
        revenue=row[1][0], 
        count=row[1][1]
    )

In [41]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap_with_column_name) \
    .toDF() \
    .take(5)

                                                                                

[Row(hour=datetime.datetime(2020, 1, 23, 13, 0), zone=74, revenue=1044.0499999999993, count=60),
 Row(hour=datetime.datetime(2020, 1, 20, 15, 0), zone=67, revenue=79.5, count=3),
 Row(hour=datetime.datetime(2020, 1, 15, 20, 0), zone=260, revenue=163.90000000000003, count=14),
 Row(hour=datetime.datetime(2020, 1, 5, 16, 0), zone=82, revenue=500.4700000000002, count=33),
 Row(hour=datetime.datetime(2020, 1, 15, 11, 0), zone=179, revenue=50.5, count=5)]

In [42]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap_with_column_name) \
    .toDF() 

                                                                                

## step 5 add schema

In [43]:
df_result.schema

StructType([StructField('hour', TimestampType(), True), StructField('zone', LongType(), True), StructField('revenue', DoubleType(), True), StructField('count', LongType(), True)])

In [93]:
from pyspark.sql import types

In [45]:
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True), 
    types.StructField('zone', types.IntegerType(), True), 
    types.StructField('revenue', types.DoubleType(), True), 
    types.StructField('count', types.IntegerType(), True)
])

In [46]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap_with_column_name) \
    .toDF(result_schema)

In [50]:
df_result.show(5)

[Stage 37:>                                                         (0 + 1) / 1]

+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2020-01-23 13:00:00|  74|1044.0499999999993|   60|
|2020-01-20 15:00:00|  67|              79.5|    3|
|2020-01-15 20:00:00| 260|163.90000000000003|   14|
|2020-01-05 16:00:00|  82| 500.4700000000002|   33|
|2020-01-15 11:00:00| 179|              50.5|    5|
+-------------------+----+------------------+-----+
only showing top 5 rows



                                                                                

In [49]:
df_result.write.parquet("data/tmp/green-revenue")

                                                                                

# RDD Part 2 mapPartition

handle paritions with functions using map, then output partitions again

the purpose of this session is to do machine learning to predict the trip distance

## step 1 count the number of records in each partition

In [59]:
columns = ["VendorID", "lpep_pickup_datetime", "PULocationID", "DOLocationID", "trip_distance"]

duration_rdd = df_green \
    .select(columns) \
    .rdd

In [64]:
def apply_model_in_batch(partition):
    cnt = 0

    for row in partition:
        cnt = cnt + 1
        
    return [cnt]

In [65]:
duration_rdd.mapPartitions(apply_model_in_batch).collect()
# it means I have 3 partitions

                                                                                

[1317800, 597023, 389694]

## step 2 using pandas DataFrame to count the records, but it consumes lots of memory

In [66]:
import pandas as pd

In [67]:
rows = duration_rdd.take(10)

In [69]:
pd.DataFrame(rows, columns=columns)

Unnamed: 0,VendorID,lpep_pickup_datetime,PULocationID,DOLocationID,trip_distance
0,2.0,2020-01-23 13:10:15,74,130,12.77
1,,2020-01-20 15:09:00,67,39,8.0
2,2.0,2020-01-15 20:23:41,260,157,1.27
3,2.0,2020-01-05 16:32:26,82,83,1.25
4,2.0,2020-01-29 19:22:42,166,42,1.84
5,2.0,2020-01-15 11:07:42,179,223,0.76
6,2.0,2020-01-16 08:22:29,41,237,3.32
7,2.0,2020-01-28 17:05:28,75,161,2.21
8,1.0,2020-01-22 14:51:37,152,166,0.9
9,2.0,2020-01-31 10:25:04,75,234,6.1


In [73]:
def apply_model_in_batch_pd(rows):
    df = pd.DataFrame(rows, columns=columns)
    cnt = len(df)     
    return [cnt]

# I choose 10 rows to define the function to count the records, and mapParitions applies it to the whole DataFrame

In [74]:
duration_rdd.mapPartitions(apply_model_in_batch).collect()

                                                                                

[1317800, 597023, 389694]

## step 3 do the machine learning

In [76]:
# understand the iterator
df = pd.DataFrame(rows, columns=columns)
list(df.itertuples())

[Pandas(Index=0, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-23 13:10:15'), PULocationID=74, DOLocationID=130, trip_distance=12.77),
 Pandas(Index=1, VendorID=nan, lpep_pickup_datetime=Timestamp('2020-01-20 15:09:00'), PULocationID=67, DOLocationID=39, trip_distance=8.0),
 Pandas(Index=2, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-15 20:23:41'), PULocationID=260, DOLocationID=157, trip_distance=1.27),
 Pandas(Index=3, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-05 16:32:26'), PULocationID=82, DOLocationID=83, trip_distance=1.25),
 Pandas(Index=4, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-29 19:22:42'), PULocationID=166, DOLocationID=42, trip_distance=1.84),
 Pandas(Index=5, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-15 11:07:42'), PULocationID=179, DOLocationID=223, trip_distance=0.76),
 Pandas(Index=6, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-16 08:22:29'), PULocationID=41, DOLocationID=237, trip_distance=3.32),
 Panda

In [83]:
#model = ...

def model_predict(df):
    y_pred = df.trip_distance * 5
    return y_pred

In [84]:
def apply_model_in_batch_predict(rows):
    df = pd.DataFrame(rows, columns=columns)
    predictions = model_predict(df)

    df["predicted_duration"] = predictions

    for row in df.itertuples():
        yield row

In [97]:
duration_rdd.mapPartitions(apply_model_in_batch_predict).take(10)

                                                                                

[Pandas(Index=0, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-23 13:10:15'), PULocationID=74, DOLocationID=130, trip_distance=12.77, predicted_duration=63.849999999999994),
 Pandas(Index=1, VendorID=nan, lpep_pickup_datetime=Timestamp('2020-01-20 15:09:00'), PULocationID=67, DOLocationID=39, trip_distance=8.0, predicted_duration=40.0),
 Pandas(Index=2, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-15 20:23:41'), PULocationID=260, DOLocationID=157, trip_distance=1.27, predicted_duration=6.35),
 Pandas(Index=3, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-05 16:32:26'), PULocationID=82, DOLocationID=83, trip_distance=1.25, predicted_duration=6.25),
 Pandas(Index=4, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-29 19:22:42'), PULocationID=166, DOLocationID=42, trip_distance=1.84, predicted_duration=9.200000000000001),
 Pandas(Index=5, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-15 11:07:42'), PULocationID=179, DOLocationID=223, trip_distance=0.7

In [98]:
df_predicts = duration_rdd \
    .mapPartitions(apply_model_in_batch_predict) \
    .toDF() \
    .drop("Index")

                                                                                

In [99]:
df_predicts.show(5)

[Stage 59:>                                                         (0 + 1) / 1]

+--------+--------------------+------------+------------+-------------+------------------+
|VendorID|lpep_pickup_datetime|PULocationID|DOLocationID|trip_distance|predicted_duration|
+--------+--------------------+------------+------------+-------------+------------------+
|     2.0|                  {}|          74|         130|        12.77|63.849999999999994|
|     NaN|                  {}|          67|          39|          8.0|              40.0|
|     2.0|                  {}|         260|         157|         1.27|              6.35|
|     2.0|                  {}|          82|          83|         1.25|              6.25|
|     2.0|                  {}|         166|          42|         1.84| 9.200000000000001|
+--------+--------------------+------------+------------+-------------+------------------+
only showing top 5 rows



                                                                                

In [131]:
# convert 
def apply_model_in_batch_predict_timestamp(rows):
    df = pd.DataFrame(rows, columns=columns) 
    # Convert the Pandas Timestamp to string format that Spark can understand
    df['lpep_pickup_datetime'] = df['lpep_pickup_datetime'].astype(str)

    predictions = model_predict(df)
    df["predicted_duration"] = predictions

    for row in df.itertuples(index=False):
        yield row

In [132]:
df_predicts_schema = duration_rdd \
    .mapPartitions(apply_model_in_batch_predict_timestamp) \
    .toDF() 

                                                                                

In [133]:
df_predicts_schema.show(5)

[Stage 71:>                                                         (0 + 1) / 1]

+--------+--------------------+------------+------------+-------------+------------------+
|VendorID|lpep_pickup_datetime|PULocationID|DOLocationID|trip_distance|predicted_duration|
+--------+--------------------+------------+------------+-------------+------------------+
|     2.0| 2020-01-23 13:10:15|          74|         130|        12.77|63.849999999999994|
|     NaN| 2020-01-20 15:09:00|          67|          39|          8.0|              40.0|
|     2.0| 2020-01-15 20:23:41|         260|         157|         1.27|              6.35|
|     2.0| 2020-01-05 16:32:26|          82|          83|         1.25|              6.25|
|     2.0| 2020-01-29 19:22:42|         166|          42|         1.84| 9.200000000000001|
+--------+--------------------+------------+------------+-------------+------------------+
only showing top 5 rows



                                                                                