In [1]:
%%capture
!pip install pyspark
!pip install findspark
!pip install pyngrok

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
        .appName('testColab') \
        .getOrCreate()

In [56]:
from pyngrok import ngrok, conf
import getpass

print("Enter your authtoken, which can be copied "
"from https://dashboard.ngrok.com/get-started/your-authtoken")
conf.get_default().auth_token = getpass.getpass()

ui_port = 4040
public_url = ngrok.connect(ui_port).public_url
print(f" * ngrok tunnel \"{public_url}\" -> \"http://127.0.0.1:{ui_port}\"")

Enter your authtoken, which can be copied from https://dashboard.ngrok.com/get-started/your-authtoken
··········
 * ngrok tunnel "https://82f5-34-30-144-98.ngrok-free.app" -> "http://127.0.0.1:4040"


In [6]:
from google.colab import drive
drive.mount('/content/gdrive',force_remount=True)

Mounted at /content/gdrive


In [7]:
df_green = spark.read.parquet('gdrive/MyDrive/module5/pq/green/*/*')

In [None]:
SELECT
    date_trunc('hour', lpep_pickup_datetime) AS hour,
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2

In [11]:
rdd = df_green \
     .select('lpep_pickup_datetime', 'PULocationID', 'total_amount' )\
     .rdd

In [16]:
from datetime import datetime

In [22]:
start = datetime(year=2021, month=1, day=1)

def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

In [23]:
rows= rdd.take(10)
row=rows[0]

In [25]:
row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)

datetime.datetime(2020, 1, 23, 13, 0)

In [26]:
def prepare_for_grouping(row):
    hour= row.lpep_pickup_datetime
    zone= row.PULocationID
    key=(hour,zone)

    amount=row.total_amount
    count= 1
    value=(amount,count)

    return(key,value)

In [36]:
def calculate_revenue (left_value,right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value

    output_amount=left_amount+right_amount
    output_count=left_count+right_count
    return (output_amount,output_count)


In [44]:
from collections import namedtuple

In [38]:
rdd \
    .filter(filter_outliers)\
    .map(prepare_for_grouping)\
    .reduceByKey(calculate_revenue)\
    .take(10)

[((datetime.datetime(2021, 6, 24, 22, 19, 13), 41), (18.55, 1)),
 ((datetime.datetime(2021, 6, 23, 19, 56, 24), 22), (63.3, 1)),
 ((datetime.datetime(2021, 6, 12, 10, 40, 5), 41), (16.31, 1)),
 ((datetime.datetime(2021, 6, 12, 21, 47), 189), (26.83, 1)),
 ((datetime.datetime(2021, 6, 10, 15, 46, 25), 41), (41.2, 1)),
 ((datetime.datetime(2021, 6, 18, 15, 50, 37), 74), (14.8, 1)),
 ((datetime.datetime(2021, 6, 22, 7, 10, 17), 75), (3.3, 1)),
 ((datetime.datetime(2021, 6, 14, 21, 26, 41), 93), (42.3, 1)),
 ((datetime.datetime(2021, 6, 24, 19, 32, 1), 75), (37.85, 1)),
 ((datetime.datetime(2021, 6, 4, 8, 28, 9), 130), (26.76, 1))]

In [45]:
RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

In [46]:
def unwrap(row):
    return RevenueRow(
        hour=row[0][0],
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1])

In [50]:
from pyspark.sql import types

In [51]:
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

In [52]:
df_result= rdd \
    .filter(filter_outliers)\
    .map(prepare_for_grouping)\
    .reduceByKey(calculate_revenue)\
    .map(unwrap)\
    .toDF(result_schema)

In [53]:
df_result.show()

+-------------------+----+-------+-----+
|               hour|zone|revenue|count|
+-------------------+----+-------+-----+
|2021-06-24 22:19:13|  41|  18.55|    1|
|2021-06-23 19:56:24|  22|   63.3|    1|
|2021-06-12 10:40:05|  41|  16.31|    1|
|2021-06-12 21:47:00| 189|  26.83|    1|
|2021-06-10 15:46:25|  41|   41.2|    1|
|2021-06-18 15:50:37|  74|   14.8|    1|
|2021-06-22 07:10:17|  75|    3.3|    1|
|2021-06-14 21:26:41|  93|   42.3|    1|
|2021-06-24 19:32:01|  75|  37.85|    1|
|2021-06-04 08:28:09| 130|  26.76|    1|
|2021-06-10 13:10:58|  75|   10.8|    1|
|2021-06-22 15:22:23| 134|   29.8|    1|
|2021-06-18 20:29:00| 116|   11.4|    1|
|2021-06-10 19:06:28|  41|    8.8|    1|
|2021-06-15 16:38:11| 116|   15.3|    1|
|2021-06-29 13:22:00| 145|  27.85|    1|
|2021-06-13 20:00:00|  26|  26.87|    1|
|2021-06-17 10:57:00| 242|  23.85|    1|
|2021-06-03 10:46:22|  82|    9.8|    1|
|2021-06-30 08:48:14| 116|   12.3|    1|
+-------------------+----+-------+-----+
only showing top

In [54]:
df_result.write.parquet('gdrive/MyDrive/module5/tmp/green-revenue')

PREDICT THE DURATION OF A TRIP

In [59]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']

duration_rdd = df_green \
    .select(columns) \
    .rdd


In [66]:
import pandas as pd

In [67]:
rows= duration_rdd.take(10)

In [76]:
df= pd.DataFrame(rows, columns=columns)

In [77]:
list(df.itertuples())

[Pandas(Index=0, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-23 13:10:15'), PULocationID=74, DOLocationID=130, trip_distance=12.77),
 Pandas(Index=1, VendorID=nan, lpep_pickup_datetime=Timestamp('2020-01-20 15:09:00'), PULocationID=67, DOLocationID=39, trip_distance=8.0),
 Pandas(Index=2, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-15 20:23:41'), PULocationID=260, DOLocationID=157, trip_distance=1.27),
 Pandas(Index=3, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-05 16:32:26'), PULocationID=82, DOLocationID=83, trip_distance=1.25),
 Pandas(Index=4, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-29 19:22:42'), PULocationID=166, DOLocationID=42, trip_distance=1.84),
 Pandas(Index=5, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-15 11:07:42'), PULocationID=179, DOLocationID=223, trip_distance=0.76),
 Pandas(Index=6, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-16 08:22:29'), PULocationID=41, DOLocationID=237, trip_distance=3.32),
 Panda

In [72]:
#model=...
def model_predict(df):
  #y_pred = model.predict(df)
  y_pred = df.trip_distance * 5
  return y_pred


In [84]:
def infinite_seq():
  i=0
  while True:
    yield i
    i=i+1

    if i>15:
      break

In [85]:
seq=infinite_seq()

In [86]:
list(seq)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

In [82]:
next(seq)

2

In [83]:
for i in seq:
  print(i)

  if i>= 10:
    break

3
4
5
6
7
8
9
10


In [87]:
def apply_model_in_batch(rows):
    df=pd.DataFrame(rows, columns=columns)
    predictions=model_predict(df)
    df['predicted_duration']=predictions

    for row in df.itertuples():
      yield row


In [90]:
df_predicts = duration_rdd \
    .mapPartitions(apply_model_in_batch)\
    .toDF() \
    .drop('Index')

In [91]:
df_predicts.select('predicted_duration').show()

+------------------+
|predicted_duration|
+------------------+
|63.849999999999994|
|              40.0|
|              6.35|
|              6.25|
| 9.200000000000001|
|               3.8|
|16.599999999999998|
|             11.05|
|               4.5|
|              30.5|
|               8.7|
|5.8999999999999995|
|              11.0|
|              15.2|
|              4.25|
|25.299999999999997|
|7.8500000000000005|
|              34.0|
| 5.300000000000001|
|              6.15|
+------------------+
only showing top 20 rows

