In [0]:
import pyspark
from pyspark.sql import SparkSession, types
from pyspark.sql.types import *


In [0]:
spark = SparkSession.builder \
        .master('local[*]') \
        .appName('test') \
        .getOrCreate()

In [0]:
df = spark.read.parquet('/FileStore/tables/fhv_tripdata_2023_01.parquet', header=True)

In [0]:
df.show(10)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00008|2023-01-01 00:30:00|2023-01-01 01:00:00|        null|        null|   null|                B00008|
|              B00078|2023-01-01 00:01:00|2023-01-01 03:15:00|        null|        null|   null|                B00078|
|              B00111|2023-01-01 00:30:00|2023-01-01 01:05:00|        null|        null|   null|                B03406|
|              B00112|2023-01-01 00:34:45|2023-01-01 00:52:03|        null|        14.0|   null|                B00112|
|              B00112|2023-01-01 00:11:20|2023-01-01 00:22:03|        null|        14.0|   null|                B00112|
|              B00112|2023-01-01 00:33:2

In [0]:
from pyspark.sql import functions as F
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num%7 == 0:
        return f's/{num:03x}'
    elif num%3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'
    
crazy_stuff('B02884')


Out[13]: 's/b44'

In [0]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [0]:
df.withColumn('pickup_date',F.to_date(df.pickup_datetime)) \
   .withColumn('dropoff_date',F.to_date(df.dropOff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show(10)

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/008| 2023-01-01|  2023-01-01|        null|        null|
|  a/04e| 2023-01-01|  2023-01-01|        null|        null|
|  a/06f| 2023-01-01|  2023-01-01|        null|        null|
|  s/070| 2023-01-01|  2023-01-01|        null|        14.0|
|  s/070| 2023-01-01|  2023-01-01|        null|        14.0|
|  s/070| 2023-01-01|  2023-01-01|        null|        29.0|
|  s/070| 2023-01-01|  2023-01-01|        null|        14.0|
|  s/070| 2023-01-01|  2023-01-01|        null|        14.0|
|  s/070| 2023-01-01|  2023-01-01|        null|        14.0|
|  s/070| 2023-01-01|  2023-01-01|        null|        14.0|
+-------+-----------+------------+------------+------------+
only showing top 10 rows



In [0]:
df.columns

Out[44]: ['dispatching_base_num',
 'pickup_datetime',
 'dropOff_datetime',
 'PUlocationID',
 'DOlocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [0]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID',  'DOLocationId') \
    .filter(df.DOlocationID == '14').show(10)

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationId|
+-------------------+-------------------+------------+------------+
|2023-01-01 00:34:45|2023-01-01 00:52:03|        null|        14.0|
|2023-01-01 00:11:20|2023-01-01 00:22:03|        null|        14.0|
|2023-01-01 00:33:11|2023-01-01 00:48:45|        null|        14.0|
|2023-01-01 00:55:24|2023-01-01 01:02:55|        null|        14.0|
|2023-01-01 00:39:16|2023-01-01 00:39:23|        null|        14.0|
|2023-01-01 00:50:10|2023-01-01 00:50:17|        null|        14.0|
|2023-01-01 00:37:04|2023-01-01 00:57:13|        null|        14.0|
|2023-01-01 00:31:15|2023-01-01 00:35:57|        null|        14.0|
|2023-01-01 00:19:50|2023-01-01 00:23:52|        null|        14.0|
|2023-01-01 00:51:14|2023-01-01 01:04:34|        null|        14.0|
+-------------------+-------------------+------------+------------+
only showing top 10 rows



In [0]:
df_green = spark.read.parquet('/FileStore/tables/green_tripdata_2023_01.parquet', header=True)

In [0]:
df_yellow = spark.read.parquet('/FileStore/tables/yellow_tripdata_2023_01.parquet', header=True)

In [0]:
df_green = df_green.withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
            .withColumnRenamed('lpep_dropoff_datetime', 'drop_off_date')

In [0]:
df_yellow = df_yellow.withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
            .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [0]:
common_columns = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_columns.append(col)

In [0]:
from pyspark.sql import functions as F

In [0]:
df_green_sel = df_green \
                .select(common_columns) \
                .withColumn('service_type', F.lit('green'))

df_yellow_sel = df_yellow \
                    .select(common_columns) \
                    .withColumn('service_type', F.lit('yellow'))

In [0]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [0]:
df_trips_data.show(10)

+--------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|VendorID|    pickup_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|congestion_surcharge|service_type|
+--------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|       2|2023-01-01 00:26:10|                 N|       1.0|         166|         143|            1.0|         2.58|       14.9|  1.0|    0.5|      4.03|         0.0|                  1.0|       24.18|         1.0|                2.75|       green|
|   

In [0]:
df_trips_data.groupBy('service_type').count().show()

+------------+-------+
|service_type|  count|
+------------+-------+
|       green|  68211|
|      yellow|3066766|
+------------+-------+



In [0]:
df_trips_data.columns

Out[72]: ['VendorID',
 'pickup_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge',
 'service_type']

In [0]:
df_trips_data.registerTempTable('trips_data')



In [0]:
%sql

select count(*) from trips_data

count(1)
3134977


In [0]:
spark.sql("""select 
  service_type, count(1)
from 
  trips_data
group by 
  service_type""").show()

+------------+--------+
|service_type|count(1)|
+------------+--------+
|       green|   68211|
|      yellow| 3066766|
+------------+--------+




### Group by and Join

In [0]:
df_green.createOrReplaceTempView('green')


In [0]:
df_green_revenue = spark.sql("""
SELECT 
    date_trunc('hour', pickup_datetime) AS hour,
    PULocationID AS zone,

    SUM(total_amount) as amount,
    count(1) as number_records
FROM
    green
GROUP BY 
1, 2
""")

In [0]:
df_green_revenue.show(10)

+-------------------+----+------------------+--------------+
|               hour|zone|            amount|number_records|
+-------------------+----+------------------+--------------+
|2023-01-01 01:00:00| 210|              57.6|             2|
|2023-01-01 07:00:00| 152|              11.5|             1|
|2023-01-01 11:00:00| 116|             42.48|             2|
|2023-01-01 18:00:00| 129|              65.1|             4|
|2023-01-02 09:00:00|   7|              10.8|             1|
|2023-01-02 14:00:00|  33|              24.1|             1|
|2023-01-02 15:00:00|  75|154.97000000000003|             9|
|2023-01-03 05:00:00|  75|               9.7|             1|
|2023-01-03 08:00:00|  47|              18.5|             1|
|2023-01-03 16:00:00|  43|390.28000000000003|            15|
+-------------------+----+------------------+--------------+
only showing top 10 rows



### rdd


%md

SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,
    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2


In [0]:
df_green.columns

Out[89]: ['VendorID',
 'pickup_datetime',
 'drop_off_date',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [0]:
rdd = df_green \
    .select('pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

In [0]:
rdd.take(10)

Out[93]: [Row(pickup_datetime=datetime.datetime(2023, 1, 1, 0, 26, 10), PULocationID=166, total_amount=24.18),
 Row(pickup_datetime=datetime.datetime(2023, 1, 1, 0, 51, 3), PULocationID=24, total_amount=15.84),
 Row(pickup_datetime=datetime.datetime(2023, 1, 1, 0, 35, 12), PULocationID=223, total_amount=11.64),
 Row(pickup_datetime=datetime.datetime(2023, 1, 1, 0, 13, 14), PULocationID=41, total_amount=10.2),
 Row(pickup_datetime=datetime.datetime(2023, 1, 1, 0, 33, 4), PULocationID=41, total_amount=8.0),
 Row(pickup_datetime=datetime.datetime(2023, 1, 1, 0, 53, 31), PULocationID=41, total_amount=22.95),
 Row(pickup_datetime=datetime.datetime(2023, 1, 1, 0, 9, 14), PULocationID=181, total_amount=29.2),
 Row(pickup_datetime=datetime.datetime(2023, 1, 1, 0, 11, 58), PULocationID=24, total_amount=16.7),
 Row(pickup_datetime=datetime.datetime(2023, 1, 1, 0, 41, 29), PULocationID=41, total_amount=10.7),
 Row(pickup_datetime=datetime.datetime(2023, 1, 1, 0, 50, 32), PULocationID=24, total_am

In [0]:
def prepare_for_grouping(row):
    hour = row.pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)

    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)

In [0]:
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    output_amount = left_amount + right_amount
    output_count = left_count + right_count

    return (output_amount, output_count)

In [0]:
from collections import namedtuple


In [0]:
Revenuerow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

In [0]:
def unwrap(row):
    return Revenuerow(hour=row[0][0],
                      zone=row[0][1],
                      revenue=row[1][0],
                      count=row[1][1])

In [0]:
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

reduceByKey operation takes the values associated with the same keys and applies the provided function to reduce them.

In [0]:
df_result = rdd \
            .map(prepare_for_grouping) \
            .reduceByKey(calculate_revenue) \
            .map(unwrap) \
            .toDF(result_schema)

In [0]:
df_result.show(5)

+-------------------+----+-----------------+-----+
|               hour|zone|          revenue|count|
+-------------------+----+-----------------+-----+
|2023-01-01 00:00:00| 166|86.53999999999999|    3|
|2023-01-01 00:00:00|  24|            132.3|    5|
|2023-01-01 00:00:00| 223|            11.64|    1|
|2023-01-01 00:00:00|  41|98.75999999999999|    8|
|2023-01-01 00:00:00| 181|             29.2|    1|
+-------------------+----+-----------------+-----+
only showing top 5 rows

