In [1]:

!apt-get install openjdk-17-jdk-headless -qq > /dev/null
!pip install pyspark



In [57]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETL Test").getOrCreate()

print(" Spark started successfully!")
print("Spark version:", spark.version)

 Spark started successfully!
Spark version: 3.5.1


In [58]:
from google.colab import files
uploaded = files.upload()

Saving urban_noise_levels.csv to urban_noise_levels (1).csv


In [59]:
df = spark.read.csv("urban_noise_levels.csv", header=True, inferSchema=True)
df.show()

+---+------------------+------------------+-------------------+------------------+----+-----------+----------+------------------+------------------+------------------+-------------------+---------------+------------+------------+-----------------+------------------+--------------+---------------+-------------+--------------+------------+-------+-----------+----------------+---------+
| id|          latitude|         longitude|           datetime|     decibel_level|hour|day_of_week|is_weekend|     temperature_c|        humidity_%|    wind_speed_kmh|   precipitation_mm|traffic_density|near_airport|near_highway|near_construction|population_density|park_proximity|industrial_zone|vehicle_count|honking_events|public_event|holiday|school_zone|noise_complaints|sensor_id|
+---+------------------+------------------+-------------------+------------------+----+-----------+----------+------------------+------------------+------------------+-------------------+---------------+------------+----------

In [60]:
df.count()

2000

In [61]:
df.columns

['id',
 'latitude',
 'longitude',
 'datetime',
 'decibel_level',
 'hour',
 'day_of_week',
 'is_weekend',
 'temperature_c',
 'humidity_%',
 'wind_speed_kmh',
 'precipitation_mm',
 'traffic_density',
 'near_airport',
 'near_highway',
 'near_construction',
 'population_density',
 'park_proximity',
 'industrial_zone',
 'vehicle_count',
 'honking_events',
 'public_event',
 'holiday',
 'school_zone',
 'noise_complaints',
 'sensor_id']

In [None]:
df.dtypes

In [63]:
from pyspark.sql.functions import col, sum

df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

+---+--------+---------+--------+-------------+----+-----------+----------+-------------+----------+--------------+----------------+---------------+------------+------------+-----------------+------------------+--------------+---------------+-------------+--------------+------------+-------+-----------+----------------+---------+
| id|latitude|longitude|datetime|decibel_level|hour|day_of_week|is_weekend|temperature_c|humidity_%|wind_speed_kmh|precipitation_mm|traffic_density|near_airport|near_highway|near_construction|population_density|park_proximity|industrial_zone|vehicle_count|honking_events|public_event|holiday|school_zone|noise_complaints|sensor_id|
+---+--------+---------+--------+-------------+----+-----------+----------+-------------+----------+--------------+----------------+---------------+------------+------------+-----------------+------------------+--------------+---------------+-------------+--------------+------------+-------+-----------+----------------+---------+
|  0

In [64]:
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window

# Check total rows before and after removing duplicates
print("Before removing duplicates:", df.count())
df = df.dropDuplicates()
print("After removing duplicates:", df.count())

# Reassign IDs (starting from 1)
window = Window.orderBy(monotonically_increasing_id())
df = df.withColumn("new_id", row_number().over(window))

# Drop old id column and rename new_id to id
df = df.drop("id").withColumnRenamed("new_id", "id")

#  Verify
df.select("id").show(10)

Before removing duplicates: 2000
After removing duplicates: 2000
+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
+---+
only showing top 10 rows



In [65]:
unique_rows = df.dropDuplicates().count()
print("Unique rows:", unique_rows)

Unique rows: 2000


In [66]:
duplicates = df.count() - unique_rows
print("Number of duplicate rows:", duplicates)

Number of duplicate rows: 0


In [67]:
from pyspark.sql.functions import regexp_replace, col

df = df.withColumn("humidity_%", regexp_replace(col("humidity_%"), "%", "").cast("double"))

In [68]:
df = df.withColumn("temperature_c", regexp_replace(col("temperature_c"), "[^0-9.-]", "").cast("double"))

In [69]:
from pyspark.sql.functions import regexp_extract

df = df.withColumn("date_only", regexp_extract(col("datetime"), r"^\d{4}-\d{2}-\d{2}", 0))

In [70]:
from pyspark.sql.functions import to_timestamp, date_format, hour, dayofweek

df = df.withColumn("datetime", to_timestamp("datetime"))
df = df.withColumn("date", date_format("datetime", "yyyy-MM-dd"))
df = df.withColumn("hour_of_day", hour("datetime"))
df = df.withColumn("day_of_week", dayofweek("datetime"))

In [71]:
df = df.filter((df.decibel_level >= 30) & (df.decibel_level <= 120))

In [72]:
from pyspark.sql.functions import when

df = df.withColumn(
    "noise_category",
    when(df.decibel_level < 55, "Low")
    .when(df.decibel_level < 85, "Moderate")
    .otherwise("High")
)

In [73]:
for c in df.columns:
    df = df.withColumnRenamed(c, c.lower())

In [74]:

df.coalesce(1).write.csv("cleaned_noise_data", header=True)

# Zip folder for easy download
!zip -r cleaned_noise_data.zip cleaned_noise_data

# Download the file
files.download("cleaned_noise_data.zip")

  adding: cleaned_noise_data/ (stored 0%)
  adding: cleaned_noise_data/part-00000-c84cc068-d215-431e-9600-79842f6a22fd-c000.csv (deflated 60%)
  adding: cleaned_noise_data/.part-00000-c84cc068-d215-431e-9600-79842f6a22fd-c000.csv.crc (stored 0%)
  adding: cleaned_noise_data/_SUCCESS (stored 0%)
  adding: cleaned_noise_data/._SUCCESS.crc (stored 0%)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>