# Read Climate Data

In [None]:
climate_data_url = "<raw_climate_data_url>"

countries_climate_df = spark\
.read\
.format("csv")\
.option("header", "true")\
.option("treatEmptyValuesAsNulls", "true")\
.option("inferSchema", "true")\
.load(climate_data_url)

In [None]:
countries_climate_df.show(10)

# Clean Data

In [None]:
import pyspark.sql.functions as F

cleaned_climate_data = countries_climate_df\
.withColumnRenamed("AverageTemperature", "avg_temp")\
.withColumnRenamed("AverageTemperatureUncertainty", "avg_temp_uncertainty")\
.withColumnRenamed("Country", "country")\
.filter(F.col("AverageTemperature").isNotNull() & F.col("AverageTemperatureUncertainty").isNotNull())\
.withColumn("year", F.year("dt"))\
.withColumn("month", F.month("dt"))\
.drop(F.col("dt"))

cleaned_climate_data.show(10)

# Write to Transformed Layer

In [None]:
cleaned_climate_data\
.coalesce(2)\
.write\
.format("parquet")\
.partitionBy("month")\
.mode("overwrite")\
.save("<transformed_climate_data_url>")



# Read From Transformed Layer

In [None]:
transformed_climate_data = spark\
.read\
.format("parquet")\
.load("<transformed_climate_data_url>")

transformed_climate_data.show(20)

# Select the hottest historical months per country

In [None]:
hottest_months_per_countries_all_time = transformed_climate_data\
.groupBy(F.col("month"), F.col("country"))\
.agg(F.round(F.max("avg_temp"), 2).alias("max_avg_temp"))\
.orderBy(F.desc("max_avg_temp"))

hottest_months_per_countries_all_time.show(10)

# Save Hottest Historical Months per Country to Trusted Layer

In [None]:
hottest_months_per_countries_all_time\
.write\
.format("parquet")\
.mode("overwrite")\
.save("<trusted_climate_data_url>")