### Importing Required Depeendencies

In [7]:
from datetime import datetime
from collections import namedtuple

### Starting Spark Session

In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local[*]") \
        .appName('test') \
        .getOrCreate()

22/06/09 08:13:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/09 08:13:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Map and Reduce

In [2]:
df_green = spark.read.parquet('data/pq/green/*/*')

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

```
SELECT
  date_trunc('hour', lpep_pickup_datetime) AS hour,
  PULocationID AS zone,  

  SUM(total_amount) AS amount,
  COUNT(1) AS number_records

FROM 
    green
WHERE 
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY 
    1, 2
```

In [3]:
df_green.rdd.take(2)

                                                                                

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 11, 4, 5, 54), lpep_dropoff_datetime=datetime.datetime(2020, 1, 11, 4, 13, 49), store_and_fwd_flag='N', RatecodeID=1.0, PULocationID=129, DOLocationID=129, passenger_count=1.0, trip_distance=0.81, fare_amount=6.5, extra=0.5, mta_tax=0.5, tip_amount=0.71, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=8.51, payment_type=1.0, trip_type=1.0, congestion_surcharge=0.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 17, 19, 33, 5), lpep_dropoff_datetime=datetime.datetime(2020, 1, 17, 19, 51, 8), store_and_fwd_flag='N', RatecodeID=1.0, PULocationID=75, DOLocationID=42, passenger_count=3.0, trip_distance=2.69, fare_amount=13.5, extra=1.0, mta_tax=0.5, tip_amount=3.06, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=18.36, payment_type=1.0, trip_type=1.0, congestion_surcharge=0.0)]

In [4]:
rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

In [5]:
rdd.take(2)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 11, 4, 5, 54), PULocationID=129, total_amount=8.51),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 17, 19, 33, 5), PULocationID=75, total_amount=18.36)]

In [8]:
start = datetime(year=2021, month=1, day=1)

rdd \
    .filter(lambda row: row.lpep_pickup_datetime >= start) \
    .take(2)

                                                                                

[Row(lpep_pickup_datetime=datetime.datetime(2021, 10, 30, 11, 55, 9), PULocationID=33, total_amount=25.86),
 Row(lpep_pickup_datetime=datetime.datetime(2021, 10, 7, 16, 12), PULocationID=71, total_amount=22.0)]

In [9]:
start = datetime(year=2020, month=1, day=1)
RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

# Filtering data before 2020
def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

# Mapping Key-Value pairs
def prepare_for_grouping(row):
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    count = 1
    value = (amount, count)
    
    return (key, value)

# Grouping based on keys
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

# Unwrapping key-value pairs to a single row
def unwrap(row):    
    return RevenueRow(
        hour = row[0][0], 
        zone = row[0][1], 
        revenue = row[1][0], 
        count = row[1][1]
    )

In [35]:
# Performing Group by on the RDD

rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF() \
    .show()

                                                                                

+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2020-01-04 21:00:00| 129|471.95000000000016|   34|
|2020-01-21 06:00:00|  37|             86.45|    3|
|2020-01-03 16:00:00|   7| 413.2300000000001|   33|
|2020-01-06 17:00:00|  82| 823.4299999999997|   56|
|2020-01-20 15:00:00| 155|             76.34|    3|
|2020-01-23 17:00:00|  75|1581.9899999999977|  103|
|2020-01-11 18:00:00|  25|254.21000000000004|   18|
|2020-01-16 18:00:00|  91|             71.03|    3|
|2020-01-04 13:00:00|  66|252.98000000000008|   17|
|2020-01-03 19:00:00| 181|            171.49|   15|
|2020-01-05 08:00:00| 166|             81.36|    4|
|2020-01-13 22:00:00|  97|            211.24|   15|
|2020-01-02 07:00:00| 260|            168.06|    9|
|2020-01-11 15:00:00|  83|            134.48|    9|
|2020-01-10 12:00:00| 173|             34.82|    2|
|2020-01-12 13:00:00| 244|             286.1|   14|
|2020-01-30 

In [36]:
# Performing group by and converting into a spark dataframe without providing schema
# Here spark tries to figure out the schema and goes through various stages and builds the whole rdd

df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF()

                                                                                

In [37]:
df_result.schema

StructType(List(StructField(hour,TimestampType,true),StructField(zone,LongType,true),StructField(revenue,DoubleType,true),StructField(count,LongType,true)))

In [40]:
# Building Schema based on the columns shown above
from pyspark.sql import types

result_schema = types.StructType([
  types.StructField('hour', types.TimestampType(), True),
  types.StructField('zone', types.IntegerType(), True),
  types.StructField('revenue', types.DoubleType(), True),
  types.StructField('count', types.IntegerType(), True)
])

In [41]:
# Performing group by and converting into a spark dataframe while providing schema
# Here spark just executes the command without going through the various stages

df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema)

In [42]:
df_result.show(5)



+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2020-01-04 21:00:00| 129|471.95000000000016|   34|
|2020-01-21 06:00:00|  37|             86.45|    3|
|2020-01-03 16:00:00|   7| 413.2300000000001|   33|
|2020-01-06 17:00:00|  82| 823.4299999999997|   56|
|2020-01-20 15:00:00| 155|             76.34|    3|
+-------------------+----+------------------+-----+
only showing top 5 rows



                                                                                

In [43]:
# Writing the result in a parquet file
df_result.write.parquet('tmp/green-revenue')

                                                                                

### Map Partitions

In [48]:
# Selecting a set of columns to create a new RDD

columns = ['VendorID', 'PULocationID', 'DOLocationID', 'trip_distance']

duration_rdd = df_green \
    .select(columns) \
    .rdd

In [47]:
# Viewing this rdd as a pandas dataframe

import pandas as pd

rows = duration_rdd.take(10)
pd.DataFrame(rows, columns=columns)

Unnamed: 0,VendorID,PULocationID,DOLocationID,trip_distance
0,2,129,129,0.81
1,2,75,42,2.69
2,2,117,188,13.11
3,2,41,151,2.13
4,2,129,260,0.89
5,2,75,75,0.88
6,2,66,232,2.25
7,2,129,129,0.91
8,2,41,168,2.69
9,2,37,33,5.35


In [46]:
# Using the following model to predict trip duration
def model_predict(df):
    return df['trip_distance']*5

# Applying the above model to a partition
def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    predictions = model_predict(df)
    df['predicted_duration'] = predictions
    
    for row in df.itertuples():
        yield row

In [41]:
# Below code applies the model to each partition/batch
duration_rdd \
    .mapPartitions(apply_model_in_batch) \
    .take(5)

                                                                                

[Pandas(Index=0, VendorID=2, PULocationID=129, DOLocationID=129, trip_distance=0.81, predicted_duration=4.050000000000001),
 Pandas(Index=1, VendorID=2, PULocationID=75, DOLocationID=42, trip_distance=2.69, predicted_duration=13.45),
 Pandas(Index=2, VendorID=2, PULocationID=117, DOLocationID=188, trip_distance=13.11, predicted_duration=65.55),
 Pandas(Index=3, VendorID=2, PULocationID=41, DOLocationID=151, trip_distance=2.13, predicted_duration=10.649999999999999),
 Pandas(Index=4, VendorID=2, PULocationID=129, DOLocationID=260, trip_distance=0.89, predicted_duration=4.45)]

In [42]:
# Providing schema for writing the RDD into a dataframe

from pyspark.sql import types

predict_schema = types.StructType([
    types.StructField('Index', types.IntegerType(), True),
    types.StructField('VendorID', types.IntegerType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('trip_distance', types.DoubleType(), True),
    types.StructField('predicted_duration', types.DoubleType(), True)
])

In [44]:
df_predict = duration_rdd \
    .mapPartitions(apply_model_in_batch) \
    .toDF(predict_schema) \
    .drop('Index')

In [45]:
df_predict.show(10)

[Stage 20:>                                                         (0 + 1) / 1]

+--------+------------+------------+-------------+------------------+
|VendorID|PULocationID|DOLocationID|trip_distance|predicted_duration|
+--------+------------+------------+-------------+------------------+
|       2|         129|         129|         0.81| 4.050000000000001|
|       2|          75|          42|         2.69|             13.45|
|       2|         117|         188|        13.11|             65.55|
|       2|          41|         151|         2.13|10.649999999999999|
|       2|         129|         260|         0.89|              4.45|
|       2|          75|          75|         0.88|               4.4|
|       2|          66|         232|         2.25|             11.25|
|       2|         129|         129|         0.91|              4.55|
|       2|          41|         168|         2.69|             13.45|
|       2|          37|          33|         5.35|             26.75|
+--------+------------+------------+-------------+------------------+
only showing top 10 

                                                                                