In [3]:
import timeit

"""
Created on Sat Dec 5 
@author: jing0703

Data source: 
https://www.metoffice.gov.uk/research/climate/maps-and-data/historic-station-data

Data format:
   1914   1    5.2     0.7    ---     52.0    ---
   1914   2    9.2     3.5    ---     28.0    ---
   1914   3   ---     ---     ---     ---     ---

Step:
1. Load and parse UK Weather Station History data 
2. Combine all stations so can run queries across all stations and all time periods 
3. Summary statistics for stations

Package:
Spark: 2.3.2
Python: 3.6.4
"""

def main():
    """
    main function to parse, clean up and analyze data
    """
    # create empty list to append data and row count for each station
    df_all = []
    station_count = []
    station_list =  ["aberporth", 
                     "armagh", 
                     "ballypatrick", 
                     "bradford", 
                     "braemar", 
                     "camborne", 
                     "cambridge", 
                     "cardiff", 
                     "chivenor", 
                     "dunstaffnage", 
                     "durham", 
                     "eastbourne", 
                     "eskdalemuir", 
                     "heathrow", 
                     "hurn", 
                     "lerwick", 
                     "leuchars", 
                     "whitby", 
                     "cwmystwyth", 
                     "lowestoft", 
                     "manston", 
                     "nairn", 
                     "newtonrigg", 
                     "oxford", 
                     "paisley", 
                     "ringway", 
                     "rossonwye", 
                     "shawbury", 
                     "sheffield", 
                     "southampton", 
                     "stornoway", 
                     "suttonbonington", 
                     "tiree", 
                     "valley", 
                     "waddington", 
                     "wickairport", 
                     "yeovilton"]
    
    start_time = timeit.default_timer()
    
    # read data for each station and append to list
    for station in station_list:
        url = f"https://www.metoffice.gov.uk/pub/data/weather/uk/climate/stationdata/{station}data.txt"
        sc.addFile(url)
        rdd = sc.textFile("file://"+ SparkFiles.get(f"{station}data.txt"))
        df= read_data(rdd, station)
        station_count.append(df.count())
        df_all.append(df)

    # create dict for row count of each station
    station_count = dict(zip(station_list, station_count))
    print("Row count for each station:", station_count)

    # union list of station data into complete df as all history data
    df_complete = reduce(DataFrame.unionAll, df_all).cache()
    
    # climate data analysis for question 4
    station_history_rank(df_complete)
    station_rain_sun_rank(df_complete)
    worst_rain_best_sun(df_complete)
    may_avg(df_complete)
    best_worst_year_may_climate(df_complete)
    
    # remove cached df
    del df_complete
    
    # finish timing program 
    end_time = timeit.default_timer()
    print(f"Program duration: {(end_time - start_time)}s")


def read_data(rdd, station):
    """
    function to remove header line, special character and provisional data
    """
    # list of df column name
    new_colnames = ["station", "year", "month", "tmax", "tmin", "af", "rain", "sun"]
    
    # find the index for the end of header and  filter out header & special character 
    head_index = rdd.zipWithIndex().lookup("              degC    degC    days      mm   hours")
    cli_data = rdd.zipWithIndex()\
                  .filter(lambda row_index: row_index[1] > head_index[0]).keys()\
                  .filter(lambda x: any(e not in x for e in ["*", "#", "$"])) 
    
    # split rdd, add station name, convert to df  
    line_df = cli_data.map(lambda line: (station, line.split(" "))).toDF(("station", "data"))
    
    # remove empty array from data column 
    # for spark >= 2.4 use array_remove function
    # from pyspark.sql.functions import array_remove
    # line_split.withColumn("data", array_remove("data", ""))
    drop_array = udf(drop_from_array, ArrayType(StringType()))
    
    # remove rows with extra character for provisional data
    df = line_df.withColumn("data", drop_array("data", lit(""))).filter(size("data") == 7)
    
    # split column data to multiple columns and rename new column
    df1 = df.select([df.station] + [df.data[i] for i in range(7)])
    df2 = df1.toDF(*new_colnames)\
          .replace("---", None)\
          .na.fill({"tmax": 0.0, "tmin": 0.0, "af": 0, "rain": 0.0, "sun": 0.0})\
          .withColumn("year",col("year").cast("integer"))\
          .withColumn("month",col("month").cast("integer"))\
          .withColumn("tmax",col("tmax").cast("float"))\
          .withColumn("tmin",col("tmin").cast("float"))\
          .withColumn("af",col("af").cast("integer"))\
          .withColumn("rain",col("rain").cast("float"))\
          .withColumn("sun",col("sun").cast("float")) \
          .cache() 

    return df2   


def drop_from_array(arr, item):
    """
    function to remove item from array
    """
    return [x for x in arr if x != item]


def round_column(col_names):
    """
    function to round float from multiple columns
    """
    def inner(df):
        for col_name in col_names:
            df = df.withColumn(
                col_name,
                round(df[col_name], 2)
            )
        return df
    return inner



def station_history_rank(df_complete):
    """
    a. Rank stations by how long they have been online
    """
    print("Rank stations by how long they have been online:")
    
    # groupby station & get earliest year/ month by agg
    df_online = df_complete.groupBy("station") \
        .agg(
             min("year").alias("online_y"), \
             min("month").alias("online_m") \
         ) \
        .orderBy("online_y", "online_m")
        
    # get dense rank of online time with window and ranking functions
    window = Window.orderBy(asc("online_y"), asc("online_m"))

    df_online_rank = df_online.withColumn(
        "online_time_rank", 
        dense_rank().over(window)
    ).show(37, truncate = False)



def station_rain_sun_rank(df_complete):
    """
    b. Rank stations by rainfall and / or sunshine
    """
    
    print("Station rank by total sunshine and rain fall:")
    df_rain_sun = df_complete.groupBy("station") \
                             .agg(
                                 sum("rain").alias("total_rainfall"),\
                                 sum("sun").alias("total_sunshine")
                             ) \
                             .orderBy("total_rainfall", "total_sunshine")

    rain_window = Window.orderBy(desc("total_rainfall"))
    sun_window = Window.orderBy(desc("total_sunshine"))
    
    df_rain_sun_rank = df_rain_sun.withColumn(
                                    "rainfall_rank", 
                                    dense_rank().over(rain_window)
                                 ).withColumn(
                                    "sunshine_rank", 
                                    dense_rank().over(sun_window)
                                 )
    round_column(["total_rainfall", "total_sunshine"])(df_rain_sun_rank).show(37, truncate = False)
    



def worst_rain_best_sun(df_complete):
    """
    c. When was the worst rainfall and / or best sunshine for each station
    
    """
    print("Worst rainfall for each station:")
    df_worst_rain = df_complete.withColumn("year_month", concat(col("year"), lit("-"), col("month")))\
                       .select(["station", "rain", "year_month"])\
                       .groupBy("station", "year_month") \
                       .agg(
                         sum("rain").alias("total_rainfall")
                       ) \
                       .groupBy("station") \
                       .agg(max(struct(col("total_rainfall"), col("year_month"))).alias("max_rain")) \
                       .select(col("station"), col("max_rain.year_month"), col("max_rain.total_rainfall")) \
                       .orderBy(desc("max_rain.total_rainfall")) 

    round_column(["total_rainfall"])(df_worst_rain).show(37, truncate = False)
    
    print("Best sunshine for each station:")
    df_best_sun = df_complete.withColumn("year_month", concat(col("year"), lit("-"), col("month")))\
                   .select(["station", "sun", "year_month"])\
                   .groupBy("station", "year_month") \
                   .agg(
                     sum("sun").alias("total_sunshine")
                   ) \
                   .groupBy("station") \
                   .agg(max(struct(col("total_sunshine"), col("year_month"))).alias("max_sun")) \
                   .select(col("station"), col("max_sun.year_month"), col("max_sun.total_sunshine")) \
                   .orderBy(desc("max_sun.total_sunshine")) 
                
    round_column(["total_sunshine"])(df_best_sun).show(37, truncate = False)


def may_avg(df_complete):
    """
    d. What are the averages for May across all stations
    
    """
    print("Average for May across all stations:")
    df_may_avg = df_complete.where(col("month") ==5) \
                            .agg(
                                avg("tmax").alias("avg_tmax"), \
                                avg("tmin").alias("avg_tmin"), \
                                avg("af").alias("avg_af"), \
                                avg("rain").alias("avg_rain"), \
                                avg("sun").alias("avg_sun") \
                            )
    round_column(["avg_tmax", "avg_tmin", "avg_af", "avg_rain", "avg_sun"])(df_may_avg).show(truncate = False)


def best_worst_year_may_climate(df_complete):
    """
    d. what was the best / worst years for May across all stations
    """
    
    # create df for avg climate of May across all stations and groupby year
    df_may_avg = df_complete.where(col("month") ==5) \
                            .groupBy("year") \
                            .agg(
                                avg("tmax").alias("avg_tmax"), \
                                avg("tmin").alias("avg_tmin"), \
                                avg("af").alias("avg_af"), \
                                avg("rain").alias("avg_rain"), \
                                avg("sun").alias("avg_sun") \
                            ) \
                            .orderBy("year").cache()
                
    # create window by asc & desc for average temp/ af/ sunshine/ rainfall
    tmax_window = Window.orderBy(desc("avg_tmax"))
    tmin_window = Window.orderBy(asc("avg_tmin"))
    af_desc_window = Window.orderBy(desc("avg_af"))
    af_asc_window = Window.orderBy(asc("avg_af"))
    sun_desc_window = Window.orderBy(desc("avg_sun"))
    sun_asc_window = Window.orderBy(asc("avg_sun"))
    rain_desc_window = Window.orderBy(desc("avg_rain"))
    rain_asc_window = Window.orderBy(asc("avg_rain"))
    
    print("Best and worst year of temperature for May:")
    may_tmax_max = df_may_avg.withColumn(
                            "tmax_rank", 
                            dense_rank().over(tmax_window)
                          ).select("year", "avg_tmax") \
                          .filter(col("tmax_rank") == 1) \
                          .withColumnRenamed("avg_tmax","may_temp_max") \
                          .withColumn("may_temp_max", round("may_temp_max", 2)) \
                          .show()
                
    may_tmin_min = df_may_avg.withColumn(
                                "tmin_rank", 
                                dense_rank().over(tmin_window)
                              ).select("year", "avg_tmin") \
                              .filter(col("tmin_rank") == 1) \
                              .withColumnRenamed("avg_tmin","may_temp_min") \
                              .withColumn("may_temp_min", round("may_temp_min", 2)) \
                              .show()
                    
    print("Best and worst year of days of air frost for May:")
    may_af_max = df_may_avg.withColumn(
                            "af_rank", 
                            dense_rank().over(af_desc_window)
                        ).select("year", "avg_af") \
                        .filter(col("af_rank") == 1) \
                        .withColumnRenamed("avg_af","may_af_max") \
                        .withColumn("may_af_max", round("may_af_max", 2)) \
                        .show()


    may_af_min = df_may_avg.withColumn(
                              "af_rank", 
                              dense_rank().over(af_asc_window)
                            ).select("year", "avg_af") \
                            .filter(col("af_rank") == 1) \
                            .withColumnRenamed("avg_af","may_af_min").show()
            
    print("Best and worst year of sunshine for May:")
    may_sun_max = df_may_avg.withColumn(
                            "sun_rank", 
                            dense_rank().over(sun_desc_window)
                        ).select("year", "avg_sun") \
                        .filter(col("sun_rank") == 1) \
                        .withColumnRenamed("avg_sun","may_sun_max") \
                        .withColumn("may_sun_max", round("may_sun_max", 2)) \
                        .show()

    may_sun_min = df_may_avg.withColumn(
                                "sun_rank", 
                                dense_rank().over(sun_asc_window)
                             ).select("year", "avg_sun") \
                            .filter(col("sun_rank") == 1) \
                            .withColumnRenamed("avg_sun","may_sun_min").show()
            
    print("Best and worst year of rainfall for May:")
    may_rain_max = df_may_avg.withColumn(
                            "rain_rank", 
                            dense_rank().over(rain_desc_window)
                        ).select("year", "avg_rain") \
                        .filter(col("rain_rank") == 1) \
                        .withColumnRenamed("avg_rain","may_rain_max") \
                        .withColumn("may_rain_max", round("may_rain_max", 2)) \
                        .show()


    may_rain_min = df_may_avg.withColumn(
                                "rain_rank", 
                                dense_rank().over(rain_asc_window)
                             ).select("year", "avg_rain") \
                             .filter(col("rain_rank") == 1) \
                             .withColumnRenamed("avg_rain","may_rain_min") \
                             .withColumn("may_rain_min", round("may_rain_min", 2)) \
                             .show()