In [1]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn
import findspark
findspark.init('/usr/local/spark')

In [2]:
spark = SparkSession\
        .builder\
        .master("yarn")\
        .config("spark.driver.memory", "16g")\
        .config("spark.driver.maxResultSize", "12g")\
        .config('spark.executor.instances','16')\
        .config('spark.executor.memory','9G')\
        .appName("feature engineering")\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-05-31 09:41:48,875 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

In [4]:
spark.sparkContext.appName
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

In [5]:
import pyspark.pandas as ps
ps.set_option("compute.default_index_type", "distributed")



In [6]:
# read parquet
df = ps.read_parquet(f'hdfs://nncluster:/data/etag/m05/m05_holidays.parquet/')

                                                                                

In [7]:
# read holiday csv
df_h = ps.read_csv(f'hdfs://nncluster:/data/etag/m05/2015_2021_holidays_order.csv')
# df_h drop useless columns
df_h = df_h.drop(['_c0','Year'],axis=1)
# pick specific holiday and preprocess
df_h = df_h[df_h['Holiday'].isin(['Lunar New Year','dragon','moon','New Year','national'])]
df_h = df_h.rename(columns = {"Date Start":"Date_Start"})

                                                                                

In [8]:
# add date string
df["Date_Start"] = ps.to_datetime(df["Time"],format = "%Y/%m/%d").dt.strftime('%Y%m%d')

In [9]:
# merge order df to main df
df = df.merge(df_h,on='Date_Start')
df.head()

                                                                                

Unnamed: 0,Time,S-Station,E-Station,V-Type,Speed,Count,year,Date_Start,Holiday,Order
68719476736,2019/02/07 02:35,03F2614N,03F2535N,42,0,0,2019,20190207,Lunar New Year,5
68719476737,2019/02/07 02:35,03F2614N,03F2535N,5,0,0,2019,20190207,Lunar New Year,5
68719476738,2019/02/07 02:35,03F2447S,03F2535S,31,111,19,2019,20190207,Lunar New Year,5
68719476739,2019/02/07 02:35,03F2447S,03F2535S,32,113,9,2019,20190207,Lunar New Year,5
68719476740,2019/02/07 02:35,03F2447S,03F2535S,41,0,0,2019,20190207,Lunar New Year,5


In [10]:
# save a parquet(just in case)
df.to_parquet("hdfs://nncluster:/data/etag/m05/m05_holidays_step1.parquet", partition_cols="year")
df = ps.read_parquet(f'hdfs://nncluster:/data/etag/m05/m05_holidays_step1.parquet/')

                                                                                

In [11]:
# df2-6 preprocess
df_d = ps.read_csv(f'hdfs://nncluster:/data/etag/m05/m05_DaysPriorToDragon.csv')
df_lny = ps.read_csv(f'hdfs://nncluster:/data/etag/m05/m05_DaysPriorToLunar.csv')
df_m = ps.read_csv(f'hdfs://nncluster:/data/etag/m05/m05_DaysPriorToMoon.csv')
df_n = ps.read_csv(f'hdfs://nncluster:/data/etag/m05/m05_DaysPriorToNation.csv')
df_ny = ps.read_csv(f'hdfs://nncluster:/data/etag/m05/m05_DaysPriorToNewYear.csv')

df_d["Date_Start"] = ps.to_datetime(df_d["Time"],format = "%Y/%m/%d").dt.strftime('%Y%m%d')
df_d['Holiday'] = 'dragon'
df_d['Order'] = -1
df_d['year'] = df_d['Date_Start'].apply(lambda x:str(x)[:4])

df_lny["Date_Start"] = ps.to_datetime(df_lny["Time"],format = "%Y/%m/%d").dt.strftime('%Y%m%d')
df_lny['Holiday'] = 'Lunar New Year'
df_lny['Order'] = -1
df_lny['year'] = df_lny['Date_Start'].apply(lambda x:str(x)[:4])

df_m["Date_Start"] = ps.to_datetime(df_m["Time"],format = "%Y/%m/%d").dt.strftime('%Y%m%d')
df_m['Holiday'] = 'moon'
df_m['Order'] = -1
df_m['year'] = df_m['Date_Start'].apply(lambda x:str(x)[:4])

df_n["Date_Start"] = ps.to_datetime(df_n["Time"],format = "%Y/%m/%d").dt.strftime('%Y%m%d')
df_n['Holiday'] = 'national'
df_n['Order'] = -1
df_n['year'] = df_n['Date_Start'].apply(lambda x:str(x)[:4])

df_ny["Date_Start"] = ps.to_datetime(df_ny["Time"],format = "%Y/%m/%d").dt.strftime('%Y%m%d')
df_ny['Holiday'] = 'New Year'
df_ny['Order'] = -1
df_ny['year'] = df_ny['Date_Start'].apply(lambda x:str(x)[:4])

                                                                                

In [14]:
# concat dataframes
for i in [df_d,df_lny,df_m,df_n,df_ny]:
    df = ps.concat([df,i])
df.head()

                                                                                

Unnamed: 0,Time,S-Station,E-Station,V-Type,Speed,Count,Date_Start,Holiday,Order,year
8589934592,2019/02/07 02:35,03F2614N,03F2535N,42,0,0,20190207,Lunar New Year,5,2019
8589934593,2019/02/07 02:35,03F2614N,03F2535N,5,0,0,20190207,Lunar New Year,5,2019
8589934594,2019/02/07 02:35,03F2447S,03F2535S,31,111,19,20190207,Lunar New Year,5,2019
8589934595,2019/02/07 02:35,03F2447S,03F2535S,32,113,9,20190207,Lunar New Year,5,2019
8589934596,2019/02/07 02:35,03F2447S,03F2535S,41,0,0,20190207,Lunar New Year,5,2019


In [15]:
# save a parquet(just in case)
df.to_parquet("hdfs://nncluster:/data/etag/m05/m05_holidays_step2.parquet", partition_cols="year")
df = ps.read_parquet(f'hdfs://nncluster:/data/etag/m05/m05_holidays_step2.parquet/')

                                                                                

In [16]:
# remane some stations
df.loc[(df['S-Station']=='01F0509S'),'S-Station'] = '01F0511S'
df.loc[(df['S-Station']=='01F0509N'),'S-Station'] = '01F0511N'
df.loc[(df['E-Station']=='01F0509S'),'E-Station'] = '01F0511S'
df.loc[(df['E-Station']=='01F0509N'),'E-Station'] = '01F0511N'

In [17]:
# add Segment feature
df["Segment"] = df["S-Station"]+"-"+df["E-Station"]

In [18]:
# delete diff direction and road
def filters(x):
    if x[:2] != x[9:11]:
        return 0
    elif x[7] != x[-1]:
        return 0
    else:
        return 1

# keep wanted rows
df['Keep'] = df["Segment"].apply(lambda x : filters(x))
df = df[df['Keep']==1]

In [19]:
# add Direction feature
df["Direction"] = df["S-Station"].apply(lambda x: 0 if x[-1] == "S" else 1)

                                                                                

In [20]:
# add hour feature
df["Hour"] = df["Time"].apply(lambda x: int(x[-5:-3]))

                                                                                

In [21]:
# set speed = 0 to na and drop it
df.loc[df["Speed"]==0,"Speed"] = np.nan
df = df.dropna(subset="Speed")

In [22]:
# groupby every hour data then sum count and avg speeds
df = df.groupby(['Hour','Date_Start','Segment', 'Direction','V-Type','Holiday','Order','year']).agg({"Count":"sum","Speed":"mean"}).reset_index()

In [23]:
# save a parquet(just in case)
df.to_parquet("hdfs://nncluster:/data/etag/m05/m05_holidays_step3.parquet", partition_cols="year")
df = ps.read_parquet(f'hdfs://nncluster:/data/etag/m05/m05_holidays_step3.parquet/')

                                                                                

In [24]:
# add month weekday feature
date = ps.to_datetime(df["Date_Start"],format = "%Y%m%d")

df["Month"] = date.dt.strftime("%m").astype(int)
df["Wday"] = date.dt.strftime("%w").astype(int)

In [25]:
# add segment category feature
df["Seg_cat"] = df["Segment"]
df["Seg_cat"] = df["Seg_cat"].astype("category").cat.codes.astype(int)

In [26]:
# sort data by date & segment & hour
df = df.sort_values(["Date_Start","Seg_cat","Hour"])

In [27]:
# save a parquet
df.to_parquet("hdfs://nncluster:/data/etag/m05/m05_holidays_final.parquet", partition_cols="year")
df = ps.read_parquet(f'hdfs://nncluster:/data/etag/m05/m05_holidays_final.parquet/')

                                                                                

In [28]:
df.head()

Unnamed: 0,Hour,Date_Start,Segment,Direction,V-Type,Holiday,Order,Count,Speed,Month,Wday,Seg_cat,year
25769803776,0,20160101,01F0005S-01F0017S,0,32,New Year,0,88,82.0,1,5,0,2016
25769803777,0,20160101,01F0005S-01F0017S,0,41,New Year,0,10,82.428571,1,5,0,2016
25769803778,0,20160101,01F0005S-01F0017S,0,42,New Year,0,7,85.833333,1,5,0,2016
25769803779,0,20160101,01F0005S-01F0017S,0,31,New Year,0,426,85.75,1,5,0,2016
25769803780,1,20160101,01F0005S-01F0017S,0,41,New Year,0,1,72.0,1,5,0,2016


In [None]:
# seperate each holiday to parquet
df_lny = df[df['Holiday']=='Lunar New Year']
df_lny.to_parquet("hdfs://nncluster:/data/etag/m05/holidays_lunar_new_year_0521_AFE.parquet", partition_cols="year")

df_d = df[df['Holiday']=='dragon']
df_d.to_parquet("hdfs://nncluster:/data/etag/m05/holidays_dragon_0521_AFE.parquet", partition_cols="year")

df_m = df[df['Holiday']=='moon']
df_m.to_parquet("hdfs://nncluster:/data/etag/m05/holidays_moon_0521_AFE.parquet", partition_cols="year")

df_na = df[df['Holiday']=='national']
df_na.to_parquet("hdfs://nncluster:/data/etag/m05/holidays_national_0521_AFE.parquet", partition_cols="year")

df_ny = df[df['Holiday']=='New Year']
df_ny.to_parquet("hdfs://nncluster:/data/etag/m05/holidays_new_year_0521_AFE.parquet", partition_cols="year")

In [None]:
df_lny = ps.read_parquet("hdfs://nncluster:/data/etag/m05/holidays_lunar_new_year_0521_AFE.parquet")
pdf_lny = df_lny.to_pandas()
pdf_lny.to_csv('holidays_lunar_new_year_0521_AFE.csv')


df_d = ps.read_parquet("hdfs://nncluster:/data/etag/m05/holidays_dragon_0521_AFE.parquet")
pdf_d = df_d.to_pandas()
pdf_d.to_csv('holidays_dragon_0521_AFE.csv')

df_m = ps.read_parquet("hdfs://nncluster:/data/etag/m05/holidays_moon_0521_AFE.parquet")
pdf_m = df_m.to_pandas()
pdf_m.to_csv('holidays_moon_0521_AFE.csv')

df_na = ps.read_parquet("hdfs://nncluster:/data/etag/m05/holidays_national_0521_AFE.parquet")
pdf_na = df_na.to_pandas()
pdf_na.to_csv('holidays_national_0521_AFE.csv')

df_ny = ps.read_parquet("hdfs://nncluster:/data/etag/m05/holidays_new_year_0521_AFE.parquet")
pdf_ny = df_ny.to_pandas()
pdf_ny.to_csv('holidays_new_year_0521_AFE.csv')