In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

from datetime import datetime

from user_definition import *
sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()

In [64]:

n = 10
start_string = 'S'
start_date = "2020-01-01"
end_date = "2020-01-07"
site_id_val = 60270022

# Q1

**Reading the Data**

In [30]:
def convert_date(x):
    arr = x.split('-')
    return datetime(int(arr[0]), int(arr[1]), int(arr[2]) )

In [41]:
airquality = sc.textFile(air_quality_file).map(lambda x : x.split(','))
airquality_ = airquality.map(lambda x: [convert_date(x[0]), int(x[1]), float(x[2]),float(x[3]) ])
airquality_.first()

[datetime.datetime(2020, 1, 1, 0, 0), 60070008, 27.0, 25.0]

In [42]:
airquality.first()

['2020-01-01', '60070008', '27', '25']

**Defining the schema**

In [43]:
schema = StructType([
            StructField("date", DateType(), False),
            StructField("site_id", IntegerType(), False),
            StructField("daily_mean_pm10_concentration", FloatType(), False),
            StructField("daily_aqi_value", FloatType(),False)
        
])

**Converting the rdd object to DataFrame**

In [44]:
df_airquality = ss.createDataFrame(airquality_, schema)

In [45]:
df_airquality.show(5)

+----------+--------+-----------------------------+---------------+
|      date| site_id|daily_mean_pm10_concentration|daily_aqi_value|
+----------+--------+-----------------------------+---------------+
|2020-01-01|60070008|                         27.0|           25.0|
|2020-01-02|60070008|                         22.0|           20.0|
|2020-01-03|60070008|                         30.0|           28.0|
|2020-01-04|60070008|                         17.0|           16.0|
|2020-01-05|60070008|                         18.0|           17.0|
+----------+--------+-----------------------------+---------------+
only showing top 5 rows



In [46]:
df_airquality.printSchema()

root
 |-- date: date (nullable = false)
 |-- site_id: integer (nullable = false)
 |-- daily_mean_pm10_concentration: float (nullable = false)
 |-- daily_aqi_value: float (nullable = false)



# Q2

In [49]:
location = sc.textFile(location_file).map(lambda x : x.split(','))
location_ = location.map(lambda x: [int(x[0]), x[1],float(x[2]),float(x[3]), x[4],x[5] ])
location_.first()

[60090001,
 'San Andreas-Gold Strike Road',
 38.20185,
 -120.680275,
 'Calaveras',
 'California']

In [50]:
schema_loc = StructType([
            StructField("site_id", IntegerType(), False),
            StructField("site_name", StringType(), False),
            StructField("site_latitude", FloatType(), False),
            StructField("site_longitude", FloatType(),False),
            StructField("county", StringType(),False),
            StructField("state", StringType(),False)
        
])


In [53]:
df_location = ss.createDataFrame(location_, schema_loc)

In [54]:
df_location.show()

+--------+--------------------+-------------+--------------+---------+----------+
| site_id|           site_name|site_latitude|site_longitude|   county|     state|
+--------+--------------------+-------------+--------------+---------+----------+
|60090001|San Andreas-Gold ...|     38.20185|   -120.680275|Calaveras|California|
|60111002| Colusa-Sunrise Blvd|     39.18919|    -121.99887|   Colusa|California|
|60170011|South Lake Tahoe-...|     38.94498|    -119.97061|El Dorado|California|
|60190007|     Fresno-Drummond|    36.705475|    -119.74133|   Fresno|California|
|60190011|    Fresno - Garland|     36.78538|    -119.77321|   Fresno|California|
|60190500|Table Mountain Ai...|     36.98512|    -119.65834|   Fresno|California|
|60195001|        Clovis-Villa|     36.81945|    -119.71643|   Fresno|California|
|60210003|Willows-Colusa St...|     39.53387|   -122.190834|    Glenn|California|
|60231004|              Jacobs|     40.77678|    -124.17949| Humboldt|California|
|60250005|Calexi

In [55]:
df_location.printSchema()

root
 |-- site_id: integer (nullable = false)
 |-- site_name: string (nullable = false)
 |-- site_latitude: float (nullable = false)
 |-- site_longitude: float (nullable = false)
 |-- county: string (nullable = false)
 |-- state: string (nullable = false)



# Q3

In [57]:
df_airquality.sort(['daily_aqi_value', 'date'], ascending = [False, True]).show(n)

+----------+--------+-----------------------------+---------------+
|      date| site_id|daily_mean_pm10_concentration|daily_aqi_value|
+----------+--------+-----------------------------+---------------+
|2020-02-02|60270022|                        562.0|          458.0|
|2020-02-02|60270025|                        537.0|          433.0|
|2020-02-02|60270030|                        361.0|          210.0|
|2020-02-02|60271003|                        253.0|          150.0|
|2020-02-02|60270028|                        212.0|          129.0|
+----------+--------+-----------------------------+---------------+
only showing top 5 rows



# Q4 

In [67]:
df_location.filter(df_location['county'].startswith(start_string)).\
        groupBy('county').count().sort(['count', 'county'], ascending=[False, True] ).show(n)

+---------------+-----+
|         county|count|
+---------------+-----+
| San Bernardino|    9|
|San Luis Obispo|    7|
|  Santa Barbara|    5|
|     Sacramento|    4|
|      San Diego|    3|
|    San Joaquin|    3|
|         Shasta|    3|
|         Sonoma|    3|
|     Stanislaus|    2|
|     San Benito|    1|
+---------------+-----+
only showing top 10 rows



# Q5

In [75]:
df_airquality.filter( (df_airquality['date'] >= start_date) & (df_airquality['date'] <= end_date) ).\
        select('site_id', 'date', 'daily_aqi_value', \
              avg('daily_aqi_value').over(Window.partitionBy('site_id').orderBy('date') ).alias("avg_daily_aqi")  
              ).show(n)

+--------+----------+---------------+------------------+
| site_id|      date|daily_aqi_value|     avg_daily_aqi|
+--------+----------+---------------+------------------+
|60070008|2020-01-01|           25.0|              25.0|
|60070008|2020-01-02|           20.0|              22.5|
|60070008|2020-01-03|           28.0|24.333333333333332|
|60070008|2020-01-04|           16.0|             22.25|
|60070008|2020-01-05|           17.0|              21.2|
|60070008|2020-01-06|           12.0|19.666666666666668|
|60070008|2020-01-07|           24.0|20.285714285714285|
|60090001|2020-01-01|           18.0|              18.0|
|60090001|2020-01-02|           13.0|              15.5|
|60090001|2020-01-03|           13.0|14.666666666666666|
+--------+----------+---------------+------------------+
only showing top 10 rows



# Q6

In [85]:
df_location.filter(f"site_id=={site_id_val}").join(df_airquality, 'site_id', 'left_outer').sort('date').printSchema()

root
 |-- site_id: integer (nullable = false)
 |-- site_name: string (nullable = false)
 |-- site_latitude: float (nullable = false)
 |-- site_longitude: float (nullable = false)
 |-- county: string (nullable = false)
 |-- state: string (nullable = false)
 |-- date: date (nullable = true)
 |-- daily_mean_pm10_concentration: float (nullable = true)
 |-- daily_aqi_value: float (nullable = true)



In [87]:
df_location.filter(f"site_id=={site_id_val}").join(df_airquality, 'site_id', 'left_outer').sort('date').\
            select('date','site_name', 'site_latitude', 'site_longitude',
                    'county', 'state', 'daily_mean_pm10_concentration', 'daily_aqi_value').show(n)

+----------+-----------+-------------+--------------+------+----------+-----------------------------+---------------+
|      date|  site_name|site_latitude|site_longitude|county|     state|daily_mean_pm10_concentration|daily_aqi_value|
+----------+-----------+-------------+--------------+------+----------+-----------------------------+---------------+
|2020-01-01|Dirty Socks|     36.32617|    -117.95512|  Inyo|California|                          3.0|            3.0|
|2020-01-02|Dirty Socks|     36.32617|    -117.95512|  Inyo|California|                          5.0|            5.0|
|2020-01-03|Dirty Socks|     36.32617|    -117.95512|  Inyo|California|                          4.0|            4.0|
|2020-01-04|Dirty Socks|     36.32617|    -117.95512|  Inyo|California|                          4.0|            4.0|
|2020-01-05|Dirty Socks|     36.32617|    -117.95512|  Inyo|California|                          6.0|            6.0|
|2020-01-06|Dirty Socks|     36.32617|    -117.95512|  I

In [88]:
sc.stop()
ss.stop()