# Project 2 Lab 6 - Parcel Feature Extraction

Next, we will illustrate the construction of features related to our main task: finding the relationship between property development and water quality over time.  In a previous lab, you identified lakes for which we have complete information for the years from 2004 to 2015.  In this lab, we will

[Original Data and variable information](https://gisdata.mn.gov/organization/us-mn-state-metrogis?q=Metro+Regional+Parcel+Dataset&sort=score+desc%2C+metadata_modified+desc)

## Problem 1 - Read/filter/union/write the combined parcel data

Next, we will use a pipe to read, filter, union and write the parcel data, with the resulting file being partitioned by the year and lake ID.

#### Tasks 1

Create a pipe that

1. Finds a list of paths for the parquet "files" created in a previous lab.
    * We only needs the years 2004-2014.
2. Reads/filters each of the parcel parquet files by mapping the first helper function from the last step to each path. 
    * You should use the imported list of lakes with complete information to filter on lakes.
    * Use the distance category to only include parcels within 1600 m of the respective lake. 
    * You can drop the centroid lat & long, only with the distance information, once the filters are applied.
3. Union the resulting data frames into one data frame.

#### Task 2.

Write the combined parcel file to a parquet file that is partitioned by the lake ID and year (in that order).  This is our silver table for the parcel data.

In [1]:
import re
import pandas as pd
from pyspark.sql import SparkSession
from lake import lakes_w_complete_info
from pyspark.sql.functions import col, avg, stddev, count, when, sum, max, lit
from more_pyspark import to_pandas
from composable.strict import map, filter
from composable.sequence import reduce
from composable.glob import glob


pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [1]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .config("spark.executor.memory", '8g')
         .config("spark.driver.memory", '8g')
         .appName('Ops')
         .getOrCreate())

22/12/06 21:10:17 WARN Utils: Your hostname, jt7372wd222 resolves to a loopback address: 127.0.1.1; using 172.21.137.216 instead (on interface eth0)
22/12/06 21:10:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/06 21:10:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [236]:
# Your code here

extract_year = re.compile(r'data/parcel_(\d{4}).parquet')
year_val = lambda path: extract_year.search(path).group(1)

year_filter = lambda parcel_file: 2003 < int(year_val(parcel_file)) < 2015 
read_data_frame = lambda parcel_data_file : spark.read.parquet(parcel_data_file)

filter_lakes_w_complete_info = lambda df: df.where(col('Monit_MAP_CODE1').isin(lakes_w_complete_info))
filter_within_1600m_distance = lambda df: df.where(col('distance_categories').isin(['within 500m','between 501-1600m']))
drop_columns = lambda df: df.drop('centroid_lat', 'centroid_long', 'distance_categories', 'Distance_Parcel_Lake_meters')

union_data_frames = lambda acc, df: acc.union(df)

file_paths = './data/parcel_2*.parquet'

In [237]:
parcel_data_frame = (file_paths
 >> glob
 >> filter(year_filter)
 >> map(read_data_frame)
 >> map(filter_lakes_w_complete_info)
 >> map(filter_within_1600m_distance)
 >> map(drop_columns)
 >> reduce(union_data_frames)
)

                                                                                

In [238]:
parcel_data_frame.groupBy('Year').count().orderBy(col('Year').asc()).show()

                                                                                

+----+------+
|Year| count|
+----+------+
|2004|118873|
|2005|119739|
|2006|120794|
|2007|121822|
|2008|124284|
|2009|123848|
|2010|125678|
|2011|125275|
|2012|125650|
|2013|125996|
|2014|126422|
+----+------+



In [23]:
(parcel_data_frame
 .write
 .partitionBy('Monit_MAP_CODE1','Year')
 .mode('overwrite')
 .parquet('./data/parcels_combined.parquet')
)


[Stage 45:>                                                       (0 + 8) / 501]

22/12/05 12:15:28 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

## Problem 2 - Feature construction

**Overview.** Remember that our target output file will have one row per year-lake combination.  To attach property information, we will need to group and aggregate the parcel data to create features for each lake-year combination.  When grouping the data, be sure to maintain the variables needed to join to the water quality data, namely the lake ID and year.  Since we are looking at tracking property development/change over time, we will want to generate features tracking

* Number of properties close to each lake,
* The value of properties close to each lake,
* Aggregate size and type of the properties, and
* Other features that might impact water quality.
    
#### Task 1. Understanding parcel variables

Before we can construct features, we need to make sure we understand the parcel data.  The metro parcel data is provided by the State of Minnesota and the meta data can be found online.  For example, searching for *metro parcel 2014* lead to [this site](https://geo.btaa.org/catalog/304cf3d8-a53b-4ea9-b02a-f550bd68e320).  Clicking on the *Meta data* button in the top left, brought up more information.  Clicking *Download* opened in this meta data [in a separate page](https://resources.gisdata.mn.gov/pub/gdrs/data/pub/us_mn_state_metrogis/plan_regonal_parcels_2014/metadata/metadata.html)

Look through the **Section 4: Attributes** and identify variables that might impact the water quality of near-by lakes.

> <font color="red"> 
    The variables that might impact the water quality of the nearby lakes in my opinion could be:<br>
    <ol>
        <li>ACRES_DEED: Deeded Acreage</li>
        <li>BASEMENT: Basement Y/N</li>
        <li>COOLING: Cooling type</li>
        <li>HEATING: Heating type</li>
        <li>DWELL_TYPE: Dwelling Type</li>
        <li>EMV_TOTAL: Est. Market Value - Total</li>
        <li>FIN_SQ_FT: Square Footage</li>
        <li>GARAGE: Garage Y/N</li>
        <li>GARAGESQFT: Garage Square Footage</li>
        <li>NUM_UNITS: Number of Units</li>
        <li>TOTAL_TAX: Total Tax</li>
        <li>SALE_VALUE: Last Sales Value</li>
    </ol>

</font>

#### Task 2. Brainstorm about features

Remember that we need to aggregate down to a table with one row per lake-year, which means that feature construction will involve computing summary statistics. Below are some techniques for feature construction that you might employ.

1. **Numerical summaries.** For numeric variables, you could should compute one or more summary statistics (mean, median, SD, IQR, etc.) per group.
2. **Categorical summaries.**. For text data, we will have some more work.  Here are some strategies.
    * **Success rates.** Compute success rates for binary variables.  For example, we could compute the percent/fraction of residences that have a basement.
    * **Clean labels.** Be sure to inspect the unique labels and clean up duplicate/similar labels.
    * **Make broader classifications.**  Some categorical variables will have too many categories that apply to a small number of properties.  These should be recoded into a smaller set of broad categories.  Try to eliminate or combine rare categories in the process.
    * **Indicator columns.** Another strategy is to create indicator variables then aggregate, where the result can be either zero-one (presence/absence) or the total/proportion over all rows.  For example, we could create the number of properties of each use type.

Consider the variables you identified in the last step, and develop a feature construction strategy for each.

> <font color="red"> 
    <ol>
        <li>ACRES_DEED: Deeded Acreage: We can use numerical summaries</li>
        <li>BASEMENT: Basement Y/N: We can make indicator column and compute success rates</li>
        <li>COOLING: Cooling type: We can make indicator column and compute success rates</li>
        <li>HEATING: Heating type: We can make indicator column and compute success rates</li>
        <li>DWELL_TYPE: Dwelling Type: We can make indicator column</li>
        <li>EMV_TOTAL: Est. Market Value - Total: We can use numerical summaries</li>
        <li>FIN_SQ_FT: Square Footage: We can use numerical summaries</li>
        <li>GARAGE: Garage Y/N: We can make indicator column and compute success rates</li>
        <li>GARAGESQFT: Garage Square Footage: We can use numerical summaries</li>
        <li>NUM_UNITS: Number of Units: We can use numerical summaries</li>
        <li>TOTAL_TAX: Total Tax: We can use numerical summaries</li>
        <li>SALE_VALUE: Last Sales Value: We can use numerical summaries</li>
    </ol>

</font>

#### Task 3.  Numerical Summaries

Two important categories of property data involve the size (e.g., finished square footage) and value (e.g., accessed value and/or taxes paid).

**Tasks.** 

1. Identify 2-3 variables for each of these categories.
2. Write a query that computes the summary statistics for each of these variables for each lake-year.  
3. Write this summary table out to a parquet file named `parcel_numerical_summaries.parquet`.  Again, you should partition by lake ID and year.

> <font color='red'>For Size: I am choosing GARAGESQFT and FIN_SQ_FT <br>
    For Value: I am choosing SALE_VALUE, EMV_TOTAL, and TOTAL_TAX</font>

In [239]:
# Your code here.
all_parcel_df = read_data_frame('data/parcels_combined.parquet')

In [240]:
all_parcel_df.take(2) >>to_pandas

Unnamed: 0,ACRES_DEED,ACRES_POLY,AGPRE_ENRD,AGPRE_EXPD,AG_PRESERV,BASEMENT,BLDG_NUM,BLOCK,CITY,CITY_USPS,COOLING,COUNTY_ID,DWELL_TYPE,EMV_BLDG,EMV_LAND,EMV_TOTAL,FIN_SQ_FT,GARAGESQFT,GREEN_ACRE,HEATING,HOME_STYLE,LANDMARK,LOT,MULTI_USES,NUM_UNITS,OPEN_SPACE,OWNER_MORE,OWNER_NAME,OWN_ADD_L1,OWN_ADD_L2,OWN_ADD_L3,PARC_CODE,PIN,PLAT_NAME,PREFIXTYPE,PREFIX_DIR,SALE_DATE,SALE_VALUE,SCHOOL_DST,SPEC_ASSES,STREETNAME,STREETTYPE,SUFFIX_DIR,Shape_Area,Shape_Leng,TAX_ADD_L1,TAX_ADD_L2,TAX_ADD_L3,TAX_CAPAC,TAX_EXEMPT,TAX_NAME,TOTAL_TAX,UNIT_INFO,USE1_DESC,USE2_DESC,USE3_DESC,USE4_DESC,WSHD_DIST,XUSE1_DESC,XUSE2_DESC,XUSE3_DESC,XUSE4_DESC,YEAR_BUILT,ZIP,ZIP4,Distance_Parcel_Lake_meters,Monit_MAP_CODE1,Year
0,0.0,0.35,,,N,,9000,,ST. LOUIS PARK,ST. LOUIS PARK,,53,,0.0,100.0,100.0,0.0,,N,,,,,,,N,,JAMES P CONWAY,,,,0.0,053-1811721340030,UNPLATTED 18 117 21,,,,0.0,283,0.0,STATE HWY NO 7,,,,,1080 W COUNTY ROAD E,SHOREVIEW MN 55126,,2.0,N,JAMES P CONWAY,4.0,,Vacant Land - Commercial,,,,Minnehaha Creek,,,,,0.0,55426,,1569.5857499400609,27005300-01,2011
1,0.0,3.09,,,N,,3720,1.0,ST. LOUIS PARK,ST. LOUIS PARK,,53,,0.0,0.0,0.0,0.0,,N,,,,125.0,,,N,,LOHMANS AMHURST HOMEOWNERS,,,,0.0,053-1811721330154,LOHMANS AMHURST 2ND ADDN,,,,0.0,283,0.0,INDEPENDENCE AVE S,,,12488.7280061,2076.59403611,3680 INDEPENDENCE AVE S,ST LOUIS PARK MN 55426,,0.0,N,LOHMANS AMHURST HMWNRS ASSOC,0.0,,Common Area (No Value),,,,Minnehaha Creek,,,,,0.0,55426,,1583.1520609230608,27005300-01,2011


In [241]:
numerical_summaries = (all_parcel_df
 .groupBy('Monit_MAP_CODE1','Year')
 .agg(avg(col('EMV_TOTAL')).alias('Mean_EMV_Total'),stddev(col('EMV_TOTAL')).alias('STD_EMV_Total'),
      avg(col('SALE_VALUE')).alias('Mean_Sale_Value'),stddev(col('SALE_VALUE')).alias('STD_Sale_Value'),
      avg(col('TOTAL_TAX')).alias("Mean_Total_Tax"), stddev(col('TOTAL_TAX')).alias("STD_Total_Tax"),
      avg(col('GARAGESQFT')).alias('Mean_Garage_Size'), stddev(col('GARAGESQFT')).alias('STD_Garage_Size'),
      avg(col('FIN_SQ_FT')).alias("Mean_Fin_SQ_FT"), stddev(col('FIN_SQ_FT')).alias("STD_Fin_SQ_FT")
     )
 .orderBy(col('Monit_MAP_CODE1').asc(), col('Year').desc())
)

In [30]:
numerical_summaries.collect() >> to_pandas

                                                                                

Unnamed: 0,Monit_MAP_CODE1,Year,Mean_EMV_Total,STD_EMV_Total,Mean_Sale_Value,STD_Sale_Value,Mean_Total_Tax,STD_Total_Tax,Mean_Garage_Size,STD_Garage_Size,Mean_Fin_SQ_FT,STD_Fin_SQ_FT
0,02000500-01,2014,215637.592745,459023.9,122485.615829,183414.9,3313.685903,17837.81,,,1849.473207,8220.487298
1,02000500-01,2013,196764.633141,454192.4,120601.723001,190129.5,3319.081616,18199.95,,,1841.112119,8221.467234
2,02000500-01,2012,200414.333057,480356.0,118315.128418,190091.0,3460.064623,19346.92,,,1811.576636,8223.512958
3,02000500-01,2011,216297.932175,536941.0,111218.812242,190312.5,3459.313482,19320.29,,,1780.282051,8215.092878
4,02000500-01,2010,222789.430223,559656.1,110765.119736,190448.3,3384.645747,18646.63,,,1760.876135,8210.519477
5,02000500-01,2009,242463.996697,591927.1,107504.150289,190962.5,3489.229562,19418.66,,,1737.592898,8212.984261
6,02000500-01,2008,251564.905037,639093.2,98506.346821,180956.6,3317.029727,19578.78,,,1730.542527,8224.659654
7,02000500-01,2007,238607.467283,625283.2,96000.966898,213258.7,3230.503464,18438.44,,,1675.535027,7962.522155
8,02000500-01,2006,235092.781833,609406.3,78391.635036,174394.7,3435.78751,23647.02,,,1652.141119,8163.855783
9,02000500-01,2005,232921.084864,651323.0,81763.251094,198193.2,0.0,0.0,,,1734.965879,8447.712659


In [31]:
(numerical_summaries
 .write
 .partitionBy('Monit_MAP_CODE1','Year')
 .mode('overwrite')
 .parquet('./data/parcel_numerical_summaries.parquet')
)

                                                                                

## Problem 3.  Simple categorical summaries.

In this part, you will create summary statistics for some of the simpler categorical variables.

**Binary variables.** There are two examples of binary variables, listed below.  You will need to compute the percent of `Yes` for each.

* GARAGE: Garage Y/N
* BASEMENT: Basement Y/N

**Other categorical variables.** There are a number of other categorical variables.  You need to select one of these variables, inspect/clean your variable as needed, create indicator variables for each resulting label, and compute summary statistics for each label.

* HOMESTEAD: Homestead Status
* TAX_EXEMPT: Tax Exempt Status 
* DWELL_TYPE: Dwelling Type 
* HOME_STYLE: Home Style
* HEATING: Heating type
* COOLING: Cooling type

**Tasks.**
Create a query that

1. Select one binary and two other categorical variables for feature construction.
2. Reads in the parcel data and selects the relevant columns (be sure to keep the lake ID and year).
3. Inspect unique labels and recode/clean as needed.
4. Creates indicator columns for all labels.
5. Groups/aggregates to compute summary statistics for each lake year.

Write this summary table out to a parquet file named `parcel_categorical_summaries.parquet`.  Again, you should partition by lake ID and year.

In [242]:
# Your code here
all_parcel_df.select("BASEMENT").distinct().show()



+--------+
|BASEMENT|
+--------+
|    null|
|       Y|
|       N|
+--------+




                                                                                

In [316]:
all_parcel_df.select("TAX_EXEMPT").distinct().show()




+----------+
|TAX_EXEMPT|
+----------+
|      null|
|         Y|
|         N|
+----------+





                                                                                

In [314]:
binary_summaries = (all_parcel_df
.withColumn("GARAGE", when(col("GARAGESQFT") > 0, "Y").otherwise("N"))
.groupBy('Monit_MAP_CODE1','Year')
.agg((count(when(col("BASEMENT") == "Y", 1))/count(lit(1))*100).alias("Percentage_Yes_Basement"),
      (count(when(col("GARAGE") == "Y", 1))/count(lit(1))*100).alias("Percentage_Yes_Garage"),
      (count(when(col("TAX_EXEMPT") == "Y", 1))/count(lit(1))*100).alias("Percentage_Yes_Tax_Exempt")
     )
 .orderBy(col('Monit_MAP_CODE1').asc(), col('Year').desc())
)

In [315]:
binary_summaries.collect() >> to_pandas

                                                                                

Unnamed: 0,Monit_MAP_CODE1,Year,Percentage_Yes_Basement,Percentage_Yes_Garage,Percentage_Yes_Tax_Exempt
0,02000500-01,2014,70.486397,0.0,7.914262
1,02000500-01,2013,69.991756,0.0,7.914262
2,02000500-01,2012,70.173985,0.0,8.119304
3,02000500-01,2011,76.840364,0.0,8.271299
4,02000500-01,2010,75.887696,0.0,8.092486
5,02000500-01,2009,74.566474,0.0,7.927333
6,02000500-01,2008,73.575557,0.0,7.927333
7,02000500-01,2007,72.440339,0.0,8.314088
8,02000500-01,2006,70.072993,0.0,8.434712
9,02000500-01,2005,75.065617,0.0,8.748906


In [271]:
list(all_parcel_df.select("COOLING").distinct().toPandas()['COOLING'])

                                                                                

['FORCED AIR',
 'PKG RF TOP',
 'WALL MOUNT',
 'CENTRAL',
 None,
 'GRAVITY/WA',
 'Forced Air',
 'RAD/BBELEC',
 'Y',
 'Unknown',
 'N',
 'STEAM W A/',
 'HOT WATER',
 'NONE',
 'CNTRL',
 'A/CON',
 '1 AC UNIT',
 'CEN.EVAP',
 '3 AC UNITS',
 '2 AC UNITS',
 'CEN. REFRI',
 '4 AC UNITS',
 'CENTRAL W/AIR COND',
 'OTHER W A/',
 'Evaporative Cooling',
 'Space Heater',
 'CHILL WATR',
 'CEN.REFRIG']

In [355]:
air_cooling = ['FORCED AIR','Forced Air','STEAM W A/', 'OTHER W A/']
ac_cooling = ['A/CON','1 AC UNIT','2 AC UNITS', '3 AC UNITS','4 AC UNITS','WALL MOUNT']
central_cooling = ['CENTRAL','CNTRL','CEN. REFRI','CENTRAL W/AIR COND','CEN.REFRIG','CEN.EVAP']
other_cooling = ['RAD/BBELEC','GRAVITY/WA','PKG RF TOP','PKG RF TOP','HOT WATER','Evaporative Cooling','Space Heater','CHILL WATR', 'Y', 'Unknown']

In [358]:
cooling_summaries=(all_parcel_df
 .select('Monit_MAP_CODE1','Year', 'COOLING')
 .withColumn("cooling_type", when(col("COOLING").isin(air_cooling),"air")
                            .when(col("COOLING").isin(ac_cooling),"ac")
                            .when(col("COOLING").isin(central_cooling),"central")
                            .when(col("COOLING").isin(other_cooling),"other")
                            .when(col("COOLING") == "NONE", "N")
                            .otherwise(col("COOLING"))
            )
 .groupBy('Monit_MAP_CODE1','Year')
 .agg((count(when(col("cooling_type") == "air", 1))/count(lit(1))*100).alias("Percentage_Air_Cooling"),
      (count(when(col("cooling_type") == "ac", 1))/count(lit(1))*100).alias("Percentage_AC_Cooling"),
      (count(when(col("cooling_type") == "central", 1))/count(lit(1))*100).alias("Percentage_Central_Cooling"),
      (count(when(col("cooling_type") == "other", 1))/count(lit(1))*100).alias("Percentage_Other_Cooling"),
      (count(when(col("cooling_type") == "N", 1))/count(lit(1))*100).alias("Percentage_No_Cooling"),
     )
 .orderBy(col('Monit_MAP_CODE1').asc(), col('Year').desc())
)

In [359]:
cooling_summaries.collect() >> to_pandas

                                                                                

Unnamed: 0,Monit_MAP_CODE1,Year,Percentage_Air_Cooling,Percentage_AC_Cooling,Percentage_Central_Cooling,Percentage_Other_Cooling,Percentage_No_Cooling
0,02000500-01,2014,0.0,0.0,0.0,2.308326,97.691674
1,02000500-01,2013,0.824402,0.0,0.0,1.154163,0.0
2,02000500-01,2012,0.0,0.0,0.0,0.0,0.0
3,02000500-01,2011,0.0,0.0,0.0,0.0,0.0
4,02000500-01,2010,0.0,0.0,0.0,0.0,0.0
5,02000500-01,2009,0.0,0.0,0.0,0.0,0.0
6,02000500-01,2008,0.0,0.0,0.0,0.0,0.0
7,02000500-01,2007,0.0,0.0,0.0,0.0,0.0
8,02000500-01,2006,0.0,0.0,0.0,0.0,0.0
9,02000500-01,2005,0.0,0.0,0.0,0.0,0.0


In [321]:
list(all_parcel_df.select("HEATING").distinct().toPandas()['HEATING'])

                                                                                

['FORCED AIR',
 '0',
 'FA Gas',
 None,
 'GRAVITY/WA',
 'Forced Air',
 'RAD/BBELEC',
 'SPACE HTR',
 'Yes',
 'Electric',
 'Gravity',
 'FRC AIR ND',
 'STEAM W A/',
 'HOT WATER',
 'NONE',
 'RAD INFRED',
 'Hot Water',
 'Other',
 'Oil F.A.',
 'H. Water',
 'No',
 'FHA Gas',
 'HOT AIR',
 'Wood',
 'AIR DUCTED',
 'ENG F AIR',
 'IN FLOOR',
 'RAD WATER',
 'FHA',
 'CONVECTION',
 'ELECTRIC',
 'SPACE HEAT',
 'OTHER W A/',
 'Forced Air Furnace',
 'Y',
 'Electric Baseboard',
 'Complete HVAC',
 'N',
 'Radiant Space Heaters',
 'Package Unit',
 'ELEC BASBD',
 'STEAM',
 'Evaporative Cooling',
 'Baseboard, Hot Water',
 'Gravity Furnace',
 'Space Heater',
 'RAD ELEC',
 'ELEC WALL',
 'GEO THERM',
 'Solar',
 'LP',
 'ENG STEAM',
 'HEAT PUMP',
 'SP HT W/FN',
 'SPACE-FAN']

In [343]:
air_heating = ['FORCED AIR','FA Gas','Forced Air','Forced Air','STEAM W A/','Oil F.A.','HOT AIR',
               'AIR DUCTED','ENG F AIR','FHA','CONVECTION','OTHER W A/','Forced Air Furnace', 'FHA Gas', 'FRC AIR ND',]

space_heating = ['SPACE HTR','SPACE HEAT','Radiant Space Heaters','Space Heater','SP HT W/FN','SPACE-FAN'] 

water_heating = ['HOT WATER','HOT WATER','H. Water','RAD WATER','STEAM','Evaporative Cooling',
                 'Baseboard, Hot Water','ENG STEAM','HEAT PUMP', 'Hot Water']

electic_heating = ['Electric','ELECTRIC','Electric Baseboard','ELEC BASBD','RAD ELEC','ELEC WALL']

other_heating = ['RAD/BBELEC','Yes','Gravity','RAD INFRED','Other','Wood','IN FLOOR','Y',
                 'Complete HVAC','GRAVITY/WA','Package Unit', 'Gravity Furnace','GEO THERM','Solar','LP']

no_heating = ['0','No','NONE', "N"]

In [360]:
heating_summaries=(all_parcel_df
 .select('Monit_MAP_CODE1','Year', 'HEATING')
 .withColumn("heating_type", when(col("HEATING").isin(air_heating),"air")
                            .when(col("HEATING").isin(space_heating),"space")
                            .when(col("HEATING").isin(water_heating),"water")
                            .when(col("HEATING").isin(electic_heating),"electric")
                            .when(col("HEATING").isin(other_heating),"other")
                            .when(col("HEATING").isin(no_heating), "no_heating")
                            .otherwise(col("HEATING"))
            )
 .groupBy('Monit_MAP_CODE1','Year')
 .agg((count(when(col("heating_type") == "air", 1))/count(lit(1))*100).alias("Percentage_Air_Heating"),
      (count(when(col("heating_type") == "space", 1))/count(lit(1))*100).alias("Percentage_Space_Heating"),
      (count(when(col("heating_type") == "water", 1))/count(lit(1))*100).alias("Percentage_Water_Heating"),
      (count(when(col("heating_type") == "electric", 1))/count(lit(1))*100).alias("Percentage_Electric_Heating"),
      (count(when(col("heating_type") == "other", 1))/count(lit(1))*100).alias("Percentage_Other_Heating"),
      (count(when(col("heating_type") == "no_heating", 1))/count(lit(1))*100).alias("Percentage_No_Heating"),
     )
 .orderBy(col('Monit_MAP_CODE1').asc(), col('Year').desc())
)

In [361]:
heating_summaries.collect() >> to_pandas

                                                                                

Unnamed: 0,Monit_MAP_CODE1,Year,Percentage_Air_Heating,Percentage_Space_Heating,Percentage_Water_Heating,Percentage_Electric_Heating,Percentage_Other_Heating,Percentage_No_Heating
0,02000500-01,2014,0.0,0.0,0.0,0.0,85.325639,14.674361
1,02000500-01,2013,79.060181,0.824402,1.154163,2.802968,1.154163,0.0
2,02000500-01,2012,0.0,0.0,0.0,0.0,0.0,0.0
3,02000500-01,2011,0.0,0.0,0.0,0.0,0.0,0.0
4,02000500-01,2010,0.0,0.0,0.0,0.0,0.0,0.0
5,02000500-01,2009,0.0,0.0,0.0,0.0,0.0,0.0
6,02000500-01,2008,0.0,0.0,0.0,0.0,0.0,0.0
7,02000500-01,2007,0.0,0.0,0.0,0.0,0.0,0.0
8,02000500-01,2006,0.0,0.0,0.0,0.0,0.0,0.0
9,02000500-01,2005,0.0,0.0,0.0,0.0,0.0,0.0


In [362]:
categorical_summaries = (all_parcel_df
 .select('Monit_MAP_CODE1','Year', 'COOLING', 'HEATING')
 .withColumn("cooling_type", when(col("COOLING").isin(air_cooling),"air")
                            .when(col("COOLING").isin(ac_cooling),"ac")
                            .when(col("COOLING").isin(central_cooling),"central")
                            .when(col("COOLING").isin(other_cooling),"other")
                            .when(col("COOLING") == "NONE", "N")
                            .otherwise(col("COOLING"))
            )
 .withColumn("heating_type", when(col("HEATING").isin(air_heating),"air")
                            .when(col("HEATING").isin(space_heating),"space")
                            .when(col("HEATING").isin(water_heating),"water")
                            .when(col("HEATING").isin(electic_heating),"electric")
                            .when(col("HEATING").isin(other_heating),"other")
                            .when(col("HEATING").isin(no_heating), "no_heating")
                            .otherwise(col("HEATING"))
            )
 .groupBy('Monit_MAP_CODE1','Year')
 .agg((count(when(col("cooling_type") == "air", 1))/count(lit(1))*100).alias("Percentage_Air_Cooling"),
      (count(when(col("cooling_type") == "ac", 1))/count(lit(1))*100).alias("Percentage_AC_Cooling"),
      (count(when(col("cooling_type") == "central", 1))/count(lit(1))*100).alias("Percentage_Central_Cooling"),
      (count(when(col("cooling_type") == "other", 1))/count(lit(1))*100).alias("Percentage_Other_Cooling"),
      (count(when(col("cooling_type") == "N", 1))/count(lit(1))*100).alias("Percentage_No_Cooling"),
      
      (count(when(col("heating_type") == "air", 1))/count(lit(1))*100).alias("Percentage_Air_Heating"),
      (count(when(col("heating_type") == "space", 1))/count(lit(1))*100).alias("Percentage_Space_Heating"),
      (count(when(col("heating_type") == "water", 1))/count(lit(1))*100).alias("Percentage_Water_Heating"),
      (count(when(col("heating_type") == "electric", 1))/count(lit(1))*100).alias("Percentage_Electric_Heating"),
      (count(when(col("heating_type") == "other", 1))/count(lit(1))*100).alias("Percentage_Other_Heating"),
      (count(when(col("heating_type") == "no_heating", 1))/count(lit(1))*100).alias("Percentage_No_Heating"),
     )
 .orderBy(col('Monit_MAP_CODE1').asc(), col('Year').desc())
)

In [363]:
categorical_summaries.collect() >> to_pandas

                                                                                

Unnamed: 0,Monit_MAP_CODE1,Year,Percentage_Air_Cooling,Percentage_AC_Cooling,Percentage_Central_Cooling,Percentage_Other_Cooling,Percentage_No_Cooling,Percentage_Air_Heating,Percentage_Space_Heating,Percentage_Water_Heating,Percentage_Electric_Heating,Percentage_Other_Heating,Percentage_No_Heating
0,02000500-01,2014,0.0,0.0,0.0,2.308326,97.691674,0.0,0.0,0.0,0.0,85.325639,14.674361
1,02000500-01,2013,0.824402,0.0,0.0,1.154163,0.0,79.060181,0.824402,1.154163,2.802968,1.154163,0.0
2,02000500-01,2012,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,02000500-01,2011,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,02000500-01,2010,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
5,02000500-01,2009,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6,02000500-01,2008,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
7,02000500-01,2007,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
8,02000500-01,2006,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
9,02000500-01,2005,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [367]:
all_summaries = (binary_summaries
.join(categorical_summaries, on=['Monit_MAP_CODE1','Year'], how='left')
.orderBy(col('Monit_MAP_CODE1').asc(), col('Year').desc())
)

In [368]:
all_summaries.collect() >> to_pandas

                                                                                

Unnamed: 0,Monit_MAP_CODE1,Year,Percentage_Yes_Basement,Percentage_Yes_Garage,Percentage_Yes_Tax_Exempt,Percentage_Air_Cooling,Percentage_AC_Cooling,Percentage_Central_Cooling,Percentage_Other_Cooling,Percentage_No_Cooling,Percentage_Air_Heating,Percentage_Space_Heating,Percentage_Water_Heating,Percentage_Electric_Heating,Percentage_Other_Heating,Percentage_No_Heating
0,02000500-01,2014,70.486397,0.0,7.914262,0.0,0.0,0.0,2.308326,97.691674,0.0,0.0,0.0,0.0,85.325639,14.674361
1,02000500-01,2013,69.991756,0.0,7.914262,0.824402,0.0,0.0,1.154163,0.0,79.060181,0.824402,1.154163,2.802968,1.154163,0.0
2,02000500-01,2012,70.173985,0.0,8.119304,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,02000500-01,2011,76.840364,0.0,8.271299,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,02000500-01,2010,75.887696,0.0,8.092486,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
5,02000500-01,2009,74.566474,0.0,7.927333,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6,02000500-01,2008,73.575557,0.0,7.927333,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
7,02000500-01,2007,72.440339,0.0,8.314088,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
8,02000500-01,2006,70.072993,0.0,8.434712,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
9,02000500-01,2005,75.065617,0.0,8.748906,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [369]:
(all_summaries
 .write
 .partitionBy('Monit_MAP_CODE1','Year')
 .mode('overwrite')
 .parquet('./data/parcel_categorical_summaries.parquet')
)

                                                                                

## Problem 4.  Join all the summaries.

Finally, you need to join all the summaries created above, along with the water quality summaries created in a previous lab, into one overall summary file.  Write the resulting table to a CSV file named `water_quality_and_parcel_summaries_2004_to_2015.csv`.

Next, we need to recode the 

In [376]:
num_summaries = read_data_frame('data/parcel_numerical_summaries.parquet')
cat_summaries = read_data_frame('data/parcel_categorical_summaries.parquet')
lake_summaries = read_data_frame('data/water_quality_by_year.parquet')

In [372]:
num_summaries.take(2)>>to_pandas

Unnamed: 0,Mean_EMV_Total,STD_EMV_Total,Mean_Sale_Value,STD_Sale_Value,Mean_Total_Tax,STD_Total_Tax,Mean_Garage_Size,STD_Garage_Size,Mean_Fin_SQ_FT,STD_Fin_SQ_FT,Monit_MAP_CODE1,Year
0,348010.984595,1152360.0,360569.497656,1332838.0,0.0,0.0,475.82931,148.595251,1422.606832,754.835089,82008900-01,2011
1,294502.497716,437567.9,208876.556808,188467.4,15.017971,317.225788,642.85362,237.909092,1666.18276,821.250171,82009700-01,2014


In [374]:
cat_summaries.take(2)>>to_pandas

Unnamed: 0,Percentage_Yes_Basement,Percentage_Yes_Garage,Percentage_Yes_Tax_Exempt,Percentage_Air_Cooling,Percentage_AC_Cooling,Percentage_Central_Cooling,Percentage_Other_Cooling,Percentage_No_Cooling,Percentage_Air_Heating,Percentage_Space_Heating,Percentage_Water_Heating,Percentage_Electric_Heating,Percentage_Other_Heating,Percentage_No_Heating,Monit_MAP_CODE1,Year
0,0.0,0.0,4.802111,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,19002700-01,2004
1,80.81761,74.842767,6.603774,0.0,0.0,0.0,66.666667,0.0,78.930818,0.0,0.0,0.314465,1.572327,0.0,82015300-01,2010


In [377]:
lake_summaries.take(2)>>to_pandas

Unnamed: 0,LAKE_NAME,DNR_ID_Site_Number,Mean_Secchi_Depth_Result,Mean_Phosporus_Result,Year
0,Crystal Lake,19002700-01,1.468182,0.064,2005
1,Fish Lake,82013700-01,1.2308,0.06115,2005


In [455]:
water_quality_and_parcel_summaries = (lake_summaries
 .join(num_summaries, on=[lake_summaries['DNR_ID_Site_Number'] == num_summaries['Monit_MAP_CODE1'],
                          lake_summaries['Year'] == num_summaries['Year']], how='right')
 .drop(lake_summaries.Year)
 .join(cat_summaries, on=['Monit_MAP_CODE1','Year'], how='left')
 .drop('DNR_ID_Site_Number')
 .orderBy(col('Monit_MAP_CODE1').asc(), num_summaries['Year'].desc())
)

In [460]:
summaries_pandas = water_quality_and_parcel_summaries.collect() >> to_pandas

In [461]:
summaries_pandas

Unnamed: 0,Monit_MAP_CODE1,Year,LAKE_NAME,Mean_Secchi_Depth_Result,Mean_Phosporus_Result,Mean_EMV_Total,STD_EMV_Total,Mean_Sale_Value,STD_Sale_Value,Mean_Total_Tax,STD_Total_Tax,Mean_Garage_Size,STD_Garage_Size,Mean_Fin_SQ_FT,STD_Fin_SQ_FT,Percentage_Yes_Basement,Percentage_Yes_Garage,Percentage_Yes_Tax_Exempt,Percentage_Air_Cooling,Percentage_AC_Cooling,Percentage_Central_Cooling,Percentage_Other_Cooling,Percentage_No_Cooling,Percentage_Air_Heating,Percentage_Space_Heating,Percentage_Water_Heating,Percentage_Electric_Heating,Percentage_Other_Heating,Percentage_No_Heating
0,02000500-01,2014,George Watch Lake,0.716667,0.108778,215637.592745,459023.9,122485.615829,183414.9,3313.685903,17837.81,,,1849.473207,8220.487298,70.486397,0.0,7.914262,0.0,0.0,0.0,2.308326,97.691674,0.0,0.0,0.0,0.0,85.325639,14.674361
1,02000500-01,2013,George Watch Lake,0.365,0.3105,196764.633141,454192.4,120601.723001,190129.5,3319.081616,18199.95,,,1841.112119,8221.467234,69.991756,0.0,7.914262,0.824402,0.0,0.0,1.154163,0.0,79.060181,0.824402,1.154163,2.802968,1.154163,0.0
2,02000500-01,2012,George Watch Lake,0.359,0.2649,200414.333057,480356.0,118315.128418,190091.0,3460.064623,19346.92,,,1811.576636,8223.512958,70.173985,0.0,8.119304,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,02000500-01,2011,George Watch Lake,0.973333,0.119417,216297.932175,536941.0,111218.812242,190312.5,3459.313482,19320.29,,,1780.282051,8215.092878,76.840364,0.0,8.271299,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,02000500-01,2010,George Watch Lake,0.493333,0.173,222789.430223,559656.1,110765.119736,190448.3,3384.645747,18646.63,,,1760.876135,8210.519477,75.887696,0.0,8.092486,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
5,02000500-01,2009,George Watch Lake,0.538,0.1056,242463.996697,591927.1,107504.150289,190962.5,3489.229562,19418.66,,,1737.592898,8212.984261,74.566474,0.0,7.927333,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6,02000500-01,2008,George Watch Lake,0.55,0.148833,251564.905037,639093.2,98506.346821,180956.6,3317.029727,19578.78,,,1730.542527,8224.659654,73.575557,0.0,7.927333,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
7,02000500-01,2007,George Watch Lake,0.562857,0.203714,238607.467283,625283.2,96000.966898,213258.7,3230.503464,18438.44,,,1675.535027,7962.522155,72.440339,0.0,8.314088,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
8,02000500-01,2006,George Watch Lake,0.728571,0.164286,235092.781833,609406.3,78391.635036,174394.7,3435.78751,23647.02,,,1652.141119,8163.855783,70.072993,0.0,8.434712,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
9,02000500-01,2005,George Watch Lake,0.681667,0.210083,232921.084864,651323.0,81763.251094,198193.2,0.0,0.0,,,1734.965879,8447.712659,75.065617,0.0,8.748906,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [459]:
(water_quality_and_parcel_summaries
 .write
 .mode('overwrite')
 .csv('./data/water_quality_and_parcel_summaries_2004_to_2014.csv')
)

In [463]:
summaries_pandas.to_csv('./data/water_quality_and_parcel_summaries_2004_to_2015.csv')

## Problem 5.  Put it all together

It is often useful to package all of the data constructions steps together in one convenient place.  Your last task is to

1. Gather all of your data construction code below.
    * You don't need to include exploratory code, e.g., exploring join mismatches; only the code necessary to combine, clean, and write your data.
2. Clean/refactor the code.

In [None]:
# Your code here.

#Finding common column between datasets and writing it to a python file
common_cols_set = ('./data/MinneMUDAC_raw_files/*parcels.txt'
              >> glob
              >> filter(lambda parcel_file: int(get_year(parcel_file)) > 2003)
              >> map(lambda parcel_data_file: make_data_frame(parcel_data_file))
              >> map(lambda parcel_data_frame: set(parcel_data_frame.columns))
              >> reduce(lambda acc, s: acc.intersection(s))
              )
sorted_common_cols_list = (common_cols_set 
                    >> to_list() 
                    >> sorted)

with open('parcel.py', 'w') as f: 
    f.write(f'common_columns_2004_to_2015 = {common_cols_set}')
    f.write('\n')
    f.write(f'sorted_common_columns_2004_to_2015 = {sorted_common_cols_list}')
    
# writing xref file to parquet
xref_data = spark.read.csv('./data/MinneMUDAC_raw_files/Parcel_Lake_Monitoring_Site_Xref.txt',
                              header = True,
                              sep='\t')
xref_data_w_distance_var = (xref_data
 .select('Monit_MAP_CODE1','Distance_Parcel_Lake_meters','centroid_long','centroid_lat')
 .withColumn("distance_categories", when(col('Distance_Parcel_Lake_meters') <= 500, 'within 500m')
                         .when((col('Distance_Parcel_Lake_meters') > 500)
                             & (col('Distance_Parcel_Lake_meters') <= 1600), 'between 501-1600m')
                         .otherwise('over 1600m')
            )
)

(xref_data_w_distance_var
 .write
 .partitionBy('Monit_MAP_CODE1','distance_categories')
 .mode('overwrite')
 .parquet('./data/xref.parquet')
)

# writing parcel by year to parquet
xref_parquet = spark.read.parquet("data/xref.parquet")

def parcel_join_with_xref(file_path):
    join_xref = (make_data_frame(file_path)
                 .select(sorted_common_columns_2004_to_2015)
                 .join(xref_data_w_distance_var, on=["centroid_lat", "centroid_long"], how='inner')
                )
    return join_xref

def create_partition(year, df):
    (df
    .write
    .partitionBy('Monit_MAP_CODE1','distance_categories')
    .mode('overwrite')
    .parquet(f'./data/parcel_{year}.parquet')
    )
    
('./data/MinneMUDAC_raw_files/*parcels.txt'
 >> glob
 >> filter(lambda parcel_file: int(get_year(parcel_file)) > 2003)
 >> map(lambda parcel_data_file: (get_year(parcel_data_file),parcel_join_with_xref(parcel_data_file)))
 >> star_map(create_partition)
)

#water quality analysis and writing to parquet
water_quality = spark.read.csv('data/MinneMUDAC_raw_files/mces_lakes_1999_2014.txt', sep='\t', header=True)

new_water_quality = (
    water_quality
    .where((col('Secchi_Depth_QUALIFIER') == 'Approved') & (col('Total_Phosphorus_QUALIFIER') == 'Approved'))
    .withColumn('Year', year(col('END_DATE')))
    .where(col('Year') > 2003)
    .groupBy('LAKE_NAME', 'DNR_ID_Site_Number', 'Year')
    .agg(avg('Secchi_Depth_RESULT').alias('Mean_Secchi_Depth_Result'),
         avg('Total_Phosphorus_RESULT').alias('Mean_Phosporus_Result'))
)
complete_lakes = (new_water_quality
 .groupBy('LAKE_NAME', 'DNR_ID_Site_Number')
 .count()
 .where(col('count') == 11)
)
lakes_w_complete_info = list(complete_lakes.select('DNR_ID_Site_Number').toPandas()['DNR_ID_Site_Number'])
with open('lake.py', 'w') as f: 
    f.write(f'lakes_w_complete_info = {lakes_w_complete_info}')
    
(new_water_quality
 .where(col('DNR_ID_Site_Number').isin(lakes_w_complete_info))
 .write
 .partitionBy('Year')
 .mode('overwrite')
 .parquet(f'./data/water_quality_by_year.parquet')
)

# aggregating summaries and writing to parquet and csv

extract_year = re.compile(r'data/parcel_(\d{4}).parquet')
year_val = lambda path: extract_year.search(path).group(1)

year_filter = lambda parcel_file: 2003 < int(year_val(parcel_file)) < 2015 
read_data_frame = lambda parcel_data_file : spark.read.parquet(parcel_data_file)

filter_lakes_w_complete_info = lambda df: df.where(col('Monit_MAP_CODE1').isin(lakes_w_complete_info))
filter_within_1600m_distance = lambda df: df.where(col('distance_categories').isin(['within 500m','between 501-1600m']))
drop_columns = lambda df: df.drop('centroid_lat', 'centroid_long', 'distance_categories', 'Distance_Parcel_Lake_meters')

union_data_frames = lambda acc, df: acc.union(df)

file_paths = './data/parcel_2*.parquet'

parcel_data_frame = (file_paths
 >> glob
 >> filter(year_filter)
 >> map(read_data_frame)
 >> map(filter_lakes_w_complete_info)
 >> map(filter_within_1600m_distance)
 >> map(drop_columns)
 >> reduce(union_data_frames)
)

(parcel_data_frame
 .write
 .partitionBy('Monit_MAP_CODE1','Year')
 .mode('overwrite')
 .parquet('./data/parcels_combined.parquet')
)

all_parcel_df = read_data_frame('data/parcels_combined.parquet')
num_summaries = (all_parcel_df
 .groupBy('Monit_MAP_CODE1','Year')
 .agg(avg(col('EMV_TOTAL')).alias('Mean_EMV_Total'),stddev(col('EMV_TOTAL')).alias('STD_EMV_Total'),
      avg(col('SALE_VALUE')).alias('Mean_Sale_Value'),stddev(col('SALE_VALUE')).alias('STD_Sale_Value'),
      avg(col('TOTAL_TAX')).alias("Mean_Total_Tax"), stddev(col('TOTAL_TAX')).alias("STD_Total_Tax"),
      avg(col('GARAGESQFT')).alias('Mean_Garage_Size'), stddev(col('GARAGESQFT')).alias('STD_Garage_Size'),
      avg(col('FIN_SQ_FT')).alias("Mean_Fin_SQ_FT"), stddev(col('FIN_SQ_FT')).alias("STD_Fin_SQ_FT")
     )
 .orderBy(col('Monit_MAP_CODE1').asc(), col('Year').desc())
)

(num_summaries
 .write
 .partitionBy('Monit_MAP_CODE1','Year')
 .mode('overwrite')
 .parquet('./data/parcel_numerical_summaries.parquet')
)

binary_summaries = (all_parcel_df
.withColumn("GARAGE", when(col("GARAGESQFT") > 0, "Y").otherwise("N"))
.groupBy('Monit_MAP_CODE1','Year')
.agg((count(when(col("BASEMENT") == "Y", 1))/count(lit(1))*100).alias("Percentage_Yes_Basement"),
      (count(when(col("GARAGE") == "Y", 1))/count(lit(1))*100).alias("Percentage_Yes_Garage"),
      (count(when(col("TAX_EXEMPT") == "Y", 1))/count(lit(1))*100).alias("Percentage_Yes_Tax_Exempt")
     )
 .orderBy(col('Monit_MAP_CODE1').asc(), col('Year').desc())
)
air_cooling = ['FORCED AIR','Forced Air','STEAM W A/', 'OTHER W A/']
ac_cooling = ['A/CON','1 AC UNIT','2 AC UNITS', '3 AC UNITS','4 AC UNITS','WALL MOUNT']
central_cooling = ['CENTRAL','CNTRL','CEN. REFRI','CENTRAL W/AIR COND','CEN.REFRIG','CEN.EVAP']
other_cooling = ['RAD/BBELEC','GRAVITY/WA','PKG RF TOP','PKG RF TOP','HOT WATER','Evaporative Cooling','Space Heater','CHILL WATR', 'Y', 'Unknown']
air_heating = ['FORCED AIR','FA Gas','Forced Air','Forced Air','STEAM W A/','Oil F.A.','HOT AIR',
               'AIR DUCTED','ENG F AIR','FHA','CONVECTION','OTHER W A/','Forced Air Furnace', 'FHA Gas', 'FRC AIR ND',]
space_heating = ['SPACE HTR','SPACE HEAT','Radiant Space Heaters','Space Heater','SP HT W/FN','SPACE-FAN'] 
water_heating = ['HOT WATER','HOT WATER','H. Water','RAD WATER','STEAM','Evaporative Cooling',
                 'Baseboard, Hot Water','ENG STEAM','HEAT PUMP', 'Hot Water']
electic_heating = ['Electric','ELECTRIC','Electric Baseboard','ELEC BASBD','RAD ELEC','ELEC WALL']
other_heating = ['RAD/BBELEC','Yes','Gravity','RAD INFRED','Other','Wood','IN FLOOR','Y',
                 'Complete HVAC','GRAVITY/WA','Package Unit', 'Gravity Furnace','GEO THERM','Solar','LP']
no_heating = ['0','No','NONE', "N"]

categorical_summaries = (all_parcel_df
 .select('Monit_MAP_CODE1','Year', 'COOLING', 'HEATING')
 .withColumn("cooling_type", when(col("COOLING").isin(air_cooling),"air")
                            .when(col("COOLING").isin(ac_cooling),"ac")
                            .when(col("COOLING").isin(central_cooling),"central")
                            .when(col("COOLING").isin(other_cooling),"other")
                            .when(col("COOLING") == "NONE", "N")
                            .otherwise(col("COOLING"))
            )
 .withColumn("heating_type", when(col("HEATING").isin(air_heating),"air")
                            .when(col("HEATING").isin(space_heating),"space")
                            .when(col("HEATING").isin(water_heating),"water")
                            .when(col("HEATING").isin(electic_heating),"electric")
                            .when(col("HEATING").isin(other_heating),"other")
                            .when(col("HEATING").isin(no_heating), "no_heating")
                            .otherwise(col("HEATING"))
            )
 .groupBy('Monit_MAP_CODE1','Year')
 .agg((count(when(col("cooling_type") == "air", 1))/count(lit(1))*100).alias("Percentage_Air_Cooling"),
      (count(when(col("cooling_type") == "ac", 1))/count(lit(1))*100).alias("Percentage_AC_Cooling"),
      (count(when(col("cooling_type") == "central", 1))/count(lit(1))*100).alias("Percentage_Central_Cooling"),
      (count(when(col("cooling_type") == "other", 1))/count(lit(1))*100).alias("Percentage_Other_Cooling"),
      (count(when(col("cooling_type") == "N", 1))/count(lit(1))*100).alias("Percentage_No_Cooling"),
      
      (count(when(col("heating_type") == "air", 1))/count(lit(1))*100).alias("Percentage_Air_Heating"),
      (count(when(col("heating_type") == "space", 1))/count(lit(1))*100).alias("Percentage_Space_Heating"),
      (count(when(col("heating_type") == "water", 1))/count(lit(1))*100).alias("Percentage_Water_Heating"),
      (count(when(col("heating_type") == "electric", 1))/count(lit(1))*100).alias("Percentage_Electric_Heating"),
      (count(when(col("heating_type") == "other", 1))/count(lit(1))*100).alias("Percentage_Other_Heating"),
      (count(when(col("heating_type") == "no_heating", 1))/count(lit(1))*100).alias("Percentage_No_Heating"),
     )
 .orderBy(col('Monit_MAP_CODE1').asc(), col('Year').desc())
)

all_summaries = (binary_summaries
.join(categorical_summaries, on=['Monit_MAP_CODE1','Year'], how='left')
.orderBy(col('Monit_MAP_CODE1').asc(), col('Year').desc())
)

(all_summaries
 .write
 .partitionBy('Monit_MAP_CODE1','Year')
 .mode('overwrite')
 .parquet('./data/parcel_categorical_summaries.parquet')
)

num_summaries = read_data_frame('data/parcel_numerical_summaries.parquet')
cat_summaries = read_data_frame('data/parcel_categorical_summaries.parquet')
lake_summaries = read_data_frame('data/water_quality_by_year.parquet')

water_quality_and_parcel_summaries = (lake_summaries
 .join(num_summaries, on=[lake_summaries['DNR_ID_Site_Number'] == num_summaries['Monit_MAP_CODE1'],
                          lake_summaries['Year'] == num_summaries['Year']], how='right')
 .drop(lake_summaries.Year)
 .join(cat_summaries, on=['Monit_MAP_CODE1','Year'], how='left')
 .drop('DNR_ID_Site_Number')
 .orderBy(col('Monit_MAP_CODE1').asc(), num_summaries['Year'].desc())
)

summaries_pandas = water_quality_and_parcel_summaries.collect() >> to_pandas

(water_quality_and_parcel_summaries
 .write
 .mode('overwrite')
 .csv('./data/water_quality_and_parcel_summaries_2004_to_2014.csv')
)

summaries_pandas.to_csv('./data/water_quality_and_parcel_summaries_2004_to_2015.csv')

In [17]:
import re
from utility import get_year, make_data_frame
from composable.sequence import reduce, to_list
from composable.strict import map, filter, sorted
from pyspark.sql.functions import year
from pyspark.sql.functions import col, avg, stddev, count, when, sum, max, lit
from composable.glob import glob
from more_pyspark import to_pandas

In [15]:
#Refactoring code
find_year_gt_2003 = lambda parcel_file: int(get_year(parcel_file)) > 2003
make_all_parcel_df = lambda parcel_data_file: make_data_frame(parcel_data_file)
make_column_set = lambda parcel_data_frame: set(parcel_data_frame.columns)
find_intersection = lambda acc, s: acc.intersection(s)

distance_categories = (when(col('Distance_Parcel_Lake_meters') <= 500, 'within 500m')
                         .when((col('Distance_Parcel_Lake_meters') > 500)
                         & (col('Distance_Parcel_Lake_meters') <= 1600), 'between 501-1600m').otherwise('over 1600m'))
create_year_parcel_w_xref_tuple = lambda parcel_data_file: (get_year(parcel_data_file),parcel_join_with_xref(parcel_data_file))

filter_water_quality_approved = (col('Secchi_Depth_QUALIFIER') == 'Approved') & (col('Total_Phosphorus_QUALIFIER') == 'Approved')
calculate_avg_secchi = avg('Secchi_Depth_RESULT').alias('Mean_Secchi_Depth_Result')
calculate_avg_phosphorus = avg('Total_Phosphorus_RESULT').alias('Mean_Phosporus_Result')

extract_year = re.compile(r'data/parcel_(\d{4}).parquet')
year_val = lambda path: extract_year.search(path).group(1)

year_filter = lambda parcel_file: 2003 < int(year_val(parcel_file)) < 2015 
read_data_frame = lambda parcel_data_file : spark.read.parquet(parcel_data_file)

filter_lakes_w_complete_info = lambda df: df.where(col('Monit_MAP_CODE1').isin(lakes_w_complete_info))
filter_within_1600m_distance = lambda df: df.where(col('distance_categories').isin(['within 500m','between 501-1600m']))
drop_columns = lambda df: df.drop('centroid_lat', 'centroid_long', 'distance_categories', 'Distance_Parcel_Lake_meters')

union_data_frames = lambda acc, df: acc.union(df)

file_paths = './data/parcel_2*.parquet'

avg_emv = avg(col('EMV_TOTAL')).alias('Mean_EMV_Total')
std_emv = stddev(col('EMV_TOTAL')).alias('STD_EMV_Total')
avg_sale = avg(col('SALE_VALUE')).alias('Mean_Sale_Value')
std_sale = stddev(col('SALE_VALUE')).alias('STD_Sale_Value')
avg_tax = avg(col('TOTAL_TAX')).alias("Mean_Total_Tax")
std_tax = stddev(col('TOTAL_TAX')).alias("STD_Total_Tax")
avg_garage = avg(col('GARAGESQFT')).alias('Mean_Garage_Size')
std_garage = stddev(col('GARAGESQFT')).alias('STD_Garage_Size')
avg_finished_sqft = avg(col('FIN_SQ_FT')).alias("Mean_Fin_SQ_FT")
std_finished_sqft = stddev(col('FIN_SQ_FT')).alias("STD_Fin_SQ_FT")

percentage_yes_basement=(count(when(col("BASEMENT") == "Y", 1))/count(lit(1))*100).alias("Percentage_Yes_Basement")
percentage_yes_garage=(count(when(col("GARAGE") == "Y", 1))/count(lit(1))*100).alias("Percentage_Yes_Garage")
percentage_yes_tax_exempt=(count(when(col("TAX_EXEMPT") == "Y", 1))/count(lit(1))*100).alias("Percentage_Yes_Tax_Exempt")

air_cooling = ['FORCED AIR','Forced Air','STEAM W A/', 'OTHER W A/']
ac_cooling = ['A/CON','1 AC UNIT','2 AC UNITS', '3 AC UNITS','4 AC UNITS','WALL MOUNT']
central_cooling = ['CENTRAL','CNTRL','CEN. REFRI','CENTRAL W/AIR COND','CEN.REFRIG','CEN.EVAP']
other_cooling = ['RAD/BBELEC','GRAVITY/WA','PKG RF TOP','PKG RF TOP','HOT WATER','Evaporative Cooling','Space Heater','CHILL WATR', 'Y', 'Unknown']
air_heating = ['FORCED AIR','FA Gas','Forced Air','Forced Air','STEAM W A/','Oil F.A.','HOT AIR',
               'AIR DUCTED','ENG F AIR','FHA','CONVECTION','OTHER W A/','Forced Air Furnace', 'FHA Gas', 'FRC AIR ND',]
space_heating = ['SPACE HTR','SPACE HEAT','Radiant Space Heaters','Space Heater','SP HT W/FN','SPACE-FAN'] 
water_heating = ['HOT WATER','HOT WATER','H. Water','RAD WATER','STEAM','Evaporative Cooling',
                 'Baseboard, Hot Water','ENG STEAM','HEAT PUMP', 'Hot Water']
electic_heating = ['Electric','ELECTRIC','Electric Baseboard','ELEC BASBD','RAD ELEC','ELEC WALL']
other_heating = ['RAD/BBELEC','Yes','Gravity','RAD INFRED','Other','Wood','IN FLOOR','Y',
                 'Complete HVAC','GRAVITY/WA','Package Unit', 'Gravity Furnace','GEO THERM','Solar','LP']
no_heating = ['0','No','NONE', "N"]

refactor_cooling_types = (when(col("COOLING").isin(air_cooling),"air")
                          .when(col("COOLING").isin(ac_cooling),"ac")
                          .when(col("COOLING").isin(central_cooling),"central")
                          .when(col("COOLING").isin(other_cooling),"other")
                          .when(col("COOLING") == "NONE", "N")
                          .otherwise(col("COOLING")))
refactor_heating_types = (when(col("HEATING").isin(air_heating),"air")
                          .when(col("HEATING").isin(space_heating),"space")
                          .when(col("HEATING").isin(water_heating),"water")
                          .when(col("HEATING").isin(electic_heating),"electric")
                          .when(col("HEATING").isin(other_heating),"other")
                          .when(col("HEATING").isin(no_heating), "no_heating")
                          .otherwise(col("HEATING")))
percentage_air_cooling=(count(when(col("cooling_type") == "air", 1))/count(lit(1))*100).alias("Percentage_Air_Cooling")
percentage_ac_cooling=(count(when(col("cooling_type") == "ac", 1))/count(lit(1))*100).alias("Percentage_AC_Cooling")
percentage_central_cooling=(count(when(col("cooling_type") == "central", 1))/count(lit(1))*100).alias("Percentage_Central_Cooling")
percentage_other_cooling=(count(when(col("cooling_type") == "other", 1))/count(lit(1))*100).alias("Percentage_Other_Cooling")
percentage_no_cooling=(count(when(col("cooling_type") == "N", 1))/count(lit(1))*100).alias("Percentage_No_Cooling")
      
percentage_air_heating=(count(when(col("heating_type") == "air", 1))/count(lit(1))*100).alias("Percentage_Air_Heating")
percentage_space_heating=(count(when(col("heating_type") == "space", 1))/count(lit(1))*100).alias("Percentage_Space_Heating")
percentage_water_heating=(count(when(col("heating_type") == "water", 1))/count(lit(1))*100).alias("Percentage_Water_Heating")
percentage_electric_heating=(count(when(col("heating_type") == "electric", 1))/count(lit(1))*100).alias("Percentage_Electric_Heating")
percentage_other_heating=(count(when(col("heating_type") == "other", 1))/count(lit(1))*100).alias("Percentage_Other_Heating")
percentage_no_heating=(count(when(col("heating_type") == "no_heating", 1))/count(lit(1))*100).alias("Percentage_No_Heating")

In [18]:
#Finding common column between datasets and writing it to a python file
common_cols_set = ('./data/MinneMUDAC_raw_files/*parcels.txt' >> glob 
                   >> filter(find_year_gt_2003) >> map(make_all_parcel_df) 
                   >> map(make_column_set) >> reduce(find_intersection)
              )
sorted_common_cols_list = (common_cols_set >> to_list() >> sorted)

with open('parcel.py', 'w') as f: 
    f.write(f'common_columns_2004_to_2015 = {common_cols_set}')
    f.write('\n')
    f.write(f'sorted_common_columns_2004_to_2015 = {sorted_common_cols_list}')
    
# writing xref file to parquet
xref_data = spark.read.csv('./data/MinneMUDAC_raw_files/Parcel_Lake_Monitoring_Site_Xref.txt',header = True, sep='\t')
xref_data_w_distance_var = (xref_data
 .select('Monit_MAP_CODE1','Distance_Parcel_Lake_meters','centroid_long','centroid_lat')
 .withColumn("distance_categories", distance_categories))

(xref_data_w_distance_var.write.partitionBy('Monit_MAP_CODE1','distance_categories').mode('overwrite')
 .parquet('./data/xref.parquet'))

# writing parcel by year to parquet
def parcel_join_with_xref(file_path):
    return (make_data_frame(file_path)
                 .select(sorted_common_columns_2004_to_2015)
                 .join(xref_data_w_distance_var, on=["centroid_lat", "centroid_long"], how='inner'))

def create_partition(year, df):
    (df
    .write
    .partitionBy('Monit_MAP_CODE1','distance_categories')
    .mode('overwrite')
    .parquet(f'./data/parcel_{year}.parquet')
    )
    
('./data/MinneMUDAC_raw_files/*parcels.txt' >> glob >> filter(find_year_gt_2003) 
 >> map(create_year_parcel_w_xref_tuple) >> star_map(create_partition))

#water quality analysis and writing to parquet
water_quality = spark.read.csv('data/MinneMUDAC_raw_files/mces_lakes_1999_2014.txt', sep='\t', header=True)

new_water_quality = (water_quality.where(filter_water_quality_approved).withColumn('Year', year(col('END_DATE')))
    .where(col('Year') > 2003).groupBy('LAKE_NAME', 'DNR_ID_Site_Number', 'Year')
    .agg(calculate_avg_secchi,calculate_avg_phosphorus)
)
complete_lakes = (new_water_quality.groupBy('LAKE_NAME', 'DNR_ID_Site_Number').count().where(col('count') == 11))

lakes_w_complete_info = list(complete_lakes.select('DNR_ID_Site_Number').toPandas()['DNR_ID_Site_Number'])

with open('lake.py', 'w') as f: 
    f.write(f'lakes_w_complete_info = {lakes_w_complete_info}')
    
(new_water_quality
 .where(col('DNR_ID_Site_Number').isin(lakes_w_complete_info)).write.partitionBy('Year').mode('overwrite')
 .parquet(f'./data/water_quality_by_year.parquet'))


# aggregating summaries and writing to parquet and csv

parcel_data_frame = (file_paths
 >> glob >> filter(year_filter) >> map(read_data_frame) >> map(filter_lakes_w_complete_info)
 >> map(filter_within_1600m_distance) >> map(drop_columns)>> reduce(union_data_frames))

(parcel_data_frame.write.partitionBy('Monit_MAP_CODE1','Year').mode('overwrite')
 .parquet('./data/parcels_combined.parquet'))

num_summaries = (parcel_data_frame
 .groupBy('Monit_MAP_CODE1','Year')
 .agg(avg_emv,std_emv,avg_sale,std_sale,avg_tax,std_tax,avg_garage,std_garage,avg_finished_sqft,std_finished_sqft)
 .orderBy(col('Monit_MAP_CODE1').asc(), col('Year').desc())
)

(num_summaries.write.partitionBy('Monit_MAP_CODE1','Year').mode('overwrite')
 .parquet('./data/parcel_numerical_summaries.parquet'))

binary_summaries = (parcel_data_frame
.withColumn("GARAGE", when(col("GARAGESQFT") > 0, "Y").otherwise("N"))
.groupBy('Monit_MAP_CODE1','Year')
.agg(percentage_yes_basement,percentage_yes_garage,percentage_yes_tax_exempt)
.orderBy(col('Monit_MAP_CODE1').asc(), col('Year').desc()))

categorical_summaries = (parcel_data_frame
 .select('Monit_MAP_CODE1','Year', 'COOLING', 'HEATING')
 .withColumn("cooling_type", refactor_cooling_types)
 .withColumn("heating_type", refactor_heating_types)
 .groupBy('Monit_MAP_CODE1','Year')
 .agg(percentage_air_cooling, percentage_ac_cooling,percentage_central_cooling,percentage_other_cooling,
      percentage_no_cooling,percentage_air_heating,percentage_space_heating,percentage_water_heating,
      percentage_electric_heating,percentage_other_heating,percentage_no_heating)
 .orderBy(col('Monit_MAP_CODE1').asc(), col('Year').desc())
)

all_summaries = (binary_summaries
.join(categorical_summaries, on=['Monit_MAP_CODE1','Year'], how='left')
.orderBy(col('Monit_MAP_CODE1').asc(), col('Year').desc())
)

(all_summaries.write.partitionBy('Monit_MAP_CODE1','Year').mode('overwrite')
 .parquet('./data/parcel_categorical_summaries.parquet'))

lake_summaries = read_data_frame('data/water_quality_by_year.parquet')

water_quality_and_parcel_summaries = (lake_summaries
 .join(num_summaries, on=[lake_summaries['DNR_ID_Site_Number'] == num_summaries['Monit_MAP_CODE1'],
                          lake_summaries['Year'] == num_summaries['Year']], how='right')
 .drop(lake_summaries.Year)
 .join(all_summaries, on=['Monit_MAP_CODE1','Year'], how='left')
 .drop('DNR_ID_Site_Number')
 .orderBy(col('Monit_MAP_CODE1').asc(), num_summaries['Year'].desc()))

summaries_pandas = water_quality_and_parcel_summaries.collect() >> to_pandas

(water_quality_and_parcel_summaries.write.mode('overwrite')
 .csv('./data/water_quality_and_parcel_summaries_2004_to_2014.csv'))

summaries_pandas.to_csv('./data/water_quality_and_parcel_summaries_2004_to_2015.csv')

                                                                                

## Deliverables.

Make sure you have pushed all of your lab notebooks, along with the final combined `CSV` to the GitHub Classroom repo.  Submit a WORD document on D2L that includes

1. A link to your repository.
2. Screen shots of verifying the construction of the larger parquet files. You don't (and probably can't) record all of the folders/files, but should be able to capture the basic structure/partitioning.