## About the project
The goal is to use the variable WT08 analyzing if there is a relationship between COVID-19 and US air quality in 2020.According to our plan, the experimental design is as follows: 

1. Extract daily values of variables over January - April of all years to the present worldwide.
2. Quantify historic variability in each variable at a site-by-site level: e.g., mean and CI (parametric or non-parametric quantile intervals) by site over all years to 2019. This might depend on data scarcity/availability.
3. Evaluate placement of 2020 observations relative to long-term variability using appropriate statistical test.
4. Potential use of PCA: across all sites (not site by site), could look and see if components responsible for greatest variation in 2020 data look different than prior years (not sure if that’s possible - might be for precip and temp, prob not for smoke/haze and fog)


In [1]:
import pandas as pd
import numpy as np
import sklearn as sk
import urllib
import math

import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext(master="local[4]")
#sc.version

from pyspark import SparkContext
from pyspark.sql import *
import pyspark.sql
sqlContext = SQLContext(sc)

## About the data
Just make some small tests. I have downloaded NOAA GHCN-D data (year 2017-2020) from AWS and saved them in the directory ~/weather_data/test
The stations text file is saved in the same directory.
The yearly files stores wheather data of the whole world, while we are mainly interested in the WT08 variable of US from January to April each year.So first we need to do some data cleaning.Take year 2020 for instance.

In [3]:
stations = sqlContext.read.csv("/home/fyjj/weather_data/test/stations.txt", sep=" ", mode="PERMISSIVE", header=False)
stations.show(4)

+-----------+----+-------+----+--------+-------+----+----+----+----+----+----+-------+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+----+----+----+----+----+----+
|        _c0| _c1|    _c2| _c3|     _c4|    _c5| _c6| _c7| _c8| _c9|_c10|_c11|   _c12|    _c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27|_c28|_c29|_c30| _c31|_c32|_c33|_c34|_c35|_c36|_c37|
+-----------+----+-------+----+--------+-------+----+----+----+----+----+----+-------+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+----+----+----+----+----+----+
|ACW00011604|null|17.1167|null|-61.7833|   null|null|10.1|null|null|null|  ST|  JOHNS|COOLIDGE| FLD|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null| null|null|null|null|null|null|null|
|ACW00011647|null|17.1333|null|-61.7833|   null|null|19.2|null|null|null|  ST|  JOHNS|    null|null|null|null|null|null|null|null|nu

In [5]:
from pyspark.sql.functions import col
stations = stations.filter(col("_c0").like("US%"))
stations = stations.select(col("_c0"), col("_c7"))
stations.show(4)

+-----------+---+
|        _c0|_c7|
+-----------+---+
|US009052008| SD|
|US10RMHS145|1.6|
|US10adam001| NE|
|US10adam002| NE|
+-----------+---+
only showing top 4 rows



In [6]:
filename = '/home/fyjj/weather_data/test/2020.csv'
weather_df = sqlContext.read.format("csv").option("header","False").load(filename)
weather_df.show(4)

+-----------+--------+----+---+----+----+---+----+
|        _c0|     _c1| _c2|_c3| _c4| _c5|_c6| _c7|
+-----------+--------+----+---+----+----+---+----+
|US1FLSL0019|20200101|PRCP|  0|null|null|  N|null|
|US1FLSL0019|20200101|SNOW|  0|null|null|  N|null|
|US1NVNY0012|20200101|PRCP|  0|null|null|  N|null|
|US1NVNY0012|20200101|SNOW|  0|null|null|  N|null|
+-----------+--------+----+---+----+----+---+----+
only showing top 4 rows



In [8]:
weather_df = weather_df.drop("_c3").drop("_c4").drop("_c5").drop("_c6").drop("_c7")
weather_df = weather_df.withColumnRenamed("_c0","station").withColumnRenamed("_c1","data").withColumnRenamed("_c2","element")
weather_df.show(4)

+-----------+--------+-------+
|    station|    data|element|
+-----------+--------+-------+
|US1FLSL0019|20200101|   PRCP|
|US1FLSL0019|20200101|   SNOW|
|US1NVNY0012|20200101|   PRCP|
|US1NVNY0012|20200101|   SNOW|
+-----------+--------+-------+
only showing top 4 rows



In [9]:
weather_df = weather_df.filter(weather_df["element"]=="WT08")
weather_df = weather_df.filter(weather_df["data"]<20200501)
weather_df.show(4)

+-----------+--------+-------+
|    station|    data|element|
+-----------+--------+-------+
|USW00013927|20200101|   WT08|
|USW00023061|20200101|   WT08|
|USW00026615|20200101|   WT08|
|USW00053120|20200101|   WT08|
+-----------+--------+-------+
only showing top 4 rows



In [14]:
sqlContext.registerDataFrameAsTable(weather_df,'weather_df')
sqlContext.sql('select station, count(station) as count from weather_df group by station order by count desc').show(5)

+-----------+-----+
|    station|count|
+-----------+-----+
|USW00023234|  112|
|USW00012972|   91|
|USW00003166|   81|
|USW00094938|   80|
|USW00094931|   78|
+-----------+-----+
only showing top 5 rows



I want to join the weather data with the station data to determine the specific state/city the station is in. But don't know how to do it...