# 4. Create Input Files for EPA PMF

This notebook will produce two input files for EPA PMF v5.0: concentration file and uncertainty file.

In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import matplotlib.patches as mpatches
import sys
import seaborn as sns
from pathlib import Path
import math

from pandas.tseries.offsets import MonthEnd

# set project root
sys.path.insert(0, str(Path.cwd().parent))

from src.config import *
from src.pmf.input_splitter import *

## 4.1. Handle Missing MDL Values

When preparing input for PMF analysis, follow these steps to fill in missing MDL (Method Detection Limit) values:

- If MDL is reported
    - → Use the reported MDL value directly.
- If MDL is missing, check the availability of blank samples:
    - If 7 or more Field Blanks or Travel Blanks are available
        - → Calculate the standard deviation of the blank values and use:
        - MDL = 3 × standard deviation of blank
    - If fewer than 7 blank values are available
        - → Find the minimum positive concentration measured for the analyte, and use:
        - MDL = 0.5 × minimum positive value


In [None]:
year = 2020
site_id = 100119


In [None]:
sheet_map = get_sheets_for_year(year)

for key in ['nt', 'pm25']:
    csv_path = PROCESSED_DATA_DIR / f"{year}_{site_id}_{key}.csv"

    df = pd.read_csv(
        csv_path,
        index_col="sampling_date",  # column header you wrote out
        parse_dates=True,           # turn it into pandas-datetime
    )
    
    df_filled = fill_missing_mdl(df)
    df_routine = df_filled[df_filled['sampling_type'] == 'R']

    if key == 'nt':
        prefix = 'NT_'
    elif key == 'ion':
        prefix = 'ion_'
    else:
        prefix = ''

    export_analytes(
        df_routine,
        str(PMF_INPUT_DATA_DIR) + f'/{site_id}_tmp',
        prefix=prefix
    )

# export_analytes(
#     str(PROCESSED_DATA_DIR) + f'/{year}_{site_id}_pm25.csv',
#     str(PMF_INPUT_DATA_DIR) + f'/{site_id}_tmp',
#     prefix=''
# )

## 4.2. Coverage Check

In [None]:
def load_analyte_groups(site_id, base_dir):
    """
    Load PMF input analyte files for a given site and classify them.

    Parameters
    ----------
    site_id : int or str
        NAPS site ID used to construct the folder name.
    base_dir : Path or str
        Root directory where PMF input files are stored.

    Returns
    -------
    dict of DataFrames:
        A dictionary with keys 'metals', 'ions', 'others',
        each containing a DataFrame of analytes.
    """
    folder = Path(base_dir) / (str(site_id) + '_tmp')
    all_files = sorted(folder.glob("*.csv"))

    nt_pm25_files = []
    ion_files = []
    other_files = []

    # Classify files
    for file_path in all_files:
        fname = file_path.name
        if fname.startswith("NT_") or fname == "PM25.csv":
            nt_pm25_files.append(file_path)
        elif fname.startswith("ion_"):
            ion_files.append(file_path)
        else:
            other_files.append(file_path)

    # Helper to load and name columns
    def load_files(file_list):
        dfs = []
        for file_path in file_list:
            analyte_name = file_path.stem
            df = pd.read_csv(file_path, parse_dates=['Date'], index_col='Date')
    
            # Check for duplicated dates
            duplicated_dates = df.index[df.index.duplicated()]
            if not duplicated_dates.empty:
                print(f"!!!!! Duplicate dates found in: {file_path.name}")
                print(duplicated_dates)
    
            # Optionally: keep first or handle them differently
            df = df[~df.index.duplicated(keep='first')]
    
            # Use only the concentration column (assumed to be first)
            df = df.iloc[:, [0]]
            df.columns = [analyte_name]
            dfs.append(df)
    
        return pd.concat(dfs, axis=1) if dfs else pd.DataFrame()
    
    return {
        'metals': load_files(nt_pm25_files),
        'ions': load_files(ion_files),
        'others': load_files(other_files),
    }

dfs_dic = load_analyte_groups(site_id=100119, base_dir=PMF_INPUT_DATA_DIR)

df_nt_pm25 = dfs_dic['metals']
# df_ions = dfs_dic['ions']
df_other = dfs_dic['others']

print(df_nt_pm25.columns)
# print(df_ions.columns)
print(df_other.columns)

In [None]:
df = df_nt_pm25.copy()
df.index = pd.to_datetime(df.index, errors='coerce')

# 1. Assign season labels
season_months = {
    "Winter": [12, 1, 2],
    "Spring": [3, 4, 5],
    "Summer": [6, 7, 8],
    "Fall":   [9, 10, 11]
}

def assign_year_season(date):
    year = date.year
    month = date.month
    for season, months in season_months.items():
        if month in months:
            if season == "Winter" and month == 12:
                year += 1
            return f"{year}-{season}"

# 2. Function to count calendar days in a season
def count_calendar_days(year, season):
    months = season_months[season]
    days = 0
    for m in months:
        y = year - 1 if m == 12 else year
        month_end = pd.Timestamp(year=y, month=m, day=1) + MonthEnd(1)
        days += month_end.day
    return days

# 3.
def get_freq(site_id, year):
    """ Return measurement frequency based on the year and site. """
    if site_id == 100119:
        return 3 if year < 2013 else 6
    elif site_id == 60211:
        return 3
    else:
        return 0  # or raise ValueError

# ---- MAIN CALCULATION ----

df['year_season'] = df.index.map(assign_year_season)
analytes = [col for col in df.columns if col not in ['year_season'] and not col.endswith('-MDL') and not col.endswith('-VFlag')]

# 4. Actual counts per season
actual_counts = df.groupby('year_season')[analytes].count()

# 5. Calculate ideal counts per season
ideal_counts = []
for label in actual_counts.index:
    year_str, season = label.split('-')
    year = int(year_str)
    days = count_calendar_days(year, season)
    freq = get_freq(site_id, year)
    ideal = days / freq if freq > 0 else np.nan
    ideal_counts.append(ideal)

# 6. Calculate coverage
ideal_series = pd.Series(ideal_counts, index=actual_counts.index, name='ideal')
coverage_df = actual_counts.divide(ideal_series, axis=0).clip(upper=1.0)

# 7. Sort seasons chronologically
season_order = {'Winter': 1, 'Spring': 2, 'Summer': 3, 'Fall': 4}
coverage_df['sort_key'] = coverage_df.index.map(lambda s: (int(s.split('-')[0]), season_order[s.split('-')[1]]))
coverage_df = coverage_df.sort_values('sort_key').drop(columns='sort_key')

# 8. Categorical heatmap (red/yellow/green for <50%, 50–75%, ≥75%)
# Multiply values by 100 to convert to percentage
coverage_pct = coverage_df * 100

# Define bins in percentage
bins = [0, 50, 75, 100]

# Use a single hue (blue), darker = higher coverage
cmap = sns.light_palette("#006cd1", n_colors=3)

# Bin for coloring
categorized = coverage_pct.apply(lambda col: np.digitize(col.fillna(0), bins) - 1)

# Plot setup
fig = plt.figure(figsize=(len(coverage_pct.columns) * 0.4 + 2, len(coverage_pct) * 0.4 + 2))
gs = gridspec.GridSpec(2, 1, height_ratios=[20, 1])
ax = fig.add_subplot(gs[0])

# Disable actual color bar, so it doesn't interfere
sns.heatmap(
    categorized,
    cmap=cmap,
    annot=coverage_pct.round().astype(int),
    fmt='d',
    linewidths=0.5,
    linecolor='black',
    cbar=False,  # We will use custom legend instead
    ax=ax
)

# Custom color legend (patches)
legend_labels = ['<50%', '50–75%', '≥75%']
legend_colors = cmap
patches = [mpatches.Patch(color=legend_colors[i], label=legend_labels[i]) for i in range(3)]

# Place legend just below the heatmap
ax.legend(
    handles=patches,
    loc='upper center',
    bbox_to_anchor=(0.5, -0.5),
    ncol=3,
    title="Coverage (%)",
    frameon=False
)

# Axis labels and title
ax.set_title("Seasonal Measurement Coverage per Analyte")
ax.set_xlabel("Analyte")
ax.set_ylabel("Season")

plt.tight_layout()
# plt.savefig(coverage_dir + f'/{site_id}_seasonal_75_{target_to_plot}.png', dpi=300, bbox_inches='tight')
plt.show()

## 4.3. Combine Data

In [None]:
start_date = pd.to_datetime('2020-01-01')
end_date = pd.to_datetime('2020-12-31')


In [None]:
def reorder_analyte_columns(df, priority_analytes=['PM25']):
    """
    Reorder the columns of a DataFrame to prioritize specific analytes,
    followed by the rest in alphabetical order.

    Parameters:
    - df: pandas DataFrame with analytes as columns and datetime as index
    - priority_analytes: list of analyte names to place at the front

    Returns:
    - DataFrame with reordered columns
    """
    # Keep only priority analytes that are actually in the DataFrame
    existing_priority = [a for a in priority_analytes if a in df.columns]
    
    # Get the rest of the analytes in alphabetical order
    remaining = sorted([a for a in df.columns if a not in existing_priority])
    
    # Combine into the new order
    new_order = existing_priority + remaining
    
    # Reorder and return
    return df[new_order]
    

### EPA PMF Uncertainty Rules (Plain Language)

When you build inputs for EPA PMF, each measurement needs both a **concentration** and an **uncertainty**. Here’s how we handle different cases:

1. **Good measurements** (concentration above the MDL):  
   - We calculate uncertainty as  
     $
     sqrt{(0.1 \times \text{Concentration})^2 + (0.5 \times \text{MDL})^2}.
     $
   - This means “10% of the measured value” combined with “50% of the detection limit.”

2. **Below the MDL** (concentration at or below the MDL):  
   - We set the concentration to **half the MDL**:  
     `Concentration = 0.5 × MDL`  
   - We set the uncertainty to **five-sixths of the MDL**:  
     `Uncertainty = (5/6) × MDL`

3. **Missing values** (no concentration reported):  
   - We fill the concentration with the **median** value for that chemical (the middle value when you sort all measurements).  
   - We set the uncertainty to **four times** that median.

4. **Special “no data” flag** (e.g. your `-999` values):  
   - We leave these exactly as they are, so you can always spot them later.

This approach gives you realistic error bars for good data, a conservative guess for values below detection, and a consistent way to fill in gaps. Simply paste your concentration and MDL columns into these rules to get the uncertainty column for PMF.


In [None]:
def apply_epa_uncertainty_rules(conc_df: pd.DataFrame, mdl_df: pd.DataFrame):
    """
    Apply EPA PMF rules to generate uncertainty and fill missing values.
    - If conc <= MDL: conc = 0.5 * MDL, unc = 5/6 * MDL
    - If conc is NaN: conc = species median, unc = 4 * species median
    - If conc > MDL: unc = sqrt((0.1*conc)^2 + (0.5*MDL)^2)
    - Keep -999 unchanged in both conc and unc
    """
    conc_out = conc_df.copy()
    unc_out = pd.DataFrame(index=conc_df.index, columns=conc_df.columns, dtype=float)

    for col in conc_df.columns:
        conc = conc_df[col]
        mdl = mdl_df[col]

        # Flags
        is_minus999 = conc == -999
        is_nan      = conc.isna()
        is_below    = (~is_minus999) & (conc <= mdl)
        is_above    = (~is_minus999) & (~is_nan) & (conc > mdl)

        # Species median (exclude NaN and -999)
        median_val = conc[~is_nan & ~is_minus999].median()

        # Prepare filled series
        conc_filled = conc.copy()
        unc_filled  = pd.Series(index=conc.index, dtype=float)

        # Rule 1: conc <= MDL
        conc_filled[is_below] = 0.5 * mdl[is_below]
        unc_filled[is_below]  = (5/6) * mdl[is_below]

        # Rule 2: missing conc
        conc_filled[is_nan] = median_val
        unc_filled[is_nan]  = 4 * median_val

        # Rule 3: conc > MDL
        unc_filled[is_above] = np.sqrt((0.1 * conc[is_above])**2 + (0.5 * mdl[is_above])**2)

        # Rule 4: keep -999
        conc_filled[is_minus999] = -999
        unc_filled[is_minus999]  = -999

        conc_out[col] = conc_filled
        unc_out[col]  = unc_filled

    return conc_out, unc_out

The cell below will create the concentration and uncertainty files for input into EPA PMF.

In [None]:
# new folder for two input files
pmf_input_dir = PMF_INPUT_DATA_DIR / Path(str(site_id))
os.makedirs(pmf_input_dir, exist_ok=True)

# where files are stored
folder_name = str(PMF_INPUT_DATA_DIR) + f'/{site_id}_tmp'
folder =  Path(folder_name)

# Collect series
concentration_series = []
mdl_series           = []

for file_path in sorted(folder.glob("*.csv")):
    df = pd.read_csv(file_path, parse_dates=["Date"], index_col="Date")
    df.index = pd.to_datetime(df.index, errors="coerce")
    df = df[(df.index >= start_date) & (df.index <= end_date)]
    if df.shape[1] < 2:
        continue

    analyte = file_path.stem
    conc    = df.iloc[:, 0].copy()  # concentration column
    mdl     = df.iloc[:, 1].copy()  # MDL column
    conc.name = analyte
    mdl.name  = analyte

    concentration_series.append(conc)
    mdl_series.append(mdl)

# Combine into DataFrames
concentration_df = pd.concat(concentration_series, axis=1)
mdl_df           = pd.concat(mdl_series, axis=1)

# Apply EPA PMF uncertainty rules
concentration_filled, uncertainty_filled = apply_epa_uncertainty_rules(concentration_df, mdl_df)

# reorder columns
concentration_filled = reorder_analyte_columns(concentration_filled)
uncertainty_filled   = reorder_analyte_columns(uncertainty_filled)

# Save final CSVs
concentration_filled.to_csv(pmf_input_dir / f"{site_id}_conc.csv",    index_label="Date")
uncertainty_filled .to_csv(pmf_input_dir / f"{site_id}_unc.csv",     index_label="Date")

# Preview
display(concentration_filled.head(3))
display(uncertainty_filled.head(3))