In [3]:
! pip install pyspark -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


# Analyzing temperature and API datasets across US states using PySpark


In [5]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as fn

sc = SparkContext.getOrCreate()

spark = SparkSession\
    .builder\
    .appName('02')\
    .getOrCreate()

In [None]:
# download the temperature and aqi data sets
%%bash

if [[ ! -f us-daily-temperatures-2021.csv.csv ]]; then
 wget https://syr-bda.s3.us-east-2.amazonaws.com/us-daily-temperatures-2021.csv -q
fi

if [[ ! -f us-daily-aqi-2021.csv.csv ]]; then
 wget https://syr-bda.s3.us-east-2.amazonaws.com/us-daily-aqi-2021.csv -q
fi

In [None]:
temperature = spark.read.csv('us-daily-temperatures-2021.csv', header =  True)
aqi = spark.read.csv('us-daily-aqi-2021.csv', header = True)
temperature = temperature.withColumn("mean_temperature_f", temperature["mean_temperature_f"].cast("int"))
temperature = temperature.withColumn("max_temp_f", temperature["max_temp_f"].cast("int"))
temperature = temperature.withColumn("sites_reporting", temperature["sites_reporting"].cast("int"))
temperature = temperature.withColumn("max_temp_hour", temperature["max_temp_hour"].cast("int"))
temperature.printSchema()
temperature.count()
aqi.printSchema()
aqi.count()



root
 |-- date: string (nullable = true)
 |-- state: string (nullable = true)
 |-- county: string (nullable = true)
 |-- city: string (nullable = true)
 |-- sites_reporting: integer (nullable = true)
 |-- mean_temperature_f: integer (nullable = true)
 |-- max_temp_f: integer (nullable = true)
 |-- max_temp_hour: integer (nullable = true)

root
 |-- date: string (nullable = true)
 |-- state: string (nullable = true)
 |-- county: string (nullable = true)
 |-- aqi: string (nullable = true)
 |-- category: string (nullable = true)



130922

Finding the **mean** temperature, the **max** temperature, and the **total** sites reporting, for each unique `date`, `state`, and `county` in the `temperature` data frame.

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *


temperature_county = temperature.groupBy("date", "state", "county").agg(
    round(mean("mean_temperature_f")).alias("mean_temperature_f"),
    max("max_temp_f").alias("max_temp_f"),
    sum("sites_reporting").alias("sites_reporting"))
temperature_county.show()




+----------+------------+----------+------------------+----------+---------------+
|      date|       state|    county|mean_temperature_f|max_temp_f|sites_reporting|
+----------+------------+----------+------------------+----------+---------------+
|2021-01-01|       Idaho|  Shoshone|              33.0|        37|              1|
|2021-01-01|       Texas|  Brazoria|              44.0|        53|              2|
|2021-01-02|North Dakota|     Burke|              32.0|        38|              1|
|2021-01-03|  California| Riverside|              52.0|        72|             14|
|2021-01-03|South Dakota|    Custer|              37.0|        43|              1|
|2021-01-04|       Idaho|Twin Falls|              36.0|        44|              1|
|2021-01-05|    Illinois| Champaign|              29.0|        31|              1|
|2021-01-05|    Maryland| Baltimore|              38.0|        41|              3|
|2021-01-05|     Wyoming|     Teton|              22.0|        31|              3|
|202

In [None]:
print('Rows in temperature_county:', temperature_county.count())
temperature_county.orderBy('date', 'state', 'county').show(10)



Rows in temperature_county: 138555
+----------+--------+--------------------+------------------+----------+---------------+
|      date|   state|              county|mean_temperature_f|max_temp_f|sites_reporting|
+----------+--------+--------------------+------------------+----------+---------------+
|2021-01-01| Alabama|            Escambia|              63.0|        68|              1|
|2021-01-01| Alabama|           Jefferson|              65.0|        73|              2|
|2021-01-01|  Alaska|              Denali|               0.0|         6|              1|
|2021-01-01|  Alaska|Fairbanks North Star|              -9.0|        -1|              7|
|2021-01-01| Arizona|             Cochise|              39.0|        49|              1|
|2021-01-01| Arizona|            Coconino|              29.0|        35|              1|
|2021-01-01| Arizona|            Maricopa|              49.0|        65|              1|
|2021-01-01| Arizona|              Navajo|              29.0|        42|   



Creating a new data frame called `county_max_temp_hour` that reports the `max_temp_hour` at the same level of aggregation as `temperature_county` in the previous step.

In [None]:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

window_spec = Window.partitionBy("date", "state", 'county') \
 .orderBy(desc('max_temp_f'))

df = temperature\
  .withColumn('row_num', row_number()\
              .over(window_spec))
df.count()


df2 = df.filter(df.row_num == 1)


county_max_temp_hour = df2.select("date", "state", "county","max_temp_hour")
county_max_temp_hour.show()

temperature_county_final = temperature_county.join(
    county_max_temp_hour,
    on=["date", "state", "county"],
    how="left"
)

temperature_county_final.count()



+----------+----------+--------------------+-------------+
|      date|     state|              county|max_temp_hour|
+----------+----------+--------------------+-------------+
|2021-01-01|   Alabama|            Escambia|            0|
|2021-01-01|    Alaska|              Denali|            4|
|2021-01-01|    Alaska|Fairbanks North Star|           12|
|2021-01-01|   Arizona|            Coconino|           12|
|2021-01-01|   Arizona|            Maricopa|           13|
|2021-01-01|   Arizona|              Navajo|           14|
|2021-01-01|   Arizona|                Pima|           15|
|2021-01-01|California|               Butte|           15|
|2021-01-01|California|               Glenn|           14|
|2021-01-01|California|            Imperial|           14|
|2021-01-01|California|            Mariposa|           12|
|2021-01-01|California|          Sacramento|           13|
|2021-01-01|California|          San Benito|           13|
|2021-01-01|California|      San Bernardino|           1

In [None]:
print('Rows in county_max_temp_hour:', county_max_temp_hour.count())
county_max_temp_hour.orderBy('date', 'state', 'county').show(10)

Rows in county_max_temp_hour: 138555
+----------+--------+--------------------+-------------+
|      date|   state|              county|max_temp_hour|
+----------+--------+--------------------+-------------+
|2021-01-01| Alabama|            Escambia|            0|
|2021-01-01| Alabama|           Jefferson|           13|
|2021-01-01|  Alaska|              Denali|            4|
|2021-01-01|  Alaska|Fairbanks North Star|           12|
|2021-01-01| Arizona|             Cochise|           12|
|2021-01-01| Arizona|            Coconino|           12|
|2021-01-01| Arizona|            Maricopa|           13|
|2021-01-01| Arizona|              Navajo|           14|
|2021-01-01| Arizona|                Pima|           15|
|2021-01-01|Arkansas|             Pulaski|           10|
+----------+--------+--------------------+-------------+
only showing top 10 rows



In [None]:
print('Rows in temperature_county_final:', temperature_county_final.count())
temperature_county_final.orderBy('date', 'state', 'county').show(10)

aqi.show()

Rows in temperature_county_final: 138555
+----------+--------+--------------------+------------------+----------+---------------+-------------+
|      date|   state|              county|mean_temperature_f|max_temp_f|sites_reporting|max_temp_hour|
+----------+--------+--------------------+------------------+----------+---------------+-------------+
|2021-01-01| Alabama|            Escambia|              63.0|        68|              1|            0|
|2021-01-01| Alabama|           Jefferson|              65.0|        73|              2|           13|
|2021-01-01|  Alaska|              Denali|               0.0|         6|              1|            4|
|2021-01-01|  Alaska|Fairbanks North Star|              -9.0|        -1|              7|           12|
|2021-01-01| Arizona|             Cochise|              39.0|        49|              1|           12|
|2021-01-01| Arizona|            Coconino|              29.0|        35|              1|           12|
|2021-01-01| Arizona|           



Joining `aqi` to `temperature_county_final` and call the resulting data frame `daily_county_measurements`.Then finding out the value where the **highest recorded `aqi`** occurred in 2021. In the event of a tie, we will include the first instance.

In [None]:
# join aqi to temperature_county_final
daily_county_measurements = temperature_county_final.join(aqi,
                                                          on=['date', 'state', 'county'], how= 'inner')
highest_aqi = daily_county_measurements.select('date','county','state','aqi').filter(year("date")==2021).orderBy(col("aqi"), ascending=False)


highest_aqi_values = highest_aqi.first()
date = highest_aqi_values['date']
county = highest_aqi_values['county']
state = highest_aqi_values['state']
aqi_value = highest_aqi_values['aqi']
print(f"The highest recorded AQI value in 2021 occurred on {date}, in {county} County, {state}, and had a value of {aqi_value}.")


The highest recorded AQI value in 2021 occurred on 2021-01-11, in Kern County, California, and had a value of 99.


 Creating a new data frame called `highest_temperates_by_state_2021` that contains one row per state, and shows the `date`, `state`, and `max_temp_f` for the **highest recorded temperature** in that state in 2021.

In [None]:

highest_temperates_by_state_2021 = temperature_county_final.filter(year('date')==2021).groupBy('state').agg(max('max_temp_f'))

highest_temperatures_by_state_2021 = temperature_county_final \
    .filter(year("date") == 2021) \
    .groupBy("state","county") \
    .agg(max(col("max_temp_f")))
highest_temperates_by_state_2021.show()

+--------------------+---------------+
|               state|max(max_temp_f)|
+--------------------+---------------+
|                Utah|            115|
|              Hawaii|             89|
|   Country Of Mexico|            123|
|           Minnesota|             98|
|                Ohio|             99|
|              Oregon|            116|
|            Arkansas|            101|
|District Of Columbia|             97|
|               Texas|            107|
|        North Dakota|            103|
|        Pennsylvania|             97|
|         Connecticut|            100|
|             Vermont|             92|
|            Nebraska|            102|
|              Nevada|            118|
|          Washington|            116|
|            Illinois|             99|
|            Oklahoma|            103|
|            Delaware|             97|
|              Alaska|             87|
+--------------------+---------------+
only showing top 20 rows

