In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [2]:
df_green = spark.read.parquet('data/pq/green/*/*')

26/01/15 23:09:43 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: data/pq/green/*/*.
java.io.FileNotFoundException: File data/pq/green/*/* does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:980)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1301)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:970)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.sinks.FileStreamSink$.hasMetadata(FileStreamSink.scala:58)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:384)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.catalyst.analysis.ResolveData

```
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
```

In [5]:
rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

In [6]:
from datetime import datetime

In [7]:
start = datetime(year=2020, month=1, day=1)

def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

In [8]:
rows = rdd.take(10)
row = rows[0]

                                                                                

In [9]:
row

Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 14, 32), PULocationID=97, total_amount=7.3)

In [10]:
def prepare_for_grouping(row): 
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)

In [11]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .take(3)

[((datetime.datetime(2020, 1, 29, 14, 0), 97), (7.3, 1)),
 ((datetime.datetime(2020, 1, 24, 4, 0), 7), (42.75, 1)),
 ((datetime.datetime(2020, 1, 23, 13, 0), 43), (8.3, 1))]

In [22]:
from operator import add
sc = spark.sparkContext
rdd2 = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd2.reduceByKey(add).collect())

                                                                                

[('a', 2), ('b', 1)]

In [24]:
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

In [26]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .take(3)

                                                                                

[((datetime.datetime(2020, 1, 29, 10, 0), 89), (149.54999999999998, 5)),
 ((datetime.datetime(2020, 1, 9, 9, 0), 7), (458.3600000000001, 30)),
 ((datetime.datetime(2020, 1, 8, 15, 0), 123), (138.32999999999998, 6))]

In [27]:
from collections import namedtuple

In [31]:
RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

In [32]:
def unwrap(row):
    return RevenueRow(
        hour=row[0][0], 
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1]
    )

In [33]:
from pyspark.sql import types

In [34]:
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

In [35]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema) 

In [36]:
df_result.write.parquet('tmp/green-revenue')

26/01/15 23:30:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
26/01/15 23:30:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
26/01/15 23:30:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
26/01/15 23:30:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
26/01/15 23:30:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
26/01/15 23:30:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
26/01/15 23:30:09 WARN MemoryManager: Total allocation exceeds 95.

In [37]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']

duration_rdd = df_green \
    .select(columns) \
    .rdd

In [38]:
import pandas as pd

In [40]:
rows = duration_rdd.take(10)
rows

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 14, 32), PULocationID=97, DOLocationID=97, trip_distance=0.81),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 24, 4, 36, 44), PULocationID=7, DOLocationID=141, trip_distance=3.02),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 52), PULocationID=43, DOLocationID=24, trip_distance=1.32),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 26, 22, 18, 3), PULocationID=260, DOLocationID=260, trip_distance=0.89),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 4, 16, 0), PULocationID=177, DOLocationID=49, trip_distance=2.83),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 21, 23, 2, 4), PULocationID=24, DOLocationID=41, trip_distance=1.33),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 25, 19, 9, 59), PULocationID=82, DOLocationID=145, trip_distance=3.41),
 Row(VendorID=1, lpep_pickup_datetime=datetime.dateti

In [41]:
df = pd.DataFrame(rows, columns=columns)

In [42]:
columns

['VendorID',
 'lpep_pickup_datetime',
 'PULocationID',
 'DOLocationID',
 'trip_distance']

In [59]:
#model = ...

def model_predict(df):
#     y_pred = model.predict(df)
    y_pred = df.trip_distance * 5
    return y_pred

In [60]:
def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    predictions = model_predict(df)
    df['predicted_duration'] = predictions

    for row in df.itertuples():
        yield row

In [61]:
df_predicts = duration_rdd \
    .mapPartitions(apply_model_in_batch)\
    .toDF() \
    .drop('Index')

                                                                                

In [56]:
len(duration_rdd.mapPartitions(lambda a: [1]).collect())

                                                                                

16

In [57]:
duration_rdd.getNumPartitions()

16

In [58]:
def foo(partition):
    c = 0
    for row in partition:
        c += 1

    return [c]

duration_rdd.mapPartitions(foo).collect()

                                                                                

[335827,
 311259,
 311019,
 183043,
 110877,
 109971,
 110447,
 106762,
 107864,
 104436,
 103052,
 117050,
 103812,
 96126,
 75166,
 17806]

In [62]:
df_predicts.select('predicted_duration').show()

[Stage 26:>                                                         (0 + 1) / 1]

+------------------+
|predicted_duration|
+------------------+
| 4.050000000000001|
|              15.1|
|6.6000000000000005|
|              4.45|
|             14.15|
|              6.65|
|             17.05|
|               7.5|
|               1.0|
|12.350000000000001|
|              97.6|
|               2.6|
|              40.9|
|             14.75|
|               7.9|
|               4.3|
|              79.2|
| 6.550000000000001|
|3.5999999999999996|
|               5.5|
+------------------+
only showing top 20 rows


Traceback (most recent call last):                                              
  File "/usr/local/Cellar/apache-spark/4.1.1/libexec/python/lib/pyspark.zip/pyspark/daemon.py", line 233, in manager
  File "/usr/local/Cellar/apache-spark/4.1.1/libexec/python/lib/pyspark.zip/pyspark/daemon.py", line 87, in worker
BrokenPipeError: [Errno 32] Broken pipe
