# Resilient Distributed Datasets (RDDs)

## RDDs simple operations: map and reduce

Dataframes are implemented on top of another data structure called **Resilient Distributed Dataset**, or **RDD**, which is a lower level abstraction.

Since Spark added support for dataframes and SQL, in general there is no need to work directly with RDDs, which is more complex and time-consuming. But knowing how they work can help us understand how to make better use of Spark.

One difference is that dataframes follow a specific structure, have a schema. On the contrary RDDs have **no schema**, they are just a distributed collection of objects partitioned accross the nodes of the cluster.

### Reproduce query with RDDs

We want to implement with RDDs the following query from `04_groupby_and_joins.ipynb` notebook.

```
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour,
    PULocationID AS revenue_zone,
    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green_data
WHERE lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
```

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("rdds") \
    .getOrCreate()

24/02/23 18:07:07 WARN Utils: Your hostname, GRAD0365UBUNTU resolves to a loopback address: 127.0.1.1; using 192.168.68.103 instead (on interface wlp0s20f3)
24/02/23 18:07:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/02/23 18:07:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/23 18:07:08 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df_green = spark.read.parquet("../data/pq/green/*/*")

### From dataframe to RDD

Spark dataframes have an attribute called **rdd**, which contains the raw RDD of the dataframe. The RDD's objects used for building the dataframe are called **Rows**.

Below we apply several useful functions:
* **`filter()`** returns a new RDD cointaining only the elements that satisfy a predicate.
* **`map()`** takes an RDD as input, transforms it with a function and returns a new RDD.
* **`reduceByKey()`** method, which will take all records with the same key and put them together in a single record.

In [4]:
df_green.rdd

MapPartitionsRDD[7] at javaToPython at NativeMethodAccessorImpl.java:0

In [5]:
df_green.rdd.take(3)

                                                                                

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 12, 18, 15, 4), lpep_dropoff_datetime=datetime.datetime(2020, 1, 12, 18, 19, 52), store_and_fwd_flag='N', RatecodeID=1, PULocationID=41, DOLocationID=41, passenger_count=1, trip_distance=0.78, fare_amount=5.5, extra=0.0, mta_tax=0.5, tip_amount=1.58, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=7.88, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 31, 20, 24, 10), lpep_dropoff_datetime=datetime.datetime(2020, 1, 31, 20, 31, 51), store_and_fwd_flag='N', RatecodeID=1, PULocationID=173, DOLocationID=70, passenger_count=1, trip_distance=0.98, fare_amount=7.0, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=8.3, payment_type=2, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 7, 8, 16, 53), lpep_dropoff_dat

### Filter

Let's select only the columns we need for our query, and then filter based on the WHERE clause.

In [6]:
rdd = df_green \
    .select("lpep_pickup_datetime", "PULocationID", "total_amount") \
    .rdd

rdd.take(3)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 12, 18, 15, 4), PULocationID=41, total_amount=7.88),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 31, 20, 24, 10), PULocationID=173, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 7, 8, 16, 53), PULocationID=74, total_amount=23.46)]

In [7]:
from datetime import datetime

start = datetime(year=2020, month=1, day=1)

def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

rdd.filter(filter_outliers).take(3)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 12, 18, 15, 4), PULocationID=41, total_amount=7.88),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 31, 20, 24, 10), PULocationID=173, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 7, 8, 16, 53), PULocationID=74, total_amount=23.46)]

### Map

We need to generate intermediate results in a very similar way to the original SQL query, so we will need to create the composite key `(hour, revenue_zone)` and a composite value `(amount, count)`, which are the 2 halves of each record that the executors will generate:
```
Each record: (key, value) = ((hour, revenue_zone), (amount, count))
```
Once we have a function that generates the record, we will use the **`map()`** method, which takes an RDD, transforms it with a function (our key-value function) and returns a new RDD.

In [8]:
row = rdd.take(1)[0]
row

Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 12, 18, 15, 4), PULocationID=41, total_amount=7.88)

In [9]:
row.lpep_pickup_datetime.replace(minute=0, second=0)

datetime.datetime(2020, 1, 12, 18, 0)

In [10]:
def prepare_for_grouping(row):
    # key = (hour, revenue_zone)
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0)
    revenue_zone = row.PULocationID
    key = (hour, revenue_zone)

    # value = (amount, count)
    amount = row.total_amount
    count = 1
    value = (amount, count)
    
    return (key, value)

In [11]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .take(5)

[((datetime.datetime(2020, 1, 12, 18, 0), 41), (7.88, 1)),
 ((datetime.datetime(2020, 1, 31, 20, 0), 173), (8.3, 1)),
 ((datetime.datetime(2020, 1, 7, 8, 0), 74), (23.46, 1)),
 ((datetime.datetime(2020, 1, 15, 14, 0), 25), (7.3, 1)),
 ((datetime.datetime(2020, 1, 31, 10, 0), 259), (25.54, 1))]

### ReduceByKey

We now need to use the **`reduceByKey()`** method, which will reduce all the records with the same key to a single record. Since we want to count the total amount and the total number of records, we just need to add the values.

In [12]:
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

In [13]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .take(5)

                                                                                

[((datetime.datetime(2020, 1, 13, 10, 0), 165), (55.43000000000001, 3)),
 ((datetime.datetime(2020, 1, 23, 10, 0), 43), (315.40000000000003, 19)),
 ((datetime.datetime(2020, 1, 2, 9, 0), 116), (296.54, 17)),
 ((datetime.datetime(2020, 1, 31, 21, 0), 41), (588.1599999999999, 40)),
 ((datetime.datetime(2020, 1, 26, 0, 0), 258), (56.92, 1))]

### From RDD to dataframe

The nested structure above is not very useful to work with. To turn the structure back to a dataframe we first apply another **`map()`** (to unnest what we got after the reduce operation) and **`toDF()`** method with the desired schema.


In [14]:
def unwrap(row):
    return (row[0][0], row[0][1], row[1][0], row[1][1])

In [15]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .take(5)

                                                                                

[(datetime.datetime(2020, 1, 13, 10, 0), 165, 55.43000000000001, 3),
 (datetime.datetime(2020, 1, 23, 10, 0), 43, 315.40000000000003, 19),
 (datetime.datetime(2020, 1, 2, 9, 0), 116, 296.54, 17),
 (datetime.datetime(2020, 1, 31, 21, 0), 41, 588.1599999999999, 40),
 (datetime.datetime(2020, 1, 26, 0, 0), 258, 56.92, 1)]

Finally, we turn the structure back to a dataframe.

In [16]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF() \
    .show(5)

                                                                                

+-------------------+---+------------------+---+
|                 _1| _2|                _3| _4|
+-------------------+---+------------------+---+
|2020-01-13 10:00:00|165| 55.43000000000001|  3|
|2020-01-23 10:00:00| 43|315.40000000000003| 19|
|2020-01-02 09:00:00|116|            296.54| 17|
|2020-01-31 21:00:00| 41| 588.1599999999999| 40|
|2020-01-26 00:00:00|258|             56.92|  1|
+-------------------+---+------------------+---+
only showing top 5 rows



The column names have been lost in the first `map()` operation, so we can do it a bit differently to keep the names.

In [17]:
from collections import namedtuple

RevenueRow = namedtuple("RevenueRow", ["hour", "revenue_zone", "revenue", "count"])

def unwrap(row):
    return RevenueRow(
        hour=row[0][0],
        revenue_zone=row[0][1],
        revenue=row[1][0], 
        count=row[1][1]
    )

In [18]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF()

                                                                                

In [19]:
df_result.show(5)

+-------------------+------------+------------------+-----+
|               hour|revenue_zone|           revenue|count|
+-------------------+------------+------------------+-----+
|2020-01-13 10:00:00|         165| 55.43000000000001|    3|
|2020-01-23 10:00:00|          43|315.40000000000003|   19|
|2020-01-02 09:00:00|         116|            296.54|   17|
|2020-01-31 21:00:00|          41| 588.1599999999999|   40|
|2020-01-26 00:00:00|         258|             56.92|    1|
+-------------------+------------+------------------+-----+
only showing top 5 rows



In [20]:
df_result.schema

StructType([StructField('hour', TimestampType(), True), StructField('revenue_zone', LongType(), True), StructField('revenue', DoubleType(), True), StructField('count', LongType(), True)])

We want to change the schema to the one below:

```
StructType([
    StructField('hour', TimestampType(), True),
    StructField('revenue_zone', IntegerType(), True),
    StructField('revenue', DoubleType(), True),
    StructField('count', IntegerType(), True)
])
```

In [21]:
from pyspark.sql.types import StructType, StructField, TimestampType, IntegerType, DoubleType

In [22]:
result_schema = StructType([
    StructField('hour', TimestampType(), True),
    StructField('revenue_zone', IntegerType(), True),
    StructField('revenue', DoubleType(), True),
    StructField('count', IntegerType(), True)
])

In [23]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema)

In [24]:
df_result.show(5)



+-------------------+------------+------------------+-----+
|               hour|revenue_zone|           revenue|count|
+-------------------+------------+------------------+-----+
|2020-01-13 10:00:00|         165| 55.43000000000001|    3|
|2020-01-23 10:00:00|          43|315.40000000000003|   19|
|2020-01-02 09:00:00|         116|            296.54|   17|
|2020-01-31 21:00:00|          41| 588.1599999999999|   40|
|2020-01-26 00:00:00|         258|             56.92|    1|
+-------------------+------------+------------------+-----+
only showing top 5 rows



                                                                                

In [25]:
df_result.printSchema()

root
 |-- hour: timestamp (nullable = true)
 |-- revenue_zone: integer (nullable = true)
 |-- revenue: double (nullable = true)
 |-- count: integer (nullable = true)



In [26]:
df_result.write.parquet("../data/tmp/green-revenue", mode="overwrite")

[Stage 21:>                                                         (0 + 8) / 8]

24/02/23 18:43:20 WARN MemoryManager: Total allocation exceeds 95,00% (906.992.014 bytes) of heap memory
Scaling row group sizes to 96,54% for 7 writers
24/02/23 18:43:20 WARN MemoryManager: Total allocation exceeds 95,00% (906.992.014 bytes) of heap memory
Scaling row group sizes to 84,47% for 8 writers
24/02/23 18:43:22 WARN MemoryManager: Total allocation exceeds 95,00% (906.992.014 bytes) of heap memory
Scaling row group sizes to 96,54% for 7 writers


                                                                                

## Spark RDD mapPartitions

Similar to **`map()`** function, **`mapPartitions()`** gets in a partition and outputs another partition. However, they differ in the process:
* `map()` gets in an RDD, it is applied to every element of this RDD, and for every element it produces another element and in the end it creates another RDD.
* `mapPartitions()` gets a partition (RDD), applies a function to the entire partition or chunk of data, and outputs another partition (RDD).

Thanks to this, `mapPartitions()` is a convenient method for dealing with large datasets, since it allows us to separate them into chunks that we can process more easily. For applications such as Machine Learning this behavior is very useful. In this case, if we pass the ML model to `mapPartitions()`, it will apply it to each chunk and combine the results.

Let's imagine we want to create an application or a service that predicts the duration of a trip.

In [27]:
df_green.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [28]:
columns = ["VendorID", "lpep_pickup_datetime", "PULocationID", "DOLocationID", "trip_distance"]

df_green \
    .select(columns) \
    .show(5)

+--------+--------------------+------------+------------+-------------+
|VendorID|lpep_pickup_datetime|PULocationID|DOLocationID|trip_distance|
+--------+--------------------+------------+------------+-------------+
|       2| 2020-01-12 18:15:04|          41|          41|         0.78|
|       2| 2020-01-31 20:24:10|         173|          70|         0.98|
|       2| 2020-01-07 08:16:53|          74|         236|          2.7|
|       1| 2020-01-15 14:47:15|          25|          66|          0.8|
|    null| 2020-01-31 10:08:00|         259|          51|         2.33|
+--------+--------------------+------------+------------+-------------+
only showing top 5 rows



To apply a ML model, we convert the dataframe to an RDD.

In [29]:
duration_rdd = df_green \
    .select(columns) \
    .rdd
duration_rdd.take(5)

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 12, 18, 15, 4), PULocationID=41, DOLocationID=41, trip_distance=0.78),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 31, 20, 24, 10), PULocationID=173, DOLocationID=70, trip_distance=0.98),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 7, 8, 16, 53), PULocationID=74, DOLocationID=236, trip_distance=2.7),
 Row(VendorID=1, lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 14, 47, 15), PULocationID=25, DOLocationID=66, trip_distance=0.8),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 31, 10, 8), PULocationID=259, DOLocationID=51, trip_distance=2.33)]

In [30]:
# simple example of how mapPartitions works
# count the lines in each partition
def apply_model_in_batch(partition):
    cnt = 0
    for row in partition:
        cnt += 1
    
    return [cnt]

duration_rdd.mapPartitions(apply_model_in_batch).collect()

                                                                                

[746744, 418184, 219219, 215720, 212420, 199320, 183795, 109115]

In [31]:
import pandas as pd

def apply_model_in_batch(rows):
    # convert the partition to a pandas DataFrame
    df = pd.DataFrame(rows, columns=columns)
    cnt = len(df)

    return [cnt]

duration_rdd.mapPartitions(apply_model_in_batch).collect()

                                                                                

[746744, 418184, 219219, 215720, 212420, 199320, 183795, 109115]

In [32]:
# model = ...
def model_predict(df):
    # y_pred = model.predict(df)
    y_pred = df.trip_distance * 5
    return y_pred

In [33]:
def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    predictions = model_predict(df)
    df["predicted_duration"] = predictions
    
    for row in df.itertuples():
        yield row

    return [cnt]

In [34]:
duration_rdd.mapPartitions(apply_model_in_batch).take(10)

                                                                                

[Pandas(Index=0, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-12 18:15:04'), PULocationID=41, DOLocationID=41, trip_distance=0.78, predicted_duration=3.9000000000000004),
 Pandas(Index=1, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-31 20:24:10'), PULocationID=173, DOLocationID=70, trip_distance=0.98, predicted_duration=4.9),
 Pandas(Index=2, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-07 08:16:53'), PULocationID=74, DOLocationID=236, trip_distance=2.7, predicted_duration=13.5),
 Pandas(Index=3, VendorID=1.0, lpep_pickup_datetime=Timestamp('2020-01-15 14:47:15'), PULocationID=25, DOLocationID=66, trip_distance=0.8, predicted_duration=4.0),
 Pandas(Index=4, VendorID=nan, lpep_pickup_datetime=Timestamp('2020-01-31 10:08:00'), PULocationID=259, DOLocationID=51, trip_distance=2.33, predicted_duration=11.65),
 Pandas(Index=5, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-18 17:46:45'), PULocationID=177, DOLocationID=188, trip_distance=2.62, predicted_dura

In [35]:
df_predicts = duration_rdd \
    .mapPartitions(apply_model_in_batch) \
    .toDF() \
    .drop("Index")

                                                                                

In [36]:
df_predicts.show()

[Stage 28:>                                                         (0 + 1) / 1]

+--------+--------------------+------------+------------+-------------+------------------+
|VendorID|lpep_pickup_datetime|PULocationID|DOLocationID|trip_distance|predicted_duration|
+--------+--------------------+------------+------------+-------------+------------------+
|     2.0|                  {}|          41|          41|         0.78|3.9000000000000004|
|     2.0|                  {}|         173|          70|         0.98|               4.9|
|     2.0|                  {}|          74|         236|          2.7|              13.5|
|     1.0|                  {}|          25|          66|          0.8|               4.0|
|     NaN|                  {}|         259|          51|         2.33|             11.65|
|     2.0|                  {}|         177|         188|         2.62|13.100000000000001|
|     2.0|                  {}|          65|          97|         1.13|5.6499999999999995|
|     NaN|                  {}|         165|         123|         1.36| 6.800000000000001|

                                                                                