In [4]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/19 15:16:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
df_green = spark.read.parquet('data/pq/green/*/*')

                                                                                

In [6]:
df_green.show()


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2020-01-27 23:23:13|  2020-01-27 23:33:22|                 N|         1|         260|         223|              1|         2.98|       11.5|  0.5|    0.

In [7]:
df_green.rdd.take(2)

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 23, 23, 13), lpep_dropoff_datetime=datetime.datetime(2020, 1, 27, 23, 33, 22), store_and_fwd_flag='N', RatecodeID=1, PULocationID=260, DOLocationID=223, passenger_count=1, trip_distance=2.98, fare_amount=11.5, extra=0.5, mta_tax=0.5, tip_amount=3.2, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=16.0, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 30, 20, 30), lpep_dropoff_datetime=datetime.datetime(2020, 1, 30, 20, 51), store_and_fwd_flag=None, RatecodeID=None, PULocationID=238, DOLocationID=47, passenger_count=None, trip_distance=7.65, fare_amount=25.46, extra=2.75, mta_tax=0.0, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=28.51, payment_type=None, trip_type=None, congestion_surcharge=None)]

### implement this with rdd

SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2

In [12]:
rdd = df_green \
        .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
        .rdd

In [13]:
rdd.filter(lambda row: True).take(2)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 23, 23, 13), PULocationID=260, total_amount=16.0),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 30, 20, 30), PULocationID=238, total_amount=28.51)]

In [15]:
from datetime import datetime

In [19]:
def filter_outliers(row):
    start = datetime(year=2021, month=1, day=1)
    return row.lpep_pickup_datetime >= start

In [20]:
rdd.filter(filter_outliers).take(1)

                                                                                

[Row(lpep_pickup_datetime=datetime.datetime(2021, 6, 11, 20, 4, 53), PULocationID=244, total_amount=33.55)]

In [21]:
rows = rdd.take(10)
row = rows[0]

In [22]:
row

Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 23, 23, 13), PULocationID=260, total_amount=16.0)

In [25]:
def prepare_for_grouping(row):
    hour = row.lpep_pickup_datetime.replace(minute=0 , second=0, microsecond=0)
    zone = row.PULocationID

    key = (hour, zone)

    amount = row.total_amount
    count = 1
    
    value = (amount, count)
    return (key,value)

In [27]:
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count

    output_value = (output_amount, output_count)

    return (output_amount, output_count)
    

In [34]:
from collections import namedtuple

RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

In [35]:
def unwrap(row):
    return RevenueRow(
        hour=row[0][0], 
        zone=row[0][1],
        revenue = row[1][0],
        count = row[1][1])

In [42]:
from pyspark.sql import types

result_schema = types.StructType([
types.StructField('hour', types.TimestampType(), True), 
types.StructField('zone', types.IntegerType(), True), 
types.StructField('revenue', types.DoubleType(), True), 
types.StructField('count', types.IntegerType(), True)
])

In [43]:
df_result = rdd.filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema)

In [44]:
df_result.schema

StructType([StructField('hour', TimestampType(), True), StructField('zone', IntegerType(), True), StructField('revenue', DoubleType(), True), StructField('count', IntegerType(), True)])

In [45]:
df_result.show()



+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2021-06-17 11:00:00|   7|             95.56|    6|
|2021-06-05 13:00:00| 198|              43.0|    1|
|2021-06-17 22:00:00|  93|            164.06|    3|
|2021-06-26 10:00:00|  42|            179.66|    9|
|2021-06-27 16:00:00| 173|53.599999999999994|    2|
|2021-06-10 16:00:00|  72|              25.5|    1|
|2021-06-07 21:00:00|  75|57.290000000000006|    5|
|2021-06-27 10:00:00|  74|            194.33|   12|
|2021-06-13 07:00:00|   8|             19.88|    1|
|2021-06-03 10:00:00| 155|53.730000000000004|    2|
|2021-06-29 12:00:00|  92|             34.09|    1|
|2021-06-08 12:00:00|  24|             61.54|    2|
|2021-06-24 22:00:00|  41|            122.05|    8|
|2021-06-25 07:00:00|  74| 522.4200000000001|   36|
|2021-06-18 08:00:00|  28|             38.97|    1|
|2021-06-27 13:00:00|   3|             53.22|    1|
|2021-06-09 

                                                                                

## Let's use map_partitions, which takes one partition as input and return modified partititon
- We would like to do this when we have really large data, and we wnat to to say some ML stuff on it
- then we can use pass the ML algo to the map partition and do the stuff 
- show example below

In [46]:
df_green.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2020-01-27 23:23:13|  2020-01-27 23:33:22|                 N|         1|         260|         223|              1|         2.98|       11.5|  0.5|    0.

In [51]:
# define the columns on which we should train the data 
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID' ,'trip_distance']

duration_rdd = df_green \
    .select(columns) \
    .rdd

In [52]:
duration_rdd.take(5)

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 23, 23, 13), PULocationID=260, DOLocationID=223, trip_distance=2.98),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 30, 20, 30), PULocationID=238, DOLocationID=47, trip_distance=7.65),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 22, 18, 13), PULocationID=82, DOLocationID=95, trip_distance=1.83),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 4, 18, 12, 24), PULocationID=42, DOLocationID=244, trip_distance=1.65),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 11, 53), PULocationID=82, DOLocationID=197, trip_distance=6.38)]

In [57]:
def apply_model_in_batch(partiton):
    cnt = 0 
    for row in partiton:
        cnt += 1 
    return [cnt]

In [58]:
rdd.mapPartitions(apply_model_in_batch).collect()

                                                                                

[746744, 418184, 219219, 215981, 232941, 214666, 178150, 78632]

In [59]:
import pandas as pd

In [61]:
rows = duration_rdd.take(10)
rows

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 23, 23, 13), PULocationID=260, DOLocationID=223, trip_distance=2.98),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 30, 20, 30), PULocationID=238, DOLocationID=47, trip_distance=7.65),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 22, 18, 13), PULocationID=82, DOLocationID=95, trip_distance=1.83),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 4, 18, 12, 24), PULocationID=42, DOLocationID=244, trip_distance=1.65),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 11, 53), PULocationID=82, DOLocationID=197, trip_distance=6.38),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 15, 35), PULocationID=7, DOLocationID=174, trip_distance=10.4),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 2, 11, 32), PULocationID=66, DOLocationID=217, trip_distance=2.95),
 Row(VendorID=1, lpep_pickup_datetime=datetime.d

In [63]:
pd.DataFrame(rows, columns=columns)

Unnamed: 0,VendorID,lpep_pickup_datetime,PULocationID,DOLocationID,trip_distance
0,2.0,2020-01-27 23:23:13,260,223,2.98
1,,2020-01-30 20:30:00,238,47,7.65
2,,2020-01-22 18:13:00,82,95,1.83
3,2.0,2020-01-04 18:12:24,42,244,1.65
4,2.0,2020-01-27 11:53:00,82,197,6.38
5,2.0,2020-01-29 15:35:00,7,174,10.4
6,,2020-01-02 11:32:00,66,217,2.95
7,1.0,2020-01-04 09:40:17,42,41,1.0
8,1.0,2020-01-18 21:41:43,129,129,0.4
9,2.0,2020-01-11 18:43:11,41,41,0.91
