## Import Library

In [1]:
!pip install findspark

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
import findspark
findspark.init()

In [3]:
from argparse import ArgumentParser
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window

from datetime import datetime
import sys

## Define Functions

In [4]:
def merge_data(spark: SparkSession, output_dir: str, start_date: str, end_date: str):
    sy, sm = start_date.split('-')
    ey, em = end_date.split('-')
    tmp_df = []
    for year in range(int(sy), int(ey)+1):
        for month in range(1, 13):
            if year == int(sy) and month < int(sm):
                continue
            if year == int(ey) and month > int(em):
                continue
            file_path = f'{output_dir}/fhvhv_tripdata_{year}-{str(month).zfill(2)}.parquet'
            tmp_df.append(spark.read.parquet(file_path))

    if len(tmp_df) == 1:
        return tmp_df[0]
    else:
        tmp = tmp_df[0]
        for idx in range(1, len(tmp_df)):
            tmp = tmp.union(tmp_df[idx])
        return tmp

## Run SparkSession

In [5]:
conf = SparkConf() \
    .setAppName("NYC Taxi Data Analysis") \
    .setMaster("spark://spark-master:7077") \
    .set("spark.executor.memory", "20g")

sc = SparkContext(conf=conf)
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/04 04:26:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
output_dir = "hdfs://spark-master:9000/data"
result_dir = "hdfs://spark-master:9000/result"
start_date = "2023-01"
end_date = "2023-01"

In [7]:
# load dataframe from hdfs
df = merge_data(spark, output_dir, start_date, end_date)

                                                                                

In [8]:
# extract cols
df = df.select(df.pickup_datetime, df.trip_miles, df.base_passenger_fare)

In [9]:
# remove nan values
df = df.dropna(subset=["pickup_datetime", "trip_miles", "base_passenger_fare"])

In [10]:
# remove invalid values
df = df.filter((df.trip_miles > 0) & (df.base_passenger_fare > 0))

In [11]:
df.cache()

DataFrame[pickup_datetime: timestamp_ntz, trip_miles: double, base_passenger_fare: double]

In [12]:
# filtering trip_miles upper than 30 miles
filtered_df = df.filter(df.trip_miles > 50)
filtered_df.show(5)



+-------------------+----------+-------------------+
|    pickup_datetime|trip_miles|base_passenger_fare|
+-------------------+----------+-------------------+
|2023-01-01 00:44:46|     51.23|             167.74|
|2023-01-01 00:16:59|     52.71|             146.72|
|2023-01-01 00:19:43|    57.256|             135.21|
|2023-01-01 00:26:12|     82.85|             247.89|
|2023-01-01 00:52:31|     50.46|             441.36|
+-------------------+----------+-------------------+
only showing top 5 rows



                                                                                

In [13]:
# count total numter of trips
print(df.count())



18462090


                                                                                

In [14]:
# calculate average trip distance
print(df.agg(F.avg(F.col("trip_miles"))).collect()[0][0])

4.8704881980318335


In [15]:
print(df.agg(F.sum(F.col("base_passenger_fare"))).collect()[0][0])

398359394.00305015


### Preprocess Weather dataframe

In [16]:
# load weather csv
weather_df = spark.read.csv(output_dir+"/2023_weather.csv")

In [17]:
# remove first row
first_row = weather_df.first()
columns = [str(cell) for cell in first_row]

weather_df = weather_df.filter(weather_df['_c0'] != first_row[0])
weather_df = weather_df.toDF(*columns)

24/08/04 04:27:10 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [18]:
not_use_cols = ['NAME', 'REPORT_TYPE', 'CALL_SIGN', 'QUALITY_CONTROL', 'CIG', 'VIS', 'DEW', 'SLP', 'AA2', 'AA3', 'AB1', 'AD1', 'AE1', 'AH1', 'AH2', 'AH3', 
                'AH4', 'AH5', 'AH6', 'AI1', 'AI2', 'AI3', 'AI4', 'AI5', 'AI6', 'AJ1', 'AK1', 'AL1', 'AM1', 'AN1', 'AT1', 'AT2', 'AT3', 'AT4', 'AT5', 'AU1',
                'AU2', 'AW1', 'AW2', 'AW3', 'AX1', 'AX2', 'AX3', 'AX4', 'GA1', 'GA2', 'GA3', 'GD1', 'GD2', 'GD3', 'GE1', 'GF1', 'KA1', 'KA2', 'KB1', 'KB2',
                'KB3', 'KC1', 'KC2', 'KD1', 'KD2', 'KE1', 'KG1', 'KG2', 'MA1', 'MD1', 'MF1', 'MG1', 'MH1', 'MK1', 'MW1', 'OC1', 'OD1', 'OE1', 'OE2', 'OE3',
                'RH1', 'RH2', 'RH3', 'WA1', 'REM', 'EQD', 'SOURCE', 'LATITUDE', 'LONGITUDE', 'ELEVATION']

In [19]:
weather_df = weather_df.drop(*not_use_cols)

In [20]:
weather_df.show(5)

+-----------+-------------------+--------------+-------+-----------+
|    STATION|               DATE|           WND|    TMP|        AA1|
+-----------+-------------------+--------------+-------+-----------+
|72505394728|2023-01-01T00:06:00|999,9,C,0000,5|+0100,5|01,0000,2,1|
|72505394728|2023-01-01T00:18:00|030,5,N,0015,5|+0100,5|01,0000,2,1|
|72505394728|2023-01-01T00:31:00|999,9,C,0000,5|+0100,5|01,0002,3,1|
|72505394728|2023-01-01T00:44:00|999,9,C,0000,5|+0100,5|01,0005,3,1|
|72505394728|2023-01-01T00:49:00|999,9,C,0000,5|+0100,5|       null|
+-----------+-------------------+--------------+-------+-----------+
only showing top 5 rows



In [21]:
# TMP / WND 전처리
weather_df = weather_df.withColumn('Temperature(C)', F.regexp_extract('TMP', r'(\+|-)?\d{4}', 0).cast('int') / 10) # 섭씨 온도
weather_df = weather_df.withColumn('Wind_Power(m/s)', F.regexp_extract('WND', r'\d{3},\d,[A-Z],(\d{4})', 1).cast('int') / 10) # 풍속

In [22]:
# UTC => 실제 뉴욕 시간으로 변경
weather_df = weather_df.withColumn('DATE', F.col('DATE').cast('timestamp'))

In [23]:
# 강수량 전처리
weather_df = weather_df.withColumn('RAIN_PER_HOUR', F.regexp_extract('AA1', r'\d{2},(\d{4})', 1).cast('int') / 10) # 강수량

In [24]:
# 일별 시간대 컬럼 추가
weather_df = weather_df.withColumn('DAY', F.dayofmonth(F.col('DATE')))
weather_df = weather_df.withColumn('HOUR', F.hour(F.col('DATE')))
weather_df = weather_df.withColumn('MONTH', F.month(F.col('DATE'))) 
weather_df = weather_df.withColumn('Year', F.year(F.col('DATE')))

In [25]:
# DATE가 동일한 컬럼 제거
print("Before count:", weather_df.count())
weather_df = weather_df.dropDuplicates(['DATE'])
print("After count:", weather_df.count())

Before count: 11842
After count: 11824


In [26]:
# temperature 결측치 처리
weather_df = weather_df.withColumn('Temperature(C)', F.when(F.col('Temperature(C)') == 999.9, None).otherwise(F.col('Temperature(C)')))
window_spec = Window.partitionBy('STATION').orderBy('DATE').rowsBetween(-3, 3)
weather_df = weather_df.withColumn('Temperature(C)', F.when(F.col('Temperature(C)').isNull(), F.avg('Temperature(C)').over(window_spec)).otherwise(F.col('Temperature(C)')))

In [27]:
# wind_power 결측치 처리
weather_df = weather_df.withColumn('Wind_Power(m/s)', F.when(F.col('Wind_Power(m/s)') == 999.9, None).otherwise(F.col('Wind_Power(m/s)')))
window_spec = Window.partitionBy('STATION').orderBy('DATE').rowsBetween(-3, 3)
weather_df = weather_df.withColumn('Wind_Power(m/s)', F.when(F.col('Wind_Power(m/s)').isNull(), F.avg('Wind_Power(m/s)').over(window_spec)).otherwise(F.col('Wind_Power(m/s)')))

In [28]:
# RAIN_PER_HOUR
weather_df = weather_df.withColumn('RAIN_PER_HOUR', F.when(F.col('RAIN_PER_HOUR') == 999.9, None).otherwise(F.col('RAIN_PER_HOUR')))
window_spec = Window.partitionBy('STATION').orderBy('DATE').rowsBetween(-3, 3)
weather_df = weather_df.withColumn('RAIN_PER_HOUR', F.when(F.col('RAIN_PER_HOUR').isNull(), F.avg('RAIN_PER_HOUR').over(window_spec)).otherwise(F.col('RAIN_PER_HOUR')))

In [29]:
# aggregation by Hour
weather_avg_df = weather_df.groupBy('Year', 'Month', 'Day', 'Hour').agg(
    F.round(F.avg('Temperature(C)'), 2).alias('Avg_temp(C)'),
    F.round(F.avg('Wind_Power(m/s)'), 2).alias('Avg_wind_power(m/s)'),
    F.round(F.avg('RAIN_PER_HOUR'), 2).alias('Avg_rain_per_hour(mm)')
).orderBy('Year', 'Month', 'Day', 'Hour')

### Merge Two Dataframes

In [30]:
# add cols
df = df.withColumn('dfYear', F.year(F.col('pickup_datetime')))
df = df.withColumn('dfMonth', F.month(F.col('pickup_datetime')))
df = df.withColumn('dfDay', F.dayofmonth(F.col('pickup_datetime')))
df = df.withColumn('dfHour', F.hour(F.col('pickup_datetime')))

In [31]:
joined_df = df.join(weather_avg_df, (df.dfYear==weather_avg_df.Year) & (df.dfMonth==weather_avg_df.Month) & (df.dfDay==weather_avg_df.Day) & (df.dfHour==weather_avg_df.Hour), how='left')

In [32]:
joined_df.show(5)

+-------------------+----------+-------------------+------+-------+-----+------+----+-----+---+----+-----------+-------------------+---------------------+
|    pickup_datetime|trip_miles|base_passenger_fare|dfYear|dfMonth|dfDay|dfHour|Year|Month|Day|Hour|Avg_temp(C)|Avg_wind_power(m/s)|Avg_rain_per_hour(mm)|
+-------------------+----------+-------------------+------+-------+-----+------+----+-----+---+----+-----------+-------------------+---------------------+
|2023-01-01 00:19:38|      0.94|              25.95|  2023|      1|    1|     0|2023|    1|  1|   0|       10.0|               0.25|                 0.32|
|2023-01-01 00:58:39|      2.78|              60.14|  2023|      1|    1|     0|2023|    1|  1|   0|       10.0|               0.25|                 0.32|
|2023-01-01 00:20:27|      8.81|              24.37|  2023|      1|    1|     0|2023|    1|  1|   0|       10.0|               0.25|                 0.32|
|2023-01-01 00:41:05|      0.67|               13.8|  2023|      1|   

In [33]:
joined_df = joined_df.select(F.col("pickup_datetime"), F.col("trip_miles"), F.col("base_passenger_fare"), F.col("Year"), F.col("Month"), F.col("Day"), F.col("Hour"), F.col("Avg_wind_power(m/s)"), F.col("Avg_rain_per_hour(mm)"), F.col("Avg_temp(C)"))

### Extract sample data and save to hdfs

In [34]:
# extract sample
sample_data = joined_df.limit(10).collect()
print("Sample Data:")
for row in sample_data:
    print(row)

Sample Data:
Row(pickup_datetime=datetime.datetime(2023, 1, 1, 0, 19, 38), trip_miles=0.94, base_passenger_fare=25.95, Year=2023, Month=1, Day=1, Hour=0, Avg_wind_power(m/s)=0.25, Avg_rain_per_hour(mm)=0.32, Avg_temp(C)=10.0)
Row(pickup_datetime=datetime.datetime(2023, 1, 1, 0, 58, 39), trip_miles=2.78, base_passenger_fare=60.14, Year=2023, Month=1, Day=1, Hour=0, Avg_wind_power(m/s)=0.25, Avg_rain_per_hour(mm)=0.32, Avg_temp(C)=10.0)
Row(pickup_datetime=datetime.datetime(2023, 1, 1, 0, 20, 27), trip_miles=8.81, base_passenger_fare=24.37, Year=2023, Month=1, Day=1, Hour=0, Avg_wind_power(m/s)=0.25, Avg_rain_per_hour(mm)=0.32, Avg_temp(C)=10.0)
Row(pickup_datetime=datetime.datetime(2023, 1, 1, 0, 41, 5), trip_miles=0.67, base_passenger_fare=13.8, Year=2023, Month=1, Day=1, Hour=0, Avg_wind_power(m/s)=0.25, Avg_rain_per_hour(mm)=0.32, Avg_temp(C)=10.0)
Row(pickup_datetime=datetime.datetime(2023, 1, 1, 0, 52, 47), trip_miles=4.38, base_passenger_fare=20.49, Year=2023, Month=1, Day=1, Hour

In [155]:
# save to hdfs
joined_df = joined_df.coalesce(1)
joined_df.write.mode("overwrite").parquet(result_dir+"/joined_df")

                                                                                