In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

23/03/12 12:38:14 WARN Utils: Your hostname, padilha-A70-HYB resolves to a loopback address: 127.0.1.1; using 192.168.15.5 instead (on interface wlo1)
23/03/12 12:38:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/12 12:38:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df_green = spark.read.parquet('data/pq/green/*/*')

In [3]:
df_green.take(5)

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 8, 15, 1, 13), lpep_dropoff_datetime=datetime.datetime(2020, 1, 8, 15, 12, 35), store_and_fwd_flag='N', RatecodeID=1, PULocationID=97, DOLocationID=61, passenger_count=1, trip_distance=2.18, fare_amount=10.0, extra=0.0, mta_tax=0.5, tip_amount=2.7, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=13.5, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 16, 4), lpep_dropoff_datetime=datetime.datetime(2020, 1, 27, 16, 43), store_and_fwd_flag=None, RatecodeID=None, PULocationID=231, DOLocationID=62, passenger_count=None, trip_distance=6.38, fare_amount=26.16, extra=2.75, mta_tax=0.0, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=29.21, payment_type=None, trip_type=None, congestion_surcharge=None),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 11, 11, 3), lpep_

In [4]:
rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

In [5]:
rdd.take(5)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 8, 15, 1, 13), PULocationID=97, total_amount=13.5),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 16, 4), PULocationID=231, total_amount=29.21),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 11, 11, 3), PULocationID=222, total_amount=18.48),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 26, 12, 13), PULocationID=174, total_amount=16.93),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 22, 3, 48, 21), PULocationID=260, total_amount=6.8)]

In [6]:
from datetime import datetime

def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

start = datetime(year=2020, month=1, day=1)
rdd.filter(filter_outliers).take(3)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 8, 15, 1, 13), PULocationID=97, total_amount=13.5),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 16, 4), PULocationID=231, total_amount=29.21),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 11, 11, 3), PULocationID=222, total_amount=18.48)]

In [7]:
def prepare_for_grouping(row):
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    count = 1
    value = (amount, count)
    
    return (key, value)

In [8]:
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value    
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    return (output_amount, output_count)

In [9]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .take(5)

                                                                                

[((datetime.datetime(2020, 1, 9, 9, 0), 7), (458.36000000000007, 30)),
 ((datetime.datetime(2020, 1, 3, 17, 0), 165), (69.14, 2)),
 ((datetime.datetime(2020, 1, 18, 21, 0), 42), (203.26999999999998, 15)),
 ((datetime.datetime(2020, 1, 3, 17, 0), 241), (22.3, 2)),
 ((datetime.datetime(2020, 1, 1, 7, 0), 80), (1653.3099999999997, 39))]

In [10]:
def unwrap(row):
    return (row[0][0], row[0][1], row[1][0], row[1][1])

rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF() \
    .show()



+-------------------+---+------------------+---+
|                 _1| _2|                _3| _4|
+-------------------+---+------------------+---+
|2020-01-09 09:00:00|  7|458.36000000000007| 30|
|2020-01-03 17:00:00|165|             69.14|  2|
|2020-01-18 21:00:00| 42|203.26999999999998| 15|
|2020-01-03 17:00:00|241|              22.3|  2|
|2020-01-01 07:00:00| 80|1653.3099999999997| 39|
|2020-01-30 23:00:00|129|            292.88| 25|
|2020-01-13 14:00:00| 25|             441.0| 22|
|2020-01-10 11:00:00| 97| 389.8400000000001| 20|
|2020-01-07 08:00:00|133|163.23000000000002|  7|
|2020-01-22 13:00:00|244| 589.1100000000001| 28|
|2020-01-22 20:00:00|181| 257.4300000000001| 21|
|2020-01-30 18:00:00| 66| 515.8800000000001| 22|
|2020-01-22 13:00:00|130|361.46000000000004| 14|
|2020-01-09 14:00:00|193|             143.4| 11|
|2020-01-07 16:00:00|129|191.92999999999998| 14|
|2020-01-19 02:00:00|255|            469.14| 22|
|2020-01-06 07:00:00|244|            297.64| 13|
|2020-01-25 15:00:00

                                                                                

In [11]:
from collections import namedtuple

RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

def unwrap(row):
    return RevenueRow(
        hour=row[0][0],
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1]
    )

rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF() \
    .show()

                                                                                

+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2020-01-09 09:00:00|   7|458.36000000000007|   30|
|2020-01-03 17:00:00| 165|             69.14|    2|
|2020-01-18 21:00:00|  42|203.26999999999998|   15|
|2020-01-03 17:00:00| 241|              22.3|    2|
|2020-01-01 07:00:00|  80|1653.3099999999997|   39|
|2020-01-30 23:00:00| 129|            292.88|   25|
|2020-01-13 14:00:00|  25|             441.0|   22|
|2020-01-10 11:00:00|  97| 389.8400000000001|   20|
|2020-01-07 08:00:00| 133|163.23000000000002|    7|
|2020-01-22 13:00:00| 244| 589.1100000000001|   28|
|2020-01-22 20:00:00| 181| 257.4300000000001|   21|
|2020-01-30 18:00:00|  66| 515.8800000000001|   22|
|2020-01-22 13:00:00| 130|361.46000000000004|   14|
|2020-01-09 14:00:00| 193|             143.4|   11|
|2020-01-07 16:00:00| 129|191.92999999999998|   14|
|2020-01-19 02:00:00| 255|            469.14|   22|
|2020-01-06 

In [12]:
from pyspark.sql import types

result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema)

In [13]:
df_result.show()



+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2020-01-09 09:00:00|   7|458.36000000000007|   30|
|2020-01-03 17:00:00| 165|             69.14|    2|
|2020-01-18 21:00:00|  42|203.26999999999998|   15|
|2020-01-03 17:00:00| 241|              22.3|    2|
|2020-01-01 07:00:00|  80|1653.3099999999997|   39|
|2020-01-30 23:00:00| 129|            292.88|   25|
|2020-01-13 14:00:00|  25|             441.0|   22|
|2020-01-10 11:00:00|  97| 389.8400000000001|   20|
|2020-01-07 08:00:00| 133|163.23000000000002|    7|
|2020-01-22 13:00:00| 244| 589.1100000000001|   28|
|2020-01-22 20:00:00| 181| 257.4300000000001|   21|
|2020-01-30 18:00:00|  66| 515.8800000000001|   22|
|2020-01-22 13:00:00| 130|361.46000000000004|   14|
|2020-01-09 14:00:00| 193|             143.4|   11|
|2020-01-07 16:00:00| 129|191.92999999999998|   14|
|2020-01-19 02:00:00| 255|            469.14|   22|
|2020-01-06 

                                                                                

In [14]:
df_result

DataFrame[hour: timestamp, zone: int, revenue: double, count: int]

In [15]:
df_result.write.parquet('tmp/green-revenue', mode='overwrite')

23/03/12 12:38:36 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
23/03/12 12:38:36 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
23/03/12 12:38:36 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 76,00% for 10 writers
23/03/12 12:38:36 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 69,09% for 11 writers
23/03/12 12:38:36 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 63,33% for 12 writers
23/03/12 12:38:36 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 58,46% for 13 writers
23/03/12 12:38:36 WARN MemoryManager: Total allocation exceeds 95,

In [16]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']

df_green \
    .select(columns) \
    .show()

+--------+--------------------+------------+------------+-------------+
|VendorID|lpep_pickup_datetime|PULocationID|DOLocationID|trip_distance|
+--------+--------------------+------------+------------+-------------+
|       2| 2020-01-08 15:01:13|          97|          61|         2.18|
|    null| 2020-01-27 16:04:00|         231|          62|         6.38|
|    null| 2020-01-11 11:03:00|         222|          36|         3.95|
|    null| 2020-01-26 12:13:00|         174|         174|         0.39|
|       2| 2020-01-22 03:48:21|         260|          83|         0.98|
|    null| 2020-01-02 11:09:00|          75|         203|        19.09|
|       2| 2020-01-09 09:06:23|           7|         193|         1.08|
|       2| 2020-01-03 10:45:17|          43|         142|         2.85|
|       2| 2020-01-04 15:18:15|          74|         168|         2.34|
|       2| 2020-01-25 17:09:50|         244|         161|         8.25|
|       2| 2020-01-24 18:37:58|          75|         238|       

In [17]:
duration_rdd = df_green \
    .select(columns) \
    .rdd

duration_rdd.take(5)

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 8, 15, 1, 13), PULocationID=97, DOLocationID=61, trip_distance=2.18),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 16, 4), PULocationID=231, DOLocationID=62, trip_distance=6.38),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 11, 11, 3), PULocationID=222, DOLocationID=36, trip_distance=3.95),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 26, 12, 13), PULocationID=174, DOLocationID=174, trip_distance=0.39),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 22, 3, 48, 21), PULocationID=260, DOLocationID=83, trip_distance=0.98)]

In [18]:
def count_rows(partition):
    count = 0
    for row in partition:
        count += 1
    return [count]

In [19]:
import pandas as pd

def count_rows_using_pandas(partition):
    df = pd.DataFrame(partition, columns=columns)
    return [len(df)]

In [20]:
duration_rdd.mapPartitions(count_rows_using_pandas).collect()

                                                                                

[335827,
 311259,
 255167,
 191335,
 93024,
 87098,
 88286,
 88451,
 84766,
 87464,
 85349,
 83479,
 82787,
 79927,
 93517,
 83460,
 80349,
 66263,
 26709]

In [None]:
def model_predict(df):
#     y_pred = model.predict(df)
    y_pred = df.trip_distance * 5
    return y_pred

def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    predictions = model_predict(df)
    df['predicted_duration'] = predictions
    for row in df.itertuples():
        yield row

df_predictions = duration_rdd \
    .mapPartitions(apply_model_in_batch) \
    .toDF() \
    .drop('Index')

In [None]:
df_predictions.show()

In [None]:
df_predictions.select('predicted_duration').show()