# Resilient Distributed Datasets in Spark

It turns out DataFrames are implemented on top of RDDs.

- dataframes have schema
- RDD is simply a collection of objects

## Map and reduce



In [1]:
from pathlib import Path

from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder.master("local[*]").appName("test").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/24 14:14:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df_green = spark.read.parquet('../data/taxi_ingest_data/parts/green/*/*')

                                                                                

`.rdd` returns the underlying RDD

In [4]:
df_green.rdd.take(3)

                                                                                

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 12, 33, 7), lpep_dropoff_datetime=datetime.datetime(2020, 1, 23, 12, 38, 42), store_and_fwd_flag='N', RatecodeID=1, PULocationID=24, DOLocationID=41, passenger_count=1, trip_distance=0.9300000071525574, fare_amount=6.0, extra=0.0, mta_tax=0.5, tip_amount=1.3600000143051147, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.30000001192092896, total_amount=8.15999984741211, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 0, 7, 7), lpep_dropoff_datetime=datetime.datetime(2020, 1, 23, 0, 9, 47), store_and_fwd_flag='N', RatecodeID=1, PULocationID=193, DOLocationID=193, passenger_count=1, trip_distance=0.4699999988079071, fare_amount=4.0, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.30000001192092896, total_amount=5.300000190734863, payment_type=2, trip_type=1, congestion_surcharge=0.0),


The `Row` object is also returned if we `.take` from the DataFrame

In [5]:
df_green.take(3)

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 12, 33, 7), lpep_dropoff_datetime=datetime.datetime(2020, 1, 23, 12, 38, 42), store_and_fwd_flag='N', RatecodeID=1, PULocationID=24, DOLocationID=41, passenger_count=1, trip_distance=0.9300000071525574, fare_amount=6.0, extra=0.0, mta_tax=0.5, tip_amount=1.3600000143051147, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.30000001192092896, total_amount=8.15999984741211, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 0, 7, 7), lpep_dropoff_datetime=datetime.datetime(2020, 1, 23, 0, 9, 47), store_and_fwd_flag='N', RatecodeID=1, PULocationID=193, DOLocationID=193, passenger_count=1, trip_distance=0.4699999988079071, fare_amount=4.0, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.30000001192092896, total_amount=5.300000190734863, payment_type=2, trip_type=1, congestion_surcharge=0.0),


In [6]:
cols = ['lpep_pickup_datetime', 'PULocationID', 'total_amount']
rdd_green = df_green \
                .select(*cols) \
                .rdd

In [7]:
from datetime import datetime

In [8]:
start = datetime(year=2020, month=1, day=1)
def filter_datetime(row):
    """
    pyspark.RDD.filter callable
    only accepts row as argument
    """
    return row.lpep_pickup_datetime >= start

In [9]:
def prepare_for_groupby(row):
    """
    Map of MapReduce
    pyspark.RDD.map callable
    only accepts row as arg
    """
    # assigning our keys
    # truncate to datetime to hour
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    # key becomes a tuple
    key = (hour, zone)

    # assigning values 
    amount = row.total_amount
    count = 1
    # value's also a tuple so that we have a 1:1 key: value relation
    value = (amount, count)
    return (key, value)

In [15]:
def calc_revenue(left_val, right_val):
    """
    Reduce of MapReduce
    pyspark.RDD.reduceByKey callable
    The left, right terminalogy is identical to itertools.reduce
    where the reduce() func iterates through an array from left to right,
    collecting the results starting from left

    Since this will be used in reduceByKey, our call do not need
    to specify which keys to use; all unique keys will be used
    """
    # unpack our value
    # left is the cumulative val, so far
    left_amt, left_cnt = left_val
    # right is the new val
    right_amt, right_cnt = right_val

    # add them
    output_amt = left_amt + right_amt
    output_cnt = left_cnt + right_cnt

    # return as tuple to match existing format
    return (output_amt, output_cnt)

In [16]:
rdd_green \
    .filter(filter_datetime) \
    .map(prepare_for_groupby) \
    .reduceByKey(calc_revenue) \
    .take(10)

                                                                                

[((datetime.datetime(2020, 1, 3, 16, 0), 75), (1449.6299991607666, 87)),
 ((datetime.datetime(2020, 1, 11, 11, 0), 193), (43.55000042915344, 6)),
 ((datetime.datetime(2020, 1, 2, 14, 0), 263), (76.92000198364258, 4)),
 ((datetime.datetime(2020, 1, 4, 17, 0), 129), (524.7999963760376, 28)),
 ((datetime.datetime(2020, 1, 9, 10, 0), 123), (60.64000129699707, 2)),
 ((datetime.datetime(2020, 1, 1, 10, 0), 127), (23.850000381469727, 2)),
 ((datetime.datetime(2020, 1, 10, 18, 0), 181), (357.79000091552734, 17)),
 ((datetime.datetime(2020, 1, 22, 20, 0), 74), (655.1300024986267, 48)),
 ((datetime.datetime(2020, 1, 26, 12, 0), 76), (106.87999725341797, 6)),
 ((datetime.datetime(2020, 1, 3, 19, 0), 97), (769.689998626709, 46))]

Now we have our `revenue_amount` and `trip_counts` grouped by `hour` and `pickup_zone`. We can add a cosmetic fix to ungroup the `key-val` tuples to flatten it, and return it to dataframe by adding the schema as a `namedtuple` 

In [19]:
from collections import namedtuple
RevenueRow = namedtuple('RevenueRow', ['hour', 'pickup_zone', 'revenue', 'count'])
def unwrap(row):
    return RevenueRow(
        hour=row[0][0], 
        pickup_zone=row[0][1], 
        revenue=row[1][0], 
        count=row[1][1])

In [20]:
df_result = rdd_green \
    .filter(filter_datetime) \
    .map(prepare_for_groupby) \
    .reduceByKey(calc_revenue) \
    .map(unwrap) \
    .toDF()
df_result.show()

[Stage 11:>                                                         (0 + 1) / 1]

+-------------------+-----------+------------------+-----+
|               hour|pickup_zone|           revenue|count|
+-------------------+-----------+------------------+-----+
|2020-01-03 16:00:00|         75|1449.6299991607666|   87|
|2020-01-11 11:00:00|        193| 43.55000042915344|    6|
|2020-01-02 14:00:00|        263| 76.92000198364258|    4|
|2020-01-04 17:00:00|        129| 524.7999963760376|   28|
|2020-01-09 10:00:00|        123| 60.64000129699707|    2|
|2020-01-01 10:00:00|        127|23.850000381469727|    2|
|2020-01-10 18:00:00|        181|357.79000091552734|   17|
|2020-01-22 20:00:00|         74| 655.1300024986267|   48|
|2020-01-26 12:00:00|         76|106.87999725341797|    6|
|2020-01-03 19:00:00|         97|  769.689998626709|   46|
|2020-01-27 16:00:00|         65| 622.7200040817261|   31|
|2020-01-04 00:00:00|        166|145.34999990463257|   10|
|2020-01-09 07:00:00|         89|295.28000354766846|   11|
|2020-01-02 10:00:00|        213|119.89000129699707|    

                                                                                

In [21]:
df_result.schema

StructType([StructField('hour', TimestampType(), True), StructField('pickup_zone', LongType(), True), StructField('revenue', DoubleType(), True), StructField('count', LongType(), True)])

Recasting the schema during `toDF()` 

In [23]:
from pyspark.sql import types

res_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True), 
    types.StructField('pickup_zone', types.IntegerType(), True), 
    types.StructField('revenue', types.FloatType(), True), 
    types.StructField('count', types.IntegerType(), True)]
)

In [24]:
# all the operations
df_result = rdd_green \
    .filter(filter_datetime) \
    .map(prepare_for_groupby) \
    .reduceByKey(calc_revenue) \
    .map(unwrap) \
    .toDF(schema=res_schema)
df_result.show()



+-------------------+-----------+---------+-----+
|               hour|pickup_zone|  revenue|count|
+-------------------+-----------+---------+-----+
|2020-01-03 16:00:00|         75|  1449.63|   87|
|2020-01-11 11:00:00|        193|    43.55|    6|
|2020-01-02 14:00:00|        263|    76.92|    4|
|2020-01-04 17:00:00|        129|    524.8|   28|
|2020-01-09 10:00:00|        123|    60.64|    2|
|2020-01-01 10:00:00|        127|    23.85|    2|
|2020-01-10 18:00:00|        181|   357.79|   17|
|2020-01-22 20:00:00|         74|   655.13|   48|
|2020-01-26 12:00:00|         76|   106.88|    6|
|2020-01-03 19:00:00|         97|   769.69|   46|
|2020-01-27 16:00:00|         65|622.72003|   31|
|2020-01-04 00:00:00|        166|   145.35|   10|
|2020-01-09 07:00:00|         89|   295.28|   11|
|2020-01-02 10:00:00|        213|   119.89|    6|
|2020-01-23 16:00:00|        166|    975.0|   58|
|2020-01-26 22:00:00|         25|   190.99|   11|
|2020-01-07 12:00:00|        258|    51.85|    2|


                                                                                

The job detail will show two stages:

1. Map
2. Reduce

In [25]:
df_result.schema

StructType([StructField('hour', TimestampType(), True), StructField('pickup_zone', IntegerType(), True), StructField('revenue', FloatType(), True), StructField('count', IntegerType(), True)])

In [26]:
df_result.write.parquet('../data/reports/revenue/green_zones/')

                                                                                

## RDD mapPartitions

Signature: mapPartitions(`func`)

Description:

> Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type `Iterator<T> => Iterator<U>` when running on an RDD of type T.


`map` takes a func, and applies it to consume one element in the dataset to produce another element

`mapPartitions` takes a *partition* to return *another partition*

> RDD partition -> `mapPartitions` -> RDD partition

So why use it over `map`? When the dataset cannot fit in memory. If we had 1TB to `map` a prediction model, there wouldn't be enough RAM. `mapPartitions` can divvy up the dataset to apply it partition by partition

In [27]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']

# get RDD to make duration predictions on
duration_rdd = df_green \
    .select(columns) \
    .rdd

In [39]:
from functools import reduce
def apply_count_in_batch(part):
    """
    Applying to a partition
    Must return an iterator
    """
    # left is being accumulated as reduce iterates through part
    return [reduce(lambda left, _: left + 1, part, 0)]

In [42]:
duration_rdd.mapPartitions(apply_count_in_batch).collect()

                                                                                

[1314009, 573315, 525878, 389729]

The unevenness of the partitions will affect the execution times; the first partition is 3x larger and will take 3x longer. Could mitigate by repartitioning, but that is also an expensive operation...

instead of passing a spark partition, we could also pass a dataframe so that pandas operations can be leveraged for our predictions

In [43]:
import pandas as pd

In [44]:
some_rows = duration_rdd.take(10) # returns list[Rows]
# if columns were not specified, pd does not have the schema
df = pd.DataFrame(some_rows, columns=columns)
df

Unnamed: 0,VendorID,lpep_pickup_datetime,PULocationID,DOLocationID,trip_distance
0,2,2020-01-23 12:33:07,24,41,0.93
1,2,2020-01-23 00:07:07,193,193,0.47
2,2,2020-01-29 12:07:00,33,49,2.15
3,2,2020-01-03 15:22:00,224,45,2.35
4,2,2020-01-10 06:49:08,74,75,1.43
5,2,2020-01-27 18:02:49,226,129,3.65
6,2,2020-01-27 07:53:12,74,48,6.07
7,2,2020-01-03 16:03:57,75,75,0.82
8,2,2020-01-04 19:17:09,129,68,7.38
9,2,2020-01-06 15:55:47,61,225,0.48


In [45]:
def model_predict(df):
    """
    Some model that returns a series of predictions given a dataframe
    of features
    """
    # y_pred = model.predict(df)
    y_pred = df.trip_distance * 3
    return y_pred

In [75]:
def apply_model_in_batch(part):
    """
    Applying to a partition
    Must return an iterator
    """
    # columns is some var defined outside func scope
    df = pd.DataFrame(part, columns=columns)

    df['pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
    df = df.drop(columns=['lpep_pickup_datetime'])
    preds = model_predict(df)
    
    # add col to df
    df['pred_duration'] = preds
    # returns iterator where each row becomes a namedtuple
    # namedtuples are iterables and attributes can be accessed by name
    for row in df.itertuples(index=False,name="duration_features"):
        yield row

In [72]:
foo = namedtuple('features', ['datetime'])
bar = foo(datetime='2020-01-01')
bar.datetime

'2020-01-01'

In [73]:
bar.datetime = pd.to_datetime(bar.datetime)
bar.datetime

AttributeError: can't set attribute

Iterating through `itertuples` and accessing fields by attribute

In [70]:
rdd_dts = [row.lpep_pickup_datetime 
           for row in df.itertuples(index=False, name="duration_features")]
pd.to_datetime(rdd_dts[0])

Timestamp('2020-01-23 12:33:07')

In [78]:
df_preds_schema = types.StructType([
    types.StructField('VendorID', types.IntegerType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('trip_distance', types.FloatType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('pred_duration', types.FloatType(), True)]
    )

In [65]:
duration_rdd.take(1)[0].lpep_pickup_datetime

                                                                                

datetime.datetime(2020, 1, 23, 12, 33, 7)

In [81]:
df_preds = duration_rdd \
    .mapPartitions(apply_model_in_batch) \
    .toDF() # can set schema here

                                                                                

In [82]:
df_preds.show()

[Stage 30:>                                                         (0 + 1) / 1]

+--------+------------+------------+-------------------+---------------+------------------+
|VendorID|PULocationID|DOLocationID|      trip_distance|pickup_datetime|     pred_duration|
+--------+------------+------------+-------------------+---------------+------------------+
|       2|          24|          41| 0.9300000071525574|             {}| 2.790000021457672|
|       2|         193|         193| 0.4699999988079071|             {}|1.4099999964237213|
|       2|          33|          49| 2.1500000953674316|             {}| 6.450000286102295|
|       2|         224|          45| 2.3499999046325684|             {}| 7.049999713897705|
|       2|          74|          75| 1.4299999475479126|             {}| 4.289999842643738|
|       2|         226|         129| 3.6500000953674316|             {}|10.950000286102295|
|       2|          74|          48|  6.070000171661377|             {}| 18.21000051498413|
|       2|          75|          75| 0.8199999928474426|             {}| 2.45999

                                                                                

In [53]:
df_preds.select('pred_duration').show()

[Stage 20:>                                                         (0 + 1) / 1]

+------------------+
|     pred_duration|
+------------------+
| 2.790000021457672|
|1.4099999964237213|
| 6.450000286102295|
| 7.049999713897705|
| 4.289999842643738|
|10.950000286102295|
| 18.21000051498413|
| 2.459999978542328|
|22.140000343322754|
|1.4399999678134918|
| 4.080000042915344|
| 43.19999885559082|
|6.2099997997283936|
| 1.559999942779541|
|3.6000001430511475|
| 9.600000143051147|
| 44.09999942779541|
| 7.950000286102295|
| 3.539999842643738|
|12.390000343322754|
+------------------+
only showing top 20 rows



                                                                                

In [54]:
df_preds.schema

StructType([StructField('VendorID', LongType(), True), StructField('lpep_pickup_datetime', StructType([]), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('pred_duration', DoubleType(), True)])

In [41]:
part = "abcde"
apply_model_in_batch(part)

[5]

In [30]:
len(list(part))

10