# <center style="color: #003b7f"> Analyzing Temperature and Precipitation Patterns <br/> for Climate Assessment </center> <a class='tocSkip'>
    
**MSDS 2023 Term 4 SLT1B** | Loraine Menorca, BJ Enrik Yepes

This notebook is dedicated for the details of the data collection and cleaning process that serves as a supplementary to the main report of this project.
    
**! Note:** This was ran using PySpark via AWS' EMR studio. Rerunning the entire notebook outside a similar environment is not advisable.
    
***

In [1]:
spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
5,application_1685926317650_0006,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7efd58cd35d0>

In [2]:
# Import necessary libraries
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql import functions as F
import pyspark.pandas as ps

from datetime import datetime
import pytz

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Set the timezone to Philippine time for timestamps on file saving
ph_tz = pytz.timezone('Asia/Manila')

# Get the current datetime in Philippine time
current_datetime = datetime.now(ph_tz)

# Convert the datetime to string
current_datetime_str = current_datetime.strftime('%Y-%m-%d_%H:%M')

# Print the current datetime as string
print(current_datetime_str)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2023-06-05_14:50

# I. Location details

As detailed in the main notebook report, **bdcc_lab2_main.ipynb**, under the Data Source section, there details on the location of the stations such as the country and country code were collected. The raw files were in .txt. format and were converted into CSV files for ease of processing.

In [None]:
# Read the raw file on location
fnames = spark.read.text('s3://noaa-ghcn-pds/ghcnd-countries.txt')

In [None]:
fnames.show()

**SAVE**

In [None]:
# FOR SAVING
out_fpath =  f"s3://bdcc-lab2-2023/ghcnd/ghcnd_countries.csv"
out_fpath

In [None]:
# Add new columns: Country and Country code
(fnames.withColumn('columns', F.split(fnames.value, ' '))
       .withColumn('code', F.col('columns')[0])
       .withColumn('country', F.col('columns')[1])
       .drop('columns', 'value')
).write.csv(out_fpath)

# II. Station details

Similarly, details about the stations were the data on climate and weather events such as elevation, geographical coordinates, and their name were collected. The raw text file was converted into a CSV file.

In [None]:
# Read raw file on stations
fnames = spark.read.text('s3://noaa-ghcn-pds/ghcnd-stations.txt')

In [None]:
fnames.show(vertical=True, truncate=False)

**SAVE**

In [None]:
# For saving
out_fpath =  f"s3://bdcc-lab2-2023/ghcnd/ghcnd_stations.csv"
out_fpath

In [None]:
# Added columns on coordinates, elevation, and others
(fnames.withColumn('columns', F.split(F.trim(fnames.value), '  '))
       .withColumn('id', F.col('columns')[0])
       .withColumn('latitude', F.trim(F.col('columns')[1]).cast('float'))
       .withColumn('longitude', F.trim(F.col('columns')[2]).cast('float'))
       .withColumn('elevation', F.trim(F.col('columns')[3]).cast('float'))
       .withColumn('state', F.trim(F.col('columns')[4]))
       .withColumn('name', F.trim(F.col('columns')[5]))
       .withColumn('gsn_flag', F.trim(F.col('columns')[6]))
       .withColumn('hcn_gsn_flag', F.trim(F.col('columns')[7]))
       .withColumn('wmo_id', F.trim(F.col('columns')[8]))
       .drop('columns', 'value')
).write.csv(out_fpath)

# Raw data on Climate

The main dataset was collected using the path: `'s3a://noaa-ghcn-pds/csv/by_year/20*'`.

In [3]:
# Read the raw CSV files
fpath = 's3a://noaa-ghcn-pds/csv/by_year/20*'
df = spark.read.csv(fpath, header=True, sep=',').withColumn('filename', F.input_file_name())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Checker: expectation is total count > unique count because of duplicate IDs
df.select('filename', 'ID', 'DATE').agg(
    F.countDistinct('filename', 'ID', 'DATE').alias('unique_count'),
    F.count('*').alias('total_count')
).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-----------+
|unique_count|total_count|
+------------+-----------+
|   264665977|  871923626|
+------------+-----------+

## Elements

The `ELEMENT` field contains the important features that are of interest. To enhance the ease of processing and performing aggregations, the data in this field has been transformed from rows to columns. Each feature now corresponds to a separate column, and the corresponding `DATA_VALUE` has been assigned as the value for each feature.

In [5]:
# Collect needed information on elements per record
df_elements = (df
                .select('filename', 'ID', 'DATE', 'ELEMENT', 'DATA_VALUE')
                .groupBy('filename', 'ID', 'DATE')
                .pivot('ELEMENT')
                .agg(F.first('DATA_VALUE'))
            )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Tags

Because the original structure of the data is by `ID`, `DATE`, `ELEMENT`, and `DATA_VALUE`, the values of each tag are duplicated. Since the `ELEMENT` and `DATA_VALUE` columns were now flattened or transformed into unique records per row, the tags can now also be deduplicated.

In [6]:
# Collect needed information on tags per record
df_tags = (df
            .groupBy('ID', 'DATE')
            .agg(
                F.first("M_FLAG").alias("M_FLAG"),
                F.first("Q_FLAG").alias("Q_FLAG"),
                F.first("S_FLAG").alias("S_FLAG"),
                F.first("OBS_TIME").alias("OBS_TIME")
            )
        )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Merged Elements & Tags

In [7]:
# Merge information on elements and tags
df_final = df_tags.join(df_elements, on=['ID', 'DATE'], how="inner")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Supplementary

The converted CSV files on the stations and country locations are now merged with the final data on climate and weather events.

In [8]:
fpath_stations =  f"s3://bdcc-lab2-2023/ghcnd/ghcnd_stations.csv"
fpath_countries =  f"s3://bdcc-lab2-2023/ghcnd/ghcnd_countries.csv"

# stations
schema_stations = StructType([
    StructField("ID", StringType(), nullable=True),
    StructField("LATITUDE", DoubleType(), nullable=True),
    StructField("LONGITUDE", DoubleType(), nullable=True),
    StructField("ELEVATION", DoubleType(), nullable=True),
    StructField("STATE", StringType(), nullable=True),
    StructField("NAME", StringType(), nullable=True),
    StructField("GSN_FLAG", StringType(), nullable=True),
    StructField("HCN_CRN", StringType(), nullable=True),
    StructField("WMO_ID", StringType(), nullable=True)
])

df_stations = spark.read.csv(fpath_stations, header=False, sep=',', schema=schema_stations)

# countries
schema_countries = StructType([
    StructField("COUNTRY_CODE", StringType(), nullable=True),
    StructField("COUNTRY", StringType(), nullable=True)
])
df_countries = spark.read.csv(fpath_countries, header=False, sep=',', schema=schema_countries)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# Merged climate and weather events + Location + Station information
df_merged = (df_final
                .withColumn("COUNTRY_CODE", F.expr("substr(ID, 1, 2)"))
                .withColumn('DATE', F.to_date(F.col('DATE'), 'yyyyMMdd'))
                .withColumn('YEAR', F.year(F.col('DATE')))
                .withColumn('MONTH', F.month(F.col('DATE')))
                .withColumn('DAY', F.dayofmonth(F.col('DATE')))
                .join(df_stations, on='ID', how='left')
                .join(df_countries, on='COUNTRY_CODE', how='left')
            )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
df_merged.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

264665977

## Save as Parquet

In [13]:
out_fpath =  f"s3://bdcc-lab2-2023/ghcnd/processed/ghcnd_processed_{current_datetime_str}.parquet"
out_fpath

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

's3://bdcc-lab2-2023/ghcnd/processed/ghcnd_processed_2023-06-05_14:50.parquet'

In [14]:
df_merged.write.parquet(out_fpath)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Check saved data

In [17]:
in_fpath = f"s3://bdcc-lab2-2023/ghcnd/processed/ghcnd_processed_2023-06-05_14:50.parquet"
df_processed = spark.read.parquet(in_fpath)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The schema of the preprocessed data. This would serve as the working file for succeeding steps in the project.

In [18]:
df_processed.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- COUNTRY_CODE: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- DATE: date (nullable = true)
 |-- M_FLAG: string (nullable = true)
 |-- Q_FLAG: string (nullable = true)
 |-- S_FLAG: string (nullable = true)
 |-- OBS_TIME: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- ACMH: string (nullable = true)
 |-- ACSH: string (nullable = true)
 |-- ADPT: string (nullable = true)
 |-- ASLP: string (nullable = true)
 |-- ASTP: string (nullable = true)
 |-- AWBT: string (nullable = true)
 |-- AWDR: string (nullable = true)
 |-- AWND: string (nullable = true)
 |-- DAEV: string (nullable = true)
 |-- DAPR: string (nullable = true)
 |-- DASF: string (nullable = true)
 |-- DATN: string (nullable = true)
 |-- DATX: string (nullable = true)
 |-- DAWM: string (nullable = true)
 |-- DWPR: string (nullable = true)
 |-- EVAP: string (nullable = true)
 |-- FMTM: string (nullable = true)
 |-- MDEV: string (nullable = true)
 |-- MDPR: string (nullable = true)
 |-- 

In [19]:
df_processed.show(vertical=True, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0------------------------------------------------
 COUNTRY_CODE | AS                                       
 ID           | ASN00030051                              
 DATE         | 2000-08-06                               
 M_FLAG       | null                                     
 Q_FLAG       | null                                     
 S_FLAG       | a                                        
 OBS_TIME     | null                                     
 filename     | s3a://noaa-ghcn-pds/csv/by_year/2000.csv 
 ACMH         | null                                     
 ACSH         | null                                     
 ADPT         | null                                     
 ASLP         | null                                     
 ASTP         | null                                     
 AWBT         | null                                     
 AWDR         | null                                     
 AWND         | null                                     
 DAEV         

# Check saved data

## Countries

In [None]:
in_fpath = f"s3://bdcc-lab2-2023/ghcnd/ghcnd_countries.csv"
df_countries= (spark.read.csv(in_fpath, header=True, inferSchema=True))

df_countries.show(truncate=False)

## Raw

In [None]:
in_fpath = f"s3://bdcc-lab2-2023/ghcnd/ghcnd_raw.parquet"
df_raw = spark.read.parquet(in_fpath)

In [None]:
df_raw.count()

In [None]:
df.show(truncate=False)

In [None]:
df.printSchema()

In [None]:
# checker
(df.agg(F.countDistinct('ID').alias('unique_count'), F.count('ID').alias('total_count'))).show()

-End of Data Preparation Notebook-