In [None]:
# import packages
import pandas as pd
import numpy as np
import os
import plotly.express as px

# # Tell python where to look for modules.
import sys

sys.path.append("../../src/")

%reload_ext autoreload
%autoreload 2
import oge.load_data as load_data
from oge.filepaths import *
import oge.data_cleaning as data_cleaning
from oge.logging_util import get_logger, configure_root_logger
from oge.validation import check_for_orphaned_cc_part_in_subplant

configure_root_logger()
logger = get_logger("test")

In [None]:
# NOTE: Need to introduce the concept of a static earliest year

# The earliest_year represents the earliest year that the OGE dataset will ever cover
# data back to 2001 is available in PUDL, but EIA-860 files prior to 2004 was distributed
# in a different format that may not be consistent with later files, and may use
# different primary keys for generators.
# https://catalystcoop-pudl.readthedocs.io/en/v2023.12.01/data_sources/eia860.html#how-much-of-the-data-is-accessible-through-pudl
earliest_year = 2005
year = 2022

path_prefix = f"{year}/"

In [None]:
# although we could directly load all years at once from the cems parquet file, this
# would lead to a memoryerror, so we load one year at a time and drop duplicates before
# concatting the next year to the dataframe
cems_ids = []
# use 2001 as the start year as this is the earliest year that EIA data is available
# in PUDL, and we would likely never use data before this year.
for year in range(earliest_year, year + 1):
    cems_id_year = pd.read_parquet(
        downloads_folder("pudl/hourly_emissions_epacems.parquet"),
        filters=[["year", "==", year]],
        columns=["plant_id_epa", "plant_id_eia", "emissions_unit_id_epa"],
    ).drop_duplicates()
    cems_ids.append(cems_id_year)
    cems_ids = [pd.concat(cems_ids, axis=0).drop_duplicates()]
cems_ids = (
    pd.concat(cems_ids, axis=0)
    .drop_duplicates()
    .sort_values(by=["plant_id_eia", "emissions_unit_id_epa"])
)

In [None]:
#######################################################
# generate_subplant_ids()

import pudl.analysis.epacamd_eia as epacamd_eia

# load the crosswalk and filter it by the data that actually exists in cems
crosswalk = load_data.load_epa_eia_crosswalk(year)

# filter the crosswalk to drop any units that don't exist in CEMS
filtered_crosswalk = epacamd_eia.filter_crosswalk(crosswalk, cems_ids)

######################################################
from pudl.etl.glue_assets import make_subplant_ids

# use graph analysis to identify subplants
crosswalk_with_subplant_ids = make_subplant_ids(filtered_crosswalk)

# change the eia plant id to int
crosswalk_with_subplant_ids["plant_id_eia"] = crosswalk_with_subplant_ids[
    "plant_id_eia"
].astype(int)

# change the order of the columns
crosswalk_with_subplant_ids = crosswalk_with_subplant_ids[
    [
        "plant_id_epa",
        "emissions_unit_id_epa",
        "plant_id_eia",
        "generator_id",
        "subplant_id",
    ]
]

In [None]:
def load_complete_eia_generators_for_subplants(earliest_year, year):
    complete_gens = load_data.load_pudl_table(
        "denorm_generators_eia",
        columns=[
            "report_date",
            "plant_id_eia",
            "generator_id",
            "unit_id_pudl",
            "prime_mover_code",
            "operational_status_code",
            "generator_operating_date",
            "generator_retirement_date",
            "original_planned_generator_operating_date",
            "current_planned_generator_operating_date",
        ],
    )

    # create a column that indicates the earliest year a generator reported data to EIA
    complete_gens["earliest_report_date"] = complete_gens.groupby(
        ["plant_id_eia", "generator_id"]
    )["report_date"].transform("min")

    # drop any data that was reported prior to the earliest year
    # only keep data for years <= the year
    # this avoids using potentially preliminary early-release data
    complete_gens = complete_gens[
        (complete_gens["report_date"].dt.year >= earliest_year)
        & (complete_gens["report_date"].dt.year <= year)
    ]

    # for any retired gens, forward fill the most recently available unit_id_pudl to the
    # most recent available year
    complete_gens["unit_id_pudl"] = complete_gens.groupby(
        ["plant_id_eia", "generator_id"]
    )["unit_id_pudl"].ffill()

    # only keep the most recent entry for each generator
    complete_gens = complete_gens.sort_values(
        by=["plant_id_eia", "generator_id", "report_date"], ascending=True
    ).drop_duplicates(subset=["plant_id_eia", "generator_id"], keep="last")

    # remove generators that are proposed but not yet under construction, or cancelled
    status_codes_to_remove = ["CN", "IP", "P", "L", "T"]
    complete_gens = complete_gens[
        ~complete_gens["operational_status_code"].isin(status_codes_to_remove)
    ]

    # remove generators that retired prior to the earliest year
    complete_gens = complete_gens[
        ~(
            (complete_gens["operational_status_code"] == "RE")
            & (complete_gens["generator_retirement_date"].dt.year < earliest_year)
        )
    ]

    # remove generators that have no operating or retirement date, and the last time they
    # reported data was prior to the current year. This is often proposed plants that are
    # assigned a new plant_id_eia once operational
    complete_gens = complete_gens[
        ~(
            (complete_gens["generator_operating_date"].isna())
            & (complete_gens["generator_retirement_date"].isna())
            & (complete_gens["report_date"].dt.year < year)
        )
    ]

    ####################
    # merge into complete_gens and fill missing operating dates with the EIA-860 data
    generator_data_from_eia860 = load_raw_eia860_generator_dates_and_unit_ids(year)
    complete_gens = complete_gens.merge(
        generator_data_from_eia860,
        how="left",
        on=["plant_id_eia", "generator_id"],
        validate="1:1",
    )
    complete_gens["generator_operating_date"] = complete_gens[
        "generator_operating_date"
    ].fillna(complete_gens["operating_date_eia"])
    complete_gens = complete_gens.drop(columns="operating_date_eia")

    #######################
    # update the unit_id_eia_numeric to be one higher than the highest existing unit_id_pudl
    # if unit_id_eia_numeric is NA, the updated value should also still be na
    complete_gens["unit_id_eia_numeric"] = complete_gens[
        "unit_id_eia_numeric"
    ] + complete_gens.groupby("plant_id_eia")["unit_id_pudl"].transform("max").fillna(0)

    # fill in missing unit_id_pudl with the updated values
    complete_gens["unit_id_pudl"] = complete_gens["unit_id_pudl"].fillna(
        complete_gens["unit_id_eia_numeric"]
    )

    return complete_gens


def load_raw_eia860_generator_dates_and_unit_ids(year):
    """
    Loads generator operating dates and unit_id_eia codes from the raw EIA-860 to
    fill in missing dates and unit ids in the pudl data. PUDL deletes data for these
    fields if there are inconsistencies across the historical data
    """
    # load operating dates from the raw EIA-860 file to supplement missing operating dates
    # from pudl
    generator_op_dates_eia860 = pd.read_excel(
        downloads_folder(f"eia860/eia860{year}/3_1_Generator_Y{year}.xlsx"),
        header=1,
        sheet_name="Operable",
        usecols=[
            "Plant Code",
            "Generator ID",
            "Operating Month",
            "Operating Year",
            "Unit Code",
        ],
    ).rename(
        columns={
            "Plant Code": "plant_id_eia",
            "Generator ID": "generator_id",
            "Operating Month": "Month",
            "Operating Year": "Year",
            "Unit Code": "unit_id_eia",
        }
    )

    # create a datetime column from the month and year
    generator_op_dates_eia860["operating_date_eia"] = pd.to_datetime(
        generator_op_dates_eia860[["Year", "Month"]].assign(Day=1)
    )
    generator_op_dates_eia860 = generator_op_dates_eia860.drop(
        columns=["Month", "Year"]
    )

    # load unit codes for proposed generators
    proposed_unit_ids_eia860 = (
        pd.read_excel(
            downloads_folder(f"eia860/eia860{year}/3_1_Generator_Y{year}.xlsx"),
            sheet_name="Proposed",
            header=1,
            usecols=["Plant Code", "Generator ID", "Unit Code"],
        )
        .dropna(subset="Unit Code")
        .rename(
            columns={
                "Plant Code": "plant_id_eia",
                "Generator ID": "generator_id",
                "Unit Code": "unit_id_eia",
            }
        )
    )

    # concat the data together
    generator_data_from_eia860 = pd.concat(
        [generator_op_dates_eia860, proposed_unit_ids_eia860], axis=0
    )

    # create a numeric version of the ID, starting at 1
    generator_data_from_eia860["unit_id_eia_numeric"] = (
        generator_data_from_eia860.groupby(
            ["plant_id_eia"]
        )["unit_id_eia"].transform(lambda x: pd.factorize(x)[0] + 1)
    )

    # unit_id_numeric of 0 represents missing unit_id_eia, so we want to replace these
    # with nan
    generator_data_from_eia860["unit_id_eia_numeric"] = generator_data_from_eia860[
        "unit_id_eia_numeric"
    ].replace(0, np.NaN)

    return generator_data_from_eia860

In [None]:
complete_gens = load_complete_eia_generators_for_subplants(earliest_year, year)

In [None]:
##################################################
# update the subplant_crosswalk to ensure completeness
# prepare the subplant crosswalk by adding a complete list of generators and adding
# the unit_id_pudl column
# complete_generator_ids = complete_gens[["plant_id_eia", "generator_id", "unit_id_pudl"]].drop_duplicates()

subplant_crosswalk_complete = crosswalk_with_subplant_ids.merge(
    complete_gens,
    how="outer",
    on=["plant_id_eia", "generator_id"],
    validate="m:1",
)
# also add a complete list of cems emissions_unit_id_epa
subplant_crosswalk_complete = subplant_crosswalk_complete.merge(
    cems_ids[["plant_id_eia", "emissions_unit_id_epa"]].drop_duplicates(),
    how="outer",
    on=["plant_id_eia", "emissions_unit_id_epa"],
    validate="m:1",
)

In [None]:
# drop records with a missing generator_id
# these records either do not exist in EIA, or they are not yet linked
# NOTE: This may trigger new validation warnings with test_for_missing_subplant_id(),
# but this is good because we want to flag where there is CEMS data that is not linked
# to an EIA record
subplant_crosswalk_complete = subplant_crosswalk_complete.dropna(subset="generator_id")

In [None]:
# NOTE: plant_parts_eia does not contain all of the unit_id_pudl
# we need to investigate another source (plant 613)

# instead we will use denorm_generators_eia

In [None]:
# NOTE: For any generators that are proposed but not existing, we should always sort these last in case they are
# never completed so that they do not mess up the order
# eg plant 64811 UPT7

In [None]:
subplant_crosswalk_complete["sort_date"] = (
    subplant_crosswalk_complete["generator_operating_date"]
    .fillna(subplant_crosswalk_complete["generator_retirement_date"])
    .fillna(subplant_crosswalk_complete["original_planned_generator_operating_date"])
)

# sort values to ensure static order
# for any generators built on the same date, sort by generator ID

subplant_crosswalk = subplant_crosswalk_complete.sort_values(
    by=[
        "plant_id_eia",
        "earliest_report_date",
        "sort_date",
        "generator_id",
        "emissions_unit_id_epa",
    ],
    ascending=True,
).copy()

In [None]:
# NOTE: connect_ids() seems to be incorrectly grouping ids
# NOTE: subset should probably be [plant_id_iea, id_to_update]

In [None]:
def connect_ids(df, id_to_update, connecting_id):
    """Corrects an id value if it is connected by an id value in another column.

    if multiple subplant_id are connected by a single unit_id_pudl, this groups these
    subplant_id together
    if multiple unit_id_pudl are connected by a single subplant_id, this groups these
    unit_id_pudl together

    Args:
        df: dataframe containing columns with id_to_update and connecting_id columns
        subplant_unit_pairs
    """

    # get a table with all unique subplant to unit pairs
    subplant_unit_pairs = df[
        ["plant_id_eia", id_to_update, connecting_id]
    ].drop_duplicates()

    # identify if any non-NA id_to_update are duplicated, indicated that it is
    # associated with multiple connecting_id
    duplicates = subplant_unit_pairs[
        (
            subplant_unit_pairs.duplicated(
                subset=["plant_id_eia", connecting_id], keep=False
            )
        )
        & (~subplant_unit_pairs[connecting_id].isna())
    ].copy()

    # if there are any duplicate units, indicating an incorrect id_to_update,
    # fix the id_to_update
    df[f"{id_to_update}_connected_by_{connecting_id}"] = df[id_to_update]
    if len(duplicates) > 0:
        # find the lowest number subplant id associated with each duplicated
        # unit_id_pudl
        duplicates.loc[:, f"{id_to_update}_to_replace"] = (
            duplicates.groupby(["plant_id_eia", connecting_id])[id_to_update]
            .min()
            .iloc[0]
        )
        # merge this replacement subplant_id into the dataframe and use it to update
        # the existing subplant id
        df = df.merge(
            duplicates,
            how="left",
            on=["plant_id_eia", id_to_update, connecting_id],
            validate="m:1",
        )
        df.update(
            {
                f"{id_to_update}_connected_by_{connecting_id}": df[
                    f"{id_to_update}_to_replace"
                ]
            }
        )
        df = df.drop(columns=f"{id_to_update}_to_replace")
    return df

In [None]:
#####################################
# update_subplant_ids()

# update unit_id_pudl using the unit_id_eia loaded from EIA-860
subplant_crosswalk = connect_ids(
    subplant_crosswalk, id_to_update="unit_id_pudl", connecting_id="unit_id_eia_numeric"
)
subplant_crosswalk["unit_id_pudl"] = subplant_crosswalk[
    "unit_id_pudl_connected_by_unit_id_eia_numeric"
]


# Step 1: Create corrected versions of subplant_id and unit_id_pudl
# if multiple unit_id_pudl are connected by a single subplant_id,
# unit_id_pudl_connected groups these unit_id_pudl together
subplant_crosswalk = connect_ids(
    subplant_crosswalk, id_to_update="unit_id_pudl", connecting_id="subplant_id"
)

# if multiple subplant_id are connected by a single unit_id_pudl, group these
# subplant_id together
subplant_crosswalk = connect_ids(
    subplant_crosswalk, id_to_update="subplant_id", connecting_id="unit_id_pudl"
)

# Step 2: Fill missing subplant_id
# We will use unit_id_pudl to fill missing subplant ids, so first we need to fill
# any missing unit_id_pudl. We do this by assigning a new unit_id_pudl to each
# generator that isn't already grouped into a unit

# since generat
# create a numeric version of each generator_id
# ngroup() creates a unique number for each element in the group
# each unit
subplant_crosswalk["numeric_generator_id"] = subplant_crosswalk.groupby(
    ["plant_id_eia"], dropna=False, sort=False
)["generator_id"].transform(lambda x: pd.factorize(x, use_na_sentinel=False)[0])

In [None]:
# when filling in missing unit_id_pudl, we don't want these numeric_generator_id to
# overlap existing unit_id to ensure this, we will add 1000 to each of these numeric
# generator ids to ensure they are unique 1000 was chosen as an arbitrarily high
# number, since the largest unit_id_pudl is ~ 10.
subplant_crosswalk["subplant_id_filled"] = (
    subplant_crosswalk["subplant_id_connected_by_unit_id_pudl"]
    .fillna(subplant_crosswalk["unit_id_pudl_connected_by_subplant_id"] + 1000)
    .fillna(subplant_crosswalk["numeric_generator_id"] + 1000000)
)

# create a new unique subplant_id based on the connected subplant ids and the
# filled unit_id
subplant_crosswalk["new_subplant"] = subplant_crosswalk.groupby(
    ["plant_id_eia"], dropna=False, sort=False
)["subplant_id_filled"].transform(lambda x: pd.factorize(x, use_na_sentinel=False)[0])

In [None]:
subplant_crosswalk[subplant_crosswalk["plant_id_eia"] == 58987]

In [None]:
subplant_crosswalk_final = subplant_crosswalk.copy()
subplant_crosswalk_final["subplant_id"] = subplant_crosswalk_final["new_subplant"] + 1
subplant_crosswalk_final = subplant_crosswalk_final.reset_index(drop=True)[
    [
        "plant_id_epa",
        "emissions_unit_id_epa",
        "plant_id_eia",
        "generator_id",
        "subplant_id",
        "unit_id_pudl",
        "prime_mover_code",
        "generator_operating_date",
        "generator_retirement_date",
        "current_planned_generator_operating_date",
    ]
].drop_duplicates(
    subset=[
        "plant_id_epa",
        "emissions_unit_id_epa",
        "plant_id_eia",
        "generator_id",
        "subplant_id",
    ],
    keep="last",
)

In [None]:
subplant_crosswalk_final.to_csv(f"test_subplant_{year}.csv", index=False)

In [None]:
s_19 = pd.read_csv("test_subplant_2019.csv")
s_22 = pd.read_csv("test_subplant_2022.csv")

test = s_19.merge(
    s_22, how="outer", on=["plant_id_eia", "generator_id"], suffixes=("_19", "_22")
)

In [None]:
# NOTE: there are occasionally existing plants that never reported to EIA before, but then begin reporting
# later. This will throw off the subplant IDs if these were built prior to generators that had already
# been reporting (1128, 6), (63580, CAT15)
# we may need to sort by earliest reporting date

In [None]:
test.loc[
    (~test["subplant_id_19"].isna())
    & (~test["subplant_id_22"].isna())
    & (test["subplant_id_19"] != test["subplant_id_22"]),
    [
        "plant_id_eia",
        "generator_id",
        "unit_id_pudl_19",
        "unit_id_pudl_22",
        "subplant_id_19",
        "subplant_id_22",
    ],
].set_index(["plant_id_eia", "generator_id"])

In [None]:
cc_pm_codes = ["CA", "CT", "CS", "CC"]
# keep all rows that contain a combined cycle prime mover part
cc_subplants = subplant_crosswalk_final.copy()
"""cc_subplants = subplant_crosswalk[
    subplant_crosswalk["prime_mover_code"].isin(cc_pm_codes)
]"""
# for each subplant, identify a list of all CC prime movers in that subplant
cc_subplants = cc_subplants.groupby(["plant_id_eia", "subplant_id"])[
    "prime_mover_code"
].agg(["unique"])
cc_subplants["unique_cc_pms"] = [",".join(map(str, L)) for L in cc_subplants["unique"]]
cc_subplants = cc_subplants.drop(columns="unique")

# identify where there are subplants that only contain a single CC part
orphaned_cc_parts = cc_subplants[
    (cc_subplants["unique_cc_pms"] == "CA") | (cc_subplants["unique_cc_pms"] == "CT")
]

In [None]:
orphaned_cc_parts.head(20)

In [None]:
check_for_orphaned_cc_part_in_subplant(subplant_crosswalk_final)

In [None]:
# load pudl subplant
pudl_subplants = load_data.load_pudl_table("epacamd_eia_subplant_ids")

In [None]:
complete_gens[complete_gens["plant_id_eia"] == 375]

In [None]:
subplant_crosswalk_final[subplant_crosswalk_final["plant_id_eia"] == 1391]

In [None]:
pudl_subplants[pudl_subplants["plant_id_eia"] == 1128]

## Notes

In [None]:
# NOTE: we have to add sort=False to groupby so that it keeps the values in index order
# NOTE: ngroup() creates a unique number across the entire dataframe, not per group
# pd.factorize() instead creates unique IDs within each group

In [None]:
# NOTE: use_na_sentinel=False creates the same generator ID for all Na values
# so this will lead to incorrect subplant groupings. Eg plant 880110 has no generator_id
# so all 4 units are assigned a generator_id of 0.
# This needs to be fixed

In [None]:
# NOTE: in some cases, generator_id is missing for plants that are only in CEMS

In [None]:
# NOTE: The pudl bga table does not contain data for proposed units like Barry A3ST
# Even though this has a unit code that links it to other generators
# this means that some proposed gens won't be properly linked

In [None]:
# TODO: Start subplant IDs at 1 rather than 0 index
# connect CC parts
# check for differences with different years
# check for difference between this and the pudl subplants