In [0]:
df_raw = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv("s3://demobucketdatabricks/raw_data/Indian_Climate_Dataset_2024_2025.csv") # Reading the data from raw s3 folder


In [0]:
from pyspark.sql.functions import year, month, to_date

df_raw_chunks = df_raw \
    .withColumn("Date", to_date("Date")) \
    .withColumn("year", year("Date")) \
    .withColumn("month", month("Date"))  # Adding year and month columns


In [0]:
df_raw_chunks.write \
    .mode("overwrite") \
    .option("header", True) \
    .partitionBy("year", "month") \
    .csv("s3://demobucketdatabricks/raw_data/raw_partition_data/") # Writing the partitioned raw data to s3 with folder structure 

    # This is are bronze layer data


In [0]:
df_bronze = spark.read \
    .option("header", True) \
    .csv("s3://demobucketdatabricks/raw_data/raw_partition_data/*/*")  # Reading the partitioned raw data from s3


In [0]:
df_bronze.show(10)

In [0]:
from pyspark.sql.functions import col

df_silver = df_bronze \
    .withColumn("Temperature_Max_C", col("Temperature_Max (°C)").cast("double")) \
    .withColumn("Temperature_Min_C", col("Temperature_Min (°C)").cast("double")) \
    .withColumn("Temperature_Avg_C", col("Temperature_Avg (°C)").cast("double")) \
    .withColumn("Humidity_Percent", col("Humidity (%)").cast("double")) \
    .withColumn("Rainfall_mm", col("Rainfall (mm)").cast("double")) \
    .withColumn("Wind_Speed_kmh", col("Wind_Speed (km/h)").cast("double")) \
    .withColumn("Pressure_hPa", col("Pressure (hPa)").cast("double")) \
    .withColumn("Cloud_Cover_Percent", col("Cloud_Cover (%)").cast("double")) \
    .withColumn("AQI", col("AQI").cast("int"))

    # Here we are standardizing column names and converting/casting the data types to appropriate types and this will be our silver layer data


In [0]:
df_silver.show(10) # After casting and standardizing our data we have duplicate columns in our data, which we will drop from our dataframe 

In [0]:
df_silver = df_silver.drop(
    "Temperature_Max (°C)",
    "Temperature_Min (°C)",
    "Temperature_Avg (°C)",
    "Humidity (%)",
    "Rainfall (mm)",
    "Wind_Speed (km/h)",
    "Pressure (hPa)",
    "Cloud_Cover (%)"
) # Dropping the duplicate columns from our dataframe


In [0]:
df_silver.show(10)

In [0]:
from pyspark.sql.functions import col

# Remove invalid temperatures
df_silver_clean = df_silver.filter(
    (col("Temperature_Max_C").isNotNull()) &
    (col("Temperature_Min_C").isNotNull()) &
    (col("Temperature_Max_C") >= col("Temperature_Min_C"))
)

# Remove invalid AQI
df_silver_clean = df_silver_clean.filter(col("AQI") >= 0)


In [0]:
df_silver_clean.show(10)

In [0]:
from pyspark.sql.functions import year, month, to_date
df_silver_clean = df_silver_clean.withColumn("Date", to_date("Date"))
df_silver_clean = df_silver_clean \
    .withColumn("year", year("Date")) \
    .withColumn("month", month("Date"))
df_silver_clean.printSchema() # Again same process as above, adding year and month columns to save the data in silver layer in a structured format


In [0]:
df_silver_clean.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("s3://demobucketdatabricks/curated_data/curated_partition_data/") # Writing the silver layer data to s3 with folder structure in parquet formatb


In [0]:
df_silver_clean = spark.read.parquet("s3://demobucketdatabricks/curated_data/curated_partition_data/")


In [0]:
df_silver_clean.show(10)


In [0]:
# Reconcilation proecss between bronze and silver layer to check if any data is missing while data transformation

bronze_count = df_bronze.count()
silver_count = df_silver_clean.count()

print(f"Bronze Count: {bronze_count}")
print(f"Silver Count: {silver_count}")
print(f"Difference: {bronze_count - silver_count}") 


In [0]:
# Gold layer data preparation for final data reporting

from pyspark.sql.functions import avg, sum, max, min

df_gold_city_month = df_silver_clean.groupBy("City", "State", "year", "month") \
    .agg(
        avg("Temperature_Avg_C").alias("avg_temp_c"),
        max("Temperature_Max_C").alias("max_temp_c"),
        min("Temperature_Min_C").alias("min_temp_c"),
        sum("Rainfall_mm").alias("total_rainfall_mm"),
        avg("AQI").alias("avg_aqi")
    )


In [0]:
df_gold_city_month.count()

df_gold_city_month.show(10)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

window = Window.partitionBy("year", "month").orderBy(col("avg_aqi").desc())

df_gold_final = df_gold_city_month \
    .withColumn("pollution_rank", rank().over(window))


In [0]:
df_gold_final.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("s3://demobucketdatabricks/final_data/")


In [0]:
df_gold_final.show(5)
df_gold_final.printSchema()
df_gold_final.count()


In [0]:
df_gold_final.write \
    .mode("overwrite") \
    .option("header", "true")\
    .csv("s3://demobucketdatabricks/final_data/CSV/")


In [0]:
df_gold_final.count()
df_gold_final.select("year", "month").distinct().show()
df_gold_final.filter(col("avg_aqi").isNull()).count()
