**Pyspark Installation**

In [7]:
pip install pyspark



**Fetching the csv file from S3 bucket**

In [28]:
import urllib.request

from pathlib import Path

source = "/home/TWD_18_22.csv"
url = "https://pragyanshu-test-s3-bucket.s3.ap-south-1.amazonaws.com/TWD_18_22.csv?response-content-disposition=inline&X-Amz-Security-Token=IQoJb3JpZ2luX2VjED0aCmFwLXNvdXRoLTEiSDBGAiEAgSkiNUAK0jkC3V1fM0YguZgNHuq5rK8AxmvtbszmVN4CIQDVckiWQtKWnl6F6pegQgykkEuDrXNSkRZ%2BqIkduoDDNCr%2FAggWEAEaDDIxNDY1NDM2MTI4NiIMHtxbeBwVB%2BCyXs3pKtwClIB3EVWWbVj6Lq8jKneHZ6I08zbuf4vFQfo%2FHE6pxk0PF2PlW9SB%2B1HjGaE040u%2BJiMkt0MhOqMk%2FFjPsW2VBSzH99GSZAKiPkp789fzQAVoG1IDji4vSWS8Pi%2BbhICwwVqR394ciTn48%2BrphIbs2%2BUL915%2F5Saqhau77KzLoP8vWcmhLsjDfurwQ0REmw8GlGMXUE4d3hql335IzUV00IFutICSvfFBPt56d4Np2Lz0D1Cb%2Fd79pJ2YDElzpILvshDV6P%2FuaXtdffDT3TefFv0KpwbIK4i6Tlhkp8TdqY4fVsyRyaXXEraW0phRitGXhBhqQbUmr%2B%2BMimKZm8uLAaXiVHuJlcZeZ4yks77OGKc46SE9zaIwmD0co45Uq9v8MgacjaZmU6fNzKIUb3uyRLtJH4jSArR0fcRJLPJ3fkOWfGjw1HVb1MPAR2ByE21aE7xYNV7NfhDHEjQkMIaZ5LQGOrICYMLqelITLXCYK9%2Bwvl8%2F3xZEZgTp6U92OayzG4XUBYVSIF5RCPcSeQ82G98koRlizKc3TTOF8fnMWjlCECyKqneWmpxr2oDEwGkGFGPejkX5qNU3EV%2FKSTjR6FWI%2FjennkjIlp68eXdmcuk%2BQX8Liry%2Fodn9y8p9aPiLJHmDEiakMWSeqSJRkGl%2FfjFCnf73xJvgdzMF1BJ7yh7Mf86vo0g%2B%2FwXdjInit3HGZdL2xy5TDCHJtJE5D7IFWjipUqDrQxgvmE5aijr9N%2FbxJ56CrYEpSfEhe43XmI9cKPaBBqHDKNq6Q0y8WVIN3agPT9wlFW7AOwmD%2Fr6%2FF%2Byo9Zl3Tguzl1FuTGn2kuDNbF6Rlnr7ZM7guEAhfGvsr3RiBdj1kLjbtTUk0ROpW0Sz%2BlFKWqAG&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20240718T135516Z&X-Amz-SignedHeaders=host&X-Amz-Expires=60&X-Amz-Credential=ASIATD6TFT3DMENYGHSF%2F20240718%2Fap-south-1%2Fs3%2Faws4_request&X-Amz-Signature=54fbf8a9b5d180f2461a82a97e15cc55fad0582ff52b9650835b11614f54db5e";

sourceFile = Path(source)

if not sourceFile.is_file() :
  urllib.request.urlretrieve(url, source)

**Application Logic**
- Loading the csv data into the dataframe.
- Cleaning and transforming data.
- Analysing and ranking the data according to its:
  - Timeline of the data recorded per district and mandal over the years
  - Rainfall per district and mandal over the years
  - Worst rainfall per district and mandal over the years
  - Average of all the factors for a month over the years
  - Best and worst year for all the factors over the years

In [29]:
"""
Created on Sat Dec 5
@author: jing0703

Data source:
https://www.metoffice.gov.uk/research/climate/maps-and-data/historic-station-data

Data format:

|district          |mandal|date      |rainfall|temp_min|temp_max|humidity_min|humidity_max|wind_speed_min|wind_speed_max|

|Medchal-Malkajgiri| Uppal|01-01-2018|     0.0|    12.1|    32.6|        23.8|       100.0|           0.0|           6.6|
|Medchal-Malkajgiri| Uppal|02-01-2018|     0.0|    11.6|    32.6|        23.2|       100.0|           0.0|           4.7|
|Medchal-Malkajgiri| Uppal|03-01-2018|     0.0|    13.0|    33.0|        31.5|       100.0|           0.0|           6.3|
|Medchal-Malkajgiri| Uppal|04-01-2018|     0.0|     9.7|    31.7|        27.4|       100.0|           0.0|           5.2|

Step:
1. Load and parse Telangana Weather Station History data
2. Clean and transform data before running the analyzer
3. Summary statistics for Districts and Mandals

Package:
Spark: 3.5.1
Python: 3.10.12
"""

month_list = [
    "JAN",
    "FEB",
    "MAR",
    "APR",
    "MAY",
    "JUN",
    "JUL",
    "AUG",
    "SEPT",
    "OCT",
    "NOV",
    "DEC"
]


def main():
    """
    main function to parse, clean up and analyze data
    """


    print("Program start")
    start_time = timeit.default_timer()
    df_all = spark.read.csv(source, header=True);

    print("Dataframe generated using the csv input:")
    df_all.show()

    date_col = split(df_all["date"], '[-/]')
    df_all = df_all.withColumn("year", date_col.getItem(2)).withColumn("month", date_col.getItem(1)).withColumn("day", date_col.getItem(0));
    df_all.drop('date')

    print("After modifying the dataframe:")
    df_all.show()

    df_all.createOrReplaceTempView("weather_table")

    mandal_list = get_by_mandal_or_district(True)
    district_list = get_by_mandal_or_district()

    print("Districts with data count:")
    district_list.show()
    print("Mandals with data count:")
    mandal_list.show()

    history_rank(df_all)
    history_rank(df_all, "mandal")
    rain_rank(df_all)
    rain_rank(df_all, "mandal")
    worst_rain(df_all)
    worst_rain(df_all, "mandal")
    month_avg(df_all)
    month_avg(df_all, 5)
    best_worst_year_by_month_climate(df_all)
    best_worst_year_by_month_climate(df_all, 9)

    # finish timing program
    end_time = timeit.default_timer()
    print(f"Program duration: {(end_time - start_time)}s")

def get_by_mandal_or_district(get_mandal = False):

    placeHolder = "mandal" if get_mandal else "district"

    result_df = spark.sql(f"""
    SELECT {placeHolder}, COUNT(*) AS row_count
    FROM weather_table
    GROUP BY {placeHolder}
    ORDER BY row_count desc, {placeHolder}
    """)

    return result_df


def round_column(col_names):
    """
    function to round float from multiple columns
    """
    def inner(df):
        for col_name in col_names:
            df = df.withColumn(
                col_name,
                round(df[col_name], 2)
            )
        return df
    return inner



def history_rank(df_complete, group="district"):
    """
    a. Rank mandal by how long they have been online
    """
    print(f"Rank {group} by how long they have been online:")

    # groupby station & get earliest year/ month by agg
    df_online = df_complete.groupBy(group) \
        .agg(
             min("year").alias("online_y"), \
             min("month").alias("online_m"), \
             min("day").alias("online_d") \
         ) \
        .orderBy("online_y", "online_m", "online_d")

    # get dense rank of online time with window and ranking functions
    window = Window.orderBy(asc("online_y"), asc("online_m"), asc("online_d"))

    df_online_rank = df_online.withColumn(
        "online_time_rank",
        dense_rank().over(window)
    )
    limit = 60 if group == "district" else 500

    # showing the resultant df with the online ranking
    df_online_rank.show(limit, truncate = False)



def rain_rank(df_complete, group="district"):
    """
    b. Rank district or mandal by rainfall
    """

    print(f"{group} rank by total rain fall:")
    df_rain = df_complete.groupBy(group) \
                             .agg(
                                 sum("rainfall").alias("total_rainfall")
                             ) \
                             .orderBy("total_rainfall")

    rain_window = Window.orderBy(desc("total_rainfall"))

    df_rain_rank = df_rain.withColumn(
                                    "rainfall_rank",
                                    dense_rank().over(rain_window)
                                 )


    limit = 60 if group == "district" else 500

    # showing the resultant df with the online ranking
    round_column(["total_rainfall"])(df_rain_rank).show(limit, truncate = False)




def worst_rain(df_complete, group="district"):
    """
    c. When was the worst rainfall for each District or mandal

    """
    print(f"Worst rainfall for each {group.capitalize()}:")
    df_worst_rain = df_complete.select([group, "rainfall", "year"])\
                       .groupBy(group, "year") \
                       .agg(
                         sum("rainfall").alias("total_rainfall")
                       ) \
                       .groupBy(group) \
                       .agg(max(struct(col("total_rainfall"), col("year"))).alias("max_rain")) \
                       .select(col(group), col("max_rain.year"), col("max_rain.total_rainfall")) \
                       .orderBy(desc("max_rain.total_rainfall"))

    limit = 60 if group == "district" else 500
    round_column(["total_rainfall"])(df_worst_rain).show(limit, truncate = False)


def month_avg(df_complete, month=7):
    """
    d. What are the averages for any given month across all district or mandal

    """
    print(f"Average for {month_list[month-1]} across all district:")
    df_month_avg = df_complete.where(col("month") == month) \
                            .agg(
                                avg("temp_max").alias("avg_tmax"), \
                                avg("temp_min").alias("avg_tmin"), \
                                avg("humidity_max").alias("avg_hmax"), \
                                avg("humidity_min").alias("avg_hmin"), \
                                avg("wind_speed_max").alias("avg_WSmax"), \
                                avg("wind_speed_min").alias("avg_WSmin"), \
                                avg("rainfall").alias("avg_rain") \
                            )
    round_column(["avg_tmax", "avg_tmin", "avg_hmax",  "avg_hmin", "avg_WSmax", "avg_WSmin", "avg_rain",])(df_month_avg).show(truncate = False)


def best_worst_year_by_month_climate(df_complete, month=7):
    """
    d. what was the best / worst years for a given month across all districts
    """

    currentMonth = month_list[month-1]
    # create df for avg climate of May across all stations and groupby year
    df_may_avg = df_complete.where(col("month") == month) \
                            .groupBy("year") \
                            .agg(
                                avg("temp_max").alias("avg_tmax"), \
                                avg("temp_min").alias("avg_tmin"), \
                                avg("humidity_max").alias("avg_hmax"), \
                                avg("humidity_min").alias("avg_hmin"), \
                                avg("wind_speed_max").alias("avg_WSmax"), \
                                avg("wind_speed_min").alias("avg_WSmin")
                            ) \
                            .orderBy("year").cache()

    # create window by asc & desc for average temp/ af/ sunshine/ rainfall
    tmax_window = Window.orderBy(desc("avg_tmax"))
    tmin_window = Window.orderBy(asc("avg_tmin"))
    af_desc_window = Window.orderBy(desc("avg_hmax"))
    af_asc_window = Window.orderBy(asc("avg_hmin"))
    sun_desc_window = Window.orderBy(desc("avg_WSmax"))
    sun_asc_window = Window.orderBy(asc("avg_WSmin"))
    rain_desc_window = Window.orderBy(desc("avg_rain"))
    rain_asc_window = Window.orderBy(asc("avg_rain"))

    print(f"Best and worst year of temperature for {currentMonth}:")
    newColumn_tmax = f"{month_list[month-1]}_temp_max"
    may_tmax = df_may_avg.withColumn(
                            "tmax_rank",
                            dense_rank().over(tmax_window)
                          ).select("year", "avg_tmax") \
                          .filter(col("tmax_rank") == 1) \
                          .withColumnRenamed("avg_tmax", newColumn_tmax) \
                          .withColumn(newColumn_tmax, round(newColumn_tmax, 2)) \
                          .show()

    newColumn_tmin = f"{month_list[month-1]}_temp_min"
    may_tmin = df_may_avg.withColumn(
                                "tmin_rank",
                                dense_rank().over(tmin_window)
                              ).select("year", "avg_tmin") \
                              .filter(col("tmin_rank") == 1) \
                              .withColumnRenamed("avg_tmin", newColumn_tmin) \
                              .withColumn(newColumn_tmin, round(newColumn_tmin, 2)) \
                              .show()


    print(f"Best and worst year of humidity for {currentMonth}:")
    newColumn_tmax = f"{month_list[month-1]}_humidity_max"
    may_tmax = df_may_avg.withColumn(
                            "hmax_rank",
                            dense_rank().over(tmax_window)
                          ).select("year", "avg_hmax") \
                          .filter(col("hmax_rank") == 1) \
                          .withColumnRenamed("avg_hmax", newColumn_tmax) \
                          .withColumn(newColumn_tmax, round(newColumn_tmax, 2)) \
                          .show()

    newColumn_tmin = f"{month_list[month-1]}_humidity_min"
    may_tmin = df_may_avg.withColumn(
                                "hmin_rank",
                                dense_rank().over(tmin_window)
                              ).select("year", "avg_hmin") \
                              .filter(col("hmin_rank") == 1) \
                              .withColumnRenamed("avg_hmin", newColumn_tmin) \
                              .withColumn(newColumn_tmin, round(newColumn_tmin, 2)) \
                              .show()

    print(f"Best and worst year of wind speed for {currentMonth}:")
    newColumn_tmax = f"{month_list[month-1]}_wind_speed_max"
    may_tmax = df_may_avg.withColumn(
                            "wsmax_rank",
                            dense_rank().over(tmax_window)
                          ).select("year", "avg_wsmax") \
                          .filter(col("wsmax_rank") == 1) \
                          .withColumnRenamed("avg_wsmax", newColumn_tmax) \
                          .withColumn(newColumn_tmax, round(newColumn_tmax, 2)) \
                          .show()

    newColumn_tmin = f"{month_list[month-1]}_wind_speed_min"
    may_tmin = df_may_avg.withColumn(
                                "wsmin_rank",
                                dense_rank().over(tmin_window)
                              ).select("year", "avg_wsmin") \
                              .filter(col("wsmin_rank") == 1) \
                              .withColumnRenamed("avg_wsmin", newColumn_tmin) \
                              .withColumn(newColumn_tmin, round(newColumn_tmin, 2)) \
                              .show()

**Runner Code**

In [30]:
if __name__ == '__main__':
    from pyspark import SparkContext, SparkFiles
    from pyspark.conf import SparkConf
    from pyspark.sql import SparkSession, SQLContext, DataFrame
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark.sql.window import Window
    from functools import reduce
    import timeit

    # define Spark configration
    conf = SparkConf().setAppName("Spark Temp App")

    # define Spark driver session
    spark = SparkSession \
            .builder.master("local[*]") \
            .config(conf = conf) \
            .appName("ClimateAnalysis") \
            .getOrCreate()

    sc = spark.sparkContext
    sc.setLogLevel("ERROR")

    main()

Program start
Dataframe generated using the csv input:
+------------------+------+----------+--------+--------+--------+------------+------------+--------------+--------------+
|          district|mandal|      date|rainfall|temp_min|temp_max|humidity_min|humidity_max|wind_speed_min|wind_speed_max|
+------------------+------+----------+--------+--------+--------+------------+------------+--------------+--------------+
|Medchal-Malkajgiri| Uppal|01-01-2018|     0.0|    12.1|    32.6|        23.8|       100.0|           0.0|           6.6|
|Medchal-Malkajgiri| Uppal|02-01-2018|     0.0|    11.6|    32.6|        23.2|       100.0|           0.0|           4.7|
|Medchal-Malkajgiri| Uppal|03-01-2018|     0.0|    13.0|    33.0|        31.5|       100.0|           0.0|           6.3|
|Medchal-Malkajgiri| Uppal|04-01-2018|     0.0|     9.7|    31.7|        27.4|       100.0|           0.0|           5.2|
|Medchal-Malkajgiri| Uppal|05-01-2018|     0.0|     8.8|    31.0|        28.6|       100.0|