In [None]:
from flask import Flask, request, jsonify
from pyngrok import ngrok
import pandas as pd
import joblib
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import re

# Initialize the Flask app
app = Flask(__name__)

# Load the model and tokenizer
access_token = "your key haha"  # Replace with your actual token
model = AutoModelForCausalLM.from_pretrained("./gemma_activity_classifier")
tokenizer = AutoTokenizer.from_pretrained("google/gemma-2-9b", token=access_token)

# Load the label encoder
label_encoder = joblib.load('./label_encoder.joblib')

# Load your emissions data
emissions_data = pd.read_excel("activity.xlsx")  # Adjust path if necessary

def get_emission_values(year, location, use_case, category, unit_type):
    filtered_data = emissions_data[
        (emissions_data['YEAR'] == year) &
        (emissions_data['REGION'] == location) &
        (emissions_data['POTENTIAL_USE_CASES'].str.contains(use_case, na=False)) &
        (emissions_data['CATEGORY'] == category) &
        (emissions_data['UNIT_TYPE'] == unit_type)
    ]

    if not filtered_data.empty:
        return {
            'activity': filtered_data['ACTIVITY'].values[0],
            'CO2e': extract_co2e(filtered_data['EMISSION_FACTORS'].values[0]),
            'CH4e': extract_ch4e(filtered_data['EMISSION_FACTORS'].values[0])
        }
    return None

def extract_co2e(emission_factors):
    co2e_match = re.search(r'CO2e([\d.]+)kg', emission_factors)
    return float(co2e_match.group(1)) if co2e_match else None

def extract_ch4e(emission_factors):
    ch4e_match = re.search(r'CH4e([\d.]+)kg', emission_factors)
    return float(ch4e_match.group(1)) if ch4e_match else None

def predict_activity(use_case):
    input_ids = tokenizer(f"Use case: {use_case}\nActivity:", return_tensors="pt").input_ids

    with torch.no_grad():
        outputs = model.generate(input_ids)

    predicted_activity = tokenizer.decode(outputs[0], skip_special_tokens=True)
    decoded_activity = predicted_activity.split("Activity:")[-1].strip()
    activity_label = label_encoder.inverse_transform([decoded_activity])[0]

    return activity_label

@app.route('/', methods=['GET'])
def home():
    return '''<h1>Emission Activity Prediction</h1>
              <p>Enter parameters to predict activity and emission values.</p>
              <form action="/predict" method="POST">
                Year: <input type="text" name="year"><br>
                Location: <input type="text" name="location"><br>
                Use Case: <input type="text" name="use_case"><br>
                Category: <input type="text" name="category"><br>
                Unit Type: <input type="text" name="unit_type"><br>
                <input type="submit" value="Submit">
              </form>'''

@app.route('/predict', methods=['POST'])
def predict():
    year = int(request.form['year'])
    location = request.form['location']
    use_case = request.form['use_case']
    category = request.form['category']
    unit_type = request.form['unit_type']

    emission_values = get_emission_values(year, location, use_case, category, unit_type)

    if emission_values:
        return jsonify({
            'activity': emission_values['activity'],
            'CO2e': emission_values['CO2e'],
            'CH4e': emission_values['CH4e']
        })
    else:
        return jsonify({'error': 'No matching activity found.'})

# Start ngrok and get the public URL
public_url = ngrok.connect(5000)
print(f"Public URL: {public_url}")

if __name__ == '__main__':
    app.run()

In [None]:
import torch
from sklearn.metrics import accuracy_score, classification_report
from tqdm import tqdm
import pandas as pd
def clean_prediction(text):
    # Remove repetitions
    text = re.sub(r'\b(\w+)( \1\b)+', r'\1', text)
    # Remove 'sector:', 'energy:', etc.
    text = re.sub(r'\w+:', '', text)
    # Remove extra whitespace
    text = ' '.join(text.split())
    return text.strip().lower()

def flexible_test_model_with_training_data(model, tokenizer, label_encoder, df, max_length=50, num_beams=5):
    model.eval()
    all_predictions = []
    all_true_labels = []
    all_samples = []

    progress_bar = tqdm(total=len(df), desc="Testing")

    for _, row in df.iterrows():
        use_case = row['POTENTIAL_USE_CASES']
        true_label = row['activity_label']
        true_activity = label_encoder.inverse_transform([true_label])[0]

        input_text = f"Use case: {use_case}\nActivity:"
        input_ids = tokenizer(input_text, return_tensors="pt").input_ids

        try:
            with torch.no_grad():
                outputs = model.generate(
                    input_ids,
                    max_length=max_length,
                    num_beams=num_beams,
                    early_stopping=True
                )

            predicted_activity = tokenizer.decode(outputs[0], skip_special_tokens=True)
            decoded_activity = predicted_activity.split("Activity:")[-1]
            cleaned_activity = clean_prediction(decoded_activity)

            # Try to find the closest match in the label encoder classes
            possible_labels = label_encoder.classes_
            closest_match = min(possible_labels, key=lambda x: len(set(x.split()) & set(cleaned_activity.split())))

            predicted_label = label_encoder.transform([closest_match])[0]
            all_predictions.append(predicted_label)
            all_true_labels.append(true_label)

            all_samples.append({
                'use_case': use_case,
                'true_activity': true_activity,
                'raw_prediction': decoded_activity,
                'cleaned_prediction': cleaned_activity,
                'matched_prediction': closest_match,
                'correct': true_activity == closest_match
            })

        except Exception as e:
            print(f"Error processing sample: {e}")
            all_samples.append({
                'use_case': use_case,
                'true_activity': true_activity,
                'raw_prediction': 'Error in generation',
                'cleaned_prediction': 'Error',
                'matched_prediction': 'Error',
                'correct': False
            })

        progress_bar.update(1)

    progress_bar.close()

    # Calculate accuracy
    accuracy = accuracy_score(all_true_labels, all_predictions)
    print(f"Model Accuracy on Training Data: {accuracy:.4f}")
    print(f"Total samples: {len(df)}")
    print(f"Processed samples: {len(all_predictions)}")

    # Generate a detailed classification report
    class_names = label_encoder.classes_
    report = classification_report(all_true_labels, all_predictions, target_names=class_names)
    print("\nClassification Report:")
    print(report)

    # Save all samples to CSV
    samples_df = pd.DataFrame(all_samples)
    samples_df.to_csv('prediction_results.csv', index=False)
    print("All prediction results saved to 'prediction_results.csv'")

    return accuracy, samples_df

# Usage example:
accuracy, results_df = flexible_test_model_with_training_data(model, tokenizer, label_encoder, df)



