In [22]:
from pyspark.sql import SparkSession

# Create the Spark session
spark = SparkSession.builder \
    .appName("CSV Reader") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.1026") \
    .getOrCreate()

# Define the path to the CSV file in S3
csv_path = "s3a://final-project-group-14/US_Accidents_March23.csv"

# Read the CSV file using the DataFrame API
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(csv_path)

# Show the DataFrame content
df.show()



+----+-------+--------+-------------------+-------------------+------------------+------------------+-------+-------+------------+--------------------+--------------------+------------+----------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|  ID| Source|Severity|         Start_Time|           End_Time|         Start_Lat|         Start_Lng|End_Lat|End_Lng|Distance(mi)|         Description|              Street|        City|    County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|A

                                                                                

In [7]:
# List of columns to drop (all previously mentioned columns)
columns_to_drop = [
    "Astronomical_Twilight",
    "Nautical_Twilight",
    "Civil_Twilight",
    "Wind_Direction",
    "Pressure(in)",
    "Humidity(%)",
    "Wind_Chill(F)",
    "Temperature(F)",
    "Airport_Code",
     "Railway",
    "End_Lng",
    "End_Lat",
    "Source",
    "Description",
    "Timezone",
    "Weather_Timestamp",
    "Amenity",
    "Bump",
    "Give_Way",
    "Roundabout",
    "Station",
    "Traffic_Calming",
    "Turning_Loop",
    "No_Exit"  # Added this column to the list
]

# Drop the specified columns
df_cleaned = df.drop(*columns_to_drop)

# Show the cleaned DataFrame content
df_cleaned.show()

+----+--------+-------------------+-------------------+------------------+------------------+------------+--------------------+------------+----------+-----+----------+-------+--------------+---------------+-----------------+-----------------+--------+--------+-----+--------------+--------------+
|  ID|Severity|         Start_Time|           End_Time|         Start_Lat|         Start_Lng|Distance(mi)|              Street|        City|    County|State|   Zipcode|Country|Visibility(mi)|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Crossing|Junction| Stop|Traffic_Signal|Sunrise_Sunset|
+----+--------+-------------------+-------------------+------------------+------------------+------------+--------------------+------------+----------+-----+----------+-------+--------------+---------------+-----------------+-----------------+--------+--------+-----+--------------+--------------+
| A-1|       3|2016-02-08 05:46:00|2016-02-08 11:00:00|         39.865147|        -84.058723|        0.01|

In [8]:
# Count the number of rows in the DataFrame
row_count = df_cleaned.count()

# Print the number of rows
print(f"The number of rows in the cleaned DataFrame is: {row_count}")



The number of rows in the cleaned DataFrame is: 7728394


                                                                                

In [9]:
# Drop rows with any null values
df_cleaned = df.dropna()

# Show the cleaned DataFrame content
df_cleaned.show()



+---------+-------+--------+-------------------+-------------------+---------+------------------+-----------------+------------------+-------------------+--------------------+---------------+-----------+----------+-----+-------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|       ID| Source|Severity|         Start_Time|           End_Time|Start_Lat|         Start_Lng|          End_Lat|           End_Lng|       Distance(mi)|         Description|         Street|       City|    County|State|Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipi

                                                                                

In [10]:
# Count the number of rows in the DataFrame
row_count = df_cleaned.count()

# Print the number of rows
print(f"The number of rows in the cleaned DataFrame is: {row_count}")



The number of rows in the cleaned DataFrame is: 3554549


                                                                                

In [11]:
# Filter the DataFrame to keep only rows where Country is 'US' and State is 'FL'
df_cleaned = df.filter((df.Country == 'US') & (df.State == 'FL'))

# Show the cleaned DataFrame content
df_cleaned.show()

[Stage 18:>                                                         (0 + 1) / 1]

+--------+-------+--------+-------------------+-------------------+------------------+------------------+-------+-------+------------+--------------------+--------------------+---------------+------------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+--------------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|      ID| Source|Severity|         Start_Time|           End_Time|         Start_Lat|         Start_Lng|End_Lat|End_Lng|Distance(mi)|         Description|              Street|           City|      County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in

                                                                                

In [12]:
# Count the number of rows in the DataFrame
row_count = df_cleaned.count()

# Print the number of rows
print(f"The number of rows in the cleaned DataFrame is: {row_count}")



The number of rows in the cleaned DataFrame is: 880192


                                                                                

In [13]:
from pyspark.sql import functions as F

# Filter the DataFrame for rows where State is 'FL'
df_fl = df.filter(df.State == 'FL')

# Group by 'County', count occurrences, and sort by count in descending order
top_counties = df_fl.groupBy("County").count() \
    .orderBy(F.desc("count")) \
    .limit(3)

# Show the top 3 most mentioned counties
top_counties.show()



+----------+------+
|    County| count|
+----------+------+
|Miami-Dade|251601|
|    Orange|124321|
|   Broward| 53865|
+----------+------+



                                                                                

In [14]:
# Define the counties to keep
counties_to_keep = ['Miami-Dade', 'Orange', 'Broward']

# Filter the DataFrame to keep only the rows with specified counties
df_cleaned = df.filter(df.County.isin(counties_to_keep))

# Show the cleaned DataFrame content
df_cleaned.show()

+-------+-------+--------+-------------------+-------------------+------------------+-------------------+-------+-------+------------+--------------------+---------------+-------------+------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|     ID| Source|Severity|         Start_Time|           End_Time|         Start_Lat|          Start_Lng|End_Lat|End_Lng|Distance(mi)|         Description|         Street|         City|County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity| 

In [15]:
# Count the number of rows in the DataFrame
row_count = df_cleaned.count()

# Print the number of rows
print(f"The number of rows in the cleaned DataFrame is: {row_count}")



The number of rows in the cleaned DataFrame is: 546741


                                                                                

In [16]:
# List of columns to drop (all previously mentioned columns)
columns_to_drop = [
    "Astronomical_Twilight",
    "Nautical_Twilight",
    "Civil_Twilight",
    "Wind_Direction",
    "Pressure(in)",
    "Humidity(%)",
    "Wind_Chill(F)",
    "Temperature(F)",
    "Airport_Code",
     "Railway",
    "End_Lng",
    "End_Lat",
    "Source",
    "Description",
    "Timezone",
    "Weather_Timestamp",
    "Amenity",
    "Bump",
    "Give_Way",
    "Roundabout",
    "Station",
    "Traffic_Calming",
    "Turning_Loop",
    "No_Exit"  
]


In [17]:
# Show all the column names in the DataFrame
column_names = df_cleaned.columns
print("The columns in the DataFrame are:")
print(column_names)

The columns in the DataFrame are:
['ID', 'Source', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Street', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone', 'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction', 'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight']


In [18]:
# Ensure you are checking the right DataFrame
print("Columns in df (original DataFrame):")
print(df.columns)

print("Columns in df_cleaned (filtered DataFrame):")
print(df_cleaned.columns)

Columns in df (original DataFrame):
['ID', 'Source', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Street', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone', 'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction', 'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight']
Columns in df_cleaned (filtered DataFrame):
['ID', 'Source', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Street', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone', 'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chil

In [19]:
# List of columns to drop
columns_to_drop = [
    "Astronomical_Twilight",
    "Nautical_Twilight",
    "Civil_Twilight",
    "Wind_Direction",
    "Pressure(in)",
    "Humidity(%)",
    "Wind_Chill(F)",
    "Temperature(F)",
    "Airport_Code",
    "Railway",
    "End_Lng",
    "End_Lat",
    "Source",
    "Description",
    "Timezone",
    "Weather_Timestamp",
    "Amenity",
    "Bump",
    "Give_Way",
    "Roundabout",
    "Station",
    "Traffic_Calming",
    "Turning_Loop",
    "No_Exit"
]

# Drop the specified columns
df_cleaned = df_cleaned.drop(*columns_to_drop)

# Show the cleaned DataFrame content
df_cleaned.show()

# Optionally, you can verify the remaining columns
print("Columns remaining in the cleaned DataFrame:")
print(df_cleaned.columns)

+-------+--------+-------------------+-------------------+------------------+-------------------+------------+---------------+-------------+------+-----+----------+-------+--------------+---------------+-----------------+-----------------+--------+--------+-----+--------------+--------------+
|     ID|Severity|         Start_Time|           End_Time|         Start_Lat|          Start_Lng|Distance(mi)|         Street|         City|County|State|   Zipcode|Country|Visibility(mi)|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Crossing|Junction| Stop|Traffic_Signal|Sunrise_Sunset|
+-------+--------+-------------------+-------------------+------------------+-------------------+------------+---------------+-------------+------+-----+----------+-------+--------------+---------------+-----------------+-----------------+--------+--------+-----+--------------+--------------+
|A-42872|       2|2016-06-21 09:31:44|2016-06-21 11:00:00|         33.797401|-117.87716699999999|         0.0|        

In [20]:
# Define the S3 path to save the cleaned DataFrame
s3_path = "s3a://your-bucket-name/cleaned_data.csv"  # Replace with your actual S3 bucket name and desired file name

# Write the DataFrame to S3 as a CSV file
df_cleaned.write \
    .mode("overwrite") \  # Use "overwrite" to replace any existing file with the same name
    .option("header", "true") \  # Include the header row
    .csv(s3_path)

print("Cleaned DataFrame has been saved to S3 as a CSV file.")

SyntaxError: unexpected character after line continuation character (4127394012.py, line 6)

In [21]:
# Define the S3 path to save the cleaned DataFrame
s3_path = "s3a://final-project-group-14/cleaned_data.csv"  # Replace with your actual S3 bucket name and desired file name

# Write the DataFrame to S3 as a CSV file
df_cleaned.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(s3_path)

print("Cleaned DataFrame has been saved to S3 as a CSV file.")


24/08/05 01:38:58 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/08/05 01:38:59 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/08/05 01:38:59 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/08/05 01:39:04 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/08/05 01:39:04 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/08/05 01:39:08 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/08/05 01:39:08 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/08/05 01:39:11 WA

Cleaned DataFrame has been saved to S3 as a CSV file.


In [23]:
# Define the S3 path to save the cleaned DataFrame
s3_path = "s3a://final-project-group-14/cleaned_data.csv"  # Replace with your actual S3 bucket name and desired file name

# Coalesce the DataFrame to a single partition
df_single_partition = df_cleaned.coalesce(1)

# Write the DataFrame to S3 as a CSV file
df_single_partition.write \
    .mode("overwrite") \  # Use "overwrite" to replace any existing file with the same name
    .option("header", "true") \  # Include the header row
    .csv(s3_path)

print("Cleaned DataFrame has been saved to S3 as a single CSV file.")

SyntaxError: unexpected character after line continuation character (4211049412.py, line 9)

In [25]:
# Define the S3 path to save the cleaned DataFrame
s3_path = "s3a://final-project-group-14/cleaned_data.csv"  # Replace with your actual S3 bucket name and desired file name

# Coalesce the DataFrame to a single partition
df_single_partition = df_cleaned.coalesce(1)

# Write the DataFrame to S3 as a CSV file
df_single_partition.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(s3_path)

print("Cleaned DataFrame has been saved to S3 as a single CSV file.")

24/08/05 02:01:44 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/08/05 02:01:44 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

Cleaned DataFrame has been saved to S3 as a single CSV file.


In [26]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Traffic Data Wrangling") \
    .getOrCreate()

# Load the cleaned data
df = spark.read.csv("s3a://final-project-group-14/cleaned_traffic_data.csv", header=True, inferSchema=True)

24/08/05 02:14:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                