In [1]:
import pyspark
from pyspark.sql import SparkSession
from datetime import datetime

In [2]:
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('test')\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/02 16:01:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


For this session, we would like to replicate this query we've done in previous parts through the usage of rdds:

SELECT
    date_trunc("hour", lpep_pickup_datetime) AS hour,
    PULocationID AS zone,
     

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records

FROM 
    green
WHERE 
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY 
    1,2

In [3]:
# let's start by reading the green taxi dataset

df_green = spark.read.parquet('data/pq/green/*/*')

                                                                                

In [6]:
# Inside the dataframe, there is a field called rdd, which is the internal underline rdd
df_green.rdd.take(10)

                                                                                

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), lpep_dropoff_datetime=datetime.datetime(2020, 1, 23, 13, 38, 16), store_and_fwd_flag='N', RatecodeID=1, PULocationID=74, DOLocationID=130, passenger_count=1, trip_distance=12.77, fare_amount=36.0, extra=0.0, mta_tax=0.5, tip_amount=2.05, tolls_amount=6.12, ehail_fee=None, improvement_surcharge=0.3, total_amount=44.97, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 20, 15, 9), lpep_dropoff_datetime=datetime.datetime(2020, 1, 20, 15, 46), store_and_fwd_flag=None, RatecodeID=None, PULocationID=67, DOLocationID=39, passenger_count=None, trip_distance=8.0, fare_amount=29.9, extra=2.75, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=33.45, payment_type=None, trip_type=None, congestion_surcharge=None),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 20, 23, 41)

In [7]:
df_green.take(5)

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), lpep_dropoff_datetime=datetime.datetime(2020, 1, 23, 13, 38, 16), store_and_fwd_flag='N', RatecodeID=1, PULocationID=74, DOLocationID=130, passenger_count=1, trip_distance=12.77, fare_amount=36.0, extra=0.0, mta_tax=0.5, tip_amount=2.05, tolls_amount=6.12, ehail_fee=None, improvement_surcharge=0.3, total_amount=44.97, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 20, 15, 9), lpep_dropoff_datetime=datetime.datetime(2020, 1, 20, 15, 46), store_and_fwd_flag=None, RatecodeID=None, PULocationID=67, DOLocationID=39, passenger_count=None, trip_distance=8.0, fare_amount=29.9, extra=2.75, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=33.45, payment_type=None, trip_type=None, congestion_surcharge=None),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 20, 23, 41)

In [8]:
# Let's only keep the columns that we are interested in

rdd = df_green\
            .select('lpep_pickup_datetime','PULocationID','total_amount')\
            .rdd

In [9]:
rdd.take(5)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), PULocationID=74, total_amount=44.97),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 20, 15, 9), PULocationID=67, total_amount=33.45),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 20, 23, 41), PULocationID=260, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 5, 16, 32, 26), PULocationID=82, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 19, 22, 42), PULocationID=166, total_amount=12.74)]

In [16]:
rows = rdd.take(10)
row = rows[0]

In [21]:
row.total_amount

44.97

In [15]:
# first thing we can do is filter the data, implement the where statement from the query

start = datetime(year=2020,month=1, day=1)

# create a function to serve as a filter for the data, replicating the where condition
def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

In [27]:
# let's also create a function to transform the data so that it conforms to the map transformation.
# In order to do a groupby, we need to prepare the data to a key,value format
def prepare_for_grouping(row):
    # prepare the key portion of our data
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID

    key = (hour, zone)
    
    # prepare the value portion of our data
    amount = row.total_amount
    count = 1

    value = (amount, count)

    return (key, value)

In [30]:
# Now let's work on the reduce function, which needs to go by parts, so it'll take first two values from the same key and reduce them to one
# then it will continue with that reduced key value pair and perform the same operation with the next record sharing the same key. This same process will
# continue until it finishes.

def calculate_revenue(left_value, right_value):
    # first unpack the tuples for each set of values
    left_amount, left_count = left_value
    right_amount, right_count = right_value

    # Let's perform the operations we need, in this case calculate the amount and the count
    output_amount = left_amount + right_amount
    output_count = left_count + right_count

    # return the tuple with the results, which will be attached to the key    
    return (output_amount, output_count)

In [35]:
'''
Finally, the reduce by key function would return a key, value structure for each row. They'd look like this:
[((datetime.datetime(2020, 1, 15, 20, 0), 260), (163.90000000000003, 14)),
 ((datetime.datetime(2020, 1, 29, 19, 0), 166), (695.0099999999999, 45)),
 ((datetime.datetime(2020, 1, 16, 8, 0), 41), (736.1399999999996, 54)),
 ((datetime.datetime(2020, 1, 4, 20, 0), 129), (583.27, 38)),
 ((datetime.datetime(2020, 1, 2, 8, 0), 66), (197.69, 10)),
 ((datetime.datetime(2020, 1, 3, 9, 0), 61), (142.21, 9)),
 ((datetime.datetime(2020, 1, 17, 21, 0), 236), (33.6, 4)),
 ((datetime.datetime(2020, 1, 12, 12, 0), 82), (290.41, 14)),
 ((datetime.datetime(2020, 1, 28, 16, 0), 197), (831.4399999999998, 18)),
 ((datetime.datetime(2020, 1, 10, 22, 0), 95), (407.7100000000002, 37))]

 where each key is a tuple and each value is also a tuple of aggregated values. But this is not very manageable, so let's 
 turn it once again into a dataframe by unwrapping it.
'''

def unwrap(row):
    return (row[0][0], row[0][1], row[1][0], row[1][1])

In [37]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF() \
    .show()

                                                                                

+-------------------+---+------------------+---+
|                 _1| _2|                _3| _4|
+-------------------+---+------------------+---+
|2020-01-15 20:00:00|260|163.90000000000003| 14|
|2020-01-29 19:00:00|166| 695.0099999999999| 45|
|2020-01-16 08:00:00| 41| 736.1399999999996| 54|
|2020-01-04 20:00:00|129|            583.27| 38|
|2020-01-02 08:00:00| 66|            197.69| 10|
|2020-01-03 09:00:00| 61|            142.21|  9|
|2020-01-17 21:00:00|236|              33.6|  4|
|2020-01-12 12:00:00| 82|            290.41| 14|
|2020-01-28 16:00:00|197| 831.4399999999998| 18|
|2020-01-10 22:00:00| 95| 407.7100000000002| 37|
|2020-01-10 01:00:00|215|            109.69|  2|
|2020-01-07 18:00:00| 25| 554.2900000000001| 37|
|2020-01-18 07:00:00| 55|              48.3|  1|
|2020-01-28 09:00:00|166| 473.0200000000002| 36|
|2020-01-12 15:00:00| 82| 265.7900000000001| 29|
|2020-01-10 20:00:00| 66|            405.88| 21|
|2020-01-31 15:00:00| 43|345.58000000000004| 19|
|2020-01-31 21:00:00

In [39]:
# The main issue with this solution right now, is that the column names are lost, including the schema. So this can be solved the following wya

from collections import namedtuple

In [40]:
RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

In [41]:
def unwrap(row):
    return RevenueRow(
            hour = row[0][0],
            zone = row[0][1], 
            revenue = row[1][0], 
            count = row[1][1])

In [42]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF()

                                                                                

+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2020-01-15 20:00:00| 260|163.90000000000003|   14|
|2020-01-29 19:00:00| 166| 695.0099999999999|   45|
|2020-01-16 08:00:00|  41| 736.1399999999996|   54|
|2020-01-04 20:00:00| 129|            583.27|   38|
|2020-01-02 08:00:00|  66|            197.69|   10|
|2020-01-03 09:00:00|  61|            142.21|    9|
|2020-01-17 21:00:00| 236|              33.6|    4|
|2020-01-12 12:00:00|  82|            290.41|   14|
|2020-01-28 16:00:00| 197| 831.4399999999998|   18|
|2020-01-10 22:00:00|  95| 407.7100000000002|   37|
|2020-01-10 01:00:00| 215|            109.69|    2|
|2020-01-07 18:00:00|  25| 554.2900000000001|   37|
|2020-01-18 07:00:00|  55|              48.3|    1|
|2020-01-28 09:00:00| 166| 473.0200000000002|   36|
|2020-01-12 15:00:00|  82| 265.7900000000001|   29|
|2020-01-10 20:00:00|  66|            405.88|   21|
|2020-01-31 

In [45]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF()

                                                                                

In [46]:
# we can see that the schema is not really there, so let's modify it manually in vscode
df_result.schema

StructType([StructField('hour', TimestampType(), True), StructField('zone', LongType(), True), StructField('revenue', DoubleType(), True), StructField('count', LongType(), True)])

In [47]:
from pyspark.sql import types

In [48]:
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

In [49]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema)

In [50]:
df_result.schema

StructType([StructField('hour', TimestampType(), True), StructField('zone', IntegerType(), True), StructField('revenue', DoubleType(), True), StructField('count', IntegerType(), True)])

In [51]:
df_result.show()



+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2020-01-15 20:00:00| 260|163.90000000000003|   14|
|2020-01-29 19:00:00| 166| 695.0099999999999|   45|
|2020-01-16 08:00:00|  41| 736.1399999999996|   54|
|2020-01-04 20:00:00| 129|            583.27|   38|
|2020-01-02 08:00:00|  66|            197.69|   10|
|2020-01-03 09:00:00|  61|            142.21|    9|
|2020-01-17 21:00:00| 236|              33.6|    4|
|2020-01-12 12:00:00|  82|            290.41|   14|
|2020-01-28 16:00:00| 197| 831.4399999999998|   18|
|2020-01-10 22:00:00|  95| 407.7100000000002|   37|
|2020-01-10 01:00:00| 215|            109.69|    2|
|2020-01-07 18:00:00|  25| 554.2900000000001|   37|
|2020-01-18 07:00:00|  55|              48.3|    1|
|2020-01-28 09:00:00| 166| 473.0200000000002|   36|
|2020-01-12 15:00:00|  82| 265.7900000000001|   29|
|2020-01-10 20:00:00|  66|            405.88|   21|
|2020-01-31 

                                                                                

In [52]:
# let's save it to see the full execution DAG 

df_result.write.parquet('data/tmp/green-revenue')

                                                                                

In [5]:
'''
mapPartition
'''

'\nmapPartition\n'

In [8]:
columns = ['VendorID','lpep_pickup_datetime','PULocationID','DOLocationID','trip_distance']

duration_rdd = df_green \
                .select(columns) \
                .rdd

In [9]:
duration_rdd.take(5)

                                                                                

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), PULocationID=74, DOLocationID=130, trip_distance=12.77),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 20, 15, 9), PULocationID=67, DOLocationID=39, trip_distance=8.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 20, 23, 41), PULocationID=260, DOLocationID=157, trip_distance=1.27),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 5, 16, 32, 26), PULocationID=82, DOLocationID=83, trip_distance=1.25),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 19, 22, 42), PULocationID=166, DOLocationID=42, trip_distance=1.84)]

In [16]:
import pandas as pd

In [18]:
rows = duration_rdd.take(10)

In [20]:
pd.DataFrame(rows,columns=columns)

Unnamed: 0,VendorID,lpep_pickup_datetime,PULocationID,DOLocationID,trip_distance
0,2.0,2020-01-23 13:10:15,74,130,12.77
1,,2020-01-20 15:09:00,67,39,8.0
2,2.0,2020-01-15 20:23:41,260,157,1.27
3,2.0,2020-01-05 16:32:26,82,83,1.25
4,2.0,2020-01-29 19:22:42,166,42,1.84
5,2.0,2020-01-15 11:07:42,179,223,0.76
6,2.0,2020-01-16 08:22:29,41,237,3.32
7,2.0,2020-01-28 17:05:28,75,161,2.21
8,1.0,2020-01-22 14:51:37,152,166,0.9
9,2.0,2020-01-31 10:25:04,75,234,6.1


In [25]:
# model = ...

def model_predict(df):
    # y_pred = model.predict(df)  
    y_pred = df.trip_distance * 5
    return y_pred

In [29]:
def apply_model_in_batch(rows):

    df = pd.DataFrame(rows,columns=columns)

    df['lpep_pickup_datetime'] = df['lpep_pickup_datetime'].astype(str) # string conversion so spark can understand the dates
    
    predictions = model_predict(df)
    df['predicted_duration'] = predictions

    for row in df.itertuples():
        yield row

In [30]:
duration_rdd.mapPartitions(apply_model_in_batch).toDF().show()

[Stage 14:>                                                         (0 + 1) / 1]

+-----+--------+--------------------+------------+------------+-------------+------------------+
|Index|VendorID|lpep_pickup_datetime|PULocationID|DOLocationID|trip_distance|predicted_duration|
+-----+--------+--------------------+------------+------------+-------------+------------------+
|    0|     2.0| 2020-01-23 13:10:15|          74|         130|        12.77|63.849999999999994|
|    1|     NaN| 2020-01-20 15:09:00|          67|          39|          8.0|              40.0|
|    2|     2.0| 2020-01-15 20:23:41|         260|         157|         1.27|              6.35|
|    3|     2.0| 2020-01-05 16:32:26|          82|          83|         1.25|              6.25|
|    4|     2.0| 2020-01-29 19:22:42|         166|          42|         1.84| 9.200000000000001|
|    5|     2.0| 2020-01-15 11:07:42|         179|         223|         0.76|               3.8|
|    6|     2.0| 2020-01-16 08:22:29|          41|         237|         3.32|16.599999999999998|
|    7|     2.0| 2020-01-28 17

                                                                                