What is RDD and how is it related to dataframe
From DataFrame to RDD
Operators on RDDs: map, filter, reduceByKey
From RDD to DataFrame

Resilient Distributed Datasets (RDDs) are the main abstraction provided by Spark and consist of collection of elements partitioned accross the nodes of the cluster.

Dataframes are actually built on top of RDDs and contain a schema as well, which plain RDDs do not.

In [3]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

23/03/21 23:32:10 WARN Utils: Your hostname, Jafas-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.130 instead (on interface en0)
23/03/21 23:32:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/21 23:32:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/21 23:32:12 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
df_green = spark.read.parquet('data/pq/green/*/*')

                                                                                

```
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
```

In [5]:
rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

In [6]:
print(type(df_green.rdd.take(1)[0]))

[Stage 1:>                                                          (0 + 1) / 1]

<class 'pyspark.sql.types.Row'>


                                                                                

In [7]:
df_green.rdd.take(3)


[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 28, 9, 54, 40), lpep_dropoff_datetime=datetime.datetime(2020, 1, 28, 10, 6, 19), store_and_fwd_flag='N', RatecodeID=1, PULocationID=92, DOLocationID=82, passenger_count=1, trip_distance=4.08, fare_amount=14.5, extra=0.0, mta_tax=0.5, tip_amount=1.5, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=16.8, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 8, 20, 11, 55), lpep_dropoff_datetime=datetime.datetime(2020, 1, 8, 20, 14, 35), store_and_fwd_flag='N', RatecodeID=1, PULocationID=189, DOLocationID=181, passenger_count=1, trip_distance=0.56, fare_amount=4.0, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=5.3, payment_type=2, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 10, 11, 23), lpep_dropoff_date

In [8]:
from datetime import datetime

In [9]:
start = datetime(year=2020, month=1, day=1)

def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

In [10]:
rows = rdd.take(10)
row = rows[0]

In [11]:
# A Row is a special object pyspark.sql.types.Row.

row

Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 28, 9, 54, 40), PULocationID=92, total_amount=16.8)

# Operations on RDDs: 
## filter, map, reduceByKey, map, toDF
### .filter() because we don’t want outliers
### .map() to generate intermediate results better suited for aggregation
### .reduceByKey() to merge the values for each key
### .map() to unwrap the rows
### .toDF() to return the rows to a dataframe properly

In [None]:
# This returns the first row.
rdd.filter(lambda row: True).take(1)

# This filters the while dataset and returns an empty list.
rdd.filter(lambda row: False).take(1)

# This returns even numbers.
# See https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.RDD.filter.html
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x % 2 == 0).collect()

[Stage 5:>                                                          (0 + 1) / 1]

### We will use this to filter a Timestamp.



In [1]:
from datetime import datetime

start = datetime(year=2020, month=1, day=1)

rdd.filter(lambda row: riw.lpep_pickup_datetime >= start).take(5)

# Better, because with lambda we gets messy quiet fast.
def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

rdd \
    .filter(filter_outliers)
    .take(3)

IndentationError: unexpected indent (911761849.py, line 13)

## To implement the equivalent of GroupBy, we need the .map() function. A map() applied a transformation to every Row and return a transformed RDD.

We create a function that transforms a row by creating a tuple composed of a key and a value. The key is a tuple of hour and zone, the same two columns of the GROUP BY. The value is a tuple of amount and number of records, the same two columns that are returned by the SQL query above.

In [29]:
def prepare_for_grouping(row): 
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .take(5)

[((datetime.datetime(2020, 1, 28, 9, 0), 92), (16.8, 1)),
 ((datetime.datetime(2020, 1, 8, 20, 0), 189), (5.3, 1)),
 ((datetime.datetime(2020, 1, 10, 11, 0), 196), (13.46, 1)),
 ((datetime.datetime(2020, 1, 23, 15, 0), 260), (6.3, 1)),
 ((datetime.datetime(2020, 1, 17, 10, 0), 197), (10.8, 1))]

.reduceByKey()
 (5.5.1) Reduce by key

Now, we will aggregate this RDD transformed RDD by the key.

To do so, we will use pyspark.RDD.reduceByKey to merge the values for each key using an associative and commutative reduce function.

Here, left_value and rigth_value are tuple of amount and number of records.

In [None]:
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .take(5)

ERROR:root:KeyboardInterrupt while sending command.==>              (3 + 1) / 4]
Traceback (most recent call last):
  File "/Users/jafa/opt/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/jafa/opt/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/jafa/opt/anaconda3/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


23/03/09 21:04:44 ERROR Executor: Exception in task 0.0 in stage 10.0 (TID 16): Connection reset


In [None]:
# The nested Row structure isn’t really nice to use, and we want to fall back to a data frame.

# To do so, we apply another map function to transform the rows into the desired columns.

In [10]:
from collections import namedtuple

In [11]:
RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

In [12]:
def unwrap(row):
    return RevenueRow(
        hour=row[0][0], 
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1]
    )
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .take(5)

### Returning to a dataframe
To return to a dataframe properly, we want to fix the schema.



In [13]:
from pyspark.sql import types

In [14]:
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

In [None]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema)

df_result.show(5)

In [None]:
df_result.printSchema()

In [16]:
df_result.write.parquet('tmp/green-revenue')

                                                                                

## mapPartitions() returns a new RDD by applying a function to each partition of this RDD.



In [17]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']

duration_rdd = df_green \
    .select(columns) \
    .rdd
duration_rdd.take(5)

## Below is a two simples codes that helps to understand how mapPartitions() works.

The code below returns [1] for each partitions and flattens the list.

In [None]:
def apply_model_in_batch(partition):
    return [1]

rdd.mapPartitions(apply_model_in_batch).collect()
# [1, 1, 1, 1]

In [None]:
def apply_model_in_batch(partition):
    cnt = 0

    for row in partition:
        cnt = cnt + 1

    return [cnt]

rdd.mapPartitions(apply_model_in_batch).collect()
# [1141148, 436983, 433476, 292910]

In [None]:
import pandas as pd

def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    cnt = len(df)
    return [cnt]

duration_rdd.mapPartitions(apply_model_in_batch).collect()
# [1141148, 436983, 433476, 292910]

## It put the entire partition in a dataframe, which isn’t always good. If you want to solve it, you can use .islice() to break a partition into 100,000 rows subpartitions and treat them them separately.

In [19]:
# rows = duration_rdd.take(10)

In [20]:
# df = pd.DataFrame(rows, columns=columns)

In [None]:
# model = ...

def model_predict(df):
    # y_pred = model.predict(df)
    y_pred = df.trip_distance * 5
    return y_pred

def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    predictions = model_predict(df)
    df['predicted_duration'] = predictions

    for row in df.itertuples():
        yield row

df_predicts = duration_rdd \
    .mapPartitions(apply_model_in_batch) \
    .toDF() \
    .drop('Index')

df_predicts.select('predicted_duration').show(10)

Here, itertuples() iterates over DataFrame rows as namedtuples.



In [None]:
rows = duration_rdd.take(5)
df = pd.DataFrame(rows, columns=columns)
list(df.itertuples())

The yield keyword in Python is similar to a return statement used for returning values or objects in Python. However, there is a slight difference. The yield statement returns a generator object to the one who calls the function which contains yield, instead of simply returning a value.

In [None]:
def infinite_seq(flag: bool):
    i = 0
    while True:
        yield i
        i = i + 1

        if flag and i > 10:
            break

In [None]:
seq = infinite_seq(False)

In [None]:
seq
# But, this produces a finite sequence.


In [None]:
seq = infinite_seq(True)

In [None]:
seq

In [None]:
list(seq)