Install pyspark

In [None]:
#!pip install pyspark




Importing necessary **libraries**

In [1]:
from pyspark.sql import SparkSession;
from pyspark.sql.functions import col, trim, size, split,sum, when, sum as spark_sum, count, avg, max as spark_max, month, dayofmonth, year, weekofyear, to_date


 Create Spark session

In [2]:

spark = SparkSession.builder \
    .appName("WeatherAnalysis") \
    .getOrCreate()


Mount the drive

In [3]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


Load the CSV Files

In [4]:
weather_path = "/content/drive/MyDrive/weatherData.csv"
location_path = "/content/drive/MyDrive/locationData.csv"

In [5]:
weather_df = spark.read.csv(weather_path, header=True, inferSchema=True)
location_df = spark.read.csv(location_path, header=True, inferSchema=True)

In [6]:
weather_df.show(5)
location_df.show(5)

+-----------+--------+-----------------------+-----------------------+-----------------------+------------------------+-----------------------------+-----------------------------+------------------------------+---------------------+---------------------+----------------------+-------------+-----------------------+-------------------------+-------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------+-------------------+
|location_id|    date|weather_code (wmo code)|temperature_2m_max (°C)|temperature_2m_min (°C)|temperature_2m_mean (°C)|apparent_temperature_max (°C)|apparent_temperature_min (°C)|apparent_temperature_mean (°C)|daylight_duration (s)|sunshine_duration (s)|precipitation_sum (mm)|rain_sum (mm)|precipitation_hours (h)|wind_speed_10m_max (km/h)|wind_gusts_10m_max (km/h)|wind_direction_10m_dominant (°)|shortwave_radiation_sum (MJ/m²)|et0_fao_evapotranspiration (mm)|            sunrise|             

Data Preprocessing

In [7]:
# Rename only the columns we need for Spark analysis
weather_df = weather_df.withColumnRenamed("shortwave_radiation_sum (MJ/m²)", "shortwave_radiation") \
                       .withColumnRenamed("temperature_2m_max (°C)", "temperature_max") \
                       .withColumnRenamed("temperature_2m_mean (°C)", "temperature_mean") \
                       .withColumnRenamed("temperature_2m_min (°C)", "temperature_min") \
                       .withColumnRenamed("city_name", "district")


# Join with location data (replace 'location_id' if different in location_df)
df = weather_df.join(location_df, "location_id", "inner")
df.show(5)

+-----------+--------+-----------------------+---------------+---------------+----------------+-----------------------------+-----------------------------+------------------------------+---------------------+---------------------+----------------------+-------------+-----------------------+-------------------------+-------------------------+-------------------------------+-------------------+-------------------------------+-------------------+-------------------+--------+---------+---------+------------------+------------+---------------------+---------+
|location_id|    date|weather_code (wmo code)|temperature_max|temperature_min|temperature_mean|apparent_temperature_max (°C)|apparent_temperature_min (°C)|apparent_temperature_mean (°C)|daylight_duration (s)|sunshine_duration (s)|precipitation_sum (mm)|rain_sum (mm)|precipitation_hours (h)|wind_speed_10m_max (km/h)|wind_gusts_10m_max (km/h)|wind_direction_10m_dominant (°)|shortwave_radiation|et0_fao_evapotranspiration (mm)|            

In [21]:
null_counts = df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show(truncate=False)

+-----------+----+-----------------------+---------------+---------------+----------------+-----------------------------+-----------------------------+------------------------------+---------------------+---------------------+----------------------+-------------+-----------------------+-------------------------+-------------------------+-------------------------------+-------------------+-------------------------------+-------+------+--------+---------+---------+------------------+--------+---------------------+---------+---+-----+----+----+
|location_id|date|weather_code (wmo code)|temperature_max|temperature_min|temperature_mean|apparent_temperature_max (°C)|apparent_temperature_min (°C)|apparent_temperature_mean (°C)|daylight_duration (s)|sunshine_duration (s)|precipitation_sum (mm)|rain_sum (mm)|precipitation_hours (h)|wind_speed_10m_max (km/h)|wind_gusts_10m_max (km/h)|wind_direction_10m_dominant (°)|shortwave_radiation|et0_fao_evapotranspiration (mm)|sunrise|sunset|latitude|longi

In [23]:
df = df.dropDuplicates()


Task 1 – % of Shortwave Radiation > 15 MJ/m²

In [9]:


spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

df = df.withColumn("date", to_date(col("date"), "M/d/yyyy")) \
       .withColumn("day", dayofmonth("date")) \
       .withColumn("month", month("date")) \
       .withColumn("year", year("date"))

df.show(5)


+-----------+----------+-----------------------+---------------+---------------+----------------+-----------------------------+-----------------------------+------------------------------+---------------------+---------------------+----------------------+-------------+-----------------------+-------------------------+-------------------------+-------------------------------+-------------------+-------------------------------+-------------------+-------------------+--------+---------+---------+------------------+------------+---------------------+---------+---+-----+----+
|location_id|      date|weather_code (wmo code)|temperature_max|temperature_min|temperature_mean|apparent_temperature_max (°C)|apparent_temperature_min (°C)|apparent_temperature_mean (°C)|daylight_duration (s)|sunshine_duration (s)|precipitation_sum (mm)|rain_sum (mm)|precipitation_hours (h)|wind_speed_10m_max (km/h)|wind_gusts_10m_max (km/h)|wind_direction_10m_dominant (°)|shortwave_radiation|et0_fao_evapotranspiratio

In [10]:
# Radiation > 15
rad_gt_15 = (df.filter(col("shortwave_radiation") > 15)
               .groupBy("month")
               .agg(spark_sum("shortwave_radiation").alias("radiation_gt_15")))

# Total radiation
rad_total = (df.groupBy("month")
               .agg(spark_sum("shortwave_radiation").alias("total_radiation")))

# Join + calculate percentage
percentage_df = (rad_gt_15.join(rad_total, "month")
                 .withColumn("percentage",
                            (col("radiation_gt_15") / col("total_radiation")) * 100))

percentage_df.orderBy("month").show()


+-----+------------------+------------------+-----------------+
|month|   radiation_gt_15|   total_radiation|       percentage|
+-----+------------------+------------------+-----------------+
|    1|195123.55999999976|224034.82000000018|87.09519350608072|
|    2| 218474.7199999993| 231686.1899999992|94.29768774737937|
|    3| 273972.8599999999| 279413.4399999998|98.05285672729275|
|    4|254211.86999999898|260929.18999999895| 97.4256157388907|
|    5|218443.91999999998|240430.56999999972|90.85530180292807|
|    6|         205310.72|222496.83999999982|  92.275791422476|
|    7|210292.58999999973|226119.78999999966|93.00052419118205|
|    8|212817.15000000002|229471.57000000027|92.74227304062101|
|    9| 199174.5300000001| 218665.0600000003|91.08658237397407|
|   10|174549.07999999993|206502.56999999983|84.52634754134056|
|   11|128007.03000000038| 177111.5500000005|72.27480647083718|
|   12|128307.64000000047|178735.05999999994| 71.7864978477085|
+-----+------------------+--------------

Task 2 - Weekly Max Temperatures for Hottest Month

In [11]:
df = df.withColumn("week", weekofyear("date"))


month with the highest average temperature_max in each year.

In [12]:
monthly_avg = (df.groupBy("year", "month")
                 .agg(avg("temperature_max").alias("avg_monthly_temp")))


Monthly averages and hottest month per year

In [13]:
monthly_avg = (
    df.groupBy("year", "month")
      .agg(avg("temperature_max").alias("avg_monthly_temp"))
)



In [15]:
max_month_temp = (
    monthly_avg.groupBy("year")
               .agg(spark_max("avg_monthly_temp").alias("max_temp_in_year"))
               .withColumnRenamed("year", "year_max")   # rename for join
)


In [16]:
hottest_months = (
    monthly_avg.join(
        max_month_temp,
        (monthly_avg.year == max_month_temp.year_max) &
        (monthly_avg.avg_monthly_temp == max_month_temp.max_temp_in_year),
        "inner"
    )
    .select(monthly_avg.year, monthly_avg.month)  # keep clean columns
)


In [18]:
hot_month_data = df.join(hottest_months, ["year", "month"], "inner")


In [20]:
weekly_max_temp = (
    hot_month_data.groupBy("year", "month", "week")
                  .agg(spark_max("temperature_max").alias("weekly_max_temperature"))
                  .orderBy("year", "month", "week")
)

weekly_max_temp.show()


+----+-----+----+----------------------+
|year|month|week|weekly_max_temperature|
+----+-----+----+----------------------+
|2010|    3|   9|                  36.1|
|2010|    3|  10|                  36.7|
|2010|    3|  11|                  37.2|
|2010|    3|  12|                  36.6|
|2010|    3|  13|                  33.9|
|2011|    6|  22|                  35.1|
|2011|    6|  23|                  35.2|
|2011|    6|  24|                  35.5|
|2011|    6|  25|                  36.4|
|2011|    6|  26|                  37.0|
|2012|    5|  18|                  35.4|
|2012|    5|  19|                  37.1|
|2012|    5|  20|                  37.4|
|2012|    5|  21|                  36.2|
|2012|    5|  22|                  36.4|
|2013|    4|  14|                  36.5|
|2013|    4|  15|                  36.0|
|2013|    4|  16|                  36.2|
|2013|    4|  17|                  34.8|
|2013|    4|  18|                  34.8|
+----+-----+----+----------------------+
only showing top