# Catalyst Cooperative Jupyter Notebook Template
This notebook lays out a standard format and some best practices for creating interactive / exploratory notebooks which can be relatively easily shared between different PUDL users, and turned into reusable Python modules for integration into our underlying Python packages.

## Begin with a narrative outline
Each notebook should start with a brief explanation (in Markdown) explaining the purpose of the analysis, and outlining the different stages / steps which are taken to accomplish the analysis.
As the analysis develops, you can add new sections or details to this section.

## Notebooks should be runnable
Insofar as it's possible, another PUDL user who has cloned the repository that the notebook is part of should be able to update their `pudl-dev` conda environment, open the notebook, and run all cells successfully.
If there are required data or other prerequisites that the notebook cannot manage on its own -- like a file that needs to be downloaded by hand and placed in a particular location -- those steps should be laid out clearly at the beginning of the notebook.

## Avoid Troublesome Elements

### Don't hardcode passwords or access tokens
Most of our work is done in public Github repositories.
No authentication information should ever appear in a notebook.
These values can be stored in environment variables on your local computer.

### Do not hardocde values, especially late in the notebook
If the analysis depends on particular choices of input values, those should be called out explicitly at the beginning of the notebook.
(NB: We should explore ways to parameterize notebooks, [papermill](https://papermill.readthedocs.io/en/latest/) is one tool that does this).

### Don't hardcode absolute file paths
If anyone is going to be able to use the notebook, the files it uses will need to be stored somewhere that makes sense on both your and other computers.
We assume that anyone using this template has the PUDL package installed, and has a local PUDL data management environment set up.
  * Input data that needs to be stored on disk and accessed via a shared location should be saved under `<PUDL_IN>/data/local/<data_source>/`.
  * Intermediate saved data products (e.g. a pickled result of a computationally intensive process) and results should be saved to a location relative to the notebook, and within the directory hierarchy of the repository that the notebook is part of.
  
### Don't require avoidable long-running computations
Consider persisting to disk the results of computations that take more than a few minutes, if the outputs are small enough to be checked into the repository and shared with other users.
Only require the expensive computation to be run if this pre-computed output is not available.

### Don't litter
Don't leave lots of additional code laying around, even commented out, "just in case" you want to look at it again later.
Notebooks need to be relatively linear in the end, even though the thought processes and exploratory analyses that generate them may not be.
Once you have a working analysis, either prune those branches, or encapsulate them as options within functions.

### Don't load unneccesary libraries
Only import libraries which are required by the notebook, to avoid unnecessary dependencies.
If your analysis requires a new library that isn't yet part of the shared `pudl-dev` environment, add it to the `devtools/environment.yml` file so that others will get it when they update their environment.

## Related Resources:
Lots of these guidelines are taken directly from Emily Riederer's post: [RMarkdown Driven Development](https://emilyriederer.netlify.app/post/rmarkdown-driven-development/).
For more in depth explanation of the motivations behind this layout, do go check it out!

# Import Libraries
* Because it's very likely that you will be editing the PUDL Python packages or your own local module under development while working in the notebook, use the %autoreload magic with autoreload level 2 to ensure that any changes you've made in those files are always reflected in the code that's running in the notebook.
* Put all import statements at the top of the notebook, so everyone can see what its dependencies are up front, and so that if an import is going to fail, it fails early, before the rest of the notebook is run.
* Try to avoid importing individual functions and classes from deep within packages, as it may not be clear to other users where those elements came from, later in the notebook, and also to minimize the chance of namespace collisions.

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
# Standard libraries
import logging
import os
import pathlib
import sys

# 3rd party libraries
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import pandas as pd
import seaborn as sns
import sqlalchemy as sa
from functools import reduce

# Local libraries
import pudl
import pudl.constants as pc

# Configure Display Parameters

In [3]:
sns.set()
%matplotlib inline
mpl.rcParams['figure.figsize'] = (10,4)
mpl.rcParams['figure.dpi'] = 150
pd.options.display.max_columns = 100
pd.options.display.max_rows = 100

# Use Python Logging facilities
* Using a logger from the beginning will make the transition into the PUDL package easier.
* Creating a logging handler here will also allow you to see the logging output coming from PUDL and other underlying packages.

In [4]:
logger=logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(message)s')
handler.setFormatter(formatter)
logger.handlers = [handler]

# Define Functions
In many cases, the eventual product of a notebook analysis is going to be the creation of new, reusable functions that are integrated into the underlying PUDL code. You should begin the process of accumulating and organizing those functions as soon as you notice repeated patterns in your code.
* Functions should be used to encapsulate any potentially reusable code.
* Functions should be defined immediately after the imports, to avoid accidental dependence on zombie variables that are defined further down in the code
* While they may evolve over time, having brief docstrings explaining what they do will help others understand them.
* If there's a particular type of plot or visualization that is made repeatedly in the notebook, it's good to parameterize and functionalize it here too, so that as you refine the presentation of the data and results, those improvements can be made in a single place, and shown uniformly throughout the notebook.
* As these functions mature and become more general purpose tools, you will probably want to start migrating them into their own local module, under a `src` directory in the same directory as the notebook. You will want to import this module 

In [5]:
# pudl_settings = pudl.workspace.setup.get_defaults()
# pudl_in = pathlib.Path(pudl_settings['pudl_in'])
# ds = pudl.workspace.datastore.Datastore(pudl_in, sandbox=True)
# eia861_raw_dfs = pudl.extract.eia861.Extractor(ds).extract([2019])

## Dummy EIA 861 ETL

In [6]:
def test_etl_eia(eia_inputs, pudl_settings):
    """
    This is a dummy function that runs the first part of the EIA ETL
    process -- everything up until the entity harvesting begins. For
    use in this notebook only.

    """
    eia860_tables = eia_inputs["eia860_tables"]
    eia860_years = eia_inputs["eia860_years"]
    eia861_tables = eia_inputs["eia861_tables"]
    eia861_years = eia_inputs["eia861_years"]
    eia923_tables = eia_inputs["eia923_tables"]
    eia923_years = eia_inputs["eia923_years"]

    # generate CSVs for the static EIA tables, return the list of tables
    #static_tables = _load_static_tables_eia(datapkg_dir)
    
    # For new Datastore args
    pudl_in = pathlib.Path(pudl_settings['pudl_in'])
    ds = pudl.workspace.datastore.Datastore(pudl_in, sandbox=True)
    
    # Extract EIA forms 923, 860
    eia860_raw_dfs = pudl.extract.eia860.Extractor(ds).extract(eia860_years)
    eia861_raw_dfs = pudl.extract.eia861.Extractor(ds).extract(eia861_years)
    eia923_raw_dfs = pudl.extract.eia923.Extractor(ds).extract(eia923_years)

    # Transform EIA forms 860, 861, 923
    eia860_transformed_dfs = pudl.transform.eia860.transform(eia860_raw_dfs, eia860_tables=eia860_tables)
    eia861_transformed_dfs = pudl.transform.eia861.transform(eia861_raw_dfs, eia861_tables=eia861_tables)
    eia923_transformed_dfs = pudl.transform.eia923.transform(eia923_raw_dfs, eia923_tables=eia923_tables)

    # create an eia transformed dfs dictionary
    eia_transformed_dfs = eia860_transformed_dfs.copy()
    eia_transformed_dfs.update(eia861_transformed_dfs.copy())
    eia_transformed_dfs.update(eia923_transformed_dfs.copy())

    # convert types..
    eia_transformed_dfs = pudl.helpers.convert_dfs_dict_dtypes(eia_transformed_dfs, 'eia')

    return eia860_raw_dfs, eia861_raw_dfs, eia923_raw_dfs, eia_transformed_dfs

# Define Notebook Parameters
If there are overarching parameters which determine the nature of the analysis -- which US states to look at, which utilities are of interest, a particular start and end date -- state those clearly at the beginning of the analysis, so that they can be referred to by the rest of the notebook and easily changed if need be.
* If the analysis depends on local (non-PUDL managed) datasets, define the paths to those data here.
* If there are external URLs or other resource locations that will be accessed, define those here as well.
* This is also where you should create your `pudl_settings` dictionary and connections to your local PUDL databases

In [7]:
EIA861_YEARS = list(range(2001, 2020))
EIA923_YEARS = list(range(2010, 2012))
pudl_settings = pudl.workspace.setup.get_defaults()
display(pudl_settings)

ferc1_engine = sa.create_engine(pudl_settings['ferc1_db'])
display(ferc1_engine)

pudl_engine = sa.create_engine(pudl_settings['pudl_db'])
display(pudl_engine)

pudl_out = pudl.output.pudltabl.PudlTabl(pudl_engine) # freq='monthly'/'annual' (maybe other abr.)


# Is there other external data you need to pull in?
# If so, put it in a (relatively) standard place on the filesystem.
my_new_data_url = "https://mynewdata.website.gov/path/to/new/data.csv"
my_new_datadir = pathlib.Path(pudl_settings["data_dir"]) / "local/new_data_source"

# Store API keys and other secrets in environment variables
# and read them in here, if needed:
# API_KEY_EIA = os.environ["API_KEY_EIA "]
# API_KEY_FRED = os.environ["API_KEY_FRED "]

{'pudl_in': '/Users/aesharpe/Desktop/Work/Catalyst_Coop/PUDL_DIR',
 'data_dir': '/Users/aesharpe/Desktop/Work/Catalyst_Coop/PUDL_DIR/data',
 'settings_dir': '/Users/aesharpe/Desktop/Work/Catalyst_Coop/PUDL_DIR/settings',
 'pudl_out': '/Users/aesharpe/Desktop/Work/Catalyst_Coop/PUDL_DIR',
 'sqlite_dir': '/Users/aesharpe/Desktop/Work/Catalyst_Coop/PUDL_DIR/sqlite',
 'parquet_dir': '/Users/aesharpe/Desktop/Work/Catalyst_Coop/PUDL_DIR/parquet',
 'datapkg_dir': '/Users/aesharpe/Desktop/Work/Catalyst_Coop/PUDL_DIR/datapkg',
 'notebook_dir': '/Users/aesharpe/Desktop/Work/Catalyst_Coop/PUDL_DIR/notebook',
 'ferc1_db': 'sqlite:////Users/aesharpe/Desktop/Work/Catalyst_Coop/PUDL_DIR/sqlite/ferc1.sqlite',
 'pudl_db': 'sqlite:////Users/aesharpe/Desktop/Work/Catalyst_Coop/PUDL_DIR/sqlite/pudl.sqlite'}

Engine(sqlite:////Users/aesharpe/Desktop/Work/Catalyst_Coop/PUDL_DIR/sqlite/ferc1.sqlite)

Engine(sqlite:////Users/aesharpe/Desktop/Work/Catalyst_Coop/PUDL_DIR/sqlite/pudl.sqlite)

# Load Data
* Given the above parameters and functions, it should now be possible to load the required data into local variables for further wrangling, analysis, and visualization.
* If the data is not yet present on the machine of the person running the notebook, this step should also acquire the data from its original source, and place it in the appropriate location under `<PUDL_IN>/data/local/`.
* If there are steps which have to be done manually here, put them first so that they fail first if the user hasn't read the instructions, and they can fix the situation before a bunch of other work gets done. Try to minimize the amount of work in the filesystem that has to be done manually though.
* If this process takes a little while, don't be shy about producing `logging` output.
* Using the `%%time` cell magic can also help users understand which pieces of work / data acquisition are hard:

## EIA 861 (2010-2018)
* Not yet fully integrated into PUDL
* Post-transform harvesting process isn't compatible w/ EIA 861 structure
* Only getting the `sales_eia861`, `balancing_authority_eia861`, and `service_territory_eia861` tables

### Already Transformed EIA 861 DataFrames

In [8]:
%%time
eia_inputs = {
    "eia860_years": [],
    "eia860_tables": pudl.constants.pudl_tables["eia860"],
    "eia861_years": EIA861_YEARS,
    "eia861_tables": pudl.constants.pudl_tables["eia861"],
    "eia923_years": [],
    "eia923_tables": pudl.constants.pudl_tables["eia923"],
}
eia860_raw_dfs, eia861_raw_dfs, eia923_raw_dfs, eia_transformed_dfs = test_etl_eia(eia_inputs=eia_inputs, pudl_settings=pudl_settings)

No years given. Not extracting eia860 spreadsheet data.
Extracting eia861 spreadsheet data.


The data has not yet been validated, and the structure may change.


No years given. Not extracting eia923 spreadsheet data.
No raw EIA 860 dataframes found. Not transforming EIA 860.
Transforming raw EIA 861 DataFrames for service_territory_eia861 concatenated across all years.


KeyboardInterrupt: 

### Sanity Check Data
If there's any validation that can be done on the data which you've loaded to flag if/when it is inappropriate for the analysis that follows, do it here. If you find the data is unusable, use `assert` statements or `raise` Exceptions to stop the notebook from proceeding, and indicate what the problem is.

TO DO: 
    * compare totals col - result: total_capacity col is more accurate but need total_capacity_mw for conversion of pct to       mw
    * ensure that col breakdown is acurate:
        * that all columns are accounted for
        * that the "tech" cols are actually components that sum to the total
        * check on the wierd extra cols that are the "half" components.
    * deal with / record duplicates

# RMI Data Wrangling


In [67]:
util_cols = [
    'utility_id_eia',
    'state',
    'report_date',
]
idx_ba = util_cols + ['balancing_authority_code_eia']
idx_nr = util_cols + ['nerc_region']

In [68]:
# Get rid of 2019 unnamed col for new data
for df_name, df in eia_transformed_dfs.items():
    if 'unnamed_0' in df.columns:
        eia_transformed_dfs[df_name] = (
            df.drop('unnamed_0', axis=1)
        )

# Fix reliability col to say standard
eia_transformed_dfs['reliability_eia861'] = (
    eia_transformed_dfs['reliability_eia861'].rename(columns={'standard': 'standards_class'})
)

In [69]:
# run this kernel to reset the dict
table_dict = {
    'advanced_metering_infrastructure_eia861': eia_transformed_dfs['advanced_metering_infrastructure_eia861'].copy(),
     #'balancing_authority_assn_eia861',
     #'balancing_authority_eia861': eia_transformed_dfs['balancing_authority_eia861'].copy(),
     'demand_response_eia861': eia_transformed_dfs['demand_response_eia861'].copy(),
     'demand_response_water_heater_eia861': eia_transformed_dfs['demand_response_water_heater_eia861'].copy(),
     'demand_side_management_ee_dr_eia861': eia_transformed_dfs['demand_side_management_ee_dr_eia861'].copy(),
     'demand_side_management_misc_eia861': eia_transformed_dfs['demand_side_management_misc_eia861'].copy(),
     'demand_side_management_sales_eia861': eia_transformed_dfs['demand_side_management_sales_eia861'].copy(),
     'distributed_generation_fuel_eia861': eia_transformed_dfs['distributed_generation_fuel_eia861'].copy(),
     'distributed_generation_misc_eia861': eia_transformed_dfs['distributed_generation_misc_eia861'].copy(),
     'distributed_generation_tech_eia861': eia_transformed_dfs['distributed_generation_tech_eia861'].copy(),
     'distribution_systems_eia861': eia_transformed_dfs['distribution_systems_eia861'].copy(),
     'dynamic_pricing_eia861': eia_transformed_dfs['dynamic_pricing_eia861'].copy(),
     'energy_efficiency_eia861': eia_transformed_dfs['energy_efficiency_eia861'].copy(),
     'green_pricing_eia861': eia_transformed_dfs['green_pricing_eia861'].copy(),
     'mergers_eia861': eia_transformed_dfs['mergers_eia861'].copy(),
     'net_metering_customer_fuel_class_eia861': eia_transformed_dfs['net_metering_customer_fuel_class_eia861'].copy(),
     'net_metering_misc_eia861': eia_transformed_dfs['net_metering_misc_eia861'].copy(),
     'non_net_metering_customer_fuel_class_eia861': eia_transformed_dfs['non_net_metering_customer_fuel_class_eia861'].copy(),
     'non_net_metering_misc_eia861': eia_transformed_dfs['non_net_metering_misc_eia861'].copy(),
     'operational_data_misc_eia861': eia_transformed_dfs['operational_data_misc_eia861'].copy(),
     'operational_data_revenue_eia861': eia_transformed_dfs['operational_data_revenue_eia861'].copy(),
     'reliability_eia861': eia_transformed_dfs['reliability_eia861'].copy(),
     'sales_eia861': eia_transformed_dfs['sales_eia861'].copy(),
     'service_territory_eia861': eia_transformed_dfs['service_territory_eia861'].copy(),
     #'utility_assn_eia861',
     'utility_data_misc_eia861': eia_transformed_dfs['utility_data_misc_eia861'].copy(),
     'utility_data_nerc_eia861': eia_transformed_dfs['utility_data_nerc_eia861'].copy(),
     'utility_data_rto_eia861': eia_transformed_dfs['utility_data_rto_eia861'].copy(),
}

In [70]:
unpeel_list = [
    'advanced_metering_infrastructure_eia861',
     #'balancing_authority_assn_eia861',
     #'balancing_authority_eia861',
     'demand_response_eia861',
     'demand_response_water_heater_eia861',
     'demand_side_management_ee_dr_eia861',
     'demand_side_management_misc_eia861',
     'demand_side_management_sales_eia861',
     'distributed_generation_fuel_eia861',
     'distributed_generation_misc_eia861',
     'distributed_generation_tech_eia861',
     'distribution_systems_eia861',
     'dynamic_pricing_eia861',
     'energy_efficiency_eia861',
     'green_pricing_eia861',
     'mergers_eia861',
     'net_metering_customer_fuel_class_eia861',
     'net_metering_customer_fuel_class_eia861', # because two classes
     'net_metering_misc_eia861',
     'non_net_metering_customer_fuel_class_eia861',
     'non_net_metering_customer_fuel_class_eia861', # because two classes 
     'non_net_metering_misc_eia861',
     'operational_data_misc_eia861',
     'operational_data_revenue_eia861',
     'reliability_eia861',
     'sales_eia861',
     'service_territory_eia861',
     #'utility_assn_eia861',
     'utility_data_misc_eia861',
     'utility_data_nerc_eia861',
     'utility_data_rto_eia861'
]

In [71]:
moniker_dict = {
    'advanced_metering_infrastructure_eia861': 'AMI',
     #'balancing_authority_assn_eia861',
     #'balancing_authority_eia861',
     'demand_response_eia861': 'DR',
     'demand_response_water_heater_eia861': 'DR',
     'demand_side_management_ee_dr_eia861': 'DSM',
     'demand_side_management_misc_eia861': 'DSM',
     'demand_side_management_sales_eia861': 'DSM',
     'distributed_generation_fuel_eia861': 'DG',
     'distributed_generation_misc_eia861': 'DG',
     'distributed_generation_tech_eia861': 'DG',
     'distribution_systems_eia861': 'DS',
     'dynamic_pricing_eia861': 'DP',
     'energy_efficiency_eia861': 'EE',
     'green_pricing_eia861': 'GP',
     'mergers_eia861': 'M',
     'net_metering_customer_fuel_class_eia861': 'NM',
     'net_metering_misc_eia861': 'NM',
     'non_net_metering_customer_fuel_class_eia861': 'NNM',
     'non_net_metering_misc_eia861': 'NNM',
     'operational_data_misc_eia861': 'OD',
     'operational_data_revenue_eia861': 'OD',
     'reliability_eia861': 'R',
     'sales_eia861': 'S',
     'service_territory_eia861': 'ST',
     #'utility_assn_eia861',
     'utility_data_misc_eia861': 'UD',
     'utility_data_nerc_eia861': 'UD',
     'utility_data_rto_eia861': 'UD',
}

In [72]:
def unpeel(df, df_name, class_name):
    """Make single class name column into suffix for columns - tall-to-wide reformatting"""
    logger.info(f'unpeeling {class_name} from {df_name} table')
    # Include utility_id_eia in qualitative col grab (for index)
    string_df = (
        df[['utility_id_eia']]
        .join(df.select_dtypes(exclude=['int64', 'float']))
    )

    class_name = class_name
    qual_cols = list(string_df.columns)
    qual_cols.remove(class_name)

    wide_df = (
        df.set_index(qual_cols)
        .pivot(columns=class_name)
    )
    old_cols = list(wide_df.columns.values)
    wide_df.columns= list(map('_'.join, [col[::-1] for col in old_cols]))
    #wide_df.columns = list(map('_'.join, wide_df.columns.values))
    wide_df = wide_df.reset_index()
    return wide_df

In [73]:
def check_and_unpeel(df_name):
    """Run unpeel function on tables that have a class column."""
    df = table_dict[df_name].copy()
    
    # Get rid of categorical columns
    for col in df:
        if 'category' in df[col].dtype.name:
            df[col] = df[col].astype('string')

    # Only unpeel if there is a class column.
    class_names = [col for col in df if 'class' in col]
    if len(class_names) > 0:
        wide_df = unpeel(df, df_name, class_names[0])
    else:
        wide_df = df
    
    return wide_df

In [74]:
def groupby_utils(df, df_name, util_cols):
    """Group EIA861 tables at the utility-level
    
    Some of the qualitative columns may present an aggregation challenge when
    grouping at the utility level (nerc_region and ba_code, specifically). To
    account for all of these values we'll first look to see if there are any
    instances where there are duplicate values (same utility/state/year, diff
    nerc region or ba code). If there are, we'll combine them into a single row
    EX: SERC and MISC to SERC, MISC. We single out the rows that have duplicates
    rather than running this on the whole dataframe to save time.
    
    """
    # Set N/A state values to UNK to prevent issues in the .transform() func
    df['state'] = df['state'].fillna('UNK')
    df = df.set_index(util_cols)
    
    # Separate the df columns into dtypes
    num_df = df.select_dtypes(include=['int64', 'float']).reset_index()
    qual_df = df.select_dtypes(exclude=['int64', 'float']).reset_index()
    
    # See whether any of the columns are duplicated at the utility-state-date level
    qual_df['dup'] = qual_df.duplicated(subset=util_cols, keep=False)
    
    # Divide into duplicated and non-duplicated
    dup_df = qual_df[qual_df['dup']==True]
    non_dup_df = qual_df[qual_df['dup']==False]
    
    if dup_df.empty:
        logger.info(f'{df_name} has no duplicates')
        return df.reset_index()
    else:
        logger.info(f'{df_name}')
        # Combine those that are duplicated into VAL1, VAL2 units
        dup_transformed = dup_df.groupby(util_cols).transform(lambda x: ' ,'.join(x.unique()))
        dup_grouped = (
            dup_df[util_cols]
            .drop_duplicates()
            .join(dup_transformed)
            .groupby(util_cols)
            .first()
            .reset_index()
        )
        # Grab the first value for non-duplicated values
        non_dup_grouped = non_dup_df.groupby(util_cols).first().reset_index()

        # Combine newly grouped duplicates and non duplicates
        qual_grouped = dup_grouped.append(non_dup_grouped, ignore_index=True)

        # Sum numeric columns
        num_grouped = num_df.groupby(util_cols).sum(min_count=1)

        # Merge numeric and qualitative dataframes back together
        merge_df = pd.merge(num_grouped, qual_grouped, on=util_cols).drop('dup', axis=1)
        
        return merge_df

In [75]:
def mega_merge(table_dict):
    """Merge all the EIA 861 tables together"""
    # Get the list of eia861 tables and merge them together. Add numeric suffixes to columns that repeat.
    #table_list = list(table_dict.values())
    merge_df = pd.DataFrame(columns=util_cols)
    #num = 0
    for df_name, df in table_dict.items():
        logger.info(f'merging {df_name}')
        moniker = moniker_dict[df_name]
        df = df.set_index(util_cols)
        df.columns = df.columns.map(lambda x: str(x) + f'_{moniker}_')
        merge_df = pd.merge(merge_df, df, on=util_cols, how='outer')
        #num = num+1
    
    return merge_df

In [76]:
def unpeel_group_merge():
    """Re-widen all tables, groupby utility, merge into one mega table."""
    # Go through list of tables and widen. Use unpeel_list because 
    # the non/net_metering tables have to be run twice.
    for df_name in unpeel_list:
        wide_df = check_and_unpeel(df_name)
        table_dict[df_name] = wide_df
    
    # Group each of the widened tables by utility/state/date
    for df_name, df in table_dict.items():
        wide_df = df.copy()
        util_df = groupby_utils(wide_df, df_name, util_cols)
        table_dict[df_name] = util_df
        
    # Merge wide, grouped tables together into one "mega" dataframe
    mega_df = mega_merge(table_dict)
    
    return mega_df

In [77]:
def compare_common_cols(df, col_name):
    """Turn repeat columns into one column with all values."""
    col_list = [col for col in df if col_name in col]
    col_df = df.set_index(util_cols)[col_list]
    temp_df = col_df.fillna('UNK')
    temp_df = temp_df.eq(temp_df.iloc[:, 0], axis=0)
    col_df['bool'] = temp_df.eq(temp_df.iloc[:, 0], axis=0).all(1)
    col_df_false = col_df[col_df['bool']==False].copy()
    col_df_false = col_df_false.astype('object')
    col_df_false.fillna(np.nan)
    col_df_false[col_name] = (
        col_df_false[col_df_false.columns[:-1]]
        .apply(lambda x: ', '.join(x.dropna().unique()), axis=1)
    )
    df = df.drop(col_list, axis=1)
    df = pd.merge(df, col_df_false[[col_name]], on=util_cols, how='outer')
    
    return df

In [78]:
def loop_over_common_cols(mega_df):
    
    common_cols = [
        'balancing_authority_code_eia',
        'utility_name_eia',
        'nerc_region',
        'entity_type',
    ]
    # get rid of short form cols
    logger.info('removing short form columns')
    drop_list = []
    for col in mega_df:
        if 'short_form' in col:
            drop_list.append(col)
    mega_df = mega_df.drop(drop_list, axis=1)
            
    # Compare duplicate columns in the mega table
    for col in common_cols:
        logger.info(f'comparing column values for {col}')
        mega_df = compare_common_cols(mega_df, col)
    
    return mega_df

In [79]:
mega_df = unpeel_group_merge()

unpeeling customer_class from advanced_metering_infrastructure_eia861 table
unpeeling customer_class from demand_response_eia861 table
unpeeling customer_class from demand_side_management_ee_dr_eia861 table
unpeeling fuel_class from distributed_generation_fuel_eia861 table
unpeeling tech_class from distributed_generation_tech_eia861 table
unpeeling customer_class from dynamic_pricing_eia861 table
unpeeling customer_class from energy_efficiency_eia861 table
unpeeling customer_class from green_pricing_eia861 table
unpeeling customer_class from net_metering_customer_fuel_class_eia861 table
unpeeling tech_class from net_metering_customer_fuel_class_eia861 table
unpeeling customer_class from non_net_metering_customer_fuel_class_eia861 table
unpeeling tech_class from non_net_metering_customer_fuel_class_eia861 table
unpeeling revenue_class from operational_data_revenue_eia861 table
unpeeling standards_class from reliability_eia861 table
unpeeling customer_class from sales_eia861 table
advanc

In [80]:
final_df = loop_over_common_cols(mega_df)

removing short form columns
comparing column values for balancing_authority_code_eia
comparing column values for utility_name_eia
comparing column values for nerc_region
comparing column values for entity_type


In [101]:
final_df.to_excel('mega_eia861.xlsx')

In [23]:
#list(table_dict.keys())

# Data Validation


In [81]:
d_val_dict = {
    '_AMI_': ['advanced_metering_infrastructure_eia861', []],
    '_DG_': ['distributed_generation_eia861', ['capacity_mw']],
    '_DP_': ['dynamic_pricing_eia861', []],
    '_DR_': ['demand_response_eia861', ['cost']],
    '_DS_': ['distribution_systems_eia861', []],
    '_DSM_': ['demand_side_management_eia861', ['cost', 'payment']],
    '_EE_': ['energy_efficiency_eia861', []],
    '_GP_': ['green_pricing_eia861', []],
    '_M_': ['mergers_eia861', []],
    '_NM_': ['net_metering_eia861', []],
    '_NNM_': ['non_net_metering_eia861', []],
    '_OD_': ['operational_data_eia861', []],
    '_R_': ['reliability_eia861', []],
    '_S_': ['sales_eia861', []],
    '_ST_': ['service_territory_eia861', []],
    '_UD_': ['utility_data_eia861', []],
}

In [82]:
val_df = final_df.copy()

In [83]:
val_df = val_df.set_index(util_cols)

In [95]:
# Split mega data into OG eia table chunks in prep for comparison 
monikers = list(d_val_dict.keys())#list(set(moniker_dict.values()))
monikers.sort()

by_eia_table_dict = {}
for moniker in monikers:
    moniker_cols = [col for col in val_df if moniker in col]
    non_moniker_cols = [col.strip(f'_{moniker}_') for col in moniker_cols]
    moniker_df = val_df[moniker_cols]
    moniker_df.columns = non_moniker_cols
    by_eia_table_dict[moniker] = moniker_df.reset_index()
    
# delete cols with only null values -- if you uncomment this, then there will be some
# cases where a reported column also has all nulls (as opposed to made up cols from the
# re-widening process)
for name, table in by_eia_table_dict.items():
    null_cols = []
    for col in table:
        if table[col].dtype == 'float' or table[col].dtype == 'int':
            if table[col].isnull().all():
                null_cols.append(col)
    by_eia_table_dict[name] = table.drop(null_cols, axis=1)

In [102]:
# Prep raw data for comparison
raw_dfs_dict = eia861_raw_dfs.copy()

for df_name, df in raw_dfs_dict.items():
    df = pudl.helpers.fix_eia_na(df)
    df = pudl.helpers.convert_to_date(df)
    raw_dfs_dict[df_name] = df
    
raw_dfs_dict = pudl.helpers.convert_dfs_dict_dtypes(raw_dfs_dict, 'eia')

In [105]:
test = raw_dfs_dict['operational_data_eia861'].copy()
test2 = test[test['utility_id_eia'].isna()]
test2.to_excel('OD_NA.xlsx')

In [96]:
# Reverse order of customer_class and tech_class in raw net/non_net metering tables

tc = pudl.constants.TECH_CLASSES
cc = pudl.constants.CUSTOMER_CLASSES

def swap_col_order(df_name):
    raw_order_cols = raw_dfs_dict[df_name].columns.tolist()
    #test = ['commercial_chp_cogen_customers', '']
    new_order_cols = []
    for col in raw_order_cols:
        for c in cc: 
            if c in col:
                for t in tc:
                    if t in col:
                        col = col.replace(f'{c}_{t}_', f'{t}_{c}_')
        new_order_cols.append(col)
        
    raw_dfs_dict[df_name].columns = new_order_cols

swap_col_order('net_metering_eia861')
swap_col_order('non_net_metering_eia861')

In [97]:
#Adapt raw tables to account for data cleaning and manipulation

dr_df = raw_dfs_dict['demand_response_eia861'].copy()
raw_dfs_dict['demand_response_eia861'] = (
    dr_df.drop_duplicates(subset=util_cols+['balancing_authority_code_eia'])
)
dsm_df = raw_dfs_dict['demand_side_management_eia861'].copy()
raw_dfs_dict['demand_side_management_eia861'] = (
    dsm_df.loc[dsm_df['utility_id_eia'] != 88888].copy()
)
nm_df = raw_dfs_dict['net_metering_eia861'].copy()
raw_dfs_dict['net_metering_eia861'] = (
    nm_df.loc[nm_df['utility_id_eia'] != 99999].copy()
)
nnm_df = raw_dfs_dict['non_net_metering_eia861'].copy()
raw_dfs_dict['non_net_metering_eia861'] = (
    nnm_df.loc[nnm_df['utility_id_eia'] != 99999].copy()
)
od_df = raw_dfs_dict['operational_data_eia861'].copy()
raw_dfs_dict['operational_data_eia861'] = (
    od_df.loc[od_df['utility_id_eia'] != 88888].copy()
) #NULLS!
r_df = raw_dfs_dict['reliability_eia861'].copy()
raw_dfs_dict['reliability_eia861'] = (
    r_df.drop_duplicates(subset=util_cols)
)
s_df = raw_dfs_dict['sales_eia861'].copy()
raw_dfs_dict['sales_eia861'] = (
    s_df.drop_duplicates(subset=util_cols + ['balancing_authority_code_eia'])
)
s_df = raw_dfs_dict['sales_eia861'].copy()
raw_dfs_dict['sales_eia861'] = (
    s_df.loc[s_df['utility_id_eia'] != 88888].copy()
)
s_df = raw_dfs_dict['sales_eia861'].copy()
raw_dfs_dict['sales_eia861'] = (
    s_df.loc[s_df['utility_id_eia'] != 99999].copy()
)

In [90]:
# WHY IS THIS BLANK AFTER RUNNING THE ABOVE??????? 
test = raw_dfs_dict['operational_data_eia861'].copy()
test['utility_id_eia'] = test.utility_id_eia.astype('float')
#test.loc[test['utility_id_eia'].isna()]
#test[['utility_id_eia']].sort_values('utility_id_eia')

Unnamed: 0,consumed_by_facility_mwh,consumed_by_respondent_without_charge_mwh,credits_or_adjustments_revenue,data_observed,delivery_customers_revenue,entity_type,exchange_energy_delivered_mwh,exchange_energy_received_mwh,furnished_without_charge_mwh,nerc_region,net_generation_mwh,net_power_exchanged_mwh,net_wheeled_power_mwh,other_revenue,retail_sales_mwh,retail_sales_revenue,sales_for_resale_mwh,sales_for_resale_revenue,short_form,state,summer_peak_demand_mw,total_disposition_mwh,total_energy_losses_mwh,total_revenue,total_sources_mwh,transmission_by_other_losses_mwh,transmission_revenue,unbundled_revenue,utility_id_eia,utility_name_eia,wheeled_power_delivered_mwh,wheeled_power_received_mwh,wholesale_power_purchases_mwh,winter_peak_demand_mw,report_date


In [98]:
def check_against_raw_numeric(df, raw_df, df_name_and_exceptions):
    """Compare numeric columns against their raw counterpart data."""
    logger.info('')
    logger.info(f'checking columns for {df_name_and_exceptions[0]} table')
    
    num_df = df.select_dtypes(include=['int64', 'float']).set_index('utility_id_eia')
    raw_num_df = raw_df.select_dtypes(include=['int64', 'float']).set_index('utility_id_eia')
    
    not_in_raw = [col for col in num_df if col not in raw_num_df]
    not_in_transformed = [col for col in raw_num_df if col not in num_df]
    not_in_transformed = [col for col in not_in_transformed if 'total' not in col] #exclude total cols
    in_both = [col for col in num_df if col in raw_num_df]
    for exception in df_name_and_exceptions[1]:
        in_both = [col for col in in_both if exception not in col]
    
    logger.info(f'     columns not in the raw_df: {not_in_raw}')
    logger.info(f'     columns not in the transformed_df {not_in_transformed}')
    
    # Check whether the raw column total is the same as the transformed column total
    for col in in_both:
        new_sum = round(num_df[col].sum(skipna=True), 0)
        raw_sum = round(raw_num_df[col].sum(skipna=True), 0)
        if new_sum != raw_sum:
            if raw_sum != round((new_sum/1000), 0):
                print(f'     sum miss-match for col: {col}')
                print(f'     new_sum: {new_sum}, raw_sum: {raw_sum}')

In [99]:
for moniker, df_name_and_exceptions in d_val_dict.items():
    check_against_raw_numeric(
        by_eia_table_dict[moniker],
        raw_dfs_dict[df_name_and_exceptions[0]],
        df_name_and_exceptions
    )


checking columns for advanced_metering_infrastructure_eia861 table
     columns not in the raw_df: []
     columns not in the transformed_df []

checking columns for distributed_generation_eia861 table
     columns not in the raw_df: []
     columns not in the transformed_df ['backup_capacity_pct', 'combustion_turbine_capacity_pct', 'distributed_generation_owned_capacity_pct', 'hydro_capacity_pct', 'internal_combustion_capacity_pct', 'other_capacity_pct', 'steam_capacity_pct', 'wind_capacity_pct']

checking columns for dynamic_pricing_eia861 table
     columns not in the raw_df: []
     columns not in the transformed_df []

checking columns for demand_response_eia861 table
     columns not in the raw_df: []
     columns not in the transformed_df []

checking columns for distribution_systems_eia861 table
     columns not in the raw_df: []
     columns not in the transformed_df []

checking columns for demand_side_management_eia861 table
     columns not in the raw_df: ['total_price_res

### Explanation of data transformations:

**DG**: change pct into mw - sums will differ

**DR**: cost cols thousands to ones, drop duplicates

**DSM**: cost / payment cols thousands to one, removed 88888 utilities

**NM**: removed 99999 utilities, extra colums from reconstruction that are all nan. can delete but don't impact sum.

**NNM**: removed 99999 utilitiesremoved 99999 utilities, *had to fix issue with capacity_mw merge deleting y vs. x*

**OD**: removed 88888 utilities, **removed utilities with NA for eia_id**

**R**: dropped duplicates

**S**: removed 99999 and 88888 utilities, dropped duplicates, revenue cols thousands to one

In [100]:
test = raw_dfs_dict['operational_data_eia861']
test = test.reset_index()
test[test['utility_id_eia'].isnull()]

Unnamed: 0,index,consumed_by_facility_mwh,consumed_by_respondent_without_charge_mwh,credits_or_adjustments_revenue,data_observed,delivery_customers_revenue,entity_type,exchange_energy_delivered_mwh,exchange_energy_received_mwh,furnished_without_charge_mwh,nerc_region,net_generation_mwh,net_power_exchanged_mwh,net_wheeled_power_mwh,other_revenue,retail_sales_mwh,retail_sales_revenue,sales_for_resale_mwh,sales_for_resale_revenue,short_form,state,summer_peak_demand_mw,total_disposition_mwh,total_energy_losses_mwh,total_revenue,total_sources_mwh,transmission_by_other_losses_mwh,transmission_revenue,unbundled_revenue,utility_id_eia,utility_name_eia,wheeled_power_delivered_mwh,wheeled_power_received_mwh,wholesale_power_purchases_mwh,winter_peak_demand_mw,report_date


# Other Data Wrangling
Once all of the data is loaded and looks like it's in good shape, do any initial wrangling that's specific to this particular analysis. This should mostly make use of the higher level functions which were defined above. If this step takes a while, don't be shy about producing `logging` outputs.

In [26]:
test = eia_transformed_dfs['demand_side_management_misc_eia861']

In [27]:
test['dup'] = test.duplicated(subset=['utility_id_eia', 'state', 'report_date'])
test.sort_values('dup', ascending=False)

print(len(test.groupby(['utility_id_eia', 'state', 'report_date'])))
print(len(test.groupby(['utility_id_eia', 'state', 'report_date', 'nerc_region'])))

11940
11940


### Data Validation Test with Pandera

In [309]:
#Zscore not a good measure because utilities are not all uniform in size.

df = eia_transformed_dfs['advanced_metering_infrastructure_eia861'].copy()
df['advanced_metering_infrastructure'] = df['advanced_metering_infrastructure'].fillna(0)
df['automated_meter_reading'] = df['automated_meter_reading'].fillna(0)
df['non_amr_ami'] = df['non_amr_ami'].fillna(0)
df['total_meters'] = df['total_meters'].fillna(0)

df = df.assign(
    summ=lambda x: (
        x.advanced_metering_infrastructure 
        + x.automated_meter_reading 
        + x.non_amr_ami),
    same=lambda x: x.summ == x.total_meters
)

df[(df['same']==False) & (df['total_meters']!= 0)]

Unnamed: 0,utility_id_eia,state,balancing_authority_code_eia,report_date,short_form,utility_name_eia,customer_class,advanced_metering_infrastructure,automated_meter_reading,daily_digital_access_customers,direct_load_control_customers,energy_served_ami_mwh,home_area_network,non_amr_ami,total_meters,summ,same
