## Uploading Data the S3##
1. **Access S3 Service:**
   - Navigate to the AWS Management Console.
   - Under "All services," find and click on "S3" listed under the Storage category.

2. **Create a New Bucket:**
   - In the S3 console, click on "Create Bucket."
   - Provide the necessary details such as bucket name and region. 
   - Select the region as "US East (N. Virginia)" to avoid data transfer charges.
   - Click "Create Bucket" at the bottom of the screen.

3. **Verify Bucket Creation:**
   - A new bucket will now be visible in the S3 console. 
4. **Upload your data:**
   - click on your new bucket it and it upload at the top to import your dataset in the        bucket

## Creating an Amazon Athena Workgroup

1. **Navigate to Amazon Athena:**
   - Go to Amazon Athena in the AWS console.
   - Select "Workgroups" from the left menu and click on "Create workgroup."

2. **Set Up Workgroup:**
   - Ensure the region is set to "N. Virginia." Change it if necessary.
   - Fill out the configuration fields:
     - Name your workgroup.
     - Choose "Apache Spark" as the Analytics Engine.
     - Select "LabRole" as the IAM service role from "Additional Configurations."
   - Complete the setup by clicking on "Create workgroup."
 3. **Create Notebook:**
     - click on workgroup and hit "create notebook"
     - or if you want to upload this notebook hit "import File"
 



## Define the path to the S3 bucket and load traffic data

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when
from pyspark.sql.types import IntegerType, FloatType, StringType, TimestampType



# The S3 path to the Athena query results
s3_result_path = "s3://plingareddy6/us_congestion_2016_2022.csv/"
dft = spark.read.option("header", "true").csv(s3_result_path)
pandas_df = dft.limit(10).toPandas()
print(pandas_df)


Calculation started (calculation_id=aec77665-dc0f-ad60-c857-668191e73607) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
     ID Severity  ... Weather_Event Weather_Conditions
0   C-1        0  ...          None               Fair
1   C-2        0  ...          None               Fair
2   C-3        1  ...          None               Fair
3   C-4        0  ...          None      Mostly Cloudy
4   C-5        1  ...          None      Mostly Cloudy
5   C-6        0  ...          None      Mostly Cloudy
6   C-7        0  ...          None               Fair
7   C-8        1  ...          None               None
8   C-9        0  ...          None               None
9  C-10        0  ...          None               Fair

[10 rows x 30 columns]



## Print the schema of the traffic data DataFrame to review the data types and identify which columns need type casting.

In [2]:
# printing schema 
dft.printSchema()

Calculation started (calculation_id=16c77666-0241-1652-39ff-d6fc7474e0b6) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
root
 |-- ID: string (nullable = true)
 |-- Severity: string (nullable = true)
 |-- Start_Lat: string (nullable = true)
 |-- Start_Lng: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- Distance(mi): string (nullable = true)
 |-- DelayFromTypicalTraffic(mins): string (nullable = true)
 |-- DelayFromFreeFlowSpeed(mins): string (nullable = true)
 |-- Congestion_Speed: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- ZipCode: string (nullable = true)
 |-- LocalTimeZone: string (nullable = true)
 |-- WeatherStation_AirportCode: string (nullable = true)
 |-- WeatherTimeStamp: string (nullable = true)
 |-- Temperature(F): string (nullable = true)
 |-- WindChill(F): string (nullable = true)
 |-- Humidit

In [3]:
# casting columns for the traffic congestion DataFrame ('dft')
dft = dft \
    .withColumn("ID", col("ID").cast(StringType())) \
    .withColumn("Severity", col("Severity").cast(StringType())) \
    .withColumn("Start_Lat", col("Start_Lat").cast(FloatType())) \
    .withColumn("Start_Lng", col("Start_Lng").cast(FloatType())) \
    .withColumn("StartTime", col("StartTime").cast(TimestampType())) \
    .withColumn("EndTime", col("EndTime").cast(TimestampType())) \
    .withColumn("Distance(mi)", col("Distance(mi)").cast(FloatType())) \
    .withColumn("DelayFromTypicalTraffic(mins)", col("DelayFromTypicalTraffic(mins)").cast(IntegerType())) \
    .withColumn("DelayFromFreeFlowSpeed(mins)", col("DelayFromFreeFlowSpeed(mins)").cast(IntegerType())) \
    .withColumn("Congestion_Speed", col("Congestion_Speed").cast(StringType())) \
    .withColumn("Description", col("Description").cast(StringType())) \
    .withColumn("Street", col("Street").cast(StringType())) \
    .withColumn("City", col("City").cast(StringType())) \
    .withColumn("County", col("County").cast(StringType())) \
    .withColumn("State", col("State").cast(StringType())) \
    .withColumn("Country", col("Country").cast(StringType())) \
    .withColumn("ZipCode", col("ZipCode").cast(StringType())) \
    .withColumn("LocalTimeZone", col("LocalTimeZone").cast(StringType())) \
    .withColumn("WeatherStation_AirportCode", col("WeatherStation_AirportCode").cast(StringType())) \
    .withColumn("WeatherTimeStamp", col("WeatherTimeStamp").cast(TimestampType())) \
    .withColumn("Temperature(F)", col("Temperature(F)").cast(FloatType())) \
    .withColumn("WindChill(F)", col("WindChill(F)").cast(FloatType())) \
    .withColumn("Humidity(%)", col("Humidity(%)").cast(IntegerType())) \
    .withColumn("Pressure(in)", col("Pressure(in)").cast(FloatType())) \
    .withColumn("Visibility(mi)", col("Visibility(mi)").cast(FloatType())) \
    .withColumn("WindDir", col("WindDir").cast(StringType())) \
    .withColumn("WindSpeed(mph)", col("WindSpeed(mph)").cast(FloatType())) \
    .withColumn("Precipitation(in)", col("Precipitation(in)").cast(FloatType())) \
    .withColumn("Weather_Event", col("Weather_Event").cast(StringType())) \
    .withColumn("Weather_Conditions", col("Weather_Conditions").cast(StringType()))


Calculation started (calculation_id=e0c77666-075c-920a-39bf-7ede528fe02b) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [4]:
dft.printSchema()

Calculation started (calculation_id=80c77666-0cd4-f190-2730-04e9d5611c1b) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
root
 |-- ID: string (nullable = true)
 |-- Severity: string (nullable = true)
 |-- Start_Lat: float (nullable = true)
 |-- Start_Lng: float (nullable = true)
 |-- StartTime: timestamp (nullable = true)
 |-- EndTime: timestamp (nullable = true)
 |-- Distance(mi): float (nullable = true)
 |-- DelayFromTypicalTraffic(mins): integer (nullable = true)
 |-- DelayFromFreeFlowSpeed(mins): integer (nullable = true)
 |-- Congestion_Speed: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- ZipCode: string (nullable = true)
 |-- LocalTimeZone: string (nullable = true)
 |-- WeatherStation_AirportCode: string (nullable = true)
 |-- WeatherTimeStamp: timestamp (nullable = true)
 |-- Temperature(F): float (nullable = true)
 |-- WindChill(F): float (nullable = true)
 |-- H

## Load Weather Data from S3 into DataFrame

In [5]:
#The S3 path to the Athena query results
s3_result_path = "s3://plingareddy6/WeatherEvents_Jan2016-Dec2022.csv/"
dfw = spark.read.option("header", "true").csv(s3_result_path)
dfw.show(5)

Calculation started (calculation_id=26c77666-0e1f-802d-4fdd-231b6a53c138) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
+-------+----+--------+-------------------+-------------------+-----------------+-----------+-----------+-----------+-----------+--------+--------+-----+-------+
|EventId|Type|Severity|     StartTime(UTC)|       EndTime(UTC)|Precipitation(in)|   TimeZone|AirportCode|LocationLat|LocationLng|    City|  County|State|ZipCode|
+-------+----+--------+-------------------+-------------------+-----------------+-----------+-----------+-----------+-----------+--------+--------+-----+-------+
|    W-1|Snow|   Light|2016-01-06 23:14:00|2016-01-07 00:34:00|              0.0|US/Mountain|       K04V|    38.0972|  -106.1689|Saguache|Saguache|   CO|  81149|
|    W-2|Snow|   Light|2016-01-07 04:14:00|2016-01-07 04:54:00|              0.0|US/Mountain|       K04V|    38.0972|  -106.1689|Saguache|Saguache|   CO|  81149|
|    W-3|Snow|   Light|2016-01-07 05:54:00|2016-01-07 15:34:00|             0.03|US/Mountain|       K04V|    38.0972|  -106.1689|Saguache|Saguache|   CO|  81149|
|    

In [6]:
# Print the schema of the DataFrame
dfw.printSchema()

Calculation started (calculation_id=78c77666-1f99-9db7-48ec-a1fbccab3a9f) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
root
 |-- EventId: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Severity: string (nullable = true)
 |-- StartTime(UTC): string (nullable = true)
 |-- EndTime(UTC): string (nullable = true)
 |-- Precipitation(in): string (nullable = true)
 |-- TimeZone: string (nullable = true)
 |-- AirportCode: string (nullable = true)
 |-- LocationLat: string (nullable = true)
 |-- LocationLng: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCode: string (nullable = true)



In [7]:
dfw = dfw \
    .withColumn("EventId", col("EventId").cast(StringType())) \
    .withColumn("Type", col("Type").cast(StringType())) \
    .withColumn("Severity", col("Severity").cast(StringType())) \
    .withColumn("StartTime(UTC)", col("StartTime(UTC)").cast(TimestampType())) \
    .withColumn("EndTime(UTC)", col("EndTime(UTC)").cast(TimestampType())) \
    .withColumn("Precipitation(in)", col("Precipitation(in)").cast(FloatType())) \
    .withColumn("TimeZone", col("TimeZone").cast(StringType())) \
    .withColumn("AirportCode", col("AirportCode").cast(StringType())) \
    .withColumn("LocationLat", col("LocationLat").cast(FloatType())) \
    .withColumn("LocationLng", col("LocationLng").cast(FloatType())) \
    .withColumn("City", col("City").cast(StringType())) \
    .withColumn("County", col("County").cast(StringType())) \
    .withColumn("State", col("State").cast(StringType())) \
    .withColumn("ZipCode", col("ZipCode").cast(StringType()))

Calculation started (calculation_id=e0c77666-20d6-27dd-a3eb-26f5c4a2ab30) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [8]:
dfw.printSchema()

Calculation started (calculation_id=eec77666-25f4-7143-bff8-e7818cdc63a5) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
root
 |-- EventId: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Severity: string (nullable = true)
 |-- StartTime(UTC): timestamp (nullable = true)
 |-- EndTime(UTC): timestamp (nullable = true)
 |-- Precipitation(in): float (nullable = true)
 |-- TimeZone: string (nullable = true)
 |-- AirportCode: string (nullable = true)
 |-- LocationLat: float (nullable = true)
 |-- LocationLng: float (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCode: string (nullable = true)



## Data Cleaning - Handling Missing Values and Dropping Columns

In [9]:
# Dropping columns from the 'dft' DataFrame
dft = dft.drop('ID', 'Description','Street', 'Country', 'WeatherStation_AirportCode')

# Dropping columns from the 'dfw' DataFrame
dfw = dfw.drop('EventId', 'AirportCode','LocationLat', 'LocationLng')

Calculation started (calculation_id=bec77666-2b44-ae52-5671-ea240725fa32) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [10]:
def get_missing(df):
    # counting the number of NULLs
    agg_expr = [count(when(col(c).isNull(), c)).alias(c) for c in df.columns]
    missing_count_df = df.select(*agg_expr)
    
    # calculate percentages
    total_rows = df.count()
    percent_expr = [(col(c) / total_rows).alias(c) for c in missing_count_df.columns]
    
    # calculate missing percentages
    missing_percent_df = missing_count_df.select(*percent_expr)
    
     # combine Total and Percent into one DataFrame
    missing_count_pd = missing_count_df.toPandas().transpose()
    missing_count_pd.columns = ['Total']
    missing_percent_pd = (missing_percent_df.toPandas() * 100).transpose()  # Convert to percentage
    missing_percent_pd.columns = ['Percent']
    missing_data = missing_count_pd.join(missing_percent_pd)
    missing_data = missing_data.sort_values(by='Total', ascending=False)
    
    return missing_data




Calculation started (calculation_id=d4c77666-3068-4dc2-4580-14e2796891fb) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [11]:
missing_data_dft = get_missing(dft)
print(missing_data_dft)

Calculation started (calculation_id=d2c77666-35b0-be3b-5743-73097bc9f24f) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
                                  Total    Percent
Weather_Event                  31165872  93.579407
Precipitation(in)              13753412  41.296330
WindChill(F)                   12655176  37.998740
WindSpeed(mph)                  3159466   9.486690
Visibility(mi)                   973286   2.922412
Weather_Conditions               949198   2.850085
WindDir                          789689   2.371139
Humidity(%)                      766202   2.300617
Temperature(F)                   729827   2.191396
Pressure(in)                     682662   2.049778
WeatherTimeStamp                 534357   1.604473
LocalTimeZone                     30837   0.092592
StartTime                         30826   0.092559
EndTime                           30826   0.092559
ZipCode                           22773   0.068379
City                               2164   0.006498
State                                12   0.000036
County                               11   0.000033
Start_La

In [12]:
missing_data_dfw = get_missing(dfw)
print(missing_data_dfw)

Calculation started (calculation_id=a2c77666-9fae-1a71-6323-23b946027638) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
                   Total   Percent
ZipCode            69199  0.802104
City               16912  0.196032
Type                   0  0.000000
Severity               0  0.000000
StartTime(UTC)         0  0.000000
EndTime(UTC)           0  0.000000
Precipitation(in)      0  0.000000
TimeZone               0  0.000000
County                 0  0.000000
State                  0  0.000000



In [13]:
# dropping specified columns
df_drop1 = dft.drop('Weather_Event')
df_drop2 = df_drop1.drop('Precipitation(in)')
df_drop3 = df_drop2.drop('WindChill(F)')
df_drop4 = df_drop3.drop('WindSpeed(mph)')

# dropping rows with any null values
df_drop1 = df_drop1.dropna()
df_drop2 = df_drop2.dropna()
df_drop3 = df_drop3.dropna()
df_drop4 = df_drop4.dropna()

def data_saved(no_null_df, original_df):
    original_count = original_df.count()
    no_null_count = no_null_df.count()
    percent_preserved = (no_null_count / original_count) * 100
    print(f'Preserves {percent_preserved}% of data points or {no_null_count} rows')


data_saved(df_drop1, dft)
data_saved(df_drop2, dft)
data_saved(df_drop3, dft)
data_saved(df_drop4, dft)


Calculation started (calculation_id=c2c77666-ad0c-fac5-4ba9-7f3db75ca6c5) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
Preserves 53.88139495563308% of data points or 17944767 rows
Preserves 61.2174999314651% of data points or 20387998 rows
Preserves 89.10618447841966% of data points or 29676101 rows
Preserves 95.65877563967234% of data points or 31858389 rows



In [14]:
dft_drop_rows = dft.dropna()
data_saved(dft_drop_rows, dft)

Calculation started (calculation_id=a2c77667-53f6-2034-3d61-0faa2ca50b59) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
Preserves 2.736708965737323% of data points or 911439 rows



In [15]:
# dropping rows with any null values in the 'dfw' DataFrame
dfw_drop_rows = dfw.dropna()
data_saved(dfw_drop_rows, dfw)


Calculation started (calculation_id=36c77667-7e06-1e94-ab8f-0f2402559b5b) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
Preserves 99.19789558141878% of data points or 8557982 rows



In [16]:
def count_null_values_spark(df):
    null_counts = [count(when(col(c).isNull(), c)).alias(c) for c in df.columns]
    result = df.select(*null_counts).collect()[0]
    total_nulls = sum(result.asDict().values())
    return total_nulls
print('Number of NaN values in dft after cleanup:', count_null_values_spark(df_drop4))
print('Number of NaN values in dfw after cleanup:', count_null_values_spark(dfw_drop_rows))



Calculation started (calculation_id=0ec77667-8761-ab37-a636-35dacf1a7065) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
Number of NaN values in dft after cleanup: 0
Number of NaN values in dfw after cleanup: 0



In [17]:
dft = df_drop4
dft.printSchema()

Calculation started (calculation_id=54c77667-d9a2-c74e-b76e-b2857824b0dc) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
root
 |-- Severity: string (nullable = true)
 |-- Start_Lat: float (nullable = true)
 |-- Start_Lng: float (nullable = true)
 |-- StartTime: timestamp (nullable = true)
 |-- EndTime: timestamp (nullable = true)
 |-- Distance(mi): float (nullable = true)
 |-- DelayFromTypicalTraffic(mins): integer (nullable = true)
 |-- DelayFromFreeFlowSpeed(mins): integer (nullable = true)
 |-- Congestion_Speed: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCode: string (nullable = true)
 |-- LocalTimeZone: string (nullable = true)
 |-- WeatherTimeStamp: timestamp (nullable = true)
 |-- Temperature(F): float (nullable = true)
 |-- Humidity(%): integer (nullable = true)
 |-- Pressure(in): float (nullable = true)
 |-- Visibility(mi): float (nullable = true)
 |-- WindDir: string (nullable = true)
 |-- Weather_Conditions: string (nullable = true)



In [18]:
dfw = dfw_drop_rows
dfw.printSchema()

Calculation started (calculation_id=6cc77667-deab-3ce1-5cf0-9efa3265728b) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
root
 |-- Type: string (nullable = true)
 |-- Severity: string (nullable = true)
 |-- StartTime(UTC): timestamp (nullable = true)
 |-- EndTime(UTC): timestamp (nullable = true)
 |-- Precipitation(in): float (nullable = true)
 |-- TimeZone: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCode: string (nullable = true)



## Formatting Time Column

In [19]:
from pyspark.sql.functions import to_date, date_format

# For dfw DataFrame
dfw = dfw.withColumn("StartDate(UTC)", to_date("StartTime(UTC)")) \
         .withColumn("EndDate(UTC)", to_date("EndTime(UTC)")) \
         .withColumn("StartTime(UTC)_Time", date_format("StartTime(UTC)", "HH:mm:ss")) \
         .withColumn("EndTime(UTC)_Time", date_format("EndTime(UTC)", "HH:mm:ss")) \
         .drop("StartTime(UTC)", "EndTime(UTC)")

# For dft DataFrame
dft = dft.withColumn("StartDate", to_date("StartTime")) \
         .withColumn("EndDate", to_date("EndTime")) \
         .withColumn("StartTime_Time", date_format("StartTime", "HH:mm:ss")) \
         .withColumn("EndTime_Time", date_format("EndTime", "HH:mm:ss")) \
         .drop("StartTime", "EndTime")


Calculation started (calculation_id=92c77667-dff1-69ce-1cae-1ba0015f49b7) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [20]:
# adding suffix to dfw columns
dfw_suffixed = dfw.toDF(*(c + '_w' for c in dfw.columns))

# adding suffix to dft columns
dft_suffixed = dft.toDF(*(c + '_t' for c in dft.columns))

Calculation started (calculation_id=fec77667-e535-b437-262b-734d8e46b74e) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


## Joining the traffic and weather data 

In [21]:
# defining the join conditions
cond = [
    (dft_suffixed["County_t"] == dfw_suffixed["County_w"]),
    (dft_suffixed["State_t"] == dfw_suffixed["State_w"]),
    (dft_suffixed["City_t"] == dfw_suffixed["City_w"]),
    (dft_suffixed["StartDate_t"] == dfw_suffixed["StartDate(UTC)_w"]),
    (dft_suffixed["ZipCode_t"] == dfw_suffixed["ZipCode_w"])
]

# join
dft_suffixed = dft_suffixed.join(dfw_suffixed, cond, 'inner')



Calculation started (calculation_id=2ec77667-ea8a-8798-1d6f-2c630c1f3bfd) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


## Save Merged DataFrame to S3 (Commented Out)

In [1]:
# Define the S3 path
#s3_path = "s3://plingareddy6/merged3.csv"

# Save the joined DataFrame to CSV in S3
# Coalesce the DataFrame into 1 partition (use cautiously)
#dft_suffixed.coalesce(1).write.mode("overwrite").option("header", "true").csv(s3_path)



## Load merged dataset and check for missing values

In [22]:
# The S3 path to the Athena query results
s3_result_path = "s3://plingareddy6/merged3.csv/"
dft = spark.read.option("header", "true").csv(s3_result_path)
pandas_df = dft.limit(10).toPandas()
print(pandas_df)

Calculation started (calculation_id=c2c77667-efc4-2f26-8227-50e6e7da0158) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
  Severity_t Start_Lat_t  ... StartTime(UTC)_Time_w EndTime(UTC)_Time_w
0          1   40.114704  ...              00:04:00            00:29:00
1          1   40.114704  ...              00:29:00            01:02:00
2          1   40.114704  ...              01:02:00            01:19:00
3          1   40.114704  ...              02:07:00            13:42:00
4          1   40.114704  ...              13:51:00            13:57:00
5          1   40.114704  ...              14:44:00            15:20:00
6          1   37.633984  ...              01:54:00            02:54:00
7          1   37.633984  ...              03:54:00            04:16:00
8          1   37.633984  ...              05:33:00            05:41:00
9          1   37.633984  ...              07:37:00            07:54:00

[10 rows x 35 columns]



In [23]:
dft.printSchema()

Calculation started (calculation_id=b6c77668-0532-2aed-7248-b2fe711d8a57) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
root
 |-- Severity_t: string (nullable = true)
 |-- Start_Lat_t: string (nullable = true)
 |-- Start_Lng_t: string (nullable = true)
 |-- Distance(mi)_t: string (nullable = true)
 |-- DelayFromTypicalTraffic(mins)_t: string (nullable = true)
 |-- DelayFromFreeFlowSpeed(mins)_t: string (nullable = true)
 |-- Congestion_Speed_t: string (nullable = true)
 |-- City_t: string (nullable = true)
 |-- County_t: string (nullable = true)
 |-- State_t: string (nullable = true)
 |-- ZipCode_t: string (nullable = true)
 |-- LocalTimeZone_t: string (nullable = true)
 |-- WeatherTimeStamp_t: string (nullable = true)
 |-- Temperature(F)_t: string (nullable = true)
 |-- Humidity(%)_t: string (nullable = true)
 |-- Pressure(in)_t: string (nullable = true)
 |-- Visibility(mi)_t: string (nullable = true)
 |-- WindDir_t: string (nullable = true)
 |-- Weather_Conditions_t: string (nullable = true)
 |-- StartDate_t: string (nullable = true)
 |-- EndDate_t: string (nullable = true)
 |--

In [24]:
dft = dft.withColumn("Start_Lat_t", col("Start_Lat_t").cast(FloatType())) \
         .withColumn("Start_Lng_t", col("Start_Lng_t").cast(FloatType())) \
         .withColumn("Distance(mi)_t", col("Distance(mi)_t").cast(FloatType())) \
         .withColumn("DelayFromTypicalTraffic(mins)_t", col("DelayFromTypicalTraffic(mins)_t").cast(IntegerType())) \
         .withColumn("DelayFromFreeFlowSpeed(mins)_t", col("DelayFromFreeFlowSpeed(mins)_t").cast(IntegerType())) \
         .withColumn("Temperature(F)_t", col("Temperature(F)_t").cast(FloatType())) \
         .withColumn("Humidity(%)_t", col("Humidity(%)_t").cast(IntegerType())) \
         .withColumn("Pressure(in)_t", col("Pressure(in)_t").cast(FloatType())) \
         .withColumn("Visibility(mi)_t", col("Visibility(mi)_t").cast(FloatType())) \
         .withColumn("Precipitation(in)_w", col("Precipitation(in)_w").cast(FloatType()))

# Casting date fields to DateType
dft = dft.withColumn("StartDate_t", col("StartDate_t").cast(DateType())) \
         .withColumn("EndDate_t", col("EndDate_t").cast(DateType())) \
         .withColumn("StartDate(UTC)_w", col("StartDate(UTC)_w").cast(DateType())) \
         .withColumn("EndDate(UTC)_w", col("EndDate(UTC)_w").cast(DateType()))
 

# Check the schema to confirm the changes
dft.printSchema()

Calculation started (calculation_id=c8c77668-0a40-2250-caef-3611459858b6) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation c8c77668-0a40-2250-caef-3611459858b6 failed


  File "<stdin>", line 13, in <module>
NameError: name 'DateType' is not defined



In [25]:
missing_data_dft = get_missing(dft)
print(missing_data_dft)

Calculation started (calculation_id=72c77668-0f6c-5bf3-3749-14679ede6d41) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
                                 Total  Percent
Severity_t                           0      0.0
TimeZone_w                           0      0.0
EndDate_t                            0      0.0
StartTime_Time_t                     0      0.0
EndTime_Time_t                       0      0.0
Type_w                               0      0.0
Severity_w                           0      0.0
Precipitation(in)_w                  0      0.0
City_w                               0      0.0
Weather_Conditions_t                 0      0.0
County_w                             0      0.0
State_w                              0      0.0
ZipCode_w                            0      0.0
StartDate(UTC)_w                     0      0.0
EndDate(UTC)_w                       0      0.0
StartTime(UTC)_Time_w                0      0.0
StartDate_t                          0      0.0
WindDir_t                            0      0.0
Start_Lat_t                          0      0.0
County_t         

## Format multipe time columns to aid with visulizations

In [26]:
from pyspark.sql.functions import concat, col, lit, to_timestamp

# Combining date and time for start and end, and naming them as 'FormattedStartTime' and 'FormattedEndTime'
# For the traffic data (t)
dft = dft.withColumn("FormattedStartTime_t",
                     to_timestamp(concat(col("StartDate_t"), lit(" "), col("StartTime_Time_t")), "yyyy-MM-dd HH:mm:ss"))

dft = dft.withColumn("FormattedEndTime_t",
                     to_timestamp(concat(col("EndDate_t"), lit(" "), col("EndTime_Time_t")), "yyyy-MM-dd HH:mm:ss"))

# For the weather data (w), appending '_w' to distinguish them
dft = dft.withColumn("FormattedStartTime_w",
                     to_timestamp(concat(col("StartDate(UTC)_w"), lit(" "), col("StartTime(UTC)_Time_w")), "yyyy-MM-dd HH:mm:ss"))

dft = dft.withColumn("FormattedEndTime_w",
                     to_timestamp(concat(col("EndDate(UTC)_w"), lit(" "), col("EndTime(UTC)_Time_w")), "yyyy-MM-dd HH:mm:ss"))


Calculation started (calculation_id=90c77668-5153-21bb-8fb1-f071d667f42f) in (session=14c7765f-544d-5d3e-4598-67c80445728a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


## Save Merged DataFrame to S3 (Commented Out)

In [2]:
# Define the S3 path
#s3_path = "s3://plingareddy6/merged4.csv"

# Save the joined DataFrame to CSV in S3
# Coalesce the DataFrame into 1 partition (use cautiously)
#dft.coalesce(1).write.mode("overwrite").option("header", "true").csv(s3_path)

