In [None]:
import pandas as pd

In [None]:
# Custom functions
from utils.data_manipulations import get_info_for_ids

In [None]:
metallican_path = r'C:\Users\mp_ma\OneDrive - polymtl\POST_DOC\CODE\metallican_db'

In [None]:
def abbreviate_metals(s, metal_map):
    """Replace metal names with abbreviations in a comma-separated string. Case-insensitive."""
    if not isinstance(s, str):
        return s
    parts = [p.strip() for p in s.split(",")]
    abbr_parts = [metal_map.get(p.lower(), p) for p in parts]  # lowercase lookup
    return ", ".join(abbr_parts)

# Import MetalliCan tables

In [None]:
main_table = pd.read_csv(metallican_path + r'\database\CSV\main_table.csv')
tech_attributes_table = pd.read_csv(metallican_path + r'\database\CSV\tech_attributes_table.csv')
archetypes_table = pd.read_csv(metallican_path + r'\database\CSV\archetypes_table.csv')
land_table = pd.read_csv(metallican_path + r'\database\CSV\land_occupation_table.csv')

In [None]:
substances_table = pd.read_csv(metallican_path + r'\database\CSV\substances_table.csv')

In [None]:
# Normalized tables, after cleaning
biosphere_df = pd.read_excel(r'data\MetalliCan\cleaned_data\biosphere_df_norm.xlsx')
material_df = pd.read_excel(r'data\MetalliCan\cleaned_data\material_df_sd_norm.xlsx')
energy_df = pd.read_excel(r'data\MetalliCan\cleaned_data\energy_df_sd_norm.xlsx')

In [None]:
# Remove the rows where value_normalized is NaN for the 3 dfs
biosphere_df = biosphere_df.dropna(subset=['value_normalized'])
material_df = material_df.dropna(subset=['value_normalized'])
energy_df = energy_df.dropna(subset=['value_normalized'])

In [None]:
def convert_to_percent(row):
    if row['unit'] == 'g/t':
        row['value'] = row['value'] / 10000  # 1 g/t = 0.0001%
        row['unit'] = '%'
    # For '%' or any other unit, do nothing
    return row


tech_attributes_table = tech_attributes_table.apply(convert_to_percent, axis=1)
mapping = {
    "Head grade": "Grade",
    "Ore grade": "Grade",
    "Mill ore grade": "Grade",
    "Mill grade": "Grade",
    "Mill feed grade": "Grade",
    "Recovery rate": "Recovery rate",
    "Mill recovery rate": "Recovery rate",
    "Copper recovery rate": "Recovery rate",
    "Strip ratio": "Strip ratio",
    "Concentrate grade": "Concentrate grade",
}

tech_attributes_table['material_type'] = tech_attributes_table['material_type'].replace(mapping)

# Energy consumption exploration

## By energy type

In [None]:
from core.constants import nrj_subflow

In [None]:
# Add a subflow_type_agg column to the energy_std_norm DataFrame based on the dictionnary
energy_df_agg = energy_df.copy()
energy_df_agg['subflow_type_agg'] = energy_df_agg['subflow_type'].map(nrj_subflow).fillna(energy_df_agg['subflow_type'])

In [None]:
energy_df_agg

In [None]:
energy_df_agg = (energy_df_agg.groupby(['main_id', 'facility_name', 'facility_group_id', 'facility_group_name', 'company_id','year', 'mining_processing_type', 'commodities', 'flow_type', 'subflow_type_agg'], dropna=False, as_index=False)
                 .agg(value_normalized_sum=('value_normalized', 'sum'))
)

In [None]:
energy_df_agg

In [None]:
from utils.visualisation_functions import plot_stacked_energy_by_site

In [None]:
plot_stacked_energy_by_site(energy_df_agg, export_path='data/Parametrization/nrj_by_type.html')

In [None]:
energy_df_agg.to_csv(r'energy_df_agg.csv', index=False)

In [None]:
df = energy_df_agg.copy()

In [None]:
df

In [None]:
import pandas as pd

def summarize_clusters_by_subflow(
    df,
    cluster_cols,       # List of columns to create clusters
    subflow_col,        # Column to group by within clusters (e.g., 'subflow_type')
    value_col,          # Column to compute statistics
    id_cols=None,       # Optional: list of columns to count unique IDs per cluster and subflow
    stats=['min', 'max', 'mean', 'std', 'count']
):
    """
    Summarize statistics for clusters and subflow types in a DataFrame.
    Includes unique ID counts per cluster and per subflow.
    """
    # Create cluster identifier
    df['cluster'] = df[cluster_cols].astype(str).agg(' | '.join, axis=1)

    # Group by cluster and subflow_col, then compute statistics
    result = df.groupby(['cluster', subflow_col])[value_col].agg(stats).reset_index()

    # Count unique IDs per subflow
    if id_cols:
        for col in id_cols:
            result[f'num_unique_{col}'] = df.groupby(['cluster', subflow_col])[col].nunique().values

    # Count unique IDs per cluster
    if id_cols:
        for col in id_cols:
            cluster_unique_counts = df.groupby('cluster')[col].nunique().reset_index()
            cluster_unique_counts = cluster_unique_counts.rename(columns={col: f'num_unique_{col}_in_cluster'})
            result = result.merge(cluster_unique_counts, on='cluster', how='left')

    # Add cluster_id for easier reference
    result['cluster_id'] = result.groupby('cluster').ngroup() + 1

    return result

# Example usage:
cluster_stats = summarize_clusters_by_subflow(
     df, cluster_cols=['mining_processing_type', 'commodities'],
     subflow_col='subflow_type_agg', value_col='value_normalized',
     id_cols=['main_id', 'facility_group_id']
)

In [None]:
cluster_stats

In [None]:
cluster_stats.to_csv(r'data/Parametrization/cluster_nrj.csv', index=False)

## Total energy

In [None]:
energy_table = energy_df.groupby(['main_id', 'facility_group_id'], dropna=False)['value_normalized'].sum().reset_index()

In [None]:
energy_table.rename(columns={'value_normalized': 'energy_MJ'}, inplace=True)

In [None]:
# Extract all the rows from the production_data_available that have a non NaN main_id
energy_table_f = energy_table[energy_table['main_id'].notna()]
energy_table_f = energy_table_f.merge(main_table[['main_id', 'facility_name', 'facility_type', 'province', 'mining_processing_type', 'commodities']], on='main_id', how='left').drop_duplicates(subset=['main_id'], keep='first')

In [None]:
# Extract all the rows from the production_data_available that have a NaN main_id and non NaN facility_group_id
energy_table_fg = energy_table[energy_table['main_id'].isna() & energy_table['facility_group_id'].notna()]
energy_table_fg = energy_table_fg.merge(main_table[['facility_group_id', 'facility_group_name', 'facility_type', 'province', 'mining_processing_type', 'commodities']], on='facility_group_id', how='left').drop_duplicates(subset=['facility_group_id'], keep='first')


In [None]:
# Ensure all desired columns are present in both DataFrames before concatenation
cols_to_keep = ['main_id', 'facility_name', 'facility_group_id', 'facility_group_name', 'facility_type', 'province', 'mining_processing_type', 'commodities', 'energy_MJ']

# Add missing columns to merged_f
for col in cols_to_keep:
    if col not in energy_table_f.columns:
        energy_table_f[col] = None
# Add missing columns to merged_fg
for col in cols_to_keep:
    if col not in energy_table_fg.columns:
        energy_table_fg[col] = None

# Reorder columns
energy_table_f = energy_table_f[cols_to_keep]
energy_table_fg = energy_table_fg[cols_to_keep]

# Combine the two results
energy_table = pd.concat([energy_table_f, energy_table_fg])

In [None]:
energy_table['energy_GJ'] = energy_table['energy_MJ'] / 1000

In [None]:
from core.constants import metal_map
metal_map_lower = {k.lower(): v for k, v in metal_map.items()}
energy_table['commodities'] = energy_table['commodities'].apply(lambda x: abbreviate_metals(x, metal_map_lower))

In [None]:
from utils.visualisation_functions import plot_2axes_by_commodity

In [None]:
fig_nrj_html = plot_2axes_by_commodity(energy_table, x_label=' ', y_label='MJ/t ore processed', export_path='data/Parametrization/nrj.html', export_format='html')

# Environmental flows exploration

In [None]:
# Let's keep rows for source_id = https://www.canada.ca/en/environment-climate-change/services/national-pollutant-release-inventory/tools-resources-data/exploredata.html or https://www.canada.ca/en/environment-climate-change/services/environmental-indicators/greenhouse-gas-emissions/large-facilities.html
biosphere_df = biosphere_df[biosphere_df['source_id'].isin([
    'https://www.canada.ca/en/environment-climate-change/services/national-pollutant-release-inventory/tools-resources-data/exploredata.html',
    'https://www.canada.ca/en/environment-climate-change/services/environmental-indicators/greenhouse-gas-emissions/large-facilities.html'
])]

In [None]:
# Merge to add the main_table information and the substance name
biosphere_df =biosphere_df.merge(main_table[['main_id', 'province', 'facility_type']], on='main_id', how='left')
biosphere_df = biosphere_df.merge(substances_table[['substance_id', 'substance_name']], on='substance_id', how='left')

In [None]:
biosphere_df

In [None]:
biosphere_df['commodities'] = biosphere_df['commodities'].apply(lambda x: abbreviate_metals(x, metal_map_lower))

In [None]:
biosphere_mining_df = biosphere_df[biosphere_df['facility_type'] == 'mining']
biosphere_man_df = biosphere_df[biosphere_df['facility_type'] == 'manufacturing']

In [None]:
biosphere_df

In [None]:
from utils.visualisation_functions import plot_biosphere

In [None]:
fig_min = plot_biosphere(
    biosphere_df=biosphere_mining_df,
    x_col='commodities',
    y_col='value_normalized',
    color_col='province',
    symbol_col='mining_processing_type',
    hover_name_cols=['facility_name', 'facility_group_name'],
    y_unit_col='unit_normalized',
    save_path="data/Parametrization/npri_min.html"
)


In [None]:
fig_man = plot_biosphere(
    biosphere_df=biosphere_man_df,
    x_col='commodities',
    y_col='value_normalized',
    color_col='province',
    symbol_col='mining_processing_type',
    hover_name_cols=['facility_name', 'facility_group_name'],
    y_unit_col='unit_normalized',
    save_path="data/Parametrization/npri_man.html"
)

# Land occupation exploration

In [None]:
land_table

In [None]:
# Step 1: Find main_id that have more than one unique source_id
multi_source_main_ids = land_table.groupby('main_id')['source_id'].nunique()
multi_source_main_ids = multi_source_main_ids[multi_source_main_ids > 1].index

# Step 2: Filter the dataframe
land_table = land_table[
    ~land_table['main_id'].isin(multi_source_main_ids) |
    (land_table['source_id'] == "https://www.nature.com/articles/s41597-025-05296-y")
]

In [None]:
land_table

In [None]:
# For each main_id, give me the sum of area_km2 associated
land_table = land_table.groupby('main_id')['area_km2'].sum().reset_index()

In [None]:
land_table = land_table.merge(main_table[['main_id', 'facility_group_id', 'facility_name', 'facility_group_name', 'province', 'facility_type', 'mining_processing_type', 'commodities']], on='main_id', how='left')

In [None]:
land_table

In [None]:
land_table_mining = land_table[land_table['facility_type'] == 'mining']

In [None]:
land_table_mining['commodities'] = land_table_mining['commodities'].apply(lambda x: abbreviate_metals(x, metal_map_lower))

In [None]:
land_mining_plot = plot_2axes_by_commodity(land_table_mining, y_col='area_km2',x_label=' ', y_label='km2', export_path='data/Parametrization/land.html', export_format='html')

In [None]:
ta_grade = tech_attributes_table[tech_attributes_table['material_type'] == 'Grade']
ta_strip = tech_attributes_table[tech_attributes_table['material_type'] == 'Strip ratio']

In [None]:
ta_grade_ids = set(tech_attributes_table[['main_id', 'facility_group_id']].apply(tuple, axis=1))
ta_strip_ids = set(tech_attributes_table[['main_id', 'facility_group_id']].apply(tuple, axis=1))
ta_common_ids = ta_grade_ids.intersection(ta_strip_ids)

## Prepare dfs

In [None]:
biosphere_param_df = get_info_for_ids(biosphere_df, ta_grade_ids)
biosphere_param_df

In [None]:
# Extract all the rows from the production_data_available that have a non NaN main_id
biosphere_param_df_f = biosphere_param_df[biosphere_param_df['main_id'].notna()]
biosphere_param_df_f = biosphere_param_df_f.merge(main_table[['main_id', 'facility_name', 'facility_type', 'province', 'mining_processing_type', 'commodities']], on='main_id', how='left').drop_duplicates(subset=['main_id'], keep='first')

In [None]:
# Extract all the rows from the production_data_available that have a NaN main_id and non NaN facility_group_id
biosphere_param_df_fg = biosphere_param_df[biosphere_param_df['main_id'].isna() & biosphere_param_df['facility_group_id'].notna()]
biosphere_param_df_fg = biosphere_param_df_fg.merge(main_table[['facility_group_id', 'facility_group_name', 'facility_type', 'province', 'mining_processing_type', 'commodities']], on='facility_group_id', how='left').drop_duplicates(subset=['facility_group_id'], keep='first')


In [None]:
biosphere_param_df_f.columns

In [None]:
biosphere_param_df_fg.columns

In [None]:
# Ensure all desired columns are present in both DataFrames before concatenation
cols_to_keep = ['main_id', 'facility_name', 'facility_group_id', 'facility_group_name', 'facility_type', 'province', 'mining_processing_type', 'commodities', 'energy_MJ']

# Add missing columns to merged_f
for col in cols_to_keep:
    if col not in energy_table_f.columns:
        energy_table_f[col] = None
# Add missing columns to merged_fg
for col in cols_to_keep:
    if col not in energy_table_fg.columns:
        energy_table_fg[col] = None

# Reorder columns
energy_table_f = energy_table_f[cols_to_keep]
energy_table_fg = energy_table_fg[cols_to_keep]

# Combine the two results
energy_table = pd.concat([energy_table_f, energy_table_fg])

In [None]:
biosphere_param_df.to_excel(r'data/Parametrization/param_biosphere.xlsx', index=False)

In [None]:
energy_param_df = get_info_for_ids(energy_df, ta_grade_ids)
energy_param_df

## NRJ

In [None]:
nrj_param = pd.read_excel(r'data/Parametrization/energy_parametrization.xlsx', sheet_name='RECAP')

In [None]:
# Make sure garde is numeric
nrj_param ['Grade'] = pd.to_numeric(nrj_param ['Grade'], errors='coerce')
nrj_param ['energy_MJ'] = pd.to_numeric(nrj_param ['energy_MJ'], errors='coerce')

In [None]:
# Apply
nrj_param ['commodities'] = nrj_param['commodities'].apply(lambda x: abbreviate_metals(x, metal_map_lower))

In [None]:
nrj_param

In [None]:
def plot_energy_vs_grade(
    df,
    x_col='Grade',
    y_col='energy_MJ',
    color_col='commodities',
    symbol_col='mining_processing_type',
    hover_name_cols=['facility_name', 'facility_group_name'],
    x_label='Grade (%)',
    y_label='MJ/t ore processed',
    x_log=False,
    font_color="#333333",
    size_marker=10,
    save_path=None
):
    """
    Scatter plot of energy vs grade.
    Color = commodities
    Symbol = mining/processing type
    Fixed mapping for symbols to match the legend
    """
    df = df.copy()

    # Build hover_name
    if hover_name_cols and all(col in df.columns for col in hover_name_cols):
        df['hover_name'] = (
            df[hover_name_cols[0]].astype(str)
            + " (" + df[hover_name_cols[1]].astype(str) + ")"
        )
    else:
        df['hover_name'] = df[hover_name_cols[0]] if hover_name_cols else None

    # Prepare color and symbol sequences
    color_sequence = px.colors.qualitative.Plotly
    symbol_sequence = [
        "circle", "square", "diamond", "cross", "x",
        "triangle-up", "triangle-down", "triangle-left", "triangle-right",
        "star", "hexagon", "pentagon"
    ]

    # Create symbol map for unique values
    unique_symbols = df[symbol_col].dropna().unique()
    symbol_map = {sym: symbol_sequence[i % len(symbol_sequence)] for i, sym in enumerate(unique_symbols)}

    # Create color map for unique commodities
    unique_colors = df[color_col].dropna().unique()
    color_map = {col: color_sequence[i % len(color_sequence)] for i, col in enumerate(unique_colors)}

    # Apply the mappings
    df['marker_symbol'] = df[symbol_col].map(symbol_map)
    df['marker_color'] = df[color_col].map(color_map)

    # Initialize figure
    fig = go.Figure()

    # Scatter trace
    fig.add_trace(
        go.Scatter(
            x=df[x_col],
            y=df[y_col],
            mode='markers',
            marker=dict(
                size=size_marker,
                color=df['marker_color'],
                symbol=df['marker_symbol']
            ),
            text=df['hover_name'],
            hovertemplate='%{text}<br>%{x:.4f} ' + x_col + '<br>%{y:.2f} ' + y_col + '<extra></extra>',
            showlegend=False
        )
    )

    # Add dummy traces for color legend
    for col, col_color in color_map.items():
        fig.add_trace(
            go.Scatter(
                x=[None], y=[None], mode='markers',
                marker=dict(symbol="circle", size=size_marker, color=col_color),
                legendgroup="Color",
                showlegend=True,
                name=str(col)
            )
        )

    # Add dummy traces for symbol legend
    for sym, sym_marker in symbol_map.items():
        fig.add_trace(
            go.Scatter(
                x=[None], y=[None], mode='markers',
                marker=dict(symbol=sym_marker, size=size_marker, color="grey"),
                legendgroup="Symbol",
                showlegend=True,
                name=str(sym)
            )
        )

    # Layout
    fig.update_layout(
        xaxis_title=x_label if x_label else x_col.replace('_', ' ').title(),
        yaxis_title=y_label if y_label else y_col.replace('_', ' ').title(),
        font=dict(color=font_color, size=14),
        template="plotly_white",
        legend=dict(tracegroupgap=20, itemsizing='constant'),
        height=600
    )

    if x_log:
        fig.update_xaxes(type="log")

    if save_path:
        fig.write_html(save_path, include_plotlyjs='cdn')

    fig.show()
    return fig

In [None]:
fig_log = plot_energy_vs_grade(
    nrj_param,
    x_col='Grade',
    y_col='energy_MJ',
    color_col='commodities',
    symbol_col='mining_processing_type',
    x_log=True,
    font_color='black',
    save_path=r'data/Parametrization/param_nrj_log.html'
)

# Filtered grades < 0.01
df_filtered = nrj_param[nrj_param['Grade'] < 0.01]
fig_filtered = plot_energy_vs_grade(
    df_filtered,
    x_col='Grade',
    y_col='energy_MJ',
    color_col='commodities',
    symbol_col='mining_processing_type',
    font_color='black',
    save_path=r'data/Parametrization/param_nrj.html'
)

## Environmental flows

In [None]:
biosphere_param = pd.read_excel(r'data/Parametrization/biosphere_parametrization.xlsx')

In [None]:
biosphere_param

In [None]:
# Make sure garde is numeric
biosphere_param['Grade'] = pd.to_numeric(biosphere_param['Grade'], errors='coerce')
biosphere_param['Strip ratio'] = pd.to_numeric(biosphere_param['Strip ratio'], errors='coerce')
biosphere_param['Recovery rate'] = pd.to_numeric(biosphere_param['Recovery rate'], errors='coerce')
biosphere_param['value_normalized'] = pd.to_numeric(biosphere_param['value_normalized'], errors='coerce')

In [None]:
# Apply
biosphere_param['commodities'] = biosphere_param['commodities'].apply(lambda x: abbreviate_metals(x, metal_map_lower))

In [None]:

import plotly.graph_objects as go
import plotly.express as px

def plot_energy_vs_grade_by_substance(
    df,
    x_col='Grade',
    y_col='value',
    substance_col='substance_id',
    color_col='commodities',
    symbol_col='mining_processing_type',
    hover_name_cols=['facility_name', 'facility_group_name'],
    x_label='Grade (%)',
    y_label='MJ/t ore processed',
    x_log=False,
    font_color="#333333",
    size_marker=10,
    save_path=None
):
    """
    Scatter plot of energy vs grade, with dropdown by substance_id.
    Color = commodities
    Symbol = mining/processing type
    Dropdown allows filtering by substance_id.
    """

    df = df.copy()

    # Build hover_name
    if hover_name_cols and all(col in df.columns for col in hover_name_cols):
        df['hover_name'] = (
            df[hover_name_cols[0]].astype(str)
            + " (" + df[hover_name_cols[1]].astype(str) + ")"
        )
    else:
        df['hover_name'] = df[hover_name_cols[0]] if hover_name_cols else None

    # --- Filter for unique substances ---
    substances = df[substance_col].dropna().unique()
    substances = sorted(substances, key=lambda x: str(x))

    # Prepare color and symbol sequences
    color_sequence = px.colors.qualitative.Plotly
    symbol_sequence = [
        "circle", "square", "diamond", "cross", "x",
        "triangle-up", "triangle-down", "triangle-left", "triangle-right",
        "star", "hexagon", "pentagon"
    ]

    # Create symbol map for unique values
    unique_symbols = df[symbol_col].dropna().unique()
    symbol_map = {sym: symbol_sequence[i % len(symbol_sequence)] for i, sym in enumerate(unique_symbols)}

    # Create color map for unique commodities
    unique_colors = df[color_col].dropna().unique()
    color_map = {col: color_sequence[i % len(color_sequence)] for i, col in enumerate(unique_colors)}

    # Initialize figure
    fig = go.Figure()

    # --- Add one trace per substance ---
    for i, substance in enumerate(substances):
        df_sub = df[df[substance_col] == substance]
        df_sub = df_sub.dropna(subset=[x_col, y_col])
        visible = True if i == 0 else False

        fig.add_trace(
            go.Scatter(
                x=df_sub[x_col],
                y=df_sub[y_col],
                mode='markers',
                marker=dict(
                    size=size_marker,
                    color=df_sub[color_col].map(color_map),
                    symbol=df_sub[symbol_col].map(symbol_map)
                ),
                text=df_sub['hover_name'],
                hovertemplate='%{text}<br>%{x:.4f} ' + x_col + '<br>%{y:.2f} ' + y_label + '<extra></extra>',
                name=str(substance),
                visible=visible,
                showlegend=False
            )
        )

    # --- Add dummy traces for color legend ---
    for col, col_color in color_map.items():
        fig.add_trace(
            go.Scatter(
                x=[None], y=[None], mode='markers',
                marker=dict(symbol="circle", size=size_marker, color=col_color),
                legendgroup="Color",
                showlegend=True,
                name=str(col)
            )
        )

    # --- Add dummy traces for symbol legend ---
    for sym, sym_marker in symbol_map.items():
        fig.add_trace(
            go.Scatter(
                x=[None], y=[None], mode='markers',
                marker=dict(symbol=sym_marker, size=size_marker, color="grey"),
                legendgroup="Symbol",
                showlegend=True,
                name=str(sym)
            )
        )

    # --- Dropdown buttons ---
    buttons = []
    for i, substance in enumerate(substances):
        visibility = [False]*len(substances) + [True]*(len(color_map)+len(symbol_map))
        visibility[i] = True
        buttons.append(
            dict(
                label=str(substance),
                method="update",
                args=[{"visible": visibility},
                      {"title": f"Energy vs Grade – {substance}"}]
            )
        )

    # Layout
    fig.update_layout(
        updatemenus=[dict(buttons=buttons, direction="down", showactive=True)],
        xaxis_title=x_label if x_label else x_col.replace('_', ' ').title(),
        yaxis_title=y_label if y_label else y_col.replace('_', ' ').title(),
        font=dict(color=font_color, size=14),
        template="plotly_white",
        legend=dict(tracegroupgap=20, itemsizing='constant'),
        height=600,
        title=f"Energy vs Grade – {substances[0]}"
    )

    if x_log:
        fig.update_xaxes(type="log")

    if save_path:
        fig.write_html(save_path, include_plotlyjs='cdn')

    fig.show()
    return fig


In [None]:
plot_energy_vs_grade_by_substance(
    biosphere_param,
    x_col='Grade',            # ou autre colonne quantitative
    y_col='value_normalized', # par exemple
    substance_col='substance_id',
    color_col='commodities',
    symbol_col='mining_processing_type',
    y_label='MJ/t ore processed'
)

## Land occupation