In [None]:
# Necessary pySpark imports

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *
import  pyspark.sql.functions as F
from pyspark.sql.functions import concat_ws

In [None]:
# create a spark session, and read from traffic and weather data

spark = SparkSession.builder.appName("traffic").getOrCreate()
df_traffic = spark.read.load('s3://bigdata-seg/data-traffic/traffic.parquet')
df_weather = spark.read.load('s3://bigdata-seg/data-weather/weatherbit_weather_2010_2022.parquet')

In [None]:
# use spark withColumn feature to transform RDD and create new columns for Traffic Data

df_traffic=df_traffic.withColumn('hour_of_day',F.hour("measurement_tstamp"))
df_traffic=df_traffic.withColumn('year', F.year("measurement_tstamp"))
df_traffic=df_traffic.withColumn('month', F.month("measurement_tstamp"))
df_traffic=df_traffic.withColumn("day",F.date_format(F.col("measurement_tstamp"), "D"))
df_traffic=df_traffic.withColumn("window_of_day",(F.col('hour_of_day').cast('integer')/6).cast('integer') +1)
df_traffic=df_traffic.withColumn("day_of_week",F.date_format(F.col("measurement_tstamp"), "E"))
df_traffic=df_traffic.withColumn("window_id", concat_ws(".", "year", "day", "window_of_day"))

In [None]:
# use spark withColumn feature to transform RDD and create new columns for Weather Data

df_weather=df_weather.withColumn('hour_of_day',F.hour("timestamp_local"))
df_weather=df_weather.withColumn('year', F.year("timestamp_local"))
df_weather=df_weather.withColumn('month', F.month("timestamp_local"))
df_weather=df_weather.withColumn("day",F.date_format(F.col("timestamp_local"), "D"))
df_weather=df_weather.withColumn("window_of_day",(F.col('hour_of_day').cast('integer')/6).cast('integer') +1)
df_weather=df_weather.withColumn("day_of_week",F.date_format(F.col("timestamp_local"), "E"))
df_weather=df_weather.withColumn("window_id", concat_ws(".", "year", "day", "window_of_day"))

In [None]:
# use coordinates for Davison County (geojson.io) to filter Weather Queries by Relevance

df_weather = df_weather.filter(df_fil.gps_coordinate_latitude >= 36.057)
df_weather = df_weather.filter(df_fil.gps_coordinate_latitude <= 36.306)
df_weather = df_weather.filter(df_fil.gps_coordinate_longitude >= -86.937)
df_weather = df_weather.filter(df_fil.gps_coordinate_longitude <= -86.621)

In [None]:
# write changes to new S3 path

df_weather.write.parquet(path='s3://bigdata-seg/weather-data-filtered')
df_traffic.write.parquet(path='s3://bigdata-seg/traffic-data-filtered')