## This notebook has two major sections. The first loads in the large, combined dataset and outputs monthly files.

## The second loads in these monthly files and performs instrument calculations and aggregations.

# Begin Splitting

In [1]:
import findspark
findspark.init()

In [427]:
import pyspark
import pandas as pd 
import numpy as np
import os 
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql.window import Window
from math import radians, cos, sin, asin, sqrt
from pyspark.sql import functions as F
from pyspark.sql.functions import col, row_number, round, substring, count, when, isnan, min, max, avg, stddev_samp, abs, sum, count
from pyspark.ml.feature import MinMaxScaler, StandardScaler, VectorAssembler
from pyspark.ml import Pipeline

from datetime import date, timedelta

import datetime

from pyspark_dist_explore import hist
import matplotlib.pyplot as plt

from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

pd.set_option('display.max_columns', None)

In [3]:
spark = SparkSession.builder.master("local[*]").config("spark.executor.memory", "48g").config("spark.driver.memory", "48g").config("spark.driver.maxResultSize","0").getOrCreate()

In [5]:
gdrive_path = 'I:\\.shortcut-targets-by-id\\11wLy1WKwOTcthBs1rpfEzkqax2BZG-6E\\W210_Capstone\\Data\\'
local_path = 'C:\\Users\\matts\\Documents\\Berkeley MIDS\\DataSci 210 Capstone\\non-push files\\data\\'

In [7]:
wind_grid_points = spark.read.option("header",True).csv(os.path.join(local_path,'all_wind_measurements.csv'))

In [8]:
wind_grid_points = wind_grid_points.withColumnRenamed('lat','wind_lat').withColumnRenamed('lon','wind_lon')

In [9]:
used_grid_points = spark.read.option("header",True).csv(os.path.join(local_path,'all_used_wind_sites.csv'))

In [10]:
used_grid_points.limit(5).show()

+----------+---------+-----------+
|grid_index| wind_lat|   wind_lon|
+----------+---------+-----------+
|       443|37.779999|-122.160004|
|       722|34.529999|-120.410004|
|       631|38.279999|-120.910004|
|       589|39.279999|-121.160004|
|      1105|33.779999|-117.910004|
+----------+---------+-----------+



In [11]:
used_grid_points.printSchema()

root
 |-- grid_index: string (nullable = true)
 |-- wind_lat: string (nullable = true)
 |-- wind_lon: string (nullable = true)



In [12]:
used_grid_points = used_grid_points.withColumn("wind_lat",used_grid_points.wind_lat.cast('double'))
used_grid_points = used_grid_points.withColumn("wind_lon",used_grid_points.wind_lon.cast('double'))

In [16]:
wind_grid_points = wind_grid_points.withColumn("wind_lat",wind_grid_points.wind_lat.cast('double'))
wind_grid_points = wind_grid_points.withColumn("wind_lon",wind_grid_points.wind_lon.cast('double'))
wind_grid_points = wind_grid_points.withColumn("u",wind_grid_points.u.cast('double'))
wind_grid_points = wind_grid_points.withColumn("v",wind_grid_points.v.cast('double'))
wind_grid_points = wind_grid_points.withColumn("wdir",wind_grid_points.wdir.cast('double'))
wind_grid_points = wind_grid_points.withColumn("wspd",wind_grid_points.wspd.cast('double'))
wind_grid_points = wind_grid_points.drop('_c0')

In [22]:
wind_grid_points = wind_grid_points.withColumn("wind_lat",round(col('wind_lat'),6))
wind_grid_points = wind_grid_points.withColumn("wind_lon",round(col('wind_lon'),6))
wind_grid_points = wind_grid_points.withColumn("u",round(col('u'),6))
wind_grid_points = wind_grid_points.withColumn("v",round(col('v'),6))
wind_grid_points = wind_grid_points.withColumn("wdir",round(col('wdir'),6))
wind_grid_points = wind_grid_points.withColumn("wspd",round(col('wspd'),6))

In [23]:
wind_grid_points.limit(5).show()

+---------+-----------+--------+---------+----------+--------+-------------------+
| wind_lat|   wind_lon|       u|        v|      wdir|    wspd|           Datetime|
+---------+-----------+--------+---------+----------+--------+-------------------+
|42.279999|-124.410004|1.316132| -4.17089|287.513185|4.373617|2001-01-01 00:00:00|
|42.029999|-124.410004|1.720276|-4.124691|292.639445|4.469052|2001-01-01 00:00:00|
|41.779999|-124.410004|2.337209|-4.626282| 296.80302|5.183149|2001-01-01 00:00:00|
|41.529999|-124.410004|2.451185|-5.043875|295.918485|5.607939|2001-01-01 00:00:00|
|41.279999|-124.410004| 2.09671|-5.050475|292.545832|5.468408|2001-01-01 00:00:00|
+---------+-----------+--------+---------+----------+--------+-------------------+



In [17]:
wind_grid_points.printSchema()

root
 |-- wind_lat: double (nullable = true)
 |-- wind_lon: double (nullable = true)
 |-- u: double (nullable = true)
 |-- v: double (nullable = true)
 |-- wdir: double (nullable = true)
 |-- wspd: double (nullable = true)
 |-- Datetime: string (nullable = true)



## Use inner join to filter out unused points

In [24]:
used_wind_observations = wind_grid_points.join(used_grid_points, ['wind_lat','wind_lon'], how='inner')

In [25]:
used_wind_observations.count()

71825712

### Pre-compute min-max for scaled version of instrument below

In [99]:
min_wspd = used_wind_observations.select(min('wspd')).collect()
max_wspd = used_wind_observations.select(max('wspd')).collect()

print("Max wspd is ",max_wspd,"; min wspd is",min_wspd,".",sep="")

Max wspd is [Row(max(wspd)=19.395623)]; min wspd is[Row(min(wspd)=0.000415)].


In [103]:
min_wspd = min_wspd[0][0]
max_wspd = max_wspd[0][0]

TypeError: 'float' object is not subscriptable

In [104]:
print("Max wspd is ",max_wspd,"; min wspd is ",min_wspd,".",sep="")

Max wspd is 19.395623; min wspd is 0.000415.


   ## Save off subset

In [26]:
used_wind_observations.write.parquet(os.path.join(local_path,'wind_subset'))

## Start splitting out --

In [57]:
# create by-month data structure

month_bins_pd = pd.date_range(start='2000-12-01',end='2017-12-01',freq='m')

month_bins = []

for month in month_bins_pd:
    month_bins.append(datetime.datetime.strftime(month+timedelta(days=1), "%Y-%m"))

print(month_bins)

['2001-01', '2001-02', '2001-03', '2001-04', '2001-05', '2001-06', '2001-07', '2001-08', '2001-09', '2001-10', '2001-11', '2001-12', '2002-01', '2002-02', '2002-03', '2002-04', '2002-05', '2002-06', '2002-07', '2002-08', '2002-09', '2002-10', '2002-11', '2002-12', '2003-01', '2003-02', '2003-03', '2003-04', '2003-05', '2003-06', '2003-07', '2003-08', '2003-09', '2003-10', '2003-11', '2003-12', '2004-01', '2004-02', '2004-03', '2004-04', '2004-05', '2004-06', '2004-07', '2004-08', '2004-09', '2004-10', '2004-11', '2004-12', '2005-01', '2005-02', '2005-03', '2005-04', '2005-05', '2005-06', '2005-07', '2005-08', '2005-09', '2005-10', '2005-11', '2005-12', '2006-01', '2006-02', '2006-03', '2006-04', '2006-05', '2006-06', '2006-07', '2006-08', '2006-09', '2006-10', '2006-11', '2006-12', '2007-01', '2007-02', '2007-03', '2007-04', '2007-05', '2007-06', '2007-07', '2007-08', '2007-09', '2007-10', '2007-11', '2007-12', '2008-01', '2008-02', '2008-03', '2008-04', '2008-05', '2008-06', '2008-07'

In [60]:
# create compare column

used_wind_observations = used_wind_observations.withColumn('y-m', substring('Datetime', 1,7))

In [61]:
used_wind_observations.limit(25).show()

+---------+-----------+---------+---------+----------+--------+-------------------+----------+-------+
| wind_lat|   wind_lon|        u|        v|      wdir|    wspd|           Datetime|grid_index|    y-m|
+---------+-----------+---------+---------+----------+--------+-------------------+----------+-------+
|42.029999|-124.160004| 0.170099| -1.91193|275.084065|1.919482|2001-01-01 00:00:00|        44|2001-01|
|41.779999|-124.160004| 0.610318|-2.619918|283.113362|2.690066|2001-01-01 00:00:00|       123|2001-01|
|41.529999|-124.160004| 0.897872|-3.260106|285.398186|3.381489|2001-01-01 00:00:00|       124|2001-01|
|41.279999|-124.160004| 0.849772|-3.431103| 283.91038|3.534768|2001-01-01 00:00:00|       125|2001-01|
|41.029999|-124.160004| 0.826245|-3.327305|283.945749|3.428358|2001-01-01 00:00:00|       126|2001-01|
|40.779999|-124.160004| 0.781805|-3.174908|283.833566|3.269749|2001-01-01 00:00:00|       127|2001-01|
|40.529999|-124.160004| 0.707041|-2.862313|283.875283|2.948346|2001-01-01

In [75]:
# Check for nulls

used_wind_observations.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in used_wind_observations.columns]).show()

+--------+--------+---+---+----+----+--------+----------+---+
|wind_lat|wind_lon|  u|  v|wdir|wspd|Datetime|grid_index|y-m|
+--------+--------+---+---+----+----+--------+----------+---+
|       0|       0|  0|  0|   0|   0|       0|         0|  0|
+--------+--------+---+---+----+----+--------+----------+---+



In [84]:
# Check for large values

used_wind_observations.select([count(when((col(c).cast('integer') > 360), c)).alias(c) for c in used_wind_observations.columns]).show()

+--------+--------+---+---+----+----+--------+----------+---+
|wind_lat|wind_lon|  u|  v|wdir|wspd|Datetime|grid_index|y-m|
+--------+--------+---+---+----+----+--------+----------+---+
|       0|       0|  0|  0|   0|   0|       0|  62139672|  0|
+--------+--------+---+---+----+----+--------+----------+---+



In [91]:
# Check for zeros (are all of these valid??)

used_wind_observations.select([count(when((col(c).cast('float') == 0), c)).alias(c) for c in used_wind_observations.columns]).show()

+--------+--------+---+---+----+----+--------+----------+---+
|wind_lat|wind_lon|  u|  v|wdir|wspd|Datetime|grid_index|y-m|
+--------+--------+---+---+----+----+--------+----------+---+
|       0|       0|  0|  0|   0|   0|       0|         0|  0|
+--------+--------+---+---+----+----+--------+----------+---+



In [92]:
print("U columns with 0 values:",used_wind_observations.filter(col('u') == 0).count())
print("V columns with 0 values:",used_wind_observations.filter(col('v') == 0).count())

U columns with 0 values: 0
V columns with 0 values: 0


In [90]:
# Check for u and v being 0

used_wind_observations.filter((col('u') + col('v')) == 0).count()

14

In [64]:
# create and write out dataframe for each month

for ym in month_bins:
    
    print("Now working on",ym,"subset.")
    
    file_name = os.path.join(local_path,'wind_subset_by_month\\', ym)
    
    temp_df = used_wind_observations.filter(col("y-m") == ym)
    
    print("Total observations for ",ym,": ",temp_df.count(),".",sep="")
    print("Now writing")
    
    temp_df.write.parquet(file_name)

Now working on 2001-01 subset.
Total observations for 2001-01: 358608.
Now writing
Now working on 2001-02 subset.
Total observations for 2001-02: 323904.
Now writing
Now working on 2001-03 subset.
Total observations for 2001-03: 358608.
Now writing
Now working on 2001-04 subset.
Total observations for 2001-04: 347040.
Now writing
Now working on 2001-05 subset.
Total observations for 2001-05: 358608.
Now writing
Now working on 2001-06 subset.
Total observations for 2001-06: 347040.
Now writing
Now working on 2001-07 subset.
Total observations for 2001-07: 358608.
Now writing
Now working on 2001-08 subset.
Total observations for 2001-08: 358608.
Now writing
Now working on 2001-09 subset.
Total observations for 2001-09: 347040.
Now writing
Now working on 2001-10 subset.
Total observations for 2001-10: 358608.
Now writing
Now working on 2001-11 subset.
Total observations for 2001-11: 347040.
Now writing
Now working on 2001-12 subset.
Total observations for 2001-12: 358608.
Now writing
Now 

Now working on 2009-04 subset.
Total observations for 2009-04: 347040.
Now writing
Now working on 2009-05 subset.
Total observations for 2009-05: 358608.
Now writing
Now working on 2009-06 subset.
Total observations for 2009-06: 347040.
Now writing
Now working on 2009-07 subset.
Total observations for 2009-07: 358608.
Now writing
Now working on 2009-08 subset.
Total observations for 2009-08: 358608.
Now writing
Now working on 2009-09 subset.
Total observations for 2009-09: 347040.
Now writing
Now working on 2009-10 subset.
Total observations for 2009-10: 358608.
Now writing
Now working on 2009-11 subset.
Total observations for 2009-11: 347040.
Now writing
Now working on 2009-12 subset.
Total observations for 2009-12: 358608.
Now writing
Now working on 2010-01 subset.
Total observations for 2010-01: 358608.
Now writing
Now working on 2010-02 subset.
Total observations for 2010-02: 323904.
Now writing
Now working on 2010-03 subset.
Total observations for 2010-03: 358608.
Now writing
Now 

Now working on 2017-07 subset.
Total observations for 2017-07: 358608.
Now writing
Now working on 2017-08 subset.
Total observations for 2017-08: 358608.
Now writing
Now working on 2017-09 subset.
Total observations for 2017-09: 347040.
Now writing
Now working on 2017-10 subset.
Total observations for 2017-10: 358608.
Now writing
Now working on 2017-11 subset.
Total observations for 2017-11: 347040.
Now writing
Now working on 2017-12 subset.
Total observations for 2017-12: 358608.
Now writing


# End Splitting

___________________


# Begin Calculations/Aggregation Step-through
## If you want to just run this, go down to Begin Calculation/Aggregation Loop

## Load in lookup tables and data sources

## Quick overview

### Pre-compute scalars for distance and TPY norming:
#### Load in `school_year_to_point_lookup_top_5_filtered`, select point_source_index, point_source_pm25_tpy,school_to_ps_geod_dist_m
#### Calculate avg, stddev_samp, max, min (AFTER standard scaling)
#### Save as scalars (outside of loop)

### Make list to hold Pandas dataframes of aggregated instruments

## Start loop

### initialize empty Pandas dataframe

### Load in month of wind data
### Temporarily store backup of wind readings for self-joining (remove lat/lon/y-m) (temp_wind_readings_df)
### Temporarily store simple averages of wdir/wspd per zip code (to Pandas?) (temp_df_avgs_by_zip)

--- wind_temp_df

## Perform joins:

### First join: inner: wind points to schools from pre-computed lookup (school_lookup)

---compute zip code avgs, save off

--add column for current year
join year lookup for ps
drop column for current year

### Second itty-bitty join: measurement year to ps_lookup year to avoid duplicates

### third join: left: schools to top five point sources from pre-computed lookup (school_to_ps_lookup)
--join on CDSCode and lookup_year

### fourth join: left: point sources to associated wind grid points from pre-computed lookup (ps_lookup)
### Fifth join: left: point source wind grid indices to wind measurements at the same time marker (wind_temp_df)
join on grid_index and Datetime
renamed u, v, wspd, wdir_wrt_0N

### Compute Θd for each row (wind_alignment)
#### **Be sure to subtract the raw value from 180 so that high values indicate good alignment**

### Add columns for normed TPY and Dps (ps_pm25_tpy_normed, school_to_ps_geod_dist_m_normed)

((X - Xmin) / (Xmax - Xmin))

min_wspd
max_wspd
ps_TPY_mean
ps_dist_mean
ps_TPY_sd
ps_dist_sd
ps_TPY_min
ps_dist_min
ps_TPY_max
ps_dist_max


### Add columns for normed Θd and wspd for v5 (wind_alignment_normed, wspd_normed)

### Compute each instrument for each row:
#### Izmd_v1_unnormed
#### Izmd_v2_nodist_unnormed
#### Izmd_v3_normed_D_and_TPY
#### Izmd_v4_nodist_normed_TPY
#### Izmd_v5_all_normed

### Save off completely un-aggregated version (wind_subset_by_month_joined_unaggregated / yyyy-mm)

### Aggregate to CDSCode level, summing each instrument

### Aggregate at school zip code, averaging each instrument 

### Rejoin with simple avgs

### Save off version aggregated at school level (aggregated_inst_by_month / yyyy-mm)

### Convert aggregated version to Pandas df and append to list

## End loop

### Append list of aggregated instruments into single dataframe and save off

## Below, we will walk through a single example month to check the code.
## After that, we will define the loop to run through all months and run it.

In [336]:
# load files

local_dir = 'C:\\Users\\matts\\Documents\\Berkeley MIDS\\DataSci 210 Capstone\\non-push files\\data\\'

school_lookup = spark.read.option("header",True).csv(os.path.join(local_dir, 'wind_grid_to_school_lookup_filtered.csv'))
ps_year_lookup = spark.read.option("header",True).csv(os.path.join(local_dir, 'year_lookup.csv'))
school_to_ps_lookup = spark.read.option("header",True).csv(os.path.join(local_dir, 'school_year_to_point_lookup_top_5_filtered.csv'))
ps_lookup = spark.read.option("header",True).csv(os.path.join(local_dir, 'wind_grid_to_ps_point_lookup_filtered.csv'))

In [164]:
school_lookup.limit(5).show()

+-----------------+--------------+----------+----------+-----------+--------------------------+
|school_grid_index|       CDSCode|school_zip|school_lat| school_lon|wind_to_school_geod_dist_m|
+-----------------+--------------+----------+----------+-----------+--------------------------+
|              789|10101080119628|     93706| 36.730273|-119.807915|               10656.24466|
|              866|10621096005839|     93631| 36.554793|-119.504582|               8904.455363|
|              827|10621176109920|     93720| 36.875241|-119.759962|               13828.73923|
|              827|10621176116313|     93611|   36.8173|-119.674236|                4329.91988|
|              753|10621251030477|     93234| 36.208894|-120.098567|               9630.587612|
+-----------------+--------------+----------+----------+-----------+--------------------------+



In [165]:
school_to_ps_lookup.limit(5).show()

+--------------+--------+-----------------+---------------------+----------------+------------------------+---------------+----------------+
|       CDSCode|ps_index|point_source_year|point_source_pm25_tpy|point_source_zip|school_to_ps_geod_dist_m|angle_to_school|ps_distance_rank|
+--------------+--------+-----------------+---------------------+----------------+------------------------+---------------+----------------+
|10621171030071|     111|             2002|           2.30478261|           93612|             710.4600631|   -53.65448057|               1|
|10621171030071|     624|             2002|               10.089|           93727|              6602.12932|    -1.83659538|               2|
|10621171030071|     787|             2002|              29.2146|           93711|             11296.27967|    100.0364033|               3|
|10621171030071|      95|             2002|          2.230020551|           93721|             14026.48193|    32.45898722|               4|
|106211710300

In [166]:
ps_lookup.limit(5).show()

+-------------+--------+----------------------+
|ps_grid_index|ps_index|wind_to_ps_geod_dist_m|
+-------------+--------+----------------------+
|          443|    1019|           9658.554153|
|          722|    1097|           8926.231706|
|          631|     110|           11822.85566|
|          589|    1103|           13572.37684|
|         1105|     124|           15949.34107|
+-------------+--------+----------------------+



In [106]:
# TPY/dist stats part 1

cols_to_drop = ['point_source_index','CDSCode', 'point_source_year', 'point_source_zip', 'angle_to_school', 'ps_distance_rank']

ps_agg = school_to_ps_lookup.drop(*cols_to_drop).distinct().cache()

ps_agg.limit(5).show()

+---------------------+------------------------+
|point_source_pm25_tpy|school_to_ps_geod_dist_m|
+---------------------+------------------------+
|            5.3369886|             2390.635165|
|            14.182397|             6092.817218|
|              1.58245|             4700.183159|
|           4.59813891|             3372.730667|
|           3.65458735|              3347.69715|
+---------------------+------------------------+



In [113]:
# TPY/dist stats part 2
# compute mean/sd scalars

ps_stats = ps_agg.select(avg('point_source_pm25_tpy'), avg('school_to_ps_geod_dist_m'), 
                         stddev_samp('point_source_pm25_tpy'), stddev_samp('school_to_ps_geod_dist_m')).collect()

ps_stats

ps_stats = ps_stats[0]

print(ps_stats)

Row(avg(point_source_pm25_tpy)=13.225952014379654, avg(school_to_ps_geod_dist_m)=10453.018640947166, stddev_samp(point_source_pm25_tpy)=50.486910084637685, stddev_samp(school_to_ps_geod_dist_m)=13818.527648504192)


In [114]:
# TPY/dist stats part 3
# save out scalars for mean/sd

ps_TPY_mean = ps_stats[0]
ps_dist_mean = ps_stats[1]
ps_TPY_sd = ps_stats[2]
ps_dist_sd = ps_stats[3]

print("avg tpy:", ps_TPY_mean)

avg tpy: 13.225952014379654


In [116]:
# TPY/dist stats part 4
# use scalars to scale values

ps_agg = ps_agg.withColumn('TPY_norm', (col('point_source_pm25_tpy') - ps_TPY_mean)/ps_TPY_sd).withColumn('dist_norm', (col('school_to_ps_geod_dist_m') - ps_dist_mean)/ps_dist_sd)

ps_stats_mm = ps_agg.select(min('TPY_norm'), min('dist_norm'), max('TPY_norm'), max('dist_norm')).collect()

ps_stats_mm = ps_stats_mm[0]

print(ps_stats_mm)

Row(min(TPY_norm)=-0.2464301038332982, min(dist_norm)=-0.7541348842439959, max(TPY_norm)=61.78923457101895, max(dist_norm)=12.084924082134723)


In [118]:
# TPY/dist stats part 3
# save out scalars for min/max

ps_TPY_min = ps_stats_mm[0]
ps_dist_min = ps_stats_mm[1]
ps_TPY_max = ps_stats_mm[2]
ps_dist_max = ps_stats_mm[3]

print("min_norm_tpy:", ps_TPY_min)

min_norm_tpy: -0.2464301038332982


## Data structures (testing)

In [316]:
zmy_agg_list = []
school_my_agg_list = []
df_avgs_list = []

In [317]:
in_dir = 'C:\\Users\\matts\\Documents\\Berkeley MIDS\\DataSci 210 Capstone\\non-push files\\data\\wind_subset_by_month\\'

out_dir_unagged = 'C:\\Users\\matts\\Documents\\Berkeley MIDS\\DataSci 210 Capstone\\non-push files\\data\\raw_my_spark_dfs'
out_dir_zmy = 'C:\\Users\\matts\\Documents\\Berkeley MIDS\\DataSci 210 Capstone\\non-push files\\data\\zmy_agged_dfs\\'
out_dir_school_my = 'C:\\Users\\matts\\Documents\\Berkeley MIDS\\DataSci 210 Capstone\\non-push files\\data\\school_my_agged_dfs\\'

for parquet_file in os.listdir(in_dir):
    
    if parquet_file == '2001-01':
    
        # for holding augmented df at the zip code level
        temp_zmy_df = pd.DataFrame()
        
        # for holding augmented df at the school level
        temp_school_my_df = pd.DataFrame()

        # for holding summmary statistics
        temp_df_avgs = pd.DataFrame()

        # read in one month
        temp_meas_df = spark.read.parquet(os.path.join(in_dir, parquet_file))

In [318]:
temp_meas_df.limit(5).show()

+---------+-----------+--------+---------+----------+--------+-------------------+----------+-------+
| wind_lat|   wind_lon|       u|        v|      wdir|    wspd|           Datetime|grid_index|    y-m|
+---------+-----------+--------+---------+----------+--------+-------------------+----------+-------+
|42.029999|-124.160004|0.170099| -1.91193|275.084065|1.919482|2001-01-01 00:00:00|        44|2001-01|
|41.779999|-124.160004|0.610318|-2.619918|283.113362|2.690066|2001-01-01 00:00:00|       123|2001-01|
|41.529999|-124.160004|0.897872|-3.260106|285.398186|3.381489|2001-01-01 00:00:00|       124|2001-01|
|41.279999|-124.160004|0.849772|-3.431103| 283.91038|3.534768|2001-01-01 00:00:00|       125|2001-01|
|41.029999|-124.160004|0.826245|-3.327305|283.945749|3.428358|2001-01-01 00:00:00|       126|2001-01|
+---------+-----------+--------+---------+----------+--------+-------------------+----------+-------+



In [319]:
# compute wind dir wrt 0N

temp_meas_df = temp_meas_df.withColumn('wdir_wrt_0N',(180*F.atan2(col('u'), col('v'))/(3.141592653589793238462)).cast('double'))

In [320]:
# drop lat/lon, wdir, and y-m and store temp df to re-join for ps wind readings (dropped can be recovered if needed)
# this assumes wdir calc is correct--that is verified below but these were run out of order

wind_temp_df = temp_meas_df.drop('wind_lat','wind_lon','wdir','y-m')

wind_temp_df.cache()

wind_temp_df.limit(10).show()

+---------+---------+--------+-------------------+----------+-------------------+
|        u|        v|    wspd|           Datetime|grid_index|        wdir_wrt_0N|
+---------+---------+--------+-------------------+----------+-------------------+
| 0.170099| -1.91193|1.919482|2001-01-01 00:00:00|        44| 174.91594219384388|
| 0.610318|-2.619918|2.690066|2001-01-01 00:00:00|       123| 166.88664073725573|
| 0.897872|-3.260106|3.381489|2001-01-01 00:00:00|       124| 164.60181000653935|
| 0.849772|-3.431103|3.534768|2001-01-01 00:00:00|       125| 166.08961753606766|
| 0.826245|-3.327305|3.428358|2001-01-01 00:00:00|       126| 166.05424716779632|
| 0.781805|-3.174908|3.269749|2001-01-01 00:00:00|       127| 166.16642852745528|
| 0.707041|-2.862313|2.948346|2001-01-01 00:00:00|       128|  166.1247092524355|
| 1.005051|-3.470103|3.612719|2001-01-01 00:00:00|       129| 163.84732058815214|
|-0.543557|-1.178743|1.298033|2001-01-01 00:00:00|       161|-155.24402584649116|
|-0.377821|-1.73

In [321]:
# rename for explicitness of measurements

temp_meas_df = (temp_meas_df
                .withColumnRenamed('wind_lat','school_wind_lat')
                .withColumnRenamed('wind_lon','school_wind_lon')
                .withColumnRenamed('u','school_u')
                .withColumnRenamed('v','school_v')
                .withColumnRenamed('grid_index','school_grid_index')
               ).drop('wdir').drop('y-m') # wdir is wrt 0° E and is confusing; y-m not needed

In [322]:
# spot check calculations (run out of order but this checks the wind dir calc above)
temp_meas_df.limit(25).show()

+---------------+---------------+---------+---------+--------+-------------------+-----------------+-------------------+
|school_wind_lat|school_wind_lon| school_u| school_v|    wspd|           Datetime|school_grid_index|        wdir_wrt_0N|
+---------------+---------------+---------+---------+--------+-------------------+-----------------+-------------------+
|      42.029999|    -124.160004| 0.170099| -1.91193|1.919482|2001-01-01 00:00:00|               44| 174.91594219384388|
|      41.779999|    -124.160004| 0.610318|-2.619918|2.690066|2001-01-01 00:00:00|              123| 166.88664073725573|
|      41.529999|    -124.160004| 0.897872|-3.260106|3.381489|2001-01-01 00:00:00|              124| 164.60181000653935|
|      41.279999|    -124.160004| 0.849772|-3.431103|3.534768|2001-01-01 00:00:00|              125| 166.08961753606766|
|      41.029999|    -124.160004| 0.826245|-3.327305|3.428358|2001-01-01 00:00:00|              126| 166.05424716779632|
|      40.779999|    -124.160004

In [323]:
temp_meas_df.count()

358608

In [368]:
combined_df = temp_meas_df.join(school_lookup, ['school_grid_index'], how='inner')

In [369]:
combined_df.limit(5).show()

combined_df.count()

+-----------------+---------------+---------------+--------+---------+--------+-------------------+------------------+-------------+----------+----------+-----------+--------------------------+
|school_grid_index|school_wind_lat|school_wind_lon|school_u| school_v|    wspd|           Datetime|       wdir_wrt_0N|      CDSCode|school_zip|school_lat| school_lon|wind_to_school_geod_dist_m|
+-----------------+---------------+---------------+--------+---------+--------+-------------------+------------------+-------------+----------+----------+-----------+--------------------------+
|               44|      42.029999|    -124.160004|0.170099| -1.91193|1.919482|2001-01-01 00:00:00|174.91594219384388|8618206005458|     95567| 41.927578| -124.15199|               11395.56698|
|              123|      41.779999|    -124.160004|0.610318|-2.619918|2.690066|2001-01-01 00:00:00|166.88664073725573|8618206005391|     95531| 41.755659|-124.206615|               4725.461031|
|              123|      41.77

9892968

In [370]:
# compute zip code averages for wdir, wspd

zip_avgs = (combined_df.groupBy('school_zip')
            .avg('wspd','wdir_wrt_0N', 'school_u','school_v')
            .withColumnRenamed("school_zip","zip_code")
            .withColumnRenamed("avg(wspd)","avg_wspd")
            .withColumnRenamed("avg(wdir_wrt_0N)","avg_wdir_0N")
            .withColumnRenamed("avg(school_u)","avg_u")
            .withColumnRenamed("avg(school_v)","avg_v")
            .toPandas()
           )

zip_avgs['y-m'] = parquet_file

display(zip_avgs)

df_avgs_list.append(temp_df_avgs)

Unnamed: 0,zip_code,avg_wspd,avg_wdir_0N,avg_u,avg_v,y-m
0,95519,3.834603,-31.618201,-1.502319,0.392380,2017-12
1,94102,3.098176,-12.994330,-0.207525,-0.305454,2017-12
2,95134,2.068261,-16.739172,-0.410638,0.179738,2017-12
3,93924,2.484223,8.085609,-0.109868,-0.133446,2017-12
4,93545,1.316469,77.580772,0.943365,-0.255368,2017-12
...,...,...,...,...,...,...
1534,90006,1.964524,-47.490534,-0.286081,-0.714206,2017-12
1535,92408,1.559641,-59.124129,-0.641338,-0.298651,2017-12
1536,95776,2.484882,40.662983,-0.086244,-0.236341,2017-12
1537,93662,1.618309,14.061419,-0.110235,-0.182299,2017-12


In [371]:
combined_df = (combined_df
               .withColumnRenamed('wspd','school_wspd')
               .withColumnRenamed('wdir_wrt_0N','school_wdir_0N') 
              )

combined_df.limit(5).show()

+-----------------+---------------+---------------+--------+---------+-----------+-------------------+------------------+-------------+----------+----------+-----------+--------------------------+
|school_grid_index|school_wind_lat|school_wind_lon|school_u| school_v|school_wspd|           Datetime|    school_wdir_0N|      CDSCode|school_zip|school_lat| school_lon|wind_to_school_geod_dist_m|
+-----------------+---------------+---------------+--------+---------+-----------+-------------------+------------------+-------------+----------+----------+-----------+--------------------------+
|               44|      42.029999|    -124.160004|0.170099| -1.91193|   1.919482|2001-01-01 00:00:00|174.91594219384388|8618206005458|     95567| 41.927578| -124.15199|               11395.56698|
|              123|      41.779999|    -124.160004|0.610318|-2.619918|   2.690066|2001-01-01 00:00:00|166.88664073725573|8618206005391|     95531| 41.755659|-124.206615|               4725.461031|
|              

In [375]:
# need to lookup by CDSCode and year, so substring for year

combined_df = combined_df.withColumn("year", substring(col('Datetime'),1,4))

combined_df.limit(5).show()

+-----------------+---------------+---------------+--------+---------+-----------+-------------------+------------------+-------------+----------+----------+-----------+--------------------------+----+
|school_grid_index|school_wind_lat|school_wind_lon|school_u| school_v|school_wspd|           Datetime|    school_wdir_0N|      CDSCode|school_zip|school_lat| school_lon|wind_to_school_geod_dist_m|year|
+-----------------+---------------+---------------+--------+---------+-----------+-------------------+------------------+-------------+----------+----------+-----------+--------------------------+----+
|               44|      42.029999|    -124.160004|0.170099| -1.91193|   1.919482|2001-01-01 00:00:00|174.91594219384388|8618206005458|     95567| 41.927578| -124.15199|               11395.56698|2001|
|              123|      41.779999|    -124.160004|0.610318|-2.619918|   2.690066|2001-01-01 00:00:00|166.88664073725573|8618206005391|     95531| 41.755659|-124.206615|               4725.461

In [376]:
# join in ps <-> year lookup

combined_df = combined_df.join(ps_year_lookup, ['year'], how='left').drop('year')

combined_df.count()

9892968

In [377]:
combined_df.limit(5).show()

+-----------------+---------------+---------------+--------+---------+-----------+-------------------+------------------+-------------+----------+----------+-----------+--------------------------+-------+
|school_grid_index|school_wind_lat|school_wind_lon|school_u| school_v|school_wspd|           Datetime|    school_wdir_0N|      CDSCode|school_zip|school_lat| school_lon|wind_to_school_geod_dist_m|ps_year|
+-----------------+---------------+---------------+--------+---------+-----------+-------------------+------------------+-------------+----------+----------+-----------+--------------------------+-------+
|               44|      42.029999|    -124.160004|0.170099| -1.91193|   1.919482|2001-01-01 00:00:00|174.91594219384388|8618206005458|     95567| 41.927578| -124.15199|               11395.56698|   2002|
|              123|      41.779999|    -124.160004|0.610318|-2.619918|   2.690066|2001-01-01 00:00:00|166.88664073725573|8618206005391|     95531| 41.755659|-124.206615|           

In [378]:
combined_df = combined_df.join(school_to_ps_lookup, ['CDSCode','ps_year'], how='left')

In [379]:
combined_df.limit(5).show()

+-------------+-------+-----------------+---------------+---------------+--------+--------+-----------+-------------------+------------------+----------+----------+----------+--------------------------+--------+---------+-----------+-----------+------+-----------+---------------+----------------+
|      CDSCode|ps_year|school_grid_index|school_wind_lat|school_wind_lon|school_u|school_v|school_wspd|           Datetime|    school_wdir_0N|school_zip|school_lat|school_lon|wind_to_school_geod_dist_m|ps_index|   ps_lat|     ps_lon|ps_pm25_tpy|ps_zip|geod_dist_m|angle_to_school|ps_distance_rank|
+-------------+-------+-----------------+---------------+---------------+--------+--------+-----------+-------------------+------------------+----------+----------+----------+--------------------------+--------+---------+-----------+-----------+------+-----------+---------------+----------------+
|8618206005458|   2002|               44|      42.029999|    -124.160004|0.170099|-1.91193|   1.919482|200

In [380]:
combined_df.count()

49464840

In [381]:
combined_df = combined_df.join(ps_lookup, ['ps_index'], how='left')

# length should not have changed

combined_df.count()

49464840

In [399]:
combined_df = combined_df.withColumnRenamed('geod_dist_m', 'school_to_ps_geod_dist_m')

In [382]:
combined_df.limit(5).show()

+--------+-------------+-------+-----------------+---------------+---------------+--------+--------+-----------+-------------------+------------------+----------+----------+----------+--------------------------+---------+-----------+-----------+------+-----------+---------------+----------------+-------------+----------------------+
|ps_index|      CDSCode|ps_year|school_grid_index|school_wind_lat|school_wind_lon|school_u|school_v|school_wspd|           Datetime|    school_wdir_0N|school_zip|school_lat|school_lon|wind_to_school_geod_dist_m|   ps_lat|     ps_lon|ps_pm25_tpy|ps_zip|geod_dist_m|angle_to_school|ps_distance_rank|ps_grid_index|wind_to_ps_geod_dist_m|
+--------+-------------+-------+-----------------+---------------+---------------+--------+--------+-----------+-------------------+------------------+----------+----------+----------+--------------------------+---------+-----------+-----------+------+-----------+---------------+----------------+-------------+-------------------

In [383]:
combined_df = combined_df.withColumnRenamed("ps_grid_index","grid_index")

In [384]:
# join in saved-off wind measurements

combined_df = combined_df.join(wind_temp_df, ['grid_index',"Datetime"], how='left')

In [385]:
combined_df = (combined_df
                .withColumnRenamed('u','ps_u')
                .withColumnRenamed('v','ps_v')
                .withColumnRenamed('wspd','ps_wspd')
                .withColumnRenamed('wdir_wrt_0N','ps_wdir_0N')
               )

In [386]:
combined_df.count()

49464840

In [387]:
combined_df.limit(5).show()

+----------+-------------------+--------+-------------+-------+-----------------+---------------+---------------+--------+---------+-----------+------------------+----------+----------+-----------+--------------------------+---------+-----------+-----------+------+-----------+---------------+----------------+----------------------+--------+---------+--------+------------------+
|grid_index|           Datetime|ps_index|      CDSCode|ps_year|school_grid_index|school_wind_lat|school_wind_lon|school_u| school_v|school_wspd|    school_wdir_0N|school_zip|school_lat| school_lon|wind_to_school_geod_dist_m|   ps_lat|     ps_lon|ps_pm25_tpy|ps_zip|geod_dist_m|angle_to_school|ps_distance_rank|wind_to_ps_geod_dist_m|    ps_u|     ps_v| ps_wspd|        ps_wdir_0N|
+----------+-------------------+--------+-------------+-------+-----------------+---------------+---------------+--------+---------+-----------+------------------+----------+----------+-----------+--------------------------+---------+----

In [388]:
combined_df.printSchema()

root
 |-- grid_index: string (nullable = true)
 |-- Datetime: string (nullable = true)
 |-- ps_index: string (nullable = true)
 |-- CDSCode: string (nullable = true)
 |-- ps_year: string (nullable = true)
 |-- school_grid_index: string (nullable = true)
 |-- school_wind_lat: double (nullable = true)
 |-- school_wind_lon: double (nullable = true)
 |-- school_u: double (nullable = true)
 |-- school_v: double (nullable = true)
 |-- school_wspd: double (nullable = true)
 |-- school_wdir_0N: double (nullable = true)
 |-- school_zip: string (nullable = true)
 |-- school_lat: string (nullable = true)
 |-- school_lon: string (nullable = true)
 |-- wind_to_school_geod_dist_m: string (nullable = true)
 |-- ps_lat: string (nullable = true)
 |-- ps_lon: string (nullable = true)
 |-- ps_pm25_tpy: string (nullable = true)
 |-- ps_zip: string (nullable = true)
 |-- geod_dist_m: string (nullable = true)
 |-- angle_to_school: string (nullable = true)
 |-- ps_distance_rank: string (nullable = true)
 |--

In [389]:
combined_df = combined_df.withColumn("angle_to_school",col("angle_to_school").cast("double"))

In [390]:
combined_df.printSchema()

root
 |-- grid_index: string (nullable = true)
 |-- Datetime: string (nullable = true)
 |-- ps_index: string (nullable = true)
 |-- CDSCode: string (nullable = true)
 |-- ps_year: string (nullable = true)
 |-- school_grid_index: string (nullable = true)
 |-- school_wind_lat: double (nullable = true)
 |-- school_wind_lon: double (nullable = true)
 |-- school_u: double (nullable = true)
 |-- school_v: double (nullable = true)
 |-- school_wspd: double (nullable = true)
 |-- school_wdir_0N: double (nullable = true)
 |-- school_zip: string (nullable = true)
 |-- school_lat: string (nullable = true)
 |-- school_lon: string (nullable = true)
 |-- wind_to_school_geod_dist_m: string (nullable = true)
 |-- ps_lat: string (nullable = true)
 |-- ps_lon: string (nullable = true)
 |-- ps_pm25_tpy: string (nullable = true)
 |-- ps_zip: string (nullable = true)
 |-- geod_dist_m: string (nullable = true)
 |-- angle_to_school: double (nullable = true)
 |-- ps_distance_rank: string (nullable = true)
 |--

In [391]:
# spot check for nulls

combined_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in combined_df.columns]).show()

+----------+--------+--------+-------+-------+-----------------+---------------+---------------+--------+--------+-----------+--------------+----------+----------+----------+--------------------------+------+------+-----------+------+-----------+---------------+----------------+----------------------+----+----+-------+----------+
|grid_index|Datetime|ps_index|CDSCode|ps_year|school_grid_index|school_wind_lat|school_wind_lon|school_u|school_v|school_wspd|school_wdir_0N|school_zip|school_lat|school_lon|wind_to_school_geod_dist_m|ps_lat|ps_lon|ps_pm25_tpy|ps_zip|geod_dist_m|angle_to_school|ps_distance_rank|wind_to_ps_geod_dist_m|ps_u|ps_v|ps_wspd|ps_wdir_0N|
+----------+--------+--------+-------+-------+-----------------+---------------+---------------+--------+--------+-----------+--------------+----------+----------+----------+--------------------------+------+------+-----------+------+-----------+---------------+----------------+----------------------+----+----+-------+----------+
|   

In [392]:
# first angle is wind angle, second angle is heading to school (both wrt 0N)

def calculateDifferenceBetweenAngles(firstAngle, secondAngle):
    difference = secondAngle - firstAngle
    if (difference <= -180): 
        difference += 360
        return difference
    elif (difference >= 180): 
        difference -= 360
        return difference
    return difference

udf_calculateDifferenceBetweenAngles = F.udf(calculateDifferenceBetweenAngles)

In [393]:
combined_df = (combined_df
               .withColumn("school_wind_alignment",
                           udf_calculateDifferenceBetweenAngles(
                           col('school_wdir_0N'), col('angle_to_school')).cast('double'))
               .withColumn("ps_wind_alignment",
                           udf_calculateDifferenceBetweenAngles(
                           col('ps_wdir_0N'), col('angle_to_school')).cast('double'))
              )

In [394]:
combined_df.limit(5).show()

+----------+-------------------+--------+--------------+-------+-----------------+---------------+---------------+---------+---------+-----------+------------------+----------+----------+-----------+--------------------------+---------+-----------+-----------+------+-----------+---------------+----------------+----------------------+--------+---------+--------+-----------------+---------------------+-------------------+
|grid_index|           Datetime|ps_index|       CDSCode|ps_year|school_grid_index|school_wind_lat|school_wind_lon| school_u| school_v|school_wspd|    school_wdir_0N|school_zip|school_lat| school_lon|wind_to_school_geod_dist_m|   ps_lat|     ps_lon|ps_pm25_tpy|ps_zip|geod_dist_m|angle_to_school|ps_distance_rank|wind_to_ps_geod_dist_m|    ps_u|     ps_v| ps_wspd|       ps_wdir_0N|school_wind_alignment|  ps_wind_alignment|
+----------+-------------------+--------+--------------+-------+-----------------+---------------+---------------+---------+---------+-----------+------

In [395]:
# spot check for nulls

combined_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in combined_df.columns]).show()

+----------+--------+--------+-------+-------+-----------------+---------------+---------------+--------+--------+-----------+--------------+----------+----------+----------+--------------------------+------+------+-----------+------+-----------+---------------+----------------+----------------------+----+----+-------+----------+---------------------+-----------------+
|grid_index|Datetime|ps_index|CDSCode|ps_year|school_grid_index|school_wind_lat|school_wind_lon|school_u|school_v|school_wspd|school_wdir_0N|school_zip|school_lat|school_lon|wind_to_school_geod_dist_m|ps_lat|ps_lon|ps_pm25_tpy|ps_zip|geod_dist_m|angle_to_school|ps_distance_rank|wind_to_ps_geod_dist_m|ps_u|ps_v|ps_wspd|ps_wdir_0N|school_wind_alignment|ps_wind_alignment|
+----------+--------+--------+-------+-------+-----------------+---------------+---------------+--------+--------+-----------+--------------+----------+----------+----------+--------------------------+------+------+-----------+------+-----------+----------

In [396]:
combined_df = combined_df.withColumn("central_wind_alignment_180_high", 
                                     (180 - abs(((col('school_wind_alignment') + col('ps_wind_alignment'))/2))).cast('double'))

In [397]:
combined_df.limit(5).show()

+----------+-------------------+--------+--------------+-------+-----------------+---------------+---------------+---------+---------+-----------+------------------+----------+----------+-----------+--------------------------+---------+-----------+-----------+------+-----------+---------------+----------------+----------------------+--------+---------+--------+-----------------+---------------------+-------------------+-------------------------------+
|grid_index|           Datetime|ps_index|       CDSCode|ps_year|school_grid_index|school_wind_lat|school_wind_lon| school_u| school_v|school_wspd|    school_wdir_0N|school_zip|school_lat| school_lon|wind_to_school_geod_dist_m|   ps_lat|     ps_lon|ps_pm25_tpy|ps_zip|geod_dist_m|angle_to_school|ps_distance_rank|wind_to_ps_geod_dist_m|    ps_u|     ps_v| ps_wspd|       ps_wdir_0N|school_wind_alignment|  ps_wind_alignment|central_wind_alignment_180_high|
+----------+-------------------+--------+--------------+-------+-----------------+------

In [400]:
# compute normed TPY and dist using scalars from above

combined_df = (combined_df
               .withColumn('ps_pm25_tpy_normed', 
                           ((((col('ps_pm25_tpy') - ps_TPY_mean) / ps_TPY_sd) - ps_TPY_min) / (ps_TPY_max - ps_TPY_min)).cast('double'))
               .withColumn('school_to_ps_geod_dist_m_normed',((((col('school_to_ps_geod_dist_m') - ps_dist_mean) / ps_dist_sd) - ps_dist_min) / (ps_dist_max - ps_dist_min)).cast('double'))
              )

In [407]:
# compute normed wspd and wind alignment for Instrument v5

combined_df = (combined_df
               .withColumn('avg_wspd',((col('school_wspd') + col('ps_wspd'))/2).cast('double'))
              )

combined_df = (combined_df
               .withColumn('central_wind_alignment_180_high_normed',
                           (col('central_wind_alignment_180_high')/180).cast('double'))
               .withColumn('avg_wspd_normed',
                           ((col('avg_wspd') - min_wspd) / (max_wspd - min_wspd)).cast('double'))
              )

In [409]:
combined_df.limit(10).show()

+----------+-------------------+--------+--------------+-------+-----------------+---------------+---------------+---------+---------+-----------+-------------------+----------+----------+-----------+--------------------------+---------+-----------+-----------+------+------------------------+---------------+----------------+----------------------+--------+---------+--------+-----------------+---------------------+-------------------+-------------------------------+--------------------+-------------------------------+--------------------------------------+--------------------+-------------------+
|grid_index|           Datetime|ps_index|       CDSCode|ps_year|school_grid_index|school_wind_lat|school_wind_lon| school_u| school_v|school_wspd|     school_wdir_0N|school_zip|school_lat| school_lon|wind_to_school_geod_dist_m|   ps_lat|     ps_lon|ps_pm25_tpy|ps_zip|school_to_ps_geod_dist_m|angle_to_school|ps_distance_rank|wind_to_ps_geod_dist_m|    ps_u|     ps_v| ps_wspd|       ps_wdir_0N|sc

In [410]:
combined_df = (combined_df
               .withColumn('Izmd_v1_unnormed',(col('central_wind_alignment_180_high') * col('ps_pm25_tpy') * (col('avg_wspd') / col('school_to_ps_geod_dist_m'))).cast('double'))
               .withColumn('Izmd_v2_nodist_unnormed',(col('central_wind_alignment_180_high') * col('ps_pm25_tpy') * col('avg_wspd')).cast('double'))
               .withColumn('Izmd_v3_normed_D_and_TPY',(col('central_wind_alignment_180_high') * col('ps_pm25_tpy_normed') * (col('avg_wspd') / col('school_to_ps_geod_dist_m_normed'))).cast('double'))
               .withColumn('Izmd_v4_nodist_normed_TPY',(col('central_wind_alignment_180_high') * col('ps_pm25_tpy_normed') * col('avg_wspd')).cast('double'))
               .withColumn('Izmd_v5_all_normed',(col('central_wind_alignment_180_high_normed') * col('ps_pm25_tpy_normed') * (col('avg_wspd_normed') / col('school_to_ps_geod_dist_m_normed'))).cast('double'))
              )

In [411]:
combined_df.limit(10).show()

+----------+-------------------+--------+--------------+-------+-----------------+---------------+---------------+---------+---------+-----------+-------------------+----------+----------+-----------+--------------------------+---------+-----------+-----------+------+------------------------+---------------+----------------+----------------------+--------+---------+--------+-----------------+---------------------+-------------------+-------------------------------+--------------------+-------------------------------+--------------------------------------+--------------------+-------------------+--------------------+-----------------------+------------------------+-------------------------+--------------------+
|grid_index|           Datetime|ps_index|       CDSCode|ps_year|school_grid_index|school_wind_lat|school_wind_lon| school_u| school_v|school_wspd|     school_wdir_0N|school_zip|school_lat| school_lon|wind_to_school_geod_dist_m|   ps_lat|     ps_lon|ps_pm25_tpy|ps_zip|school_to_ps

In [412]:
combined_df.columns

['grid_index',
 'Datetime',
 'ps_index',
 'CDSCode',
 'ps_year',
 'school_grid_index',
 'school_wind_lat',
 'school_wind_lon',
 'school_u',
 'school_v',
 'school_wspd',
 'school_wdir_0N',
 'school_zip',
 'school_lat',
 'school_lon',
 'wind_to_school_geod_dist_m',
 'ps_lat',
 'ps_lon',
 'ps_pm25_tpy',
 'ps_zip',
 'school_to_ps_geod_dist_m',
 'angle_to_school',
 'ps_distance_rank',
 'wind_to_ps_geod_dist_m',
 'ps_u',
 'ps_v',
 'ps_wspd',
 'ps_wdir_0N',
 'school_wind_alignment',
 'ps_wind_alignment',
 'central_wind_alignment_180_high',
 'ps_pm25_tpy_normed',
 'school_to_ps_geod_dist_m_normed',
 'central_wind_alignment_180_high_normed',
 'avg_wspd_normed',
 'avg_wspd',
 'Izmd_v1_unnormed',
 'Izmd_v2_nodist_unnormed',
 'Izmd_v3_normed_D_and_TPY',
 'Izmd_v4_nodist_normed_TPY',
 'Izmd_v5_all_normed']

In [415]:
# start selecting and aggregating down

cols_to_select = ['CDSCode',
                'school_zip',
                'ps_distance_rank',
                'ps_pm25_tpy_normed',
                'ps_pm25_tpy',
                'angle_to_school',
                'school_to_ps_geod_dist_m_normed',
                'school_to_ps_geod_dist_m',
                 'central_wind_alignment_180_high',
                 'central_wind_alignment_180_high_normed',
                 'avg_wspd_normed',
                 'avg_wspd',
                 'Izmd_v1_unnormed',
                 'Izmd_v2_nodist_unnormed',
                 'Izmd_v3_normed_D_and_TPY',
                 'Izmd_v4_nodist_normed_TPY',
                 'Izmd_v5_all_normed']

combined_df = combined_df.select(*cols_to_select)

combined_df.limit(5).show()

+--------------+----------+----------------+--------------------+-----------+---------------+-------------------------------+------------------------+-------------------------------+--------------------------------------+--------------------+------------------+--------------------+-----------------------+------------------------+-------------------------+--------------------+
|       CDSCode|school_zip|ps_distance_rank|  ps_pm25_tpy_normed|ps_pm25_tpy|angle_to_school|school_to_ps_geod_dist_m_normed|school_to_ps_geod_dist_m|central_wind_alignment_180_high|central_wind_alignment_180_high_normed|     avg_wspd_normed|          avg_wspd|    Izmd_v1_unnormed|Izmd_v2_nodist_unnormed|Izmd_v3_normed_D_and_TPY|Izmd_v4_nodist_normed_TPY|  Izmd_v5_all_normed|
+--------------+----------+----------------+--------------------+-----------+---------------+-------------------------------+------------------------+-------------------------------+--------------------------------------+--------------------+

In [417]:
### AGGREGATION 1/3: reduce to m-y-school-ps(-zip) level ###

group_by_cols = ['CDSCode',
                'school_zip',
                'ps_distance_rank',
                'ps_pm25_tpy_normed',
                'ps_pm25_tpy',
                'angle_to_school',
                'school_to_ps_geod_dist_m_normed',
                'school_to_ps_geod_dist_m']

combined_df = combined_df.groupBy(*group_by_cols) \
                .agg(
                avg('central_wind_alignment_180_high').alias('central_wind_alignment_180_high'), \
                avg('central_wind_alignment_180_high_normed').alias('central_wind_alignment_180_high_normed'), \
                avg('avg_wspd_normed').alias('avg_wspd_normed'), \
                avg('avg_wspd').alias('avg_wspd'), \
                sum('Izmd_v1_unnormed').alias('Izmd_v1_unnormed'), \
                sum('Izmd_v2_nodist_unnormed').alias('Izmd_v2_nodist_unnormed'), \
                sum('Izmd_v3_normed_D_and_TPY').alias('Izmd_v3_normed_D_and_TPY'), \
                sum('Izmd_v4_nodist_normed_TPY').alias('Izmd_v4_nodist_normed_TPY'), \
                sum('Izmd_v5_all_normed').alias('Izmd_v5_all_normed') \
                    )

combined_df.limit(10).show()

+--------------+----------+----------------+--------------------+-----------+---------------+-------------------------------+------------------------+-------------------------------+--------------------------------------+-------------------+------------------+------------------+-----------------------+------------------------+-------------------------+------------------+
|       CDSCode|school_zip|ps_distance_rank|  ps_pm25_tpy_normed|ps_pm25_tpy|angle_to_school|school_to_ps_geod_dist_m_normed|school_to_ps_geod_dist_m|central_wind_alignment_180_high|central_wind_alignment_180_high_normed|    avg_wspd_normed|          avg_wspd|  Izmd_v1_unnormed|Izmd_v2_nodist_unnormed|Izmd_v3_normed_D_and_TPY|Izmd_v4_nodist_normed_TPY|Izmd_v5_all_normed|
+--------------+----------+----------------+--------------------+-----------+---------------+-------------------------------+------------------------+-------------------------------+--------------------------------------+-------------------+-----------

In [418]:
combined_df.count()

66485

In [419]:
### AGGREGATION 2/3: reduce to m-y-school(-zip) level ###

combined_df = combined_df.groupBy("CDSCode", "school_zip") \
                .agg(
                avg('central_wind_alignment_180_high').alias('central_wind_alignment_180_high'), \
                avg('ps_pm25_tpy_normed').alias('ps_pm25_tpy_normed'), \
                avg('school_to_ps_geod_dist_m_normed').alias('school_to_ps_geod_dist_m_normed'), \
                avg('ps_pm25_tpy').alias('ps_pm25_tpy'), \
                avg('school_to_ps_geod_dist_m').alias('school_to_ps_geod_dist_m'), \
                avg('central_wind_alignment_180_high_normed').alias('central_wind_alignment_180_high_normed'), \
                avg('avg_wspd_normed').alias('avg_wspd_normed'), \
                avg('avg_wspd').alias('avg_wspd'), \
                sum('Izmd_v1_unnormed').alias('Izmd_v1_unnormed'), \
                sum('Izmd_v2_nodist_unnormed').alias('Izmd_v2_nodist_unnormed'), \
                sum('Izmd_v3_normed_D_and_TPY').alias('Izmd_v3_normed_D_and_TPY'), \
                sum('Izmd_v4_nodist_normed_TPY').alias('Izmd_v4_nodist_normed_TPY'), \
                sum('Izmd_v5_all_normed').alias('Izmd_v5_all_normed') \
                    )

combined_df.limit(10).show()

+--------------+----------+-------------------------------+--------------------+-------------------------------+------------------+------------------------+--------------------------------------+-------------------+------------------+------------------+-----------------------+------------------------+-------------------------+------------------+
|       CDSCode|school_zip|central_wind_alignment_180_high|  ps_pm25_tpy_normed|school_to_ps_geod_dist_m_normed|       ps_pm25_tpy|school_to_ps_geod_dist_m|central_wind_alignment_180_high_normed|    avg_wspd_normed|          avg_wspd|  Izmd_v1_unnormed|Izmd_v2_nodist_unnormed|Izmd_v3_normed_D_and_TPY|Izmd_v4_nodist_normed_TPY|Izmd_v5_all_normed|
+--------------+----------+-------------------------------+--------------------+-------------------------------+------------------+------------------------+--------------------------------------+-------------------+------------------+------------------+-----------------------+------------------------+--

In [420]:
combined_df.count()

13297

In [421]:
### AGGREGATION 3/3: reduce to m-y-zip level ###

combined_df = combined_df.groupBy("school_zip") \
                .agg(
                avg('central_wind_alignment_180_high').alias('central_wind_alignment_180_high'), \
                avg('ps_pm25_tpy_normed').alias('ps_pm25_tpy_normed'), \
                avg('school_to_ps_geod_dist_m_normed').alias('school_to_ps_geod_dist_m_normed'), \
                avg('ps_pm25_tpy').alias('ps_pm25_tpy'), \
                avg('school_to_ps_geod_dist_m').alias('school_to_ps_geod_dist_m'), \
                avg('central_wind_alignment_180_high_normed').alias('central_wind_alignment_180_high_normed'), \
                avg('avg_wspd_normed').alias('avg_wspd_normed'), \
                avg('avg_wspd').alias('avg_wspd'), \
                avg('Izmd_v1_unnormed').alias('Izmd_v1_unnormed'), \
                avg('Izmd_v2_nodist_unnormed').alias('Izmd_v2_nodist_unnormed'), \
                avg('Izmd_v3_normed_D_and_TPY').alias('Izmd_v3_normed_D_and_TPY'), \
                avg('Izmd_v4_nodist_normed_TPY').alias('Izmd_v4_nodist_normed_TPY'), \
                avg('Izmd_v5_all_normed').alias('Izmd_v5_all_normed'), \
                count('CDSCode')
                )

In [422]:
combined_df.limit(10).show()

+----------+-------------------------------+--------------------+-------------------------------+------------------+------------------------+--------------------------------------+-------------------+------------------+------------------+-----------------------+------------------------+-------------------------+------------------+
|school_zip|central_wind_alignment_180_high|  ps_pm25_tpy_normed|school_to_ps_geod_dist_m_normed|       ps_pm25_tpy|school_to_ps_geod_dist_m|central_wind_alignment_180_high_normed|    avg_wspd_normed|          avg_wspd|  Izmd_v1_unnormed|Izmd_v2_nodist_unnormed|Izmd_v3_normed_D_and_TPY|Izmd_v4_nodist_normed_TPY|Izmd_v5_all_normed|
+----------+-------------------------------+--------------------+-------------------------------+------------------+------------------------+--------------------------------------+-------------------+------------------+------------------+-----------------------+------------------------+-------------------------+------------------+
|

In [423]:
combined_df.count()

1539

In [424]:
pd_combined_df = combined_df.toPandas()

pd_combined_df['y-m'] = parquet_file

zmy_agg_list.append(combined_df)

NameError: name 'zmy_agg_list' is not defined

In [None]:
# test final join
zip_avgs
df_merged = pd.merge(df, df_avgs, left_on=["school_zip","y-m"], right_on=["zip code", "y-m"], how="left")

## Basic Instrument form (distance is present in some versions only)

## $$I_{zmy} = \sum_{ps=1}^{3} \sum_{d_{m}=1}^{D_{m}}\theta_{downstream_{zd_{m}}} \times TPY_{ps} \times \frac{S_{zd_{m}}}{D_{ps}}$$

V1: as written, no normalizing - Our original IV </br>
V2: no dividing by distance, no normalizing - Cornelia wants this</br>
V3: as written, normalizing - normalize TPY, Dps (z-score, min max)</br>
V4: no dividing by distance, normalizing - normalize TPY, Dps (z-score, min max)</br>
V5: as written, all quantities normed


# End Calculations/Aggregation Step-through

___________________


# Begin Calculation/Aggregation Loop

In [429]:
# Setup directories/variables

local_dir = 'C:\\Users\\matts\\Documents\\Berkeley MIDS\\DataSci 210 Capstone\\non-push files\\data\\'

out_dir_zmy_raw_avgs = os.path.join(local_dir,'naive_zmy_avgs\\')
out_dir_unagged = os.path.join(local_dir,'raw_my_spark_dfs\\')
out_dir_zmy = os.path.join(local_dir,'zmy_agged_dfs\\')

In [428]:
def aggregate_zmy(early_stopping: int = 0):
    """Process and Aggregate Monthly Data
    Before running this, set input/output directories above.

    This will take a long time and generate many files.
    Its only direct output is a pandas dataframe, but it 
    will save monthly Pandas dataframes to disk in case 
    something needs revisiting.

    Inputs: early_stopping (int): for testing, set to max iterations to perform
    Outputs: combined Pandas dataframe with all aggregated y-m data
    """
    ### Opening Section: Data Load and Preprocessing ###

    # read in files
    school_lookup = spark.read.option("header",True).csv(os.path.join(local_dir, 'wind_grid_to_school_lookup_filtered.csv'))
    ps_year_lookup = spark.read.option("header",True).csv(os.path.join(local_dir, 'year_lookup.csv'))
    school_to_ps_lookup = spark.read.option("header",True).csv(os.path.join(local_dir, 'school_year_to_point_lookup_top_5_filtered.csv'))
    ps_lookup = spark.read.option("header",True).csv(os.path.join(local_dir, 'wind_grid_to_ps_point_lookup_filtered.csv'))

    # Calculate and save out statistics for first normalizing (standard scaler)
    cols_to_drop = ['ps_index','CDSCode', 'ps_year', 'ps_zip', 'angle_to_school', 'ps_distance_rank']
    ps_agg = school_to_ps_lookup.drop(*cols_to_drop).distinct().cache()

    ps_stats = ps_agg.select(avg('ps_pm25_tpy'), avg('school_to_ps_geod_dist_m'), 
                             stddev_samp('ps_pm25_tpy'), stddev_samp('school_to_ps_geod_dist_m')).collect()

    ps_stats = ps_stats[0]

    ps_TPY_mean = ps_stats[0]
    ps_dist_mean = ps_stats[1]
    ps_TPY_sd = ps_stats[2]
    ps_dist_sd = ps_stats[3]

    # Calculate and save statistics for second normalizing (min-max)
    ps_agg = ps_agg.withColumn('TPY_norm', (col('point_source_pm25_tpy') - ps_TPY_mean)/ps_TPY_sd).withColumn('dist_norm', (col('school_to_ps_geod_dist_m') - ps_dist_mean)/ps_dist_sd)
    ps_stats_mm = ps_agg.select(min('TPY_norm'), min('dist_norm'), max('TPY_norm'), max('dist_norm')).collect()
    ps_stats_mm = ps_stats_mm[0]

    ps_TPY_min = ps_stats_mm[0]
    ps_dist_min = ps_stats_mm[1]
    ps_TPY_max = ps_stats_mm[2]
    ps_dist_max = ps_stats_mm[3]

    # lists to contain pandas dataframes
    zmy_agg_list = []
    df_avgs_list = []
    
    counter = 0

    # loop through files
    for parquet_file in os.listdir(in_dir):
        
        if (early_stopping == 0 or counter < early_stopping):

            # for holding augmented df at the zip code level
            temp_zmy_df = pd.DataFrame()

            # for holding augmented df at the school level
            temp_school_my_df = pd.DataFrame()

            # read in one month
            temp_meas_df = spark.read.parquet(os.path.join(in_dir, parquet_file))

            temp_meas_df = (temp_meas_df
                            .withColumn('wdir_wrt_0N',(180*F.atan2(col('u'), col('v'))
                                                       /(3.141592653589793238462)).cast('double')
                                       )
                            )

            # drop lat/lon, wdir, and y-m and store temp df to re-join for ps wind readings (dropped can be recovered if needed)
            wind_temp_df = temp_meas_df.drop('wind_lat','wind_lon','wdir','y-m')
            wind_temp_df.cache()

            # rename for explicitness of measurements
            temp_meas_df = (temp_meas_df
                            .withColumnRenamed('wind_lat','school_wind_lat')
                            .withColumnRenamed('wind_lon','school_wind_lon')
                            .withColumnRenamed('u','school_u')
                            .withColumnRenamed('v','school_v')
                            .withColumnRenamed('grid_index','school_grid_index')
                           ).drop('wdir').drop('y-m') # wdir is wrt 0° E and is confusing; y-m not needed

            combined_df = temp_meas_df.join(school_lookup, ['school_grid_index'], how='inner')

            # compute zip code averages for wdir, wspd
            zip_avgs = (combined_df.groupBy('school_zip')
                        .avg('wspd','wdir_wrt_0N', 'school_u','school_v')
                        .withColumnRenamed("school_zip","zip_code")
                        .withColumnRenamed("avg(wspd)","avg_wspd")
                        .withColumnRenamed("avg(wdir_wrt_0N)","avg_wdir_0N")
                        .withColumnRenamed("avg(school_u)","avg_u")
                        .withColumnRenamed("avg(school_v)","avg_v")
                        .toPandas()
                       )

            zip_avgs['y-m'] = parquet_file

            df_avgs_list.append(zip_avgs)

            combined_df = (combined_df
                           .withColumnRenamed('wspd','school_wspd')
                           .withColumnRenamed('wdir_wrt_0N','school_wdir_0N') 
                          )

            ### Middle Section: joins ###

            # need to lookup by CDSCode and year, so substring for year
            combined_df = combined_df.withColumn("year", substring(col('Datetime'),1,4))

            # join in ps <-> year lookup
            combined_df = combined_df.join(ps_year_lookup, ['year'], how='left').drop('year')

            # use lookup year to join in PSs to each school
            combined_df = combined_df.join(school_to_ps_lookup, ['CDSCode','ps_year'], how='left')

            # join lookup table to get nearest wind grid index for each PS
            combined_df = combined_df.join(ps_lookup, ['ps_index'], how='left')

            # rename for clarity
            combined_df = combined_df.withColumnRenamed('geod_dist_m', 'school_to_ps_geod_dist_m')

            # rename of ease/cleanliness of join           
            combined_df = combined_df.withColumnRenamed("ps_grid_index","grid_index")

            # join wind measurements at PS that we had saved off
            combined_df = combined_df.join(wind_temp_df, ['grid_index',"Datetime"], how='left')

            # rename for clarity
            combined_df = (combined_df
                            .withColumnRenamed('u','ps_u')
                            .withColumnRenamed('v','ps_v')
                            .withColumnRenamed('wspd','ps_wspd')
                            .withColumnRenamed('wdir_wrt_0N','ps_wdir_0N')
                           )

            combined_df = combined_df.withColumn("angle_to_school",col("angle_to_school").cast("double"))

            ### Second-to-last Section: Computations ###

            # function to compute better difference between alignments, factoring the zero-crossing
            # first angle is wind angle, second angle is heading to school (both wrt 0N)
            def calculateDifferenceBetweenAngles(firstAngle, secondAngle):
                difference = secondAngle - firstAngle
                if (difference <= -180): 
                    difference += 360
                    return difference
                elif (difference >= 180): 
                    difference -= 360
                    return difference
                return difference

            udf_calculateDifferenceBetweenAngles = F.udf(calculateDifferenceBetweenAngles)

            # compute alignment at each site and "central" (avg) alignment, with 180 being high and 0 being low
            combined_df = (combined_df
                           .withColumn("school_wind_alignment",
                                       udf_calculateDifferenceBetweenAngles(
                                       col('school_wdir_0N'), col('angle_to_school')).cast('double'))
                           .withColumn("ps_wind_alignment",
                                       udf_calculateDifferenceBetweenAngles(
                                       col('ps_wdir_0N'), col('angle_to_school')).cast('double'))
                          )

            combined_df = combined_df.withColumn("central_wind_alignment_180_high", 
                                                 (180 - abs(((col('school_wind_alignment') + col('ps_wind_alignment'))/2))).cast('double'))

            # compute normed TPY and dist using scalars from above
            combined_df = (combined_df
                           .withColumn('ps_pm25_tpy_normed', 
                                       ((((col('ps_pm25_tpy') - ps_TPY_mean) / ps_TPY_sd) - ps_TPY_min) / (ps_TPY_max - ps_TPY_min)).cast('double'))
                           .withColumn('school_to_ps_geod_dist_m_normed',((((col('school_to_ps_geod_dist_m') - ps_dist_mean) / ps_dist_sd) - ps_dist_min) / (ps_dist_max - ps_dist_min)).cast('double'))
                          )

            # compute normed wspd and wind alignment for Instrument v5

            combined_df = (combined_df
                           .withColumn('avg_wspd',((col('school_wspd') + col('ps_wspd'))/2).cast('double'))
                          )

            combined_df = (combined_df
                           .withColumn('central_wind_alignment_180_high_normed',
                                       (col('central_wind_alignment_180_high')/180).cast('double'))
                           .withColumn('avg_wspd_normed',
                                       ((col('avg_wspd') - min_wspd) / (max_wspd - min_wspd)).cast('double'))
                          )

            combined_df = (combined_df
                           .withColumn('Izmd_v1_unnormed',(col('central_wind_alignment_180_high') * col('ps_pm25_tpy') * (col('avg_wspd') / col('school_to_ps_geod_dist_m'))).cast('double'))
                           .withColumn('Izmd_v2_nodist_unnormed',(col('central_wind_alignment_180_high') * col('ps_pm25_tpy') * col('avg_wspd')).cast('double'))
                           .withColumn('Izmd_v3_normed_D_and_TPY',(col('central_wind_alignment_180_high') * col('ps_pm25_tpy_normed') * (col('avg_wspd') / col('school_to_ps_geod_dist_m_normed'))).cast('double'))
                           .withColumn('Izmd_v4_nodist_normed_TPY',(col('central_wind_alignment_180_high') * col('ps_pm25_tpy_normed') * col('avg_wspd')).cast('double'))
                           .withColumn('Izmd_v5_all_normed',(col('central_wind_alignment_180_high_normed') * col('ps_pm25_tpy_normed') * (col('avg_wspd_normed') / col('school_to_ps_geod_dist_m_normed'))).cast('double'))
                          )

            ### Final Section: Aggregations and Saving Out ###

            # write out raw (unaggregated) df
            file_name = os.path.join(out_dir_unagged, parquet_file)
            combined_df.write.parquet(file_name)

            # start selecting and aggregating down

            cols_to_select = ['CDSCode',
                            'school_zip',
                            'ps_distance_rank',
                            'ps_pm25_tpy_normed',
                            'ps_pm25_tpy',
                            'angle_to_school',
                            'school_to_ps_geod_dist_m_normed',
                            'school_to_ps_geod_dist_m',
                             'central_wind_alignment_180_high',
                             'central_wind_alignment_180_high_normed',
                             'avg_wspd_normed',
                             'avg_wspd',
                             'Izmd_v1_unnormed',
                             'Izmd_v2_nodist_unnormed',
                             'Izmd_v3_normed_D_and_TPY',
                             'Izmd_v4_nodist_normed_TPY',
                             'Izmd_v5_all_normed']

            combined_df = combined_df.select(*cols_to_select)

            ### AGGREGATION 1/3: reduce to m-y-school-ps(-zip) level ###

            group_by_cols = ['CDSCode',
                            'school_zip',
                            'ps_distance_rank',
                            'ps_pm25_tpy_normed',
                            'ps_pm25_tpy',
                            'angle_to_school',
                            'school_to_ps_geod_dist_m_normed',
                            'school_to_ps_geod_dist_m']

            combined_df = combined_df.groupBy(*group_by_cols) \
                            .agg(
                            avg('central_wind_alignment_180_high').alias('central_wind_alignment_180_high'), \
                            avg('central_wind_alignment_180_high_normed').alias('central_wind_alignment_180_high_normed'), \
                            avg('avg_wspd_normed').alias('avg_wspd_normed'), \
                            avg('avg_wspd').alias('avg_wspd'), \
                            sum('Izmd_v1_unnormed').alias('Izmd_v1_unnormed'), \
                            sum('Izmd_v2_nodist_unnormed').alias('Izmd_v2_nodist_unnormed'), \
                            sum('Izmd_v3_normed_D_and_TPY').alias('Izmd_v3_normed_D_and_TPY'), \
                            sum('Izmd_v4_nodist_normed_TPY').alias('Izmd_v4_nodist_normed_TPY'), \
                            sum('Izmd_v5_all_normed').alias('Izmd_v5_all_normed') \
                                )

            ### AGGREGATION 2/3: reduce to m-y-school(-zip) level ###

            combined_df = combined_df.groupBy("CDSCode", "school_zip") \
                            .agg(
                            avg('central_wind_alignment_180_high').alias('central_wind_alignment_180_high'), \
                            avg('ps_pm25_tpy_normed').alias('ps_pm25_tpy_normed'), \
                            avg('school_to_ps_geod_dist_m_normed').alias('school_to_ps_geod_dist_m_normed'), \
                            avg('ps_pm25_tpy').alias('ps_pm25_tpy'), \
                            avg('school_to_ps_geod_dist_m').alias('school_to_ps_geod_dist_m'), \
                            avg('central_wind_alignment_180_high_normed').alias('central_wind_alignment_180_high_normed'), \
                            avg('avg_wspd_normed').alias('avg_wspd_normed'), \
                            avg('avg_wspd').alias('avg_wspd'), \
                            sum('Izmd_v1_unnormed').alias('Izmd_v1_unnormed'), \
                            sum('Izmd_v2_nodist_unnormed').alias('Izmd_v2_nodist_unnormed'), \
                            sum('Izmd_v3_normed_D_and_TPY').alias('Izmd_v3_normed_D_and_TPY'), \
                            sum('Izmd_v4_nodist_normed_TPY').alias('Izmd_v4_nodist_normed_TPY'), \
                            sum('Izmd_v5_all_normed').alias('Izmd_v5_all_normed') \
                                )

            ### AGGREGATION 3/3: reduce to m-y-zip level ###

            combined_df = combined_df.groupBy("school_zip") \
                            .agg(
                            avg('central_wind_alignment_180_high').alias('central_wind_alignment_180_high'), \
                            avg('ps_pm25_tpy_normed').alias('ps_pm25_tpy_normed'), \
                            avg('school_to_ps_geod_dist_m_normed').alias('school_to_ps_geod_dist_m_normed'), \
                            avg('ps_pm25_tpy').alias('ps_pm25_tpy'), \
                            avg('school_to_ps_geod_dist_m').alias('school_to_ps_geod_dist_m'), \
                            avg('central_wind_alignment_180_high_normed').alias('central_wind_alignment_180_high_normed'), \
                            avg('avg_wspd_normed').alias('avg_wspd_normed'), \
                            avg('avg_wspd').alias('avg_wspd'), \
                            avg('Izmd_v1_unnormed').alias('Izmd_v1_unnormed'), \
                            avg('Izmd_v2_nodist_unnormed').alias('Izmd_v2_nodist_unnormed'), \
                            avg('Izmd_v3_normed_D_and_TPY').alias('Izmd_v3_normed_D_and_TPY'), \
                            avg('Izmd_v4_nodist_normed_TPY').alias('Izmd_v4_nodist_normed_TPY'), \
                            avg('Izmd_v5_all_normed').alias('Izmd_v5_all_normed'), \
                            count('CDSCode')).toPandas()

            combined_df['y-m'] = parquet_file

            zmy_agg_list.append(combined_df)

            # counter for early stopping
            counter += 1
    df_avgs = pd.concat(df_avgs_list)
    df_avgs.to_csv(os.path.join(out_dir_zmy_raw_avgs,'df_zmy_avgs.csv'))
                            
    df = pd.concat(zmy_agg_list)
    
    df_merged = pd.merge(df, df_avgs, left_on=["school_zip","y-m"], right_on=["zip code", "y-m"], how="left")

    return df_avgs_list

In [None]:
df_avgs_list = aggregate_zmy(early_stopping = 6)