# Part 1: Getting to know the data

In [6]:
# imports
from pyspark.sql import SparkSession, functions, types, Row
import sys
assert sys.version_info >= (3, 5)
import re

# Configuration
## DataFrames
spark = SparkSession.builder.appName('Canadian wind').getOrCreate()
spark.sparkContext.setLogLevel('WARN')
assert spark.version >= '3.0'  # make sure we have Spark 3.0+
## RDDs
sc = spark.sparkContext
assert sc.version >= '3.0'

In [7]:
# General variables
data_path = "data.nosync"

## Metadata
### Stations
See [documentation](https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/readme.txt)
IV. FORMAT OF "ghcnd-stations.txt"

In [8]:
def parse_line(line):
    id = '(\S+)'
    latitude = '([-+]?(?:\d*\.\d+|\d+))'
    longitude = '([-+]?(?:\d*\.\d+|\d+))'
    elevation = '([-+]?(?:\d*\.\d+|\d+))'
    state = '([-a-zA-Z0-9_][-a-zA-Z0-9_])'
    name = '((\S+\s)+)'
    delimiter = '\s+'
    any = ".*"
    line_re = re.compile(r'^'+ id + delimiter + latitude + delimiter + longitude + delimiter + elevation + delimiter + state + delimiter + name + any + '$')
    splitted_line = re.match(line_re, line)
    return Row(splitted_line.group(1), float(splitted_line.group(2)), float(splitted_line.group(3)), float(splitted_line.group(4)), splitted_line.group(5), splitted_line.group(6))

In [9]:
def stations_schema():
    return types.StructType([
        types.StructField("id", types.StringType()),
        types.StructField("latitude", types.FloatType()),
        types.StructField("longitude", types.FloatType()),
        types.StructField("elevation", types.FloatType()),
        types.StructField("state", types.StringType()),
        types.StructField("name", types.StringType()),
        #types.StructField("gsn_flag", types.StringType()),
        #types.StructField("crn_flag", types.StringType()),
        #types.StructField("wmo_id", types.StringType()),
    ])

In [10]:
stations_input = sc.textFile(data_path + "/ghcnd-stations.txt")
formatted_lines = stations_input.filter(lambda line: line.startswith("CA")).map(parse_line)
cleaned_stations = spark.createDataFrame(data=formatted_lines, schema = stations_schema())
cleaned_stations.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-----------+--------+---------+---------+-----+--------------------+
|         id|latitude|longitude|elevation|state|                name|
+-----------+--------+---------+---------+-----+--------------------+
|CA001010066| 48.8667|-123.2833|      4.0|   BC|        ACTIVE PASS |
|CA001010235|    48.4|-123.4833|     17.0|   BC|        ALBERT HEAD |
|CA001010595| 48.5833|-123.5167|     85.0|   BC|BAMBERTON OCEAN C...|
|CA001010720|    48.5|   -124.0|    351.0|   BC|         BEAR CREEK |
|CA001010774|    48.5|  -123.35|     61.0|   BC|        BEAVER LAKE |
|CA001010780| 48.3333|-123.6333|     12.0|   BC|         BECHER BAY |
|CA001010960|    48.6|-123.4667|     38.0|   BC|    BRENTWOOD BAY 2 |
|CA001010961| 48.5667|  -123.45|     31.0|   BC|BRENTWOOD CLARKE ...|
|CA001010965| 48.5667|-123.4333|     91.0|   BC|BRENTWOOD W SAANI...|
|CA001011467| 48.5833|-123.4167|     53.0|   BC|CENTRAL SAANICH V...|
|CA0010114F6| 48.5667|   -123.4|     38.0|   BC|CENTRAL SAANICH I...|
|CA0010114FF|   48.5

                                                                                

In [11]:
cleaned_stations.write.parquet(data_path + "/ghcnd-stations-cleaned", mode="overwrite")

                                                                                

### Countries
See [documentation](https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/readme.txt)
V. FORMAT OF "ghcnd-countries.txt"

In [12]:
def parse_line(line):
    code = '([A-Z][A-Z])\s'
    name = '(([,A-Za-z\[\]\(\)]+\s*)+)'
    line_re = re.compile(r'^'+ code + name + '\s*$')
    splitted_line = re.match(line_re, line)
    return Row(splitted_line.group(1), splitted_line.group(2))

In [13]:
def countries_schema():
    return types.StructType([
        types.StructField("code", types.StringType()),
        types.StructField("name", types.StringType()),
    ])

In [14]:
countries_input = sc.textFile(data_path + "/ghcnd-countries.txt")
formatted_lines = countries_input.map(parse_line)
cleaned_countries = spark.createDataFrame(data=formatted_lines, schema = countries_schema())
cleaned_countries.show()

+----+--------------------+
|code|                name|
+----+--------------------+
|  AC|Antigua and Barbuda |
|  AE|United Arab Emira...|
|  AF|         Afghanistan|
|  AG|            Algeria |
|  AJ|         Azerbaijan |
|  AL|             Albania|
|  AM|            Armenia |
|  AO|             Angola |
|  AQ|American Samoa [U...|
|  AR|          Argentina |
|  AS|          Australia |
|  AU|            Austria |
|  AY|         Antarctica |
|  BA|            Bahrain |
|  BB|           Barbados |
|  BC|           Botswana |
|  BD|Bermuda [United K...|
|  BE|            Belgium |
|  BF|       Bahamas, The |
|  BG|          Bangladesh|
+----+--------------------+
only showing top 20 rows



### States
See [documentation](https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/readme.txt)
VI. FORMAT OF "ghcnd-states.txt"

In [15]:
def parse_line(line):
    code = '([A-Z][A-Z])\s'
    name = '(([A-Z]+\s*)+)'
    line_re = re.compile(r'^'+ code + name + '\s*$')
    splitted_line = re.match(line_re, line)
    return Row(splitted_line.group(1), splitted_line.group(2))

In [16]:
def states_schema():
    return types.StructType([
        types.StructField("code", types.StringType()),
        types.StructField("name", types.StringType()),
    ])

In [17]:
states_input = sc.textFile(data_path + "/ghcnd-states.txt")
formatted_lines = states_input.map(parse_line)
cleaned_states = spark.createDataFrame(data=formatted_lines, schema = states_schema())
cleaned_states.show()

+----+--------------------+
|code|                name|
+----+--------------------+
|  AB|             ALBERTA|
|  AK|              ALASKA|
|  AL|             ALABAMA|
|  AR|            ARKANSAS|
|  AS|      AMERICAN SAMOA|
|  AZ|             ARIZONA|
|  BC|    BRITISH COLUMBIA|
|  CA|          CALIFORNIA|
|  CO|            COLORADO|
|  CT|         CONNECTICUT|
|  DC|DISTRICT OF COLUMBIA|
|  DE|            DELAWARE|
|  FL|             FLORIDA|
|  FM|          MICRONESIA|
|  GA|             GEORGIA|
|  GU|                GUAM|
|  HI|              HAWAII|
|  IA|                IOWA|
|  ID|               IDAHO|
|  IL|            ILLINOIS|
+----+--------------------+
only showing top 20 rows



### Inventory
See [documentation](https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/readme.txt)
VII. FORMAT OF "ghcnd-inventory.txt"

In [18]:
# Cleanup
def toDF(data):
    splitted_data = data.split(" ")
    while "" in splitted_data:
        splitted_data.remove("")

    return Row(splitted_data[0], float(splitted_data[1]), float(splitted_data[2]), splitted_data[3], int(splitted_data[4]), int(splitted_data[5]))

In [19]:
def inventory_schema():
    return types.StructType([
        types.StructField("id", types.StringType()),
        types.StructField("latitude", types.FloatType()),
        types.StructField("longitude", types.FloatType()),
        types.StructField("element", types.StringType()),
        types.StructField("first_year", types.IntegerType()),
        types.StructField("last_year", types.IntegerType()),
    ])

In [20]:
inventory_input = sc.textFile(data_path + "/ghcnd-inventory.txt")
formatted_lines = inventory_input.filter(lambda line: line.startswith("CA")).map(toDF)
cleaned_inventory = spark.createDataFrame(data=formatted_lines, schema = inventory_schema())
cleaned_inventory.show()

[Stage 4:>                                                          (0 + 1) / 1]

22/11/11 12:18:16 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 4 (TID 5): Attempting to kill Python Worker
+-----------+--------+---------+-------+----------+---------+
|         id|latitude|longitude|element|first_year|last_year|
+-----------+--------+---------+-------+----------+---------+
|CA001010066| 48.8667|-123.2833|   PRCP|      1984|     1996|
|CA001010066| 48.8667|-123.2833|   SNOW|      1984|     1996|
|CA001010066| 48.8667|-123.2833|   SNWD|      1984|     1996|
|CA001010066| 48.8667|-123.2833|   MDPR|      1984|     1996|
|CA001010066| 48.8667|-123.2833|   MDSF|      1984|     1990|
|CA001010235|    48.4|-123.4833|   TMAX|      1976|     1978|
|CA001010235|    48.4|-123.4833|   TMIN|      1976|     1978|
|CA001010235|    48.4|-123.4833|   PRCP|      1971|     1995|
|CA001010235|    48.4|-123.4833|   SNOW|      1971|     1995|
|CA001010235|    48.4|-123.4833|   SNWD|      1991|     1995|
|CA001010235|    48.4|-123.4833|   MDPR|      1971|     1995

                                                                                

In [21]:
cleaned_inventory.write.parquet(data_path + "/ghcnd-inventory-cleaned", mode="overwrite")

                                                                                

## Data
### Daily summaries latest


In [22]:
def daily_summaries_schema():
    return types.StructType([
        types.StructField("station", types.StringType()),
        types.StructField("date", types.StringType()),
        types.StructField("latitude", types.StringType()),
        types.StructField("longtitude", types.StringType()),
        types.StructField("elevation", types.StringType()),
        types.StructField("name", types.StringType()),
        types.StructField("prcp", types.StringType()),
        types.StructField("prcp_attributes", types.StringType()),
        types.StructField("snow", types.StringType()),
        types.StructField("snow_attributes", types.StringType()),
        types.StructField("snwd", types.StringType()),
        types.StructField("snwd_attributes", types.StringType()),
        types.StructField("dapr", types.StringType()),
        types.StructField("dapr_attributes", types.StringType()),
        types.StructField("mdpr", types.StringType()),
        types.StructField("mdpr_attributes", types.StringType()),
        types.StructField("wesd", types.StringType()),
        types.StructField("wesd_attributes", types.StringType()),
    ])
# show the data for the station CA1AB000001
daily_summaries_data = spark.read.csv(data_path + "/ghcnd-daily-summaries-latest-canada/CA1AB000001.csv", sep=",", header=True)
daily_summaries_data.show()

+-----------+----------+---------+-----------+---------+--------------------+-----+---------------+-----+---------------+-----+---------------+----+---------------+----+---------------+----+---------------+
|    STATION|      DATE| LATITUDE|  LONGITUDE|ELEVATION|                NAME| PRCP|PRCP_ATTRIBUTES| SNOW|SNOW_ATTRIBUTES| SNWD|SNWD_ATTRIBUTES|DAPR|DAPR_ATTRIBUTES|MDPR|MDPR_ATTRIBUTES|WESD|WESD_ATTRIBUTES|
+-----------+----------+---------+-----------+---------+--------------------+-----+---------------+-----+---------------+-----+---------------+----+---------------+----+---------------+----+---------------+
|CA1AB000001|2014-07-04|53.606907|-113.561926|    686.7|EDMONTON 9.1 NNW,...|    0|            ,,N|    0|            ,,N|    0|            ,,N|null|           null|null|           null|null|           null|
|CA1AB000001|2014-07-05|53.606907|-113.561926|    686.7|EDMONTON 9.1 NNW,...|  241|            ,,N|    0|            ,,N|    0|            ,,N|null|           null|null|   

### GHCND-all
See [documentation](https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/readme.txt)
III. FORMAT OF DATA FILES (".dly" FILES)

In [23]:
def parse_line(line):
    station = line[:11]
    year = line[11:15]
    month = line[15:17]
    element = line[17:21]
    start_index=21
    data = [station, year, month, element]
    for i in range(31):
        value_i = line[start_index:start_index+5]
        mflag_i = line[start_index+5:start_index+6]
        qflag_i = line[start_index+6:start_index+7]
        sflag_i = line[start_index+7:start_index+8]
        start_index=start_index+8
        data.append(int(value_i))
        data.append(mflag_i)
        data.append(qflag_i)
        data.append(sflag_i)

    return data

def ghcnd_all_columns():
    columns = ["station", "year", "month", "element"]
    for i in range(31):
        value_i = "value" + str(i+1)
        mflag_i = "mflag" + str(i+1)
        qflag_i = "qflag" + str(i+1)
        sflag_i = "sflag" + str(i+1)
        columns.append(value_i)
        columns.append(mflag_i)
        columns.append(qflag_i)
        columns.append(sflag_i)

    return columns

In [24]:
station = "CA1AB000001"
ghcnd_all_input = sc.textFile(data_path + "/ghcnd-all-canada/"+ station +".dly")
formatted_lines = ghcnd_all_input.map(parse_line)
cleaned_ghcnd_all = formatted_lines.toDF(ghcnd_all_columns())
cleaned_ghcnd_all.show()

22/11/11 12:18:18 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+-----------+----+-----+-------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+

In [25]:
cleaned_ghcnd_all.write.partitionBy("station").parquet(data_path + "/ghcnd-all-canada-cleaned", mode="overwrite")

### Wind

Questions:
- what is a multiday? - lasting for more than one day: HOW LONG
-
-

In [27]:
# wind speed in tenth of meters per second
wind_day_average_speed = "AWND"
wind_day_max_1min_speed = "WSF1"
wind_day_max_2min_speed = "WSF2"
wind_day_max_5min_speed = "WSF5"
wind_day_max_instantaneous_speed = "WSFI"
wind_day_peak_speed = "WSFG"
wind_speed = [wind_day_average_speed, wind_day_max_1min_speed, wind_day_max_2min_speed, wind_day_max_5min_speed, wind_day_max_instantaneous_speed, wind_day_peak_speed]

# wind movement in km/h
wind_time_max_movement = "FMTM"
wind_multiday_movement = "MDWM"
wind_multiday_movement_number_of_days = "DAWM"
wind_day_movement = "WDMV"
wind_movement = [wind_time_max_movement, wind_multiday_movement, wind_multiday_movement_number_of_days, wind_day_movement]

# wind directions in degree
wind_day_average_direction = "AWDR"
wind_day_max_1min_direction = "WDF1"
wind_day_max_2min_direction = "WDF2"
wind_day_max_5min_direction = "WDF5"
wind_day_peak_direction = "WDFG"
wind_day_max_instantaneous_direction = "WDFI"
wind_direction = [wind_day_average_direction, wind_day_max_1min_direction, wind_day_max_2min_direction, wind_day_max_5min_direction, wind_day_peak_direction, wind_day_max_instantaneous_direction]

# temperature in tenths of degrees C
temperature_day_min = "TMIN"
temperature_multiday_min = "MDTN"
temperature_multiday_min_number_of_days = "DATN"
temperature_day_max = "TMAX"
temperature_multiday_max = "MDTX"
temperature_multiday_max_number_of_days = "DATX"
temperature = [temperature_day_min, temperature_multiday_min, temperature_multiday_min_number_of_days, temperature_day_max, temperature_multiday_max, temperature_multiday_max_number_of_days]

In [39]:
parquet_data = spark.read.parquet(data_path + "/ghcnd-all-canada-cleaned/station=" + station)
parquet_data.groupby(parquet_data["element"]).count().show()

+-------+-----+
|element|count|
+-------+-----+
|   WESD|    1|
|   MDPR|   41|
|   DAPR|   41|
|   SNOW|   80|
|   SNWD|   16|
|   PRCP|   80|
+-------+-----+



### Temperature

### Weather

# Part 2: Clean up the data
## CSV to Parquet