In [None]:
!pip install pandas numpy matplotlib seaborn scikit-learn google-generativeai groq python-dotenv tqdm -q

In [None]:
# Suppress deprecation warnings
import warnings
import os

warnings.filterwarnings('ignore', category=FutureWarning, module='google.generativeai')


# Fix SSL/TLS certificate verification for gRPC (required for Google Gemini API on macOS)
os.environ['GRPC_DEFAULT_SSL_ROOTS_FILE_PATH'] = ''
os.environ['GRPC_SSL_CIPHER_SUITES'] = 'HIGH'

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import json
import time
from datetime import datetime
from tqdm import tqdm
from sklearn.metrics import (
    accuracy_score,
    f1_score,
    precision_score,
    recall_score,
    confusion_matrix,
    classification_report,
)

import google.generativeai as genai
from groq import Groq
from dotenv import load_dotenv

load_dotenv()

if os.getenv("GOOGLE_API_KEY"):
    genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
if os.getenv("GROQ_API_KEY"):
    groq_client = Groq(api_key=os.getenv("GROQ_API_KEY"))

sns.set_style("whitegrid")
plt.rcParams["figure.figsize"] = (14, 6)

print("✓ Setup complete")

## 1. Load Dataset

In [None]:
data_path = (
    "../../FinancialPhraseBank_Analysis/FinancialPhraseBank-v1.0/Sentences_AllAgree.txt"
)

sentences = []
sentiments = []

with open(data_path, "r", encoding="utf-8", errors="ignore") as f:
    for line in f:
        line = line.strip()
        if "@" in line:
            parts = line.rsplit("@", 1)
            if len(parts) == 2:
                sentences.append(parts[0])
                sentiments.append(parts[1])

df = pd.DataFrame({"sentence": sentences, "true_sentiment": sentiments})
print(f"Dataset loaded: {len(df)} sentences")

## 2. Tree-of-Thought Prompt Design

**Multi-Path Reasoning**:
- Path 1: Consider "positive" hypothesis
- Path 2: Consider "negative" hypothesis  
- Path 3: Consider "neutral" hypothesis
- Evaluation: Score each path's evidence strength
- Selection: Choose the most supported hypothesis

In [None]:
def create_tot_prompt(sentence):
    """
    Creates a Tree-of-Thought prompt with multi-path exploration.
    """
    prompt = f"""You are a financial sentiment analysis expert. Analyze this statement using a tree-of-thought approach.

Financial Statement:
"{sentence}"

TASK: Explore three possible sentiment classifications and select the best one.

---
PATH 1: Hypothesis = POSITIVE
Consider if this statement represents positive news for investors.
- What evidence supports this being positive?
- What evidence contradicts this being positive?
- Confidence score (0-1) for this hypothesis:

PATH 2: Hypothesis = NEGATIVE
Consider if this statement represents negative news for investors.
- What evidence supports this being negative?
- What evidence contradicts this being negative?
- Confidence score (0-1) for this hypothesis:

PATH 3: Hypothesis = NEUTRAL
Consider if this statement has no clear market impact.
- What evidence supports this being neutral?
- What evidence contradicts this being neutral?
- Confidence score (0-1) for this hypothesis:

---
FINAL DECISION:
Based on evaluating all three paths, select the hypothesis with the strongest evidence.

Provide your final answer in this exact JSON format:
{{
    "sentiment": "positive/negative/neutral",
    "confidence": 0.0-1.0,
    "rationale": "Explanation of why this hypothesis was selected over the others",
    "path_scores": {{
        "positive": 0.0-1.0,
        "negative": 0.0-1.0,
        "neutral": 0.0-1.0
    }}
}}
"""
    return prompt


# Test prompt
test_sentence = (
    "The company reported mixed results with revenue up 10% but margins declining."
)
print("=" * 80)
print("TREE-OF-THOUGHT PROMPT EXAMPLE")
print("=" * 80)
print(create_tot_prompt(test_sentence))

## 3. Model Inference Functions

In [None]:
def call_gemini(prompt, model_name="gemini-2.0-flash-exp", temperature=0.0):
    max_retries = 3
    for attempt in range(max_retries):
        try:
            model = genai.GenerativeModel(model_name)
            response = model.generate_content(
                prompt,
                generation_config=genai.types.GenerationConfig(
                    temperature=temperature,
                    max_output_tokens=1500,  # More tokens for multi-path reasoning
                ),
            )
            return response.text
        except Exception as e:
            if attempt < max_retries - 1:
                time.sleep(2**attempt)
                continue
            return None
    return None


def call_llama(prompt, temperature=0.0):
    max_retries = 3
    for attempt in range(max_retries):
        try:
            chat_completion = groq_client.chat.completions.create(
                messages=[{"role": "user", "content": prompt}],
                model="llama-3.3-70b-versatile",
                temperature=temperature,
                max_tokens=1500,
            )
            return chat_completion.choices[0].message.content
        except Exception as e:
            if attempt < max_retries - 1:
                time.sleep(2**attempt)
                continue
            return None
    return None


def parse_response(response_text):
    """Parse JSON with path scores from ToT response"""
    try:
        if "```json" in response_text:
            json_str = response_text.split("```json")[1].split("```")[0].strip()
        elif "{" in response_text:
            start = response_text.find("{")
            end = response_text.rfind("}") + 1
            json_str = response_text[start:end]
        else:
            json_str = response_text.strip()

        result = json.loads(json_str)
        return result
    except:
        response_lower = response_text.lower()
        if "positive" in response_lower and "negative" not in response_lower:
            return {
                "sentiment": "positive",
                "confidence": 0.5,
                "rationale": "Parsed",
                "path_scores": {},
            }
        elif "negative" in response_lower:
            return {
                "sentiment": "negative",
                "confidence": 0.5,
                "rationale": "Parsed",
                "path_scores": {},
            }
        elif "neutral" in response_lower:
            return {
                "sentiment": "neutral",
                "confidence": 0.5,
                "rationale": "Parsed",
                "path_scores": {},
            }
        return None


print("✓ Inference functions defined")

## 4. Run Experiments

In [None]:
# Test sample
test_df = df.head(100).copy()


def run_tot_experiment(test_df, model_func, model_name, exp_id):
    print(f"Running {exp_id}: {model_name} (Tree-of-Thought)...")
    results = []

    for idx, row in tqdm(
        test_df.iterrows(), total=len(test_df), desc=f"{exp_id} Progress"
    ):
        prompt = create_tot_prompt(row["sentence"])
        response = model_func(prompt)

        if response:
            parsed = parse_response(response)
            if parsed:
                results.append(
                    {
                        "sentence": row["sentence"],
                        "true_sentiment": row["true_sentiment"],
                        "predicted_sentiment": parsed.get("sentiment", "unknown"),
                        "confidence": parsed.get("confidence", 0),
                        "rationale": parsed.get("rationale", ""),
                        "path_scores": str(parsed.get("path_scores", {})),
                        "full_response": response[:700],
                    }
                )

        time.sleep(0.6)  # ToT requires more processing

    results_df = pd.DataFrame(results)
    print(f"\n✓ {exp_id} completed: {len(results_df)} predictions")
    return results_df


# Run Tree-of-Thought experiments
e10_df = run_tot_experiment(
    test_df, lambda p: call_gemini(p, "gemini-2.0-flash-exp"), "Gemini Pro", "E10"
)
e10b_df = run_tot_experiment(
    test_df, lambda p: call_gemini(p, "gemini-2.0-flash-exp"), "Gemini Flash", "E10b"
)
e10c_df = run_tot_experiment(test_df, call_llama, "Llama-3.3-70B", "E10c")

display(e10_df.head())

## 5. Calculate Metrics & Visualize

In [None]:
def calculate_metrics(df, exp_name):
    valid_df = df[
        df["predicted_sentiment"].isin(["positive", "negative", "neutral"])
    ].copy()
    y_true = valid_df["true_sentiment"]
    y_pred = valid_df["predicted_sentiment"]

    metrics = {
        "Experiment": exp_name,
        "Accuracy": accuracy_score(y_true, y_pred),
        "Macro-F1": f1_score(y_true, y_pred, average="macro"),
        "Macro-Precision": precision_score(y_true, y_pred, average="macro"),
        "Macro-Recall": recall_score(y_true, y_pred, average="macro"),
    }

    cm = confusion_matrix(y_true, y_pred, labels=["positive", "negative", "neutral"])
    return metrics, cm, valid_df


e10_metrics, e10_cm, e10_valid = calculate_metrics(e10_df, "E10: Gemini Pro (ToT)")
e10b_metrics, e10b_cm, e10b_valid = calculate_metrics(
    e10b_df, "E10b: Gemini Flash (ToT)"
)
e10c_metrics, e10c_cm, e10c_valid = calculate_metrics(
    e10c_df, "E10c: Llama-3.3-70B (ToT)"
)

metrics_df = pd.DataFrame([e10_metrics, e10b_metrics, e10c_metrics])

print("\n" + "=" * 80)
print("TREE-OF-THOUGHT PERFORMANCE COMPARISON")
print("=" * 80)
display(metrics_df.round(4))

In [None]:
def calculate_metrics(df, exp_name):
    """Calculate all evaluation metrics"""
    # Check if dataframe is empty or missing required columns
    if df.empty or "predicted_sentiment" not in df.columns:
        print(f"⚠️ Warning: {exp_name} has no valid predictions!")
        return (
            {
                "Experiment": exp_name,
                "Total Samples": 0,
                "Valid Predictions": 0,
                "Accuracy": 0,
                "Macro-F1": 0,
                "Weighted-F1": 0,
                "Macro-Precision": 0,
                "Macro-Recall": 0,
                "Positive_Precision": 0,
                "Positive_Recall": 0,
                "Positive_F1": 0,
                "Negative_Precision": 0,
                "Negative_Recall": 0,
                "Negative_F1": 0,
                "Neutral_Precision": 0,
                "Neutral_Recall": 0,
                "Neutral_F1": 0,
            },
            np.zeros((3, 3)),
            pd.DataFrame(),
        )

    # Filter out errors
    valid_df = df[
        df["predicted_sentiment"].isin(["positive", "negative", "neutral"])
    ].copy()

    # Check if we have valid predictions
    if valid_df.empty:
        print(f"⚠️ Warning: {exp_name} has no valid predictions after filtering!")
        return (
            {
                "Experiment": exp_name,
                "Total Samples": len(df),
                "Valid Predictions": 0,
                "Accuracy": 0,
                "Macro-F1": 0,
                "Weighted-F1": 0,
                "Macro-Precision": 0,
                "Macro-Recall": 0,
                "Positive_Precision": 0,
                "Positive_Recall": 0,
                "Positive_F1": 0,
                "Negative_Precision": 0,
                "Negative_Recall": 0,
                "Negative_F1": 0,
                "Neutral_Precision": 0,
                "Neutral_Recall": 0,
                "Neutral_F1": 0,
            },
            np.zeros((3, 3)),
            pd.DataFrame(),
        )

    y_true = valid_df["true_sentiment"]
    y_pred = valid_df["predicted_sentiment"]

    metrics = {
        "Experiment": exp_name,
        "Total Samples": len(df),
        "Valid Predictions": len(valid_df),
        "Accuracy": accuracy_score(y_true, y_pred),
        "Macro-F1": f1_score(y_true, y_pred, average="macro"),
        "Weighted-F1": f1_score(y_true, y_pred, average="weighted"),
        "Macro-Precision": precision_score(y_true, y_pred, average="macro"),
        "Macro-Recall": recall_score(y_true, y_pred, average="macro"),
    }

    # Per-class metrics
    labels = ["positive", "negative", "neutral"]
    precision_per_class = precision_score(
        y_true, y_pred, labels=labels, average=None, zero_division=0
    )
    recall_per_class = recall_score(
        y_true, y_pred, labels=labels, average=None, zero_division=0
    )
    f1_per_class = f1_score(
        y_true, y_pred, labels=labels, average=None, zero_division=0
    )

    for i, label in enumerate(labels):
        metrics[f"{label.capitalize()}_Precision"] = precision_per_class[i]
        metrics[f"{label.capitalize()}_Recall"] = recall_per_class[i]
        metrics[f"{label.capitalize()}_F1"] = f1_per_class[i]

    cm = confusion_matrix(y_true, y_pred, labels=labels)

    return metrics, cm, valid_df


# Calculate metrics
e10_metrics, e10_cm, e10_valid = calculate_metrics(e10_df, "E10: Gemini ToT")

print("\n" + "=" * 80)
print("TREE-OF-THOUGHT PERFORMANCE")
print("=" * 80)
print(f"\nExperiment: E10")
print(f"Accuracy: {e10_metrics['Accuracy']:.4f}")
print(f"Macro-F1: {e10_metrics['Macro-F1']:.4f}")
print(f"Macro-Precision: {e10_metrics['Macro-Precision']:.4f}")
print(f"Macro-Recall: {e10_metrics['Macro-Recall']:.4f}")
print(f"\nPer-class F1 Scores:")
print(f"  Positive: {e10_metrics['Positive_F1']:.4f}")
print(f"  Negative: {e10_metrics['Negative_F1']:.4f}")
print(f"  Neutral: {e10_metrics['Neutral_F1']:.4f}")

## 6. Save Results

In [None]:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

e10_df.to_csv(f"e10_gemini_pro_tot_{timestamp}.csv", index=False)
e10b_df.to_csv(f"e10b_gemini_flash_tot_{timestamp}.csv", index=False)
e10c_df.to_csv(f"e10c_llama_tot_{timestamp}.csv", index=False)
metrics_df.to_csv(f"tot_metrics_summary_{timestamp}.csv", index=False)

print(f"\n✓ Tree-of-Thought results saved")

## 7. Key Insights

### Tree-of-Thought Analysis:

1. **Multi-Path Reasoning**: How does exploring all three sentiment hypotheses affect accuracy?
2. **Decision Quality**: Are ToT predictions more justified and explainable?
3. **Computational Cost**: Does the added complexity justify performance gains?
4. **Path Score Analysis**: Which sentiment hypotheses receive highest scores for which types of statements?