In [1]:
import pandas as pd
import pyspark

from pyspark.sql import SparkSession, Row, functions, types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.executor.memory", "3gb") \
    .appName('test') \
    .getOrCreate()

23/03/10 14:05:11 WARN Utils: Your hostname, celeron resolves to a loopback address: 127.0.1.1; using 192.168.10.58 instead (on interface wlo1)
23/03/10 14:05:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/10 14:05:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/10 14:05:12 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
! cat download_data.sh


set -e

TAXI_TYPE=$1 # "yellow"
YEAR=$2 # 2020

URL_PREFIX="https://github.com/DataTalksClub/nyc-tlc-data/releases/download"

for MONTH in {1..12}; do
  FMONTH=`printf "%02d" ${MONTH}`

  URL="${URL_PREFIX}/${TAXI_TYPE}/${TAXI_TYPE}_tripdata_${YEAR}-${FMONTH}.csv.gz"

  LOCAL_PREFIX="data/raw/${TAXI_TYPE}/${YEAR}/${FMONTH}"
  LOCAL_FILE="${TAXI_TYPE}_tripdata_${YEAR}_${FMONTH}.csv.gz"
  LOCAL_PATH="${LOCAL_PREFIX}/${LOCAL_FILE}"

  echo "downloading ${URL} to ${LOCAL_PATH}"
  mkdir -p ${LOCAL_PREFIX}
  wget ${URL} -O ${LOCAL_PATH}

done


In [3]:
! bash download_data yellow 2020

data		     seminar_numpy.ipynb    seminar_spark_template.ipynb
download_data.sh     seminar_pandas.ipynb   seminar_sql.ipynb
img		     seminar_plots.ipynb    spark-warehouse
README.md	     seminar_spark_1.ipynb
seminar_mrjob.ipynb  seminar_spark_2.ipynb


In [6]:
green_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("lpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("ehail_fee", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("trip_type", types.IntegerType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [8]:
def save_to_parquet(year, taxi_type, taxi_schema):

    for month in range(1, 13):
        print(f'processing data for {year}/{month}')

        input_path = f'data/raw/{taxi_type}/{year}/{month:02d}/'
        output_path = f'data/pq/{taxi_type}/{year}/{month:02d}/'

        df = spark.read \
            .option("header", "true") \
            .schema(taxi_schema) \
            .csv(input_path)

        df.repartition(4).write.parquet(output_path)

In [9]:
save_to_parquet(2020, 'green', green_schema)

processing data for 2020/1


                                                                                

processing data for 2020/2


                                                                                

processing data for 2020/3
processing data for 2020/4
processing data for 2020/5
processing data for 2020/6
processing data for 2020/7
processing data for 2020/8
processing data for 2020/9
processing data for 2020/10
processing data for 2020/11
processing data for 2020/12


In [10]:
save_to_parquet(2021, 'green', green_schema)

processing data for 2021/1
processing data for 2021/2
processing data for 2021/3
processing data for 2021/4
processing data for 2021/5
processing data for 2021/6
processing data for 2021/7
processing data for 2021/8
processing data for 2021/9


AnalysisException: Path does not exist: file:/home/sanityseeker/Documents/Projects/lspy-2023/data/raw/green/2021/09

In [11]:
save_to_parquet(2020, 'yellow', yellow_schema)

processing data for 2020/1


                                                                                

processing data for 2020/2


                                                                                

processing data for 2020/3


                                                                                

processing data for 2020/4


                                                                                

processing data for 2020/5


                                                                                

processing data for 2020/6


                                                                                

processing data for 2020/7


                                                                                

processing data for 2020/8


                                                                                

processing data for 2020/9


                                                                                

processing data for 2020/10


                                                                                

processing data for 2020/11


                                                                                

processing data for 2020/12


                                                                                

In [12]:
save_to_parquet(2021, 'yellow', yellow_schema)

processing data for 2021/1


                                                                                

processing data for 2021/2


                                                                                

processing data for 2021/3


                                                                                

processing data for 2021/4


                                                                                

processing data for 2021/5


                                                                                

processing data for 2021/6


                                                                                

processing data for 2021/7


[Stage 115:>                                                        (0 + 4) / 4]

processing data for 2021/8
processing data for 2021/9




AnalysisException: Path does not exist: file:/home/sanityseeker/Documents/Projects/lspy-2023/data/raw/yellow/2021/09

Read new data

In [14]:
! ls data

crime-punishment.txt  digits  pq  raw  report  word2def.txt  words.txt


In [15]:
df_green = spark.read.parquet('data/pq/green/*/*')

In [16]:
df_yellow = spark.read.parquet('data/pq/yellow/*/*')

In [21]:
df_yellow.take(1)[0]

Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2020, 1, 16, 10, 6, 42), tpep_dropoff_datetime=datetime.datetime(2020, 1, 16, 10, 36, 24), passenger_count=1, trip_distance=5.0, RatecodeID=1, store_and_fwd_flag='N', PULocationID=162, DOLocationID=13, payment_type=1, fare_amount=22.0, extra=2.5, mta_tax=0.5, tip_amount=5.05, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=30.35, congestion_surcharge=2.5)

In [22]:
df_green.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       1| 2020-01-31 15:00:30|  2020-01-31 15:06:05|                 N|         1|         196|          95|              1|          1.2|        6.0|  0.0|    0.

In [23]:
df_yellow.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge']

In [24]:
df_green.columns

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [26]:
df_green = df_green \
        .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
        .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')
    
df_yellow = df_yellow \
        .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
        .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [28]:
set(df_green.columns) & set(df_yellow.columns)

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'dropoff_datetime',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'pickup_datetime',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [29]:
common_columns = []

yel_cols = set(df_yellow.columns)
for column in df_green.columns:
    if column in yel_cols:
        common_columns.append(column)

In [30]:
common_columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [33]:
df_green_select = df_green.select(common_columns).withColumn('service_type', functions.lit('green'))
df_yellow_select = df_yellow.select(common_columns).withColumn('service_type', functions.lit('yellow'))

In [35]:
df_united_taxi_data = df_yellow_select.unionAll(df_green_select)

In [38]:
df_united_taxi_data.registerTempTable('united_taxi_data')



In [37]:
df_united_taxi_data.groupBy('service_type').count().show()



+------------+--------+
|service_type|   count|
+------------+--------+
|      yellow|39649199|
|       green| 2304517|
+------------+--------+



                                                                                

In [39]:
spark.sql(
"""
select service_type, count(1)
from united_taxi_data
group by 1
"""
).show()

+------------+--------+
|service_type|count(1)|
+------------+--------+
|      yellow|39649199|
|       green| 2304517|
+------------+--------+





In [42]:
df_revenue = spark.sql(
"""
select PULocationID as zone
, date_trunc('month', pickup_datetime) as month
, service_type
, sum(tip_amount) as monthly_tips
, sum(total_amount) as monthly_total
, avg(trip_distance) as avg_monthly_distance
from united_taxi_data
group by 1, 2, 3
"""
)

In [43]:
df_revenue.coalesce(1).write.parquet('data/result/revenue/', mode='overwrite')

                                                                                

In [44]:
df_revenue_recovered = spark.read.parquet('data/result/*')

In [49]:
df_revenue_recovered.registerTempTable('revenue')



In [51]:
spark.sql(
"""
select * from revenue
order by month desc
"""
).show(100)

+----+-------------------+------------+------------------+------------------+--------------------+
|zone|              month|service_type|      monthly_tips|     monthly_total|avg_monthly_distance|
+----+-------------------+------------+------------------+------------------+--------------------+
| 193|2041-08-01 00:00:00|       green|               0.0|               0.0|                 0.0|
| 231|2029-05-01 00:00:00|      yellow|              2.16|             12.96|                1.69|
| 107|2021-12-01 00:00:00|      yellow|              2.45|             12.25|                0.78|
| 233|2021-12-01 00:00:00|      yellow|              2.06|             12.36|                1.23|
| 163|2021-12-01 00:00:00|      yellow|             13.82|             69.12|                 7.8|
| 234|2021-12-01 00:00:00|      yellow|               0.0|              11.3|                1.62|
|  79|2021-12-01 00:00:00|      yellow|              3.08|             15.38|                1.77|
|  41|2021

In [53]:
df_green.registerTempTable('green')



In [54]:
df_green_revenue = spark.sql("""
SELECT 
    date_trunc('hour', pickup_datetime) AS hour,
    PULocationID AS zone,
    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    pickup_datetime >= '2020-01-01 00:00:00' and pickup_datetime <= '2022-01-01 00:00:00'
GROUP BY
    1, 2
""")

In [55]:
df_yellow.registerTempTable('yellow')

In [56]:
df_yellow_revenue = spark.sql("""
SELECT 
    date_trunc('hour', pickup_datetime) AS hour,
    PULocationID AS zone,
    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    yellow
WHERE
    pickup_datetime >= '2020-01-01 00:00:00' and pickup_datetime <= '2022-01-01 00:00:00'
GROUP BY
    1, 2
""")

In [58]:
df_yellow_revenue.repartition(4).write.parquet('data/result/revenue/yellow', mode='overwrite')

                                                                                

In [59]:
df_yellow_revenue.show()



+-------------------+----+------------------+--------------+
|               hour|zone|            amount|number_records|
+-------------------+----+------------------+--------------+
|2020-01-16 07:00:00|  74|1009.9200000000001|            62|
|2020-01-13 09:00:00| 163| 4503.789999999998|           236|
|2020-01-06 12:00:00|  43|3048.8299999999995|           193|
|2020-01-26 17:00:00| 239| 5902.589999999995|           378|
|2020-01-23 06:00:00|  79|1235.4600000000003|            78|
|2020-01-03 22:00:00| 142| 7562.159999999994|           473|
|2020-01-12 12:00:00| 163| 4355.149999999998|           258|
|2020-01-04 13:00:00| 143|2408.8799999999997|           171|
|2020-01-25 22:00:00| 114| 5290.099999999997|           301|
|2020-01-17 17:00:00| 170| 8916.779999999999|           497|
|2020-01-25 10:00:00| 186| 6621.049999999993|           448|
|2020-01-04 11:00:00| 230| 5140.639999999996|           297|
|2020-01-24 16:00:00| 186| 8362.349999999995|           429|
|2020-01-03 14:00:00| 13

                                                                                

In [61]:
df_green_revenue_ = df_green_revenue \
.withColumnRenamed('amount', 'green_amount') \
.withColumnRenamed('number_records', 'green_number_records')

df_yellow_revenue_ = df_yellow_revenue \
.withColumnRenamed('amount', 'yellow_amount') \
.withColumnRenamed('number_records', 'yellow_number_records')

In [66]:
df_joined_revenue = df_yellow_revenue_.join(functions.broadcast(df_green_revenue_), on=['hour', 'zone'], how='outer')

In [67]:
df_joined_revenue.write.parquet('data/result/revenue/total', mode='overwrite')

23/03/10 15:09:09 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build right for full outer join.




23/03/10 15:09:12 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build right for full outer join.




23/03/10 15:09:12 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build right for full outer join.
23/03/10 15:09:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/03/10 15:09:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/03/10 15:09:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/03/10 15:09:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/03/10 15:09:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
23/03/10 15:09:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) 



In [72]:
spark.sql(
"""
select min(pickup_datetime),
max(pickup_datetime)
from green
"""    
).show()

+--------------------+--------------------+
|min(pickup_datetime)|max(pickup_datetime)|
+--------------------+--------------------+
| 2008-12-31 22:06:48| 2041-08-17 16:24:38|
+--------------------+--------------------+



In [68]:
green_rdd = df_green.select('pickup_datetime', 'PULocationID', 'total_amount').rdd

In [69]:
green_rdd

MapPartitionsRDD[363] at javaToPython at NativeMethodAccessorImpl.java:0

In [70]:
from datetime import datetime

In [73]:
start_dt = datetime(year=2020, month=1, day=1)
end_dt = datetime(year=2022, month=1, day=1)

In [75]:
sample_row = green_rdd.take(1)[0]

In [79]:
sample_row.pickup_datetime

datetime.datetime(2020, 1, 31, 15, 0, 30)

In [83]:
def filter_dt_outliers(row):
    start_dt = datetime(year=2020, month=1, day=1)
    end_dt = datetime(year=2022, month=1, day=1)
    return row.pickup_datetime >= start_dt and row.pickup_datetime <= end_dt

In [84]:
green_rdd.filter(filter_dt_outliers).collect()

                                                                                

[Row(pickup_datetime=datetime.datetime(2020, 1, 31, 15, 0, 30), PULocationID=196, total_amount=6.8),
 Row(pickup_datetime=datetime.datetime(2020, 1, 17, 7, 52, 53), PULocationID=52, total_amount=30.05),
 Row(pickup_datetime=datetime.datetime(2020, 1, 31, 2, 19, 5), PULocationID=82, total_amount=16.8),
 Row(pickup_datetime=datetime.datetime(2020, 1, 21, 0, 19, 28), PULocationID=7, total_amount=21.3),
 Row(pickup_datetime=datetime.datetime(2020, 1, 24, 19, 2, 40), PULocationID=191, total_amount=12.25),
 Row(pickup_datetime=datetime.datetime(2020, 1, 14, 13, 11, 28), PULocationID=166, total_amount=12.3),
 Row(pickup_datetime=datetime.datetime(2020, 1, 29, 21, 37, 20), PULocationID=181, total_amount=9.3),
 Row(pickup_datetime=datetime.datetime(2020, 1, 22, 14, 54, 23), PULocationID=166, total_amount=32.46),
 Row(pickup_datetime=datetime.datetime(2020, 1, 29, 21, 34, 1), PULocationID=25, total_amount=21.8),
 Row(pickup_datetime=datetime.datetime(2020, 1, 14, 15, 54, 16), PULocationID=244, t

In [89]:
sample_row.pickup_datetime.replace(minute=0, second=0)

datetime.datetime(2020, 1, 31, 15, 0)

In [90]:
def mapper(row):
    hour = row.pickup_datetime.replace(minute=0, second=0)
    location = row.PULocationID
    
    amount = row.total_amount
    
    return ((location, hour), (amount, 1))

In [91]:
def reducer(left_value, right_value):
    l_amount, l_count = left_value
    r_amount, r_count = right_value
    
    return l_amount + r_amount, l_count + r_count

Important that if the schema is not specified, the calculations will be made prematurely to infer it.

In [116]:
df_result = green_rdd \
    .filter(filter_dt_outliers) \
    .map(mapper) \
    .reduceByKey(reducer) \
    .map(lambda row: (row[0][0], row[0][1], row[1][0], row[1][1])) \
    .toDF()

                                                                                

In [117]:
df_result.take(1)[0]

Row(_1=131, _2=datetime.datetime(2020, 1, 8, 9, 0), _3=88.34, _4=4)

Columns are infereded but don't have names. We can pass them with NamedTuple

In [118]:
from collections import namedtuple

ResultRow = namedtuple('ResultRow', ['hour', 'zone', 'revenue', 'count'])

df_result = green_rdd \
    .filter(filter_dt_outliers) \
    .map(mapper) \
    .reduceByKey(reducer) \
    .map(lambda row: ResultRow(row[0][0], row[0][1], row[1][0], row[1][1])) \
    .toDF() # unwrapping ((keys), (values)) to (key_1, ..., value_n) and using namedtuple for schema

                                                                                

In [119]:
df_result.take(1)[0]

Row(hour=131, zone=datetime.datetime(2020, 1, 8, 9, 0), revenue=88.34, count=4)

But we are still having premature calculations. What if we specify the schema?

In [120]:
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

df_result = green_rdd \
    .filter(filter_dt_outliers) \
    .map(mapper) \
    .reduceByKey(reducer) \
    .map(lambda row: ResultRow(row[0][0], row[0][1], row[1][0], row[1][1])) \
    .toDF(result_schema) # unwrapping ((keys), (values)) to (key_1, ..., value_n)

It works, no calculations are being made

---

**Credits**: 
DataTalks course by Alexey Grigoryev

https://github.com/DataTalksClub/data-engineering-zoomcamp/tree/main/week_5_batch_processing