In [None]:
# Install required packages
!pip install sentence-transformers transformers nltk spacy pandas torch boto3 rich cohere-aws



In [None]:
# Download required NLTK and spaCy resources
import nltk
nltk.download('stopwords')
!python -m spacy download en_core_web_sm


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


Collecting en-core-web-sm==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m68.6 MB/s[0m eta [36m0:00:00[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


In [None]:
import os
import json
import logging
import numpy as np
import pandas as pd
import torch
from google.colab import files

In [None]:
# AWS credentials setup
from google.colab import userdata

# Securely access AWS credentials stored in Colab secrets
aws_access_key = userdata.get('AWS_ACCESS_KEY_ID')
aws_secret_key = userdata.get('AWS_SECRET_ACCESS_KEY')
aws_region = userdata.get('AWS_REGION', 'us-east-1')

# Set environment variables for AWS authentication
os.environ['AWS_ACCESS_KEY_ID'] = aws_access_key
os.environ['AWS_SECRET_ACCESS_KEY'] = aws_secret_key
os.environ['AWS_REGION'] = aws_region

# Alternative method if you don't want to use Colab secrets
# Uncomment and fill these lines instead
# os.environ['AWS_ACCESS_KEY_ID'] = 'your-access-key'
# os.environ['AWS_SECRET_ACCESS_KEY'] = 'your-secret-key'
# os.environ['AWS_REGION'] = 'us-east-1'


In [None]:
# Create prompts.py content
%%writefile prompts.py
# Prompt for guiding the LLM to recommend emission factors
lca_assistant_prompt = """
You are a carbon accounting assistant with expertise in Life Cycle Assessment (LCA) and environmental impact factor mapping.

Your goal is to help identify the most appropriate emission factor for a given business activity or product description. Use your knowledge of carbon accounting standards, environmental impact categories, and manufacturing processes to recommend the best match.

Business Activity Description: {input}

Consider the following potential emission factors that may be relevant:
- NAICS categories for EEIO analysis
- Ecoinvent processes for detailed LCA

Please provide a reasoned recommendation explaining why this emission factor is appropriate for the described activity.
"""

# System prompt for Claude 3 Sonnet
system_lca_assistant_prompt = """
You are a carbon accounting expert specializing in emission factor selection for carbon footprinting. Your task is to:

1. Analyze the given business activity or product description.
2. Recommend the most appropriate emission factor from the provided options.
3. Provide a clear justification for your recommendation, explaining why it's the best match.
4. If appropriate, suggest an alternative emission factor as a second choice.

Your recommendations should be precise, technically sound, and follow carbon accounting best practices.
"""

eio_groundtruth_json = {
    "source": "",
    "formConfig": {
        "fields": [
            {
                "id": "",
                "type": "radio",
                "label": "Select the most appropriate NAICS emission factor:",
                "required": True,
                "options": []
            }
        ]
    }
}


process_groundtruth_json = {
    "source": "",
    "formConfig": {
        "fields": [
            {
                "id": "",
                "type": "radio",
                "label": "Select the most appropriate process emission factor:",
                "required": True,
                "options": []
            }
        ]
    }
}


Writing prompts.py


In [None]:
from prompts import lca_assistant_prompt, system_lca_assistant_prompt, eio_groundtruth_json, process_groundtruth_json

In [None]:
import ast
import base64
import hashlib
import re
import uuid
from time import time
import requests
from nltk.corpus import stopwords as nltk_stopwords
from spacy.lang.en import stop_words as spacy_stopwords
from sentence_transformers import SentenceTransformer, util
from rich.logging import RichHandler
from rich.progress import Progress, BarColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn

In [None]:
# Set up logging
def setup_logging(filename="parakeet_debug.log"):
    logger = logging.getLogger("eifmap")
    logger.setLevel(logging.INFO)

    # Console handler with rich formatting
    shell_handler = RichHandler(level=logging.INFO, rich_tracebacks=True, markup=True)
    shell_handler.setFormatter(logging.Formatter("%(message)s"))

    # File handler for debugging
    file_handler = logging.FileHandler(filename, encoding="utf-8")
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(logging.Formatter("%(levelname)s %(asctime)s [%(filename)s:%(funcName)s:%(lineno)d] %(message)s"))

    logger.addHandler(shell_handler)
    logger.addHandler(file_handler)

    return logger

logger = setup_logging()

In [None]:
def md5_hash(text):
    return hashlib.md5(text.encode()).hexdigest()

def uuid4_base64():
    return base64.b64encode(uuid.uuid4().bytes).decode().replace("=", "")

def preprocess_texts(texts):
    stop_words = spacy_stopwords.STOP_WORDS.union(set(nltk_stopwords.words("english")))

    def clean_and_tokenize(text):
        text = re.sub(r"[^\w\s]", " ", text.lower())
        return [word for word in text.split() if word not in stop_words]

    if isinstance(texts, np.ndarray):
        processed_texts = [clean_and_tokenize(text) for text in texts]
    elif isinstance(texts, str):
        processed_texts = clean_and_tokenize(texts)
    else:
        error_message = "Input must be an np.ndarray or a string."
        raise TypeError(error_message)

    return processed_texts

def get_device():
    if torch.cuda.is_available():
        device = "cuda"
        logger.info("Using GPU to calculate semantic text embedding ...")
    elif torch.backends.mps.is_available():
        device = "mps"
        logger.info("Using MPS to calculate semantic text embedding ...")
    else:
        device = None
        logger.info("Using CPU to calculate semantic text embedding ...")
    return device

In [None]:
#Track Progress
class RichProgress:
    def __init__(self, data, disable_progress=False, description="Processing"):
        self.data = data
        self.total_iterations = len(data)
        self.disable_progress = disable_progress
        self.description = description

    def __enter__(self):
        if not self.disable_progress:
            self.progress = Progress(
                TextColumn("[progress.description]{task.description}"),
                BarColumn(),
                TextColumn("[progress.percentage]{task.percentage:>3.1f}%"),
                TimeElapsedColumn(),
                TimeRemainingColumn(),
                TextColumn("[progress.custom] {task.fields[rate]}"),
            )

            self.task = self.progress.add_task(
                f"{self.description} (0/{self.total_iterations})",
                total=self.total_iterations,
                rate="",
            )

            self.start_time = time()
            self.progress.start()
            self.last_update_time = time()
        else:
            self.start_time = time()
            self.last_update_time = time()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if not self.disable_progress:
            self.progress.stop()

    def update(self, advance=1):
        current_time = time()
        elapsed_time = current_time - self.start_time
        iteration_time = current_time - self.last_update_time

        if not self.disable_progress:
            completed = self.progress.tasks[self.task].completed + advance

            if iteration_time > 1:
                rate = f"{iteration_time:.2f} sec/iteration"
            else:
                iterations_per_second = (completed) / elapsed_time if elapsed_time > 0 else 0
                rate = f"{iterations_per_second:.2f} iterations/sec"

            self.progress.update(
                self.task,
                advance=advance,
                rate=rate,
                description=f"{self.description} ({completed}/{self.total_iterations})",
            )

            self.last_update_time = current_time

In [None]:
def get_ecoinvent_data(ecoinvent_file="https://19913970.fs1.hubspotusercontent-na1.net/hubfs/19913970/Database-Overview-for-ecoinvent-v3.9.1-9.xlsx"):
    res = requests.get(ecoinvent_file)
    excel_data = pd.ExcelFile(res.content, engine='openpyxl')
    eco_df = pd.read_excel(excel_data, sheet_name=2)
    eco_df = eco_df.rename(
        columns={
            'Reference Product Name': 'reference_product',
            'Activity UUID & Product UUID': 'impact_factor_id',
            'Activity Name': 'impact_factor_name',
            'Product Information': 'product_info'
        }
    )
    return eco_df

def get_naics_data(
    useeio_file="https://pasteur.epa.gov/uploads/10.23719/1528686/SupplyChainGHGEmissionFactors_v1.2_NAICS_CO2e_USD2021.csv",
    naics_file="https://www.census.gov/naics/2017NAICS/2017_NAICS_Index_File.xlsx",
):
    useeio_df = pd.read_csv(useeio_file)
    useeio_df = useeio_df[
        [
            "2017 NAICS Code",
            "2017 NAICS Title",
            "Supply Chain Emission Factors with Margins",
            "Reference USEEIO Code",
        ]
    ]
    useeio_df = useeio_df.rename(
        columns={
            "2017 NAICS Code": "naics_code",
            "2017 NAICS Title": "naics_title",
            "Supply Chain Emission Factors with Margins": "co2e_per_dollar",
            "Reference USEEIO Code": "bea_code",
        }
    )
    logger.info(f"Loaded {useeio_df.shape[0]} rows from {useeio_file}")

    naics_df = pd.read_excel(naics_file)
    naics_df = naics_df.rename(
        columns={
            "NAICS17": "naics_code",
            "INDEX ITEM DESCRIPTION": "naics_desc",
        }
    )
    logger.info(f"Loaded {naics_df.shape[0]} rows from {naics_file}")

    naics_df = naics_df.merge(useeio_df, on="naics_code", how="left").dropna()
    naics_df = naics_df.groupby("naics_desc").first().reset_index()
    logger.info(f"Final shape after merge on naics_code: {naics_df.shape}")

    return naics_df

# Recommendation ranking function
def get_ranked_list(
    text,
    semantic_text_model,
    eco_df,
    eco_ref,
    eco_ref_embedding,
    lca_type,
):
    activity_embedding = semantic_text_model.encode([text], show_progress_bar=False, batch_size=1)

    k = 10 if lca_type == "process" else 20
    cosine_scores = util.cos_sim(activity_embedding, eco_ref_embedding)
    sorted_cs, indices = cosine_scores.sort(dim=1, descending=True)
    topK_sbert = indices.squeeze().numpy()[:k].tolist()
    eco_ix = topK_sbert

    # Create a ranked list for collecting ground truth
    if lca_type == "process":
        topK_df = pd.DataFrame(eco_ref[eco_ix], columns=["reference_product"]).copy(deep=True).reset_index()
        topK_df["cosine_score"] = sorted_cs.squeeze().numpy()[:k]
        ranked_list = topK_df.reset_index()[["index", "reference_product"]].to_dict("records")
        topK_df = topK_df.reset_index()[["index", "reference_product"]]
    else:
        topK_df = eco_df.iloc[eco_ix].copy(deep=True).reset_index()
        topK_df["cosine_score"] = sorted_cs.squeeze().numpy()[:k]
        ranked_list = topK_df[["index", "naics_title", "naics_desc", "naics_code"]].to_dict("records")

    return ranked_list, topK_df

# Ground truth preparation functions
def prepare_eio_json(entry, clean_text, response, uniq_id):
    if len(response) < 1:
        error_message = "Response length must be greater than 1."
        raise ValueError(error_message)

    gt_json = eio_groundtruth_json.copy()
    gt_json["source"] = "*Business Activity*: {}\n".format(re.sub(r"[^\w\s]", "", entry))
    gt_json["source"] += f"*AI paraphrased description:* {clean_text}\n\n"
    gt_json["source"] += f"*AI top choice:* {response[0]['naics_title']} ({response[0]['naics_code']})\n"
    gt_json["source"] += f"Justification: {response[0]['justification']}\n\n"

    if len(response) > 1:
        gt_json["source"] += f"*AI second choice:* {response[1]['naics_title']} ({response[1]['naics_code']})\n"
        gt_json["source"] += f"Justification: {response[1]['justification']}\n\n"

    gt_json["formConfig"]["fields"][0]["id"] = uniq_id
    gt_json["formConfig"]["fields"][0]["options"] = pd.concat(
        [
            pd.DataFrame(response).drop("justification", axis=1).rename(columns={"naics_code": "value", "naics_title": "label"}),
            pd.DataFrame(
                [
                    {"label": "Not sure", "value": "-1"},
                    {"label": "EIF options are inappropriate, no match", "value": "-2"},
                    {"label": "Activity description is unclear to select an EIF", "value": "-3"},
                ]
            ),
        ]
    ).to_dict("records")

    return gt_json

def prepare_process_json(activity_text, response, sel_eco, uniq_id):
    gt_json = process_groundtruth_json.copy()
    gt_json["source"] = "*Given description:* {}\n".format(re.sub(r"[^\w\s]", "", activity_text))
    gt_json["source"] += "\n*AI top choice:* {}\n".format(response[0]["impact_factor_name"])
    gt_json["source"] += f"\nJustification: {response[0]['justification']}"

    if len(response) > 1:
        gt_json["source"] += "\n\n*AI next choice:* {}\n".format(response[1]["impact_factor_name"])
        gt_json["source"] += f"\nJustification: {response[1]['justification']}"

    gt_json["formConfig"]["fields"][0]["id"] = uniq_id
    gt_json["formConfig"]["fields"][0]["options"] = pd.concat(
        [
            sel_eco[["impact_factor_name", "impact_factor_id"]].rename(columns={"impact_factor_id": "value", "impact_factor_name": "label"}),
            pd.DataFrame(
                [
                    {"label": "None of the impact factors match", "value": "0"},
                    {"label": "Not sure", "value": "-1"},
                    {"label": "Activity text unclear for selection", "value": "-3"},
                ]
            ),
        ]
    ).to_dict("records")

    return gt_json

# Activity data loading function
def read_activities(
    activity_file,
    activity_col,
    start_idx=0,
    end_idx=None,
    sheet_name=0,
):
    logger.info(f"Reading {activity_file}")

    _, file_extension = os.path.splitext(activity_file)
    if file_extension == ".csv":
        activity_df = pd.read_csv(activity_file)
    elif file_extension == ".xlsx":
        activity_df = pd.read_excel(activity_file, sheet_name=sheet_name)
    else:
        error_message = f"Unsupported file extension: {file_extension}"
        raise ValueError(error_message)

    activity_df = activity_df.fillna("").reset_index(drop=True).drop_duplicates()
    logger.info(f"Read {len(activity_df)} activities")

    if end_idx is None:
        end_idx = len(activity_df)

    logger.info(f"Will be processing between index {start_idx} and {end_idx}")

    return activity_df.iloc[start_idx:end_idx].reset_index(drop=True)

In [None]:
!pip install langchain langchain_core langchain_community

Collecting langchain_community
  Downloading langchain_community-0.3.19-py3-none-any.whl.metadata (2.4 kB)
Collecting langchain_core
  Downloading langchain_core-0.3.41-py3-none-any.whl.metadata (5.9 kB)
Collecting langchain
  Downloading langchain-0.3.20-py3-none-any.whl.metadata (7.7 kB)
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain_community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting pydantic-settings<3.0.0,>=2.4.0 (from langchain_community)
  Downloading pydantic_settings-2.8.1-py3-none-any.whl.metadata (3.5 kB)
Collecting httpx-sse<1.0.0,>=0.4.0 (from langchain_community)
  Downloading httpx_sse-0.4.0-py3-none-any.whl.metadata (9.0 kB)
Collecting marshmallow<4.0.0,>=3.18.0 (from dataclasses-json<0.7,>=0.5.7->langchain_community)
  Downloading marshmallow-3.26.1-py3-none-any.whl.metadata (7.3 kB)
Collecting typing-inspect<1,>=0.4.0 (from dataclasses-json<0.7,>=0.5.7->langchain_community)
  Downloading typing_inspect-0.9.0-py3-none-a

In [None]:
# Bedrock client implementation
import boto3
import json
from botocore.config import Config
from langchain.chains import ConversationChain
from langchain.memory import ConversationBufferMemory
from langchain_community.llms import Bedrock
from langchain_core.prompts import PromptTemplate

def get_bedrock_client(
    assumed_role=None,
    region=None,
    runtime=True,
):
    """Create a boto3 client for Amazon Bedrock, with optional configuration overrides."""
    if region is None:
        target_region = os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION"))
    else:
        target_region = region

    logger.info(f"Create new client\n Using region: {target_region}")

    session_kwargs = {"region_name": target_region}
    client_kwargs = {**session_kwargs}

    profile_name = os.environ.get("AWS_PROFILE")
    if profile_name:
        logger.info(f" Using profile: {profile_name}")
        session_kwargs["profile_name"] = profile_name

    retry_config = Config(
        region_name=target_region,
        retries={
            "max_attempts": 10,
            "mode": "standard",
        },
    )

    session = boto3.Session(**session_kwargs)

    if assumed_role:
        logger.info(f" Using role: {assumed_role}", end="")
        sts = session.client("sts")
        response = sts.assume_role(RoleArn=str(assumed_role), RoleSessionName="langchain-llm-1")
        logger.info(" ... successful!")
        client_kwargs["aws_access_key_id"] = response["Credentials"]["AccessKeyId"]
        client_kwargs["aws_secret_access_key"] = response["Credentials"]["SecretAccessKey"]
        client_kwargs["aws_session_token"] = response["Credentials"]["SessionToken"]

    if runtime:
        service_name = "bedrock-runtime"
    else:
        service_name = "bedrock"

    bedrock_client = session.client(service_name=service_name, config=retry_config, **client_kwargs)
    logger.info("boto3 Bedrock client successfully created!")
    logger.info(str(bedrock_client._endpoint))

    return bedrock_client

# LCAAssistant implementation
class LCAAssistant:
    def __init__(self, llm_model="anthropic.claude-3-sonnet-20240229-v1:0"):
        self.model_list = [
            "anthropic.claude-3-sonnet-20240229-v1:0"
        ]

        self.llm_model = llm_model
        self.boto3_bedrock = get_bedrock_client()

        if self.llm_model in self.model_list:
            self.history = []
        else:
            assistant_model = Bedrock(
                model_id=llm_model,
                client=self.boto3_bedrock,
                model_kwargs={"temperature": 0},
            )

            memory = ConversationBufferMemory(ai_prefix="Assistant")
            self.conversation = ConversationChain(llm=assistant_model, verbose=False, memory=memory)
            self.conversation.prompt = PromptTemplate.from_template(lca_assistant_prompt)

        logger.info("LCA Assistant initialized")

    def reset_mem(self):
        if self.llm_model in self.model_list:
            self.history = []
        else:
            self.memory.clear()
            self.conversation.prompt = PromptTemplate.from_template(lca_assistant_prompt)

    def chat(self, text, temperature=0.0):
        if self.llm_model in self.model_list:
            input_body = dict()
            input_body["messages"] = [{"role": "user", "content": text}]
            self.history += input_body["messages"]

            try:
                response = self.boto3_bedrock.invoke_model(
                    body=json.dumps(
                        {
                            "anthropic_version": "bedrock-2023-05-31",
                            "temperature": temperature,
                            "max_tokens": 4096,
                            "system": system_lca_assistant_prompt,
                            "messages": self.history,
                        }
                    ),
                    modelId=self.llm_model,
                )
            except Exception as e:
                logger.exception(e)
                logger.exception("Returning empty string")
                return ""

            response_body = json.loads(response.get("body").read())
            self.history.append({key: response_body[key] for key in ["role", "content"]})

            return response_body.get("content")[0]["text"]

        return self.conversation.invoke(text)["response"].strip()

    def __call__(
        self,
        text,
        format="text",
        reset_mem=False,
        retries=1,
        temperature=0.0,
        validation_fn=None,
    ):
        if reset_mem:
            self.reset_mem()

        if format == "python":
            while retries > 0:
                try:
                    response = self.chat(text, temperature=temperature)
                    parsed = ast.literal_eval(response)

                    if validation_fn:
                        validation_fn(parsed)

                    return parsed
                except Exception as e:
                    logger.exception(e)
                    logger.warning("Retrying again")
                    text = f"Your previous response, when parsed with a python code interpreter, caused this python exception: {e!r}\n. This time generate a response that doesn't cause this exception. ### ORIGINAL INSTRUCTIONS ###\n" + text
                    retries -= 1

            return ast.literal_eval(self.chat(text, temperature=temperature))

        return self.chat(text, temperature=temperature)


In [None]:
# Main execution flow for Parakeet
def parakeet_predict_eeio(
    activities_df,
    activity_col,
    model_name="all-MiniLM-L6-v2",
    batch_size=32,
    start_idx=0,
    end_idx=None,
    output_file="eeio_predictions.json"
):
    """
    Generate EEIO emission factor recommendations for business activities

    Parameters:
    -----------
    activities_df : pandas.DataFrame
        DataFrame containing business activities
    activity_col : str
        Column name containing the business activity descriptions
    model_name : str
        Name of the SentenceTransformer model to use
    batch_size : int
        Batch size for processing
    start_idx : int
        Starting index for processing
    end_idx : int
        Ending index for processing
    output_file : str
        File to save predictions
    """
    # Load NAICS data
    logger.info("Loading NAICS data...")
    naics_df = get_naics_data()

    # Initialize SentenceTransformer model
    device = get_device()
    logger.info(f"Loading {model_name} model...")
    semantic_text_model = SentenceTransformer(model_name, device=device)

    # Initialize LCAAssistant
    logger.info("Initializing LCA Assistant...")
    lca_assistant = LCAAssistant()

    # Prepare NAICS embeddings
    logger.info("Computing NAICS embeddings...")
    naics_ref = naics_df["naics_title"].values
    naics_ref_embedding = semantic_text_model.encode(
        naics_ref,
        batch_size=128,
        show_progress_bar=True,
        convert_to_tensor=True
    )

    # Process activities
    if end_idx is None:
        end_idx = len(activities_df)

    activities = activities_df[activity_col].iloc[start_idx:end_idx].tolist()
    logger.info(f"Processing {len(activities)} activities from {start_idx} to {end_idx}")

    results = []

    with RichProgress(activities, description="Processing activities") as progress:
        for i, activity in enumerate(activities):
            activity_id = md5_hash(activity)
            logger.info(f"Processing activity {i+1}/{len(activities)}: {activity[:50]}...")

            # Get ranked list of potential emission factors
            ranked_list, topK_df = get_ranked_list(
                activity,
                semantic_text_model,
                naics_df,
                naics_ref,
                naics_ref_embedding,
                "eio"
            )

            # Use LLM to recommend the best emission factor
            prompt = f"""
I need to map this business activity to the most appropriate NAICS emission factor for carbon footprinting.

Business Activity: {activity}

Here are the top candidate NAICS categories from semantic matching:
{json.dumps(ranked_list[:5], indent=2)}

Please analyze the business activity and candidate NAICS categories, then recommend the most appropriate emission factor.
Return your answer as a JSON list with this structure:
[
  {{
    "naics_code": "string",
    "naics_title": "string",
    "justification": "detailed explanation of why this is the best match"
  }},
  {{
    "naics_code": "string",
    "naics_title": "string",
    "justification": "detailed explanation of why this is a reasonable alternative"
  }}
]
"""

            try:
                response = lca_assistant(prompt, format="python", reset_mem=True)

                # Generate paraphrased version of the activity for better understanding
                paraphrase_prompt = f"Paraphrase this business activity in clear, standardized terminology for carbon accounting purposes: '{activity}'"
                clean_text = lca_assistant(paraphrase_prompt, reset_mem=True)

                # Prepare result
                uniq_id = uuid4_base64()
                result = {
                    "activity_id": activity_id,
                    "activity": activity,
                    "paraphrased": clean_text,
                    "ranked_list": ranked_list,
                    "recommendations": response,
                    "timestamp": pd.Timestamp.now().isoformat()
                }

                results.append(result)

                # Prepare ground truth JSON for human validation
                gt_json = prepare_eio_json(activity, clean_text, response, uniq_id)

                # Save ground truth JSON for this activity
                with open(f"gt_{activity_id}.json", "w") as f:
                    json.dump(gt_json, f, indent=2)

            except Exception as e:
                logger.exception(f"Error processing activity {i}: {e}")

            progress.update()

    # Save all results
    with open(output_file, "w") as f:
        json.dump(results, f, indent=2)

    logger.info(f"Saved {len(results)} predictions to {output_file}")
    return results

def parakeet_predict_process(
    activities_df,
    activity_col,
    model_name="all-MiniLM-L6-v2",
    batch_size=32,
    start_idx=0,
    end_idx=None,
    output_file="process_predictions.json"
):
    """
    Generate process-based LCA emission factor recommendations for business activities

    Parameters:
    -----------
    activities_df : pandas.DataFrame
        DataFrame containing business activities
    activity_col : str
        Column name containing the business activity descriptions
    model_name : str
        Name of the SentenceTransformer model to use
    batch_size : int
        Batch size for processing
    start_idx : int
        Starting index for processing
    end_idx : int
        Ending index for processing
    output_file : str
        File to save predictions
    """
    # Load Ecoinvent data
    logger.info("Loading Ecoinvent data...")
    eco_df = get_ecoinvent_data()

    # Initialize SentenceTransformer model
    device = get_device()
    logger.info(f"Loading {model_name} model...")
    semantic_text_model = SentenceTransformer(model_name, device=device)

    # Initialize LCAAssistant
    logger.info("Initializing LCA Assistant...")
    lca_assistant = LCAAssistant()

    # Prepare Ecoinvent embeddings
    logger.info("Computing Ecoinvent embeddings...")
    eco_ref = eco_df["reference_product"].values
    eco_ref_embedding = semantic_text_model.encode(
        eco_ref,
        batch_size=128,
        show_progress_bar=True,
        convert_to_tensor=True
    )

    # Process activities
    if end_idx is None:
        end_idx = len(activities_df)

    activities = activities_df[activity_col].iloc[start_idx:end_idx].tolist()
    logger.info(f"Processing {len(activities)} activities from {start_idx} to {end_idx}")

    results = []

    with RichProgress(activities, description="Processing activities") as progress:
        for i, activity in enumerate(activities):
            activity_id = md5_hash(activity)
            logger.info(f"Processing activity {i+1}/{len(activities)}: {activity[:50]}...")

            # Get ranked list of potential emission factors
            ranked_list, topK_df = get_ranked_list(
                activity,
                semantic_text_model,
                eco_df,
                eco_ref,
                eco_ref_embedding,
                "process"
            )

            # Use LLM to recommend the best emission factor
            prompt = f"""
I need to map this activity to the most appropriate process-based emission factor for detailed LCA.

Activity: {activity}

Here are the top candidate emission factors from semantic matching:
{json.dumps(ranked_list[:5], indent=2)}

Please analyze the activity and candidate emission factors, then recommend the most appropriate one.
Return your answer as a JSON list with this structure:
[
  {{
    "impact_factor_id": "string",
    "impact_factor_name": "string",
    "justification": "detailed explanation of why this is the best match"
  }},
  {{
    "impact_factor_id": "string",
    "impact_factor_name": "string",
    "justification": "detailed explanation of why this is a reasonable alternative"
  }}
]
"""

            try:
                response = lca_assistant(prompt, format="python", reset_mem=True)

                # Prepare result
                uniq_id = uuid4_base64()
                result = {
                    "activity_id": activity_id,
                    "activity": activity,
                    "ranked_list": ranked_list,
                    "recommendations": response,
                    "timestamp": pd.Timestamp.now().isoformat()
                }

                results.append(result)

                # Prepare ground truth JSON for human validation
                gt_json = prepare_process_json(activity, response, eco_df.iloc[topK_df["index"].values], uniq_id)

                # Save ground truth JSON for this activity
                with open(f"gt_{activity_id}.json", "w") as f:
                    json.dump(gt_json, f, indent=2)

            except Exception as e:
                logger.exception(f"Error processing activity {i}: {e}")

            progress.update()

    # Save all results
    with open(output_file, "w") as f:
        json.dump(results, f, indent=2)

    logger.info(f"Saved {len(results)} predictions to {output_file}")
    return results


In [None]:
# Evaluation function to compare predictions against ground truth
def evaluate_predictions(predictions_file, ground_truth_file, lca_type="eio"):
    """
    Evaluate Parakeet predictions against ground truth

    Parameters:
    -----------
    predictions_file : str
        File containing Parakeet predictions
    ground_truth_file : str
        File containing ground truth data
    lca_type : str
        Type of LCA - 'eio' or 'process'

    Returns:
    --------
    dict
        Dictionary containing evaluation metrics
    """
    # Load predictions and ground truth
    with open(predictions_file, "r") as f:
        predictions = json.load(f)

    ground_truth = pd.read_csv(ground_truth_file)

    # Create dictionaries for easy lookup
    pred_dict = {p["activity_id"]: p for p in predictions}

    # Calculate metrics
    total = 0
    correct_top1 = 0
    correct_top2 = 0

    for _, row in ground_truth.iterrows():
        activity_id = row["activity_id"]
        true_factor = row["true_factor_id"]

        if activity_id not in pred_dict:
            continue

        pred = pred_dict[activity_id]
        total += 1

        if lca_type == "eio":
            if pred["recommendations"][0]["naics_code"] == true_factor:
                correct_top1 += 1

            if len(pred["recommendations"]) > 1 and (
                pred["recommendations"][0]["naics_code"] == true_factor or
                pred["recommendations"][1]["naics_code"] == true_factor
            ):
                correct_top2 += 1
        else:
            if pred["recommendations"][0]["impact_factor_id"] == true_factor:
                correct_top1 += 1

            if len(pred["recommendations"]) > 1 and (
                pred["recommendations"][0]["impact_factor_id"] == true_factor or
                pred["recommendations"][1]["impact_factor_id"] == true_factor
            ):
                correct_top2 += 1

    # Calculate metrics
    precision_at_1 = correct_top1 / total if total > 0 else 0
    precision_at_2 = correct_top2 / total if total > 0 else 0

    metrics = {
        "total": total,
        "correct_top1": correct_top1,
        "correct_top2": correct_top2,
        "precision_at_1": precision_at_1,
        "precision_at_2": precision_at_2
    }

    return metrics

# Demo with sample data
def demo_parakeet():
    """
    Run a demonstration of Parakeet with sample data
    """
    # Create a sample dataset
    sample_data = pd.DataFrame({
        "activity_description": [
            "COUPLING BRASS COMP GJ X COMP GJ 1-1/4 IN",
            "NIPPLE GALV IRON 3/8 X 3 IN",
            "ADAPTER BRASS COMP GJ X MIPT 3/4 IN",
            "COUPLING BRASS COMP GJ X COMP GJ 1 IN",
            "TEE BRASS COMP GJ 3/4 IN"
        ]
    })

    # Save the sample data as CSV
    sample_data.to_csv("sample_activities.csv", index=False)

    # Run Parakeet for EEIO recommendations
    print("Running Parakeet for EEIO recommendations...")
    eeio_results = parakeet_predict_eeio(
        sample_data,
        "activity_description",
        output_file="sample_eeio_predictions.json"
    )

    # Print sample results
    print("\nSample EEIO Recommendations:")
    for result in eeio_results[:2]:
        print(f"\nActivity: {result['activity']}")
        print(f"Paraphrased: {result['paraphrased']}")
        print("Top recommendation:")
        print(f"  NAICS: {result['recommendations'][0]['naics_code']} - {result['recommendations'][0]['naics_title']}")
        print(f"  Justification: {result['recommendations'][0]['justification'][:200]}...")

    # Run Parakeet for process-based recommendations
    print("\nRunning Parakeet for process-based recommendations...")
    process_results = parakeet_predict_process(
        sample_data,
        "activity_description",
        output_file="sample_process_predictions.json"
    )

    # Print sample results
    print("\nSample Process-based Recommendations:")
    for result in process_results[:2]:
        print(f"\nActivity: {result['activity']}")
        print("Top recommendation:")
        print(f"  Process: {result['recommendations'][0]['impact_factor_name']}")
        print(f"  Justification: {result['recommendations'][0]['justification'][:200]}...")


In [None]:
# Upload your own dataset
from google.colab import files
uploaded = files.upload()  # This will prompt you to upload dataset.csv

# Read the uploaded dataset
dataset = pd.read_csv("parakeet_austin.csv")
print(f"Loaded dataset with {len(dataset)} rows and columns: {', '.join(dataset.columns)}")

# Define the column containing activity descriptions
activity_column = "description"  # Replace with your actual column name

# Run Parakeet for EEIO recommendations
eeio_results = parakeet_predict_eeio(
    dataset,
    activity_column,
    batch_size=32,
    output_file="eeio_predictions.json"
)

ground_truth_uploaded = files.upload()  # Upload ground_truth.csv
metrics = evaluate_predictions("eeio_predictions.json", "ground_truth.csv", lca_type="eio")
print("\nEvaluation metrics:")
for metric, value in metrics.items():
    print(f"  {metric}: {value}")

demo_parakeet()


files.download("eeio_predictions.json")
files.download("sample_eeio_predictions.json")
files.download("sample_process_predictions.json")


Saving parakeet_austin.csv to parakeet_austin (1).csv
Loaded dataset with 4662 rows and columns: COMMODITY, COMMODITY_DESCRIPTION, EXTENDED_DESCRIPTION, CONTRACT_NAME


INFO:eifmap:Loading NAICS data...


INFO:eifmap:Loaded 1016 rows from https://pasteur.epa.gov/uploads/10.23719/1528686/SupplyChainGHGEmissionFactors_v1.2_NAICS_CO2e_USD2021.csv
