In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("test") \
    .getOrCreate()

24/02/26 19:56:49 WARN Utils: Your hostname, hp-computer resolves to a loopback address: 127.0.1.1; using 192.168.178.105 instead (on interface wlp0s20f3)
24/02/26 19:56:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/02/26 19:56:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### 5.5.1 Operations on Spark RDDs

In [2]:
# Loading the green dataset
df_green = spark.read.parquet("../data/data/raw/green/*/*")

                                                                                

**Task:** Converting this SQL query to RDD computations

```sql
SELECT
    data_trunc('hour', lpep_pickup_datetime) AS hour,
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(*) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2;
```

In [3]:
df_green.show()

[Stage 1:>                                                          (0 + 1) / 1]

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2019-12-18 16:52:30|  2019-12-18 16:54:39|                 N|       1.0|         264|         264|            5.0|          0.0|        3.5|  0.5|    0.

                                                                                

### `WHERE` vs `filter()`

Accessing the underlying RDD:

In [4]:
rdd = df_green \
    .select("lpep_pickup_datetime", "PULocationID", "total_amount") \
    .rdd

In [5]:
# Take 1st row of all records
rdd.filter(lambda row: True).take(1)

                                                                                

[Row(lpep_pickup_datetime=datetime.datetime(2019, 12, 18, 16, 52, 30), PULocationID=264, total_amount=4.81)]

In [6]:
# Take 1st row of no records
rdd.filter(lambda row: False).take(1)

                                                                                

[]

In [7]:
from datetime import datetime
from collections import namedtuple
from pyspark.sql import types

In [8]:
start = datetime(year=2021, month=1, day=1)

def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

def prepare_for_grouping(row):
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)

    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)

def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value

    output_amount = left_amount + right_amount
    output_count = left_count + right_count

    return (output_amount, output_count)

RevenueRow = namedtuple("RevenueRow", ["hour", "zone", "revenue", "count"])

# Un-Nest the rows to a single tuple per row
def unwrap(row):
    return RevenueRow(hour=row[0][0], 
                      zone=row[0][1], 
                      revenue=row[1][0], 
                      count=row[1][1])

result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True), 
    types.StructField('zone', types.IntegerType(), True), 
    types.StructField('revenue', types.DoubleType(), True), 
    types.StructField('count', types.IntegerType(), True)
])

In [9]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema)

In [10]:
df_result.write.parquet(
    "tmp/green-revenue", 
    mode="overwrite"
)

[Stage 7:>                                                          (0 + 8) / 8]

24/02/26 19:58:14 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

In [11]:
df_result.show()

+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2021-10-01 03:00:00|  74|             31.96|    3|
|2021-10-01 03:00:00|  82|              14.3|    1|
|2021-10-01 03:00:00|  42|             45.91|    2|
|2021-10-01 04:00:00|  95|               8.8|    1|
|2021-10-01 05:00:00|  92|              30.3|    1|
|2021-10-01 05:00:00| 244|             98.46|    2|
|2021-10-01 06:00:00| 129|              70.1|    4|
|2021-10-01 06:00:00|  41|              10.3|    1|
|2021-10-01 07:00:00|   7|              39.6|    1|
|2021-10-01 07:00:00| 159|             99.05|    1|
|2021-10-01 07:00:00|  71|             97.85|    1|
|2021-10-01 08:00:00| 200|121.19999999999999|    2|
|2021-10-01 08:00:00|  24|               8.8|    1|
|2021-10-01 08:00:00| 152|             28.55|    1|
|2021-10-01 09:00:00| 146|               5.3|    1|
|2021-10-01 09:00:00|  74|480.67000000000024|   36|
|2021-10-01 

### 5.5.2 Spark RDD mapPartition

*Example Appliction*: Using data from spark dataframe for machine learning

In [14]:
columns = [
    "VendorID", 
    "lpep_pickup_datetime", 
    "PULocationID", 
    "DOLocationID", 
    "trip_distance"
]

duration_rdd = df_green.select(columns).rdd

In [15]:
duration_rdd.take(5)

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2019, 12, 18, 16, 52, 30), PULocationID=264, DOLocationID=264, trip_distance=0.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 1, 1, 45, 58), PULocationID=66, DOLocationID=65, trip_distance=1.28),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 1, 1, 41, 38), PULocationID=181, DOLocationID=228, trip_distance=2.47),
 Row(VendorID=1, lpep_pickup_datetime=datetime.datetime(2020, 1, 1, 1, 52, 46), PULocationID=129, DOLocationID=263, trip_distance=6.3),
 Row(VendorID=1, lpep_pickup_datetime=datetime.datetime(2020, 1, 1, 1, 19, 57), PULocationID=210, DOLocationID=150, trip_distance=2.3)]

In [25]:
import pandas as pd

In [26]:
rows = duration_rdd.take(10)

In [29]:
# model = ...

def model_precit(df):
    # y_pred = model.predict(df)
    y_pred = df.trip_distance * 5
    return y_pred

In [31]:
def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    predictions = model_precit(df)
    df["predicted_duration"] = predictions
    
    for row in df.itertuples():
        yield row

In [33]:
duration_rdd \
    .mapPartitions(apply_model_in_batch) \
    .toDF() \
    .select("predicted_duration") \
    .show()

[Stage 21:>                                                         (0 + 1) / 1]

+------------------+
|predicted_duration|
+------------------+
|               0.0|
|               6.4|
|12.350000000000001|
|              31.5|
|              11.5|
|              15.0|
|             13.85|
|24.900000000000002|
|              3.55|
|               4.0|
|               7.6|
|              19.0|
|5.6000000000000005|
|             28.35|
|               9.3|
|               7.1|
|              77.4|
|              5.75|
|              11.5|
|               5.0|
+------------------+
only showing top 20 rows



                                                                                