# About this notebook

This notebook was used to prototype logic for the data pipeline interactively.  It is preserved for reference, but should not be expected to run as written because the package implementation has evolved over the lifetime of the notebook.  For a tutorial on how to run the pipeline or use the package methods, see {TODO}.  For the data quality expectations development notebook, look at `expectations.ipynb`

In [55]:
import os
import re

import great_expectations as ge
import numpy as np
import pandas as pd

from typing import Tuple


import rad_pipeline.rad_pipeline as rp
import rad_pipeline.zipcodes as zc

In [2]:
import importlib
importlib.reload(rp)
importlib.reload(zc)

<module 'rad_pipeline.zipcodes' from '/Users/alexhasha/repos/massenergize/rad_pipeline/rad_pipeline/zipcodes.py'>

### Output data structure

Because the datasets differ by fields provided, and so some will offer richer metrics than others, and 
because we may want to present data aggregated at multiple levels, I propose the following output data structure:

- locale: str (e.g. "02186" or "Milton" or "Norfolk County")
- zipcodes: List[str], list of zipcodes contained in the locale
- technology: str (e.g. "ASHP" or "Solar Panels")
- sector: str (e.g. "Residential", "Commercial", "Municipal", "Industrial", etc.)
- metric_name: str, The name of the metric (e.g. "Total Cost" or "Percent income support")
- value: decimal  (One could imagine wanting to compute metrics of non-numeric type, but we can deal with that separately)
- value_unit: str (e.g., Dollars, kWh, Count, BTU/h, etc)
- start_date: datetime, beginning of time period of aggregation
- end_date: datetime, end of time period of aggregation
- data_update_date: datetime, date of most recent update of data source
- data_source_id: int (identifier of raw data source metric was calculated from)


**Aggregate quantities of interest at the municipality level**

* Quantity
* Total Rebates
* Average Rebate
* Total Cost
* Average Cost
* Quantity Income-Eligible

## Air-source Heat Pumps

In [34]:
source = "Air-source Heat Pumps"
ashp_cleaned = rp.clean_data_load(source)
field_map = rp.FIELDS[source]

In [None]:
def summarize_ashp(df_cleaned: pd.DataFrame, locale_field: str) -> pd.DataFrame:
    """
    
    Output: DataFrame
    - locale: str (e.g. "02186" or "Milton" or "Norfolk County")
    - zipcodes: List[str], list of zipcodes contained in the locale  <- This is a function of locale and should be in a separate table
    - technology: str (="Air-source Heat Pumps")
    - sector: str (="Unknown")
    - metric_name: str, The name of the metric (e.g. "Total Cost" or "Percent income support")
    - value: decimal  (One could imagine wanting to compute metrics of non-numeric type, but we can deal with that separately)
    - value_unit: str (e.g., Dollars, kWh, Count, BTU/h, etc)
    - start_date: datetime, beginning of time period of aggregation
    - end_date: datetime, end of time period of aggregation
    - data_update_date: datetime, date of most recent update of data source
    - data_source_id: int (identifier of raw data source metric was calculated from)
    """
    assert False

df_cleaned = ashp_cleaned

In [5]:
## Zip aggregation
locale_field = "zip_cleaned"

In [7]:
# Quantity
quantity = ashp_cleaned.groupby(locale_field)['zip_cleaned'].count()
#* Total Rebates
total_rebates = ashp_cleaned.groupby(locale_field)["rebate"].sum()
#* Average Rebate
average_rebate = ashp_cleaned.groupby(locale_field)['rebate'].mean()
#* Total Cost
total_cost = ashp_cleaned.groupby(locale_field)['cost'].sum()
#* Average Cost
average_cost = ashp_cleaned.groupby(locale_field)['cost'].mean()

In [29]:
zipcodes = ashp_cleaned.\
            groupby("town")['zip_cleaned'].\
            apply(lambda x: list(np.unique(x))).\
            rename_axis("locale")

In [30]:
zipcodes

locale
Abington                                                   [02351]
Acton                                                      [01720]
Acushnet                                                   [02743]
Adams                                                      [01220]
Agawam                                                     [01001]
                                       ...                        
Woods Hole                                                 [02543]
Worcester        [01602, 01603, 01604, 01605, 01606, 01607, 016...
Worthington                                                [01098]
Wrentham                                                   [02093]
Yarmouth Port                                              [02675]
Name: zip_cleaned, Length: 425, dtype: object

In [32]:
ashp_cleaned.columns

Index(['zip_cleaned', 'zip4_cleaned', 'zip_valid',
       'Date Rebate Payment Approved by MassCEC', 'Site City/Town',
       'Site Zip Code', 'Installer Company Name',
       'Heating Fuel Being Replaced', 'Cooling Type Being Replaced',
       '# of Outdoor Units', '# of Indoor Units',
       'Capacity of Heat Pumps at 5°F', 'Single- Head Heat Pump #1',
       'Single- Head Heat Pump #2', 'Single- Head Heat Pump #3',
       'Multi-Head Heat Pump #1', 'Multi-Head Heat Pump #2',
       'Multi-Head Heat Pump #3', 'Total System Costs',
       'Receiving an Income-Based Adder?', 'Rebate Amount ', 'town',
       'zip_exists', 'town_valid', 'rebate', 'cost'],
      dtype='object')

In [36]:
start_date = ashp_cleaned.\
                groupby("town")[field_map['date']].\
                min().\
                rename_axis("locale").rename("start_date")
start_date

locale
Abington        2016-03-16
Acton           2015-05-06
Acushnet        2016-01-13
Adams           2018-10-11
Agawam          2015-02-05
                   ...    
Woods Hole      2015-07-07
Worcester       2015-08-19
Worthington     2017-05-17
Wrentham        2015-08-19
Yarmouth Port   2015-01-08
Name: start_date, Length: 425, dtype: datetime64[ns]

In [37]:
end_date = ashp_cleaned.\
            groupby("town")[field_map['date']].\
            max().\
            rename_axis("locale").rename("end_date")

In [41]:
quantity = ashp_cleaned.groupby("town")['zip_cleaned'].count().rename_axis("locale").rename("value")
quantity

locale
Abington          23
Acton            130
Acushnet          44
Adams              1
Agawam            44
                ... 
Woods Hole        10
Worcester        243
Worthington        8
Wrentham          43
Yarmouth Port     45
Name: value, Length: 425, dtype: int64

In [47]:
result = pd.DataFrame(data={
    "zipcodes": zipcodes, 
    "start_date": start_date, 
    "end_date": end_date, 
    "value": quantity}
)
result["value_unit"] = "count"
result["technology"] = source
result["sector"] = "Residential"
result["metric_name"] = "Quantity"
result

Unnamed: 0_level_0,zipcodes,start_date,end_date,value,value_unit,technology,sector,metric_name
locale,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
Abington,[02351],2016-03-16,2019-04-10,23,count,Air-source Heat Pumps,unknown,Quantity
Acton,[01720],2015-05-06,2019-05-08,130,count,Air-source Heat Pumps,unknown,Quantity
Acushnet,[02743],2016-01-13,2019-06-26,44,count,Air-source Heat Pumps,unknown,Quantity
Adams,[01220],2018-10-11,2018-10-11,1,count,Air-source Heat Pumps,unknown,Quantity
Agawam,[01001],2015-02-05,2019-05-01,44,count,Air-source Heat Pumps,unknown,Quantity
...,...,...,...,...,...,...,...,...
Woods Hole,[02543],2015-07-07,2019-05-29,10,count,Air-source Heat Pumps,unknown,Quantity
Worcester,"[01602, 01603, 01604, 01605, 01606, 01607, 016...",2015-08-19,2019-05-29,243,count,Air-source Heat Pumps,unknown,Quantity
Worthington,[01098],2017-05-17,2019-01-30,8,count,Air-source Heat Pumps,unknown,Quantity
Wrentham,[02093],2015-08-19,2019-06-05,43,count,Air-source Heat Pumps,unknown,Quantity


In [53]:
result.loc["Milton"]

zipcodes                     [02186]
start_date       2015-03-12 00:00:00
end_date         2019-05-15 00:00:00
value                             85
value_unit                     count
technology     Air-source Heat Pumps
sector                       unknown
metric_name                 Quantity
Name: Milton, dtype: object

In [57]:
groups = ashp_cleaned.groupby("town")

In [58]:
type(groups)

pandas.core.groupby.generic.DataFrameGroupBy

In [61]:
pd.core.groupby.generic.DataFrameGroupBy

pandas.core.groupby.generic.DataFrameGroupBy

In [54]:
result.loc["Wayland"]


zipcodes                     [01778]
start_date       2015-09-03 00:00:00
end_date         2019-08-21 00:00:00
value                             77
value_unit                     count
technology     Air-source Heat Pumps
sector                       unknown
metric_name                 Quantity
Name: Wayland, dtype: object

In [63]:
def locale_aggregation(df_cleaned: pd.DataFrame, locale_field: str, source: str) -> Tuple[pd.core.groupby.generic.DataFrameGroupBy, pd.DataFrame]:
    field_map = rp.FIELDS[source]
    groups = df_cleaned.groupby(locale_field)
    zipcodes = groups['zip_cleaned'].\
                    apply(lambda x: list(np.unique(x))).\
                    rename_axis("locale")
    start_date = groups[field_map['date']].\
                min().\
                rename_axis("locale").rename("start_date")
    end_date = groups[field_map['date']].\
                    max().\
                    rename_axis("locale").rename("end_date")
    
    result = pd.DataFrame(data = {
        "zipcodes": zipcodes, 
        "start_date": start_date, 
        "end_date": end_date, 
    })
    
    return groups, result
    
    

In [74]:
groups, town_base = locale_aggregation(ashp_cleaned, "town", "Air-source Heat Pumps")

In [77]:
groups['rebate'].count()

town
Abington          23
Acton            130
Acushnet          44
Adams              1
Agawam            44
                ... 
Woods Hole        10
Worcester        243
Worthington        8
Wrentham          43
Yarmouth Port     45
Name: rebate, Length: 425, dtype: int64

In [79]:
metric_group = result.copy()
metric_group["value_unit"] = "count"
metric_group["technology"] = source
metric_group["sector"] = "Residential"
metric_group["metric_name"] = "Quantity"
metric_group["value"] = groups['rebate'].count()

In [80]:
result.columns

Index(['zipcodes', 'start_date', 'end_date'], dtype='object')

In [81]:
metric_group.columns

Index(['zipcodes', 'start_date', 'end_date', 'value_unit', 'technology',
       'sector', 'metric_name', 'value'],
      dtype='object')

In [69]:
groups, zip_base = locale_aggregation(ashp_cleaned, "zip_cleaned", "Air-source Heat Pumps")

In [67]:
result

Unnamed: 0_level_0,zipcodes,start_date,end_date
locale,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
01001,[01001],2015-02-05,2019-05-01
01002,[01002],2014-12-26,2019-06-19
01004,[01004],2015-05-13,2019-05-29
01005,[01005],2015-01-15,2019-05-15
01007,[01007],2015-02-05,2019-08-14
...,...,...,...
02770,[02770],2015-11-18,2019-05-29
02771,[02771],2015-03-12,2019-04-24
02777,[02777],2015-06-09,2019-05-29
02790,[02790],2015-03-26,2019-06-26


## Ground-source Heat Pumps

In [None]:
gshp_cleaned = rp.clean_data_load("Ground-source Heat Pumps")

In [None]:
result = ge.from_pandas(gshp_cleaned).validate("../data/expectations/gshp_clean_expectations.json", )
print(result)
assert result.success

In [85]:
for source in rp.CLEAN_DATA_FILES.keys():
    try:
        df_cleaned = rp.clean_data_load(source)
        print(f"Loaded {source}")
    except FileNotFoundError:
        print(f"Skipping {source}")
        continue
    
    for locale_field in ["town", "zip_cleaned"]:
        
        metric_groups = []
        groups, locale_base = locale_aggregation(df_cleaned, locale_field, source)
        
        metric_group = result.copy()
        metric_group["value_unit"] = "count"
        metric_group["technology"] = source
        metric_group["sector"] = "Residential"
        metric_group["metric_name"] = "Quantity"
        metric_group["value"] = groups['rebate'].count()
    

Loaded Air-source Heat Pumps
Skipping Solar Panels
Skipping Ground-source Heat Pumps
Skipping EVs
