<a href="https://colab.research.google.com/github/s4ngi/ISYS2001/blob/main/Welcome_To_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
# Installing required packages
!pip install pandas gradio hands_on_ai python-dateutil langchain openai faiss-cpu plotly numpy
!pip install plotly pandas gradio sentence-transformers faiss-cpu numpy
# Gradio for UI
!pip install --quiet gradio

# Sentence embeddings and vector search for RAG
!pip install --quiet sentence-transformers faiss-cpu

# Your chatbot module (if pip-installable)
!pip install --quiet hands_on_ai

# Import core libraries
import pandas as pd
import gradio as gr
import os
import hands_on_ai as ha
from datetime import datetime
from hands_on_ai.chat import get_response
from dateutil import parser
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
from langchain.text_splitter import CharacterTextSplitter

import warnings
warnings.filterwarnings('ignore')

print("Core libraries loaded successfully!")
print(f"Pandas version: {pd.__version__}")
print(f"Gradio version: {gr.__version__}")
print(f"OS version: {os.name}")
print(f"Hands-on-AI version: {ha.__version__}")
print("All libraries loaded successfully!")

from getpass import getpass

# Configure hands-on-ai server connection
os.environ['HANDS_ON_AI_SERVER'] = 'https://ollama.serveur.au'
os.environ['HANDS_ON_AI_MODEL'] = 'granite3.2'
os.environ['HANDS_ON_AI_API_KEY'] = 'isys2001-assignment-key'

print("Hands-on-AI configured successfully!")


Core libraries loaded successfully!
Pandas version: 2.3.3
Gradio version: 5.49.1
OS version: posix
Hands-on-AI version: 0.1.10
All libraries loaded successfully!
Hands-on-AI configured successfully!


In [1]:
import pandas as pd
import gradio as gr
from hands_on_ai.chat import get_response
import warnings
import re
from datetime import datetime
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np

warnings.filterwarnings("ignore")

# ============================================
# ✅ CSV Handling and Analysis
# ============================================

def load_and_clean_csv(file):
    try:
        df = pd.read_csv(file.name)
        expected_columns = ["Date", "Amount", "Category", "Description"]
        if list(df.columns) != expected_columns:
            return None, None
        df["Date"] = pd.to_datetime(df["Date"].astype(str).str.strip(), dayfirst=True, errors="coerce")
        df["Amount"] = df["Amount"].replace('[\$,]', '', regex=True).astype(float)
        df = df.dropna(subset=["Date", "Amount"])
        if df.empty:
            return None, None
        transactions = df.to_dict(orient="records")
        return df, transactions
    except:
        return None, None

def summarize_expenses(df):
    total = df["Amount"].sum()
    category_totals = df.groupby("Category")["Amount"].sum().round(2).to_dict()
    category_averages = df.groupby("Category")["Amount"].mean().round(2).to_dict()
    return {
        "Total Spending": round(total, 2),
        "Category Totals": category_totals,
        "Category Averages": category_averages
    }

def monthly_summary(df):
    return {
        "Monthly Total": round(df["Amount"].sum(), 2),
        "Monthly Average": round(df["Amount"].mean(), 2)
    }

def generate_spending_advice(summary_dict):
    total = summary_dict.get("Total Spending", 0)
    category_totals = summary_dict.get("Category Totals", {})
    advice = []
    if total == 0 or not category_totals:
        return ["No spending data available."]
    for category, amount in category_totals.items():
        percent = (amount / total) * 100
        if percent >= 20:
            advice.append(f"{category} is {percent:.1f}% of your spending. Consider setting a limit.")
        elif percent >= 10:
            advice.append(f"{category} makes up {percent:.1f}%. Keep an eye on it.")
        else:
            advice.append(f"{category} is only {percent:.1f}%. No action needed.")
    return advice

def format_summary_table(df):
    summary = summarize_expenses(df)
    total = summary["Total Spending"]
    table_data = []
    for category, total_amount in summary["Category Totals"].items():
        avg_amount = summary["Category Averages"][category]
        percent = (total_amount / total) * 100
        highlight = "⚠️" if percent >= 20 else ""
        table_data.append([category, f"${total_amount:.2f} {highlight}", f"${avg_amount:.2f}"])
    return table_data

def format_monthly_table(df):
    monthly = monthly_summary(df)
    return [[f"${monthly['Monthly Total']:.2f}", f"${monthly['Monthly Average']:.2f}"]]

# ============================================
# 💬 Structured Chatbot Logic with Range-Aware Dates
# ============================================

def extract_dates_from_question(question):
    date_matches = re.findall(r"(\d{1,2}/\d{1,2}/\d{4})", question)
    dates = [datetime.strptime(d, "%d/%m/%Y").date() for d in date_matches]
    if len(dates) == 2:
        start, end = sorted(dates)
        return start, end
    elif len(dates) == 1:
        return dates[0], dates[0]
    else:
        return None, None

def filter_transactions(transactions, question):
    start_date, end_date = extract_dates_from_question(question)
    filtered = transactions

    if start_date and end_date:
        filtered = [t for t in filtered if start_date <= t["Date"].date() <= end_date]

    question_lower = question.lower()
    known_categories = set([t["Category"].lower() for t in transactions])
    matched_categories = [c for c in known_categories if c in question_lower]
    if matched_categories:
        filtered = [t for t in filtered if t["Category"].lower() in matched_categories]

    words_in_question = set(re.findall(r"\b\w+\b", question.lower()))
    filtered_desc = []
    for t in filtered:
        desc_words = set(re.findall(r"\b\w+\b", t["Description"].lower()))
        if words_in_question & desc_words:
            filtered_desc.append(t)
    if filtered_desc:
        filtered = filtered_desc

    return filtered

def format_transactions_for_prompt(transactions):
    if not transactions:
        return "No matching transactions found."
    lines = []
    for t in transactions:
        lines.append(
            f"Date: {t['Date'].strftime('%d/%m/%Y')}, "
            f"Amount: ${t['Amount']:.2f}, "
            f"Category: {t['Category']}, "
            f"Description: {t['Description']}"
        )
    return "\n".join(lines)

# ============================================
# 🧠 Local RAG Setup (FAISS + sentence-transformers)
# ============================================

class LocalRAG:
    def __init__(self):
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.texts = []
        self.vectors = None
        self.index = None

    def add_texts(self, new_texts):
        self.texts.extend(new_texts)
        embeddings = self.model.encode(self.texts, convert_to_numpy=True)
        self.vectors = embeddings.astype('float32')
        self.index = faiss.IndexFlatL2(self.vectors.shape[1])
        self.index.add(self.vectors)

    def query(self, question, top_k=3):
        if not self.index or len(self.texts) == 0:
            return []
        q_vec = self.model.encode([question]).astype('float32')
        distances, indices = self.index.search(q_vec, top_k)
        return [self.texts[i] for i in indices[0] if i < len(self.texts)]

rag = LocalRAG()

# ============================================
# 💬 Financial Sage with RAG + Insights
# ============================================

def summarize_habits(transactions):
    if not transactions:
        return "No transactions to analyze."
    df = pd.DataFrame(transactions)
    category_totals = df.groupby("Category")["Amount"].sum()
    top_category = category_totals.idxmax()
    total_spent = df["Amount"].sum()
    num_transactions = len(df)
    avg_transaction = df["Amount"].mean()
    summary = (
        f"You have {num_transactions} transactions totaling ${total_spent:.2f}. "
        f"On average, you spend ${avg_transaction:.2f} per transaction. "
        f"You spend the most on {top_category} (${category_totals[top_category]:.2f})."
    )
    return summary

def financial_sage_rag(question, transactions):
    filtered = filter_transactions(transactions, question)
    habits_summary = summarize_habits(filtered)
    rag_results = rag.query(question)
    rag_text = "\n".join(rag_results) if rag_results else "No additional reference documents."

    prompt = (
        "You are the Financial Sage, a friendly guide who explains spending habits clearly.\n"
        f"Matching transactions:\n{format_transactions_for_prompt(filtered)}\n\n"
        f"Summary of habits:\n{habits_summary}\n\n"
        f"Additional references:\n{rag_text}\n\n"
        f"Answer the user's question: {question}"
    )
    return get_response(prompt)

def get_sage_response(question, transactions_state):
    if transactions_state:
        return financial_sage_rag(question, transactions_state)
    else:
        return get_response(question)

# ============================================
# 💰 Budget Feature - Single Total Budget
# ============================================

def calculate_savings(income, budget, total_expense):
    """
    Calculate savings and percentage of budget achieved.
    """
    savings = income - total_expense
    max_savings = income - budget if income - budget > 0 else 1  # avoid division by zero
    percent = min(max(savings / max_savings, 0), 1)
    return savings, percent

def check_budget(transactions, income, budget):
    """
    Returns a summary string and savings percentage for progress bar.
    """
    df = pd.DataFrame(transactions)
    total_expense = df["Amount"].sum()
    savings, percent = calculate_savings(income, budget, total_expense)

    advice = (
        f"Income: ${income:.2f}\n"
        f"Budget: ${budget:.2f}\n"
        f"Total Expenses: ${total_expense:.2f}\n"
        f"Savings: ${savings:.2f}"
    )

    return advice, percent

# ============================================
# 🧩 Gradio App Layout
# ============================================

with gr.Blocks(title="💰 Financial Sage Dashboard") as app:

    gr.Markdown("<h1 style='text-align:center'>💰 Financial Sage Dashboard</h1>")

    transactions_state = gr.State(value=None)
    chat_history = gr.State(value=[])

    with gr.Row():
        # CSV Dashboard
        with gr.Column(scale=1, min_width=450):
            gr.Markdown("### 📁 Upload Your CSV")
            file_input = gr.File(file_types=[".csv"])
            summary_output = gr.Dataframe(headers=["Category","Total Spending","Average Spending"], interactive=False)
            monthly_output = gr.Dataframe(headers=["Monthly Total","Monthly Average"], interactive=False)
            advice_output = gr.Textbox(lines=6, interactive=False)


            def handle_csv(file):
                df, transactions = load_and_clean_csv(file)
                if df is None:
                    return [], [], "Invalid CSV file.", None
                # Add CSV as text to RAG
                rag.add_texts(df.astype(str).apply(lambda row: ' | '.join(row), axis=1).tolist())
                return (
                    format_summary_table(df),
                    format_monthly_table(df),
                    "\n".join(generate_spending_advice(summarize_expenses(df))),
                    transactions
                )

            file_input.change(
                fn=handle_csv,
                inputs=file_input,
                outputs=[summary_output, monthly_output, advice_output, transactions_state]
            )

        # Chatbot
        with gr.Column(scale=1, min_width=400):
            gr.Markdown("### 💬 Financial Sage Chat")
            chat_output = gr.Chatbot()
            question_input = gr.Textbox(placeholder="Ask anything about your spending or habits...", lines=2)
            chat_button = gr.Button("Ask the Sage 💭")

            def chat_with_history(question, transactions_state, history):
                answer = get_sage_response(question, transactions_state)
                history = history + [(question, answer)]
                return history, history, ""

            chat_button.click(
                fn=chat_with_history,
                inputs=[question_input, transactions_state, chat_history],
                outputs=[chat_output, chat_history, question_input]
            )

            question_input.submit(
                fn=chat_with_history,
                inputs=[question_input, transactions_state, chat_history],
                outputs=[chat_output, chat_history, question_input]
            )

app.launch()


  df["Amount"] = df["Amount"].replace('[\$,]', '', regex=True).astype(float)


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://03de9f465074c6b102.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [4]:
import pandas as pd
import gradio as gr
from hands_on_ai.chat import get_response
import warnings
import re
from datetime import datetime
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np

warnings.filterwarnings("ignore")

# ============================================
# ✅ CSV Handling and Analysis
# ============================================

def load_and_clean_csv(file):
    try:
        df = pd.read_csv(file.name)
        expected_columns = ["Date", "Amount", "Category", "Description"]
        if list(df.columns) != expected_columns:
            return None, None
        df["Date"] = pd.to_datetime(df["Date"].astype(str).str.strip(), dayfirst=True, errors="coerce")
        df["Amount"] = df["Amount"].replace('[\$,]', '', regex=True).astype(float)
        df = df.dropna(subset=["Date", "Amount"])
        if df.empty:
            return None, None
        transactions = df.to_dict(orient="records")
        return df, transactions
    except:
        return None, None

def summarize_expenses(df):
    total = df["Amount"].sum()
    category_totals = df.groupby("Category")["Amount"].sum().round(2).to_dict()
    category_averages = df.groupby("Category")["Amount"].mean().round(2).to_dict()
    return {
        "Total Spending": round(total, 2),
        "Category Totals": category_totals,
        "Category Averages": category_averages
    }

def monthly_summary(df):
    return {
        "Monthly Total": round(df["Amount"].sum(), 2),
        "Monthly Average": round(df["Amount"].mean(), 2)
    }

def generate_spending_advice(summary_dict):
    total = summary_dict.get("Total Spending", 0)
    category_totals = summary_dict.get("Category Totals", {})
    advice = []
    if total == 0 or not category_totals:
        return ["No spending data available."]
    for category, amount in category_totals.items():
        percent = (amount / total) * 100
        if percent >= 20:
            advice.append(f"{category} is {percent:.1f}% of your spending. Consider setting a limit.")
        elif percent >= 10:
            advice.append(f"{category} makes up {percent:.1f}%. Keep an eye on it.")
        else:
            advice.append(f"{category} is only {percent:.1f}%. No action needed.")
    return advice

def format_summary_table(df):
    summary = summarize_expenses(df)
    total = summary["Total Spending"]
    table_data = []
    for category, total_amount in summary["Category Totals"].items():
        avg_amount = summary["Category Averages"][category]
        percent = (total_amount / total) * 100
        highlight = "⚠️" if percent >= 20 else ""
        table_data.append([category, f"${total_amount:.2f} {highlight}", f"${avg_amount:.2f}"])
    return table_data

def format_monthly_table(df):
    monthly = monthly_summary(df)
    return [[f"${monthly['Monthly Total']:.2f}", f"${monthly['Monthly Average']:.2f}"]]

# ============================================
# 💬 Structured Chatbot Logic with Range-Aware Dates
# ============================================

def extract_dates_from_question(question):
    date_matches = re.findall(r"(\d{1,2}/\d{1,2}/\d{4})", question)
    dates = [datetime.strptime(d, "%d/%m/%Y").date() for d in date_matches]
    if len(dates) == 2:
        start, end = sorted(dates)
        return start, end
    elif len(dates) == 1:
        return dates[0], dates[0]
    else:
        return None, None

def filter_transactions(transactions, question):
    start_date, end_date = extract_dates_from_question(question)
    filtered = transactions

    if start_date and end_date:
        filtered = [t for t in filtered if start_date <= t["Date"].date() <= end_date]

    question_lower = question.lower()
    known_categories = set([t["Category"].lower() for t in transactions])
    matched_categories = [c for c in known_categories if c in question_lower]
    if matched_categories:
        filtered = [t for t in filtered if t["Category"].lower() in matched_categories]

    words_in_question = set(re.findall(r"\b\w+\b", question.lower()))
    filtered_desc = []
    for t in filtered:
        desc_words = set(re.findall(r"\b\w+\b", t["Description"].lower()))
        if words_in_question & desc_words:
            filtered_desc.append(t)
    if filtered_desc:
        filtered = filtered_desc

    return filtered

def format_transactions_for_prompt(transactions):
    if not transactions:
        return "No matching transactions found."
    lines = []
    for t in transactions:
        lines.append(
            f"Date: {t['Date'].strftime('%d/%m/%Y')}, "
            f"Amount: ${t['Amount']:.2f}, "
            f"Category: {t['Category']}, "
            f"Description: {t['Description']}"
        )
    return "\n".join(lines)

# ============================================
# 🧠 Local RAG Setup (FAISS + sentence-transformers)
# ============================================

class LocalRAG:
    def __init__(self):
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.texts = []
        self.vectors = None
        self.index = None

    def add_texts(self, new_texts):
        self.texts.extend(new_texts)
        embeddings = self.model.encode(self.texts, convert_to_numpy=True)
        self.vectors = embeddings.astype('float32')
        self.index = faiss.IndexFlatL2(self.vectors.shape[1])
        self.index.add(self.vectors)

    def query(self, question, top_k=3):
        if not self.index or len(self.texts) == 0:
            return []
        q_vec = self.model.encode([question]).astype('float32')
        distances, indices = self.index.search(q_vec, top_k)
        return [self.texts[i] for i in indices[0] if i < len(self.texts)]

rag = LocalRAG()

# ============================================
# 💬 Financial Sage with RAG + Insights
# ============================================

def summarize_habits(transactions):
    if not transactions:
        return "No transactions to analyze."
    df = pd.DataFrame(transactions)
    category_totals = df.groupby("Category")["Amount"].sum()
    top_category = category_totals.idxmax()
    total_spent = df["Amount"].sum()
    num_transactions = len(df)
    avg_transaction = df["Amount"].mean()
    summary = (
        f"You have {num_transactions} transactions totaling ${total_spent:.2f}. "
        f"On average, you spend ${avg_transaction:.2f} per transaction. "
        f"You spend the most on {top_category} (${category_totals[top_category]:.2f})."
    )
    return summary

def financial_sage_rag(question, transactions):
    filtered = filter_transactions(transactions, question)
    habits_summary = summarize_habits(filtered)
    rag_results = rag.query(question)
    rag_text = "\n".join(rag_results) if rag_results else "No additional reference documents."

    prompt = (
        "You are the Financial Sage, a friendly guide who explains spending habits clearly.\n"
        f"Matching transactions:\n{format_transactions_for_prompt(filtered)}\n\n"
        f"Summary of habits:\n{habits_summary}\n\n"
        f"Additional references:\n{rag_text}\n\n"
        f"Answer the user's question: {question}"
    )
    return get_response(prompt)

def get_sage_response(question, transactions_state):
    if transactions_state:
        return financial_sage_rag(question, transactions_state)
    else:
        return get_response(question)

# ============================================
# 💰 Budget Feature - Single Total Budget
# ============================================

def calculate_savings(income, budget, total_expense):
    """
    Calculate savings and percentage of budget achieved.
    """
    savings = income - total_expense
    max_savings = income - budget if income - budget > 0 else 1  # avoid division by zero
    percent = min(max(savings / max_savings, 0), 1)
    return savings, percent

def check_budget(transactions, income, budget):
    """
    Returns a summary string and savings percentage for progress bar.
    """
    df = pd.DataFrame(transactions)
    total_expense = df["Amount"].sum()
    savings, percent = calculate_savings(income, budget, total_expense)

    advice = (
        f"Income: ${income:.2f}\n"
        f"Budget: ${budget:.2f}\n"
        f"Total Expenses: ${total_expense:.2f}\n"
        f"Savings: ${savings:.2f}"
    )

    return advice, percent

# ============================================
# 🧩 Gradio App Layout
# ============================================

with gr.Blocks(title="💰 Financial Sage Dashboard") as app:

    gr.Markdown("<h1 style='text-align:center'>💰 Financial Sage Dashboard</h1>")

    transactions_state = gr.State(value=None)
    chat_history = gr.State(value=[])

    with gr.Row():
    # CSV Dashboard
        with gr.Column(scale=1, min_width=450):
            gr.Markdown("### 📁 Upload Your CSV")
            file_input = gr.File(file_types=[".csv"])
            summary_output = gr.Dataframe(headers=["Category","Total Spending","Average Spending"], interactive=False)
            monthly_output = gr.Dataframe(headers=["Monthly Total","Monthly Average"], interactive=False)
            advice_output = gr.Textbox(lines=6, interactive=False)
            top_category_output = gr.Textbox(label="🏆 Top Spending Category", interactive=False)  # <-- new

            def handle_csv(file):
                df, transactions = load_and_clean_csv(file)
                if df is None:
                    return [], [], "Invalid CSV file.", None, ""

                # Add CSV as text to RAG
                rag.add_texts(df.astype(str).apply(lambda row: ' | '.join(row), axis=1).tolist())

                # Determine Top Spending Category
                category_totals = summarize_expenses(df)["Category Totals"]
                top_category = max(category_totals, key=category_totals.get) if category_totals else "N/A"

                return (
                    format_summary_table(df),
                    format_monthly_table(df),
                    "\n".join(generate_spending_advice(summarize_expenses(df))),
                    transactions,
                    top_category
                )

            file_input.change(
                fn=handle_csv,
                inputs=file_input,
                outputs=[summary_output, monthly_output, advice_output, transactions_state, top_category_output]  # <-- updated
            )


        # Chatbot
        with gr.Column(scale=1, min_width=400):
            gr.Markdown("### 💬 Financial Sage Chat")
            chat_output = gr.Chatbot()
            question_input = gr.Textbox(placeholder="Ask anything about your spending or habits...", lines=2)
            chat_button = gr.Button("Ask the Sage 💭")

            def chat_with_history(question, transactions_state, history):
                answer = get_sage_response(question, transactions_state)
                history = history + [(question, answer)]
                return history, history, ""

            chat_button.click(
                fn=chat_with_history,
                inputs=[question_input, transactions_state, chat_history],
                outputs=[chat_output, chat_history, question_input]
            )

            question_input.submit(
                fn=chat_with_history,
                inputs=[question_input, transactions_state, chat_history],
                outputs=[chat_output, chat_history, question_input]
            )

app.launch()


It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://147391167ec13ac332.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [9]:
import pandas as pd
import gradio as gr
from hands_on_ai.chat import get_response
import warnings
import re
from datetime import datetime
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import plotly.graph_objects as go

warnings.filterwarnings("ignore")

# ============================================
# ✅ CSV Handling and Analysis
# ============================================

def load_and_clean_csv(file):
    try:
        df = pd.read_csv(file.name)
        expected_columns = ["Date", "Amount", "Category", "Description"]
        if list(df.columns) != expected_columns:
            return None, None
        df["Date"] = pd.to_datetime(df["Date"].astype(str).str.strip(), dayfirst=True, errors="coerce")
        df["Amount"] = df["Amount"].replace('[\$,]', '', regex=True).astype(float)
        df = df.dropna(subset=["Date", "Amount"])
        if df.empty:
            return None, None
        transactions = df.to_dict(orient="records")
        return df, transactions
    except:
        return None, None

def summarize_expenses(df):
    total = df["Amount"].sum()
    category_totals = df.groupby("Category")["Amount"].sum().round(2).to_dict()
    category_averages = df.groupby("Category")["Amount"].mean().round(2).to_dict()
    return {
        "Total Spending": round(total, 2),
        "Category Totals": category_totals,
        "Category Averages": category_averages
    }

def monthly_summary(df):
    return {
        "Monthly Total": round(df["Amount"].sum(), 2),
        "Monthly Average": round(df["Amount"].mean(), 2)
    }

def generate_spending_advice(summary_dict):
    total = summary_dict.get("Total Spending", 0)
    category_totals = summary_dict.get("Category Totals", {})
    advice = []
    if total == 0 or not category_totals:
        return ["No spending data available."]
    for category, amount in category_totals.items():
        percent = (amount / total) * 100
        if percent >= 20:
            advice.append(f"{category} is {percent:.1f}% of your spending. Consider setting a limit.")
        elif percent >= 10:
            advice.append(f"{category} makes up {percent:.1f}%. Keep an eye on it.")
        else:
            advice.append(f"{category} is only {percent:.1f}%. No action needed.")
    return advice

def format_summary_table(df):
    """Create summary table with budget column (initially empty)"""
    summary = summarize_expenses(df)
    total = summary["Total Spending"]
    table_data = []
    for category, total_amount in summary["Category Totals"].items():
        avg_amount = summary["Category Averages"][category]
        percent = (total_amount / total) * 100
        highlight = "⚠️" if percent >= 20 else ""
        # Add budget column as 0.0 initially (will be editable)
        table_data.append([category, f"${total_amount:.2f} {highlight}", f"${avg_amount:.2f}", 0.0])
    return table_data

def format_monthly_table(df):
    monthly = monthly_summary(df)
    return [[f"${monthly['Monthly Total']:.2f}", f"${monthly['Monthly Average']:.2f}"]]

# ============================================
# 📊 NEW: Budget Comparison Chart
# ============================================

def create_budget_comparison_chart(summary_table_data):
    """
    Creates a bar chart comparing actual spending vs budget for each category.
    summary_table_data format: [category, actual_spending, avg_spending, budget]
    """
    if not summary_table_data or len(summary_table_data) == 0:
        return None

    categories = []
    actual_spending = []
    budgets = []
    colors = []

    for row in summary_table_data:
        category = row[0]
        # Extract numeric value from "$XXX.XX" format
        actual = float(row[1].replace('$', '').replace('⚠️', '').strip())
        budget = float(row[3]) if row[3] else 0.0

        categories.append(category)
        actual_spending.append(actual)
        budgets.append(budget)

        # Color: Red if over budget, Green if under budget
        if budget > 0:
            colors.append('red' if actual > budget else 'green')
        else:
            colors.append('gray')

    # Create the bar chart
    fig = go.Figure()

    # Actual spending bars
    fig.add_trace(go.Bar(
        name='Actual Spending',
        x=categories,
        y=actual_spending,
        marker_color=colors,
        text=[f'${x:.2f}' for x in actual_spending],
        textposition='outside'
    ))

    # Budget bars
    fig.add_trace(go.Bar(
        name='Budget',
        x=categories,
        y=budgets,
        marker_color='lightblue',
        text=[f'${x:.2f}' for x in budgets],
        textposition='outside'
    ))

    fig.update_layout(
        title='Budget vs Actual Spending Comparison',
        xaxis_title='Category',
        yaxis_title='Amount ($)',
        barmode='group',
        height=400,
        showlegend=True
    )

    return fig

def generate_budget_status_text(summary_table_data):
    """Generate text summary of budget status"""
    if not summary_table_data:
        return "No budget data available."

    status_lines = []
    total_actual = 0
    total_budget = 0

    for row in summary_table_data:
        category = row[0]
        actual = float(row[1].replace('$', '').replace('⚠️', '').strip())
        budget = float(row[3]) if row[3] else 0.0

        total_actual += actual
        total_budget += budget

        if budget > 0:
            difference = budget - actual
            if actual > budget:
                status_lines.append(f"🔴 {category}: OVER budget by ${abs(difference):.2f}")
            else:
                status_lines.append(f"🟢 {category}: Under budget by ${difference:.2f}")
        else:
            status_lines.append(f"⚪ {category}: No budget set")

    # Overall summary
    if total_budget > 0:
        overall_diff = total_budget - total_actual
        overall_status = f"\n\n📊 OVERALL: "
        if total_actual > total_budget:
            overall_status += f"Over total budget by ${abs(overall_diff):.2f}"
        else:
            overall_status += f"Under total budget by ${overall_diff:.2f}"
        status_lines.append(overall_status)

    return "\n".join(status_lines)

# ============================================
# 💬 Structured Chatbot Logic with Range-Aware Dates
# ============================================

def extract_dates_from_question(question):
    date_matches = re.findall(r"(\d{1,2}/\d{1,2}/\d{4})", question)
    dates = [datetime.strptime(d, "%d/%m/%Y").date() for d in date_matches]
    if len(dates) == 2:
        start, end = sorted(dates)
        return start, end
    elif len(dates) == 1:
        return dates[0], dates[0]
    else:
        return None, None

def filter_transactions(transactions, question):
    start_date, end_date = extract_dates_from_question(question)
    filtered = transactions

    if start_date and end_date:
        filtered = [t for t in filtered if start_date <= t["Date"].date() <= end_date]

    question_lower = question.lower()
    known_categories = set([t["Category"].lower() for t in transactions])
    matched_categories = [c for c in known_categories if c in question_lower]
    if matched_categories:
        filtered = [t for t in filtered if t["Category"].lower() in matched_categories]

    words_in_question = set(re.findall(r"\b\w+\b", question.lower()))
    filtered_desc = []
    for t in filtered:
        desc_words = set(re.findall(r"\b\w+\b", t["Description"].lower()))
        if words_in_question & desc_words:
            filtered_desc.append(t)
    if filtered_desc:
        filtered = filtered_desc

    return filtered

def format_transactions_for_prompt(transactions):
    if not transactions:
        return "No matching transactions found."
    lines = []
    for t in transactions:
        lines.append(
            f"Date: {t['Date'].strftime('%d/%m/%Y')}, "
            f"Amount: ${t['Amount']:.2f}, "
            f"Category: {t['Category']}, "
            f"Description: {t['Description']}"
        )
    return "\n".join(lines)

# ============================================
# 🧠 Local RAG Setup (FAISS + sentence-transformers)
# ============================================

class LocalRAG:
    def __init__(self):
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.texts = []
        self.vectors = None
        self.index = None

    def add_texts(self, new_texts):
        self.texts.extend(new_texts)
        embeddings = self.model.encode(self.texts, convert_to_numpy=True)
        self.vectors = embeddings.astype('float32')
        self.index = faiss.IndexFlatL2(self.vectors.shape[1])
        self.index.add(self.vectors)

    def query(self, question, top_k=3):
        if not self.index or len(self.texts) == 0:
            return []
        q_vec = self.model.encode([question]).astype('float32')
        distances, indices = self.index.search(q_vec, top_k)
        return [self.texts[i] for i in indices[0] if i < len(self.texts)]

rag = LocalRAG()

# ============================================
# 💬 Financial Sage with RAG + Insights
# ============================================

def summarize_habits(transactions):
    if not transactions:
        return "No transactions to analyze."
    df = pd.DataFrame(transactions)
    category_totals = df.groupby("Category")["Amount"].sum()
    top_category = category_totals.idxmax()
    total_spent = df["Amount"].sum()
    num_transactions = len(df)
    avg_transaction = df["Amount"].mean()
    summary = (
        f"You have {num_transactions} transactions totaling ${total_spent:.2f}. "
        f"On average, you spend ${avg_transaction:.2f} per transaction. "
        f"You spend the most on {top_category} (${category_totals[top_category]:.2f})."
    )
    return summary

def financial_sage_rag(question, transactions):
    filtered = filter_transactions(transactions, question)
    habits_summary = summarize_habits(filtered)
    rag_results = rag.query(question)
    rag_text = "\n".join(rag_results) if rag_results else "No additional reference documents."

    prompt = (
        "You are the Financial Sage, a friendly guide who explains spending habits clearly.\n"
        f"Matching transactions:\n{format_transactions_for_prompt(filtered)}\n\n"
        f"Summary of habits:\n{habits_summary}\n\n"
        f"Additional references:\n{rag_text}\n\n"
        f"Answer the user's question: {question}"
    )
    return get_response(prompt)

def get_sage_response(question, transactions_state):
    if transactions_state:
        return financial_sage_rag(question, transactions_state)
    else:
        return get_response(question)

# ============================================
# 🧩 Gradio App Layout
# ============================================

with gr.Blocks(title="💰 Financial Sage Dashboard") as app:

    gr.Markdown("<h1 style='text-align:center'>💰 Financial Sage Dashboard</h1>")

    transactions_state = gr.State(value=None)
    chat_history = gr.State(value=[])

    with gr.Row():
        # CSV Dashboard
        with gr.Column(scale=1, min_width=450):
            gr.Markdown("### 📁 Upload Your CSV")
            file_input = gr.File(file_types=[".csv"])

            # Summary table with editable budget column
            gr.Markdown("#### 📊 Spending Summary (Enter budgets in the last column)")
            summary_output = gr.Dataframe(
                headers=["Category", "Total Spending", "Average Spending", "Budget ($)"],
                interactive=True,  # Make it editable
                datatype=["str", "str", "str", "number"]
            )

            # Button to apply budgets and generate chart
            apply_budget_btn = gr.Button("📊 Apply Budgets & Show Comparison", variant="primary")

            # Budget status and chart
            budget_status_output = gr.Textbox(label="💰 Budget Status", lines=8, interactive=False)
            budget_chart_output = gr.Plot(label="📊 Budget Comparison Chart")

            monthly_output = gr.Dataframe(headers=["Monthly Total","Monthly Average"], interactive=False)
            advice_output = gr.Textbox(label="💡 Spending Advice", lines=6, interactive=False)
            top_category_output = gr.Textbox(label="🏆 Top Spending Category", interactive=False)

            def handle_csv(file):
                df, transactions = load_and_clean_csv(file)
                if df is None:
                    return [], [], "", None, "", None, ""

                # Add CSV as text to RAG
                rag.add_texts(df.astype(str).apply(lambda row: ' | '.join(row), axis=1).tolist())

                # Determine Top Spending Category
                category_totals = summarize_expenses(df)["Category Totals"]
                top_category = max(category_totals, key=category_totals.get) if category_totals else "N/A"

                return (
                    format_summary_table(df),
                    format_monthly_table(df),
                    "\n".join(generate_spending_advice(summarize_expenses(df))),
                    transactions,
                    top_category,
                    None,  # Clear chart
                    ""     # Clear budget status
                )

            file_input.change(
                fn=handle_csv,
                inputs=file_input,
                outputs=[summary_output, monthly_output, advice_output, transactions_state,
                        top_category_output, budget_chart_output, budget_status_output]
            )

            # Apply budgets button
            def apply_budgets(summary_table_data):
                if not summary_table_data:
                    return "No data available.", None

                chart = create_budget_comparison_chart(summary_table_data)
                status = generate_budget_status_text(summary_table_data)
                return status, chart

            apply_budget_btn.click(
                fn=apply_budgets,
                inputs=[summary_output],
                outputs=[budget_status_output, budget_chart_output]
            )

        # Chatbot
        with gr.Column(scale=1, min_width=400):
            gr.Markdown("### 💬 Financial Sage Chat")
            chat_output = gr.Chatbot()
            question_input = gr.Textbox(placeholder="Ask anything about your spending or habits...", lines=2)
            chat_button = gr.Button("Ask the Sage 💭")

            def chat_with_history(question, transactions_state, history):
                answer = get_sage_response(question, transactions_state)
                history = history + [(question, answer)]
                return history, history, ""

            chat_button.click(
                fn=chat_with_history,
                inputs=[question_input, transactions_state, chat_history],
                outputs=[chat_output, chat_history, question_input]
            )

            question_input.submit(
                fn=chat_with_history,
                inputs=[question_input, transactions_state, chat_history],
                outputs=[chat_output, chat_history, question_input]
            )

app.launch()

It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://badc59bb41f2d9e728.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [13]:
import pandas as pd
import gradio as gr
from hands_on_ai.chat import get_response
import warnings
import re
from datetime import datetime
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import plotly.graph_objects as go

warnings.filterwarnings("ignore")

# ============================================
# ✅ CSV Handling and Analysis
# ============================================

def load_and_clean_csv(file):
    try:
        df = pd.read_csv(file.name)
        expected_columns = ["Date", "Amount", "Category", "Description"]
        if list(df.columns) != expected_columns:
            return None, None
        df["Date"] = pd.to_datetime(df["Date"].astype(str).str.strip(), dayfirst=True, errors="coerce")
        df["Amount"] = df["Amount"].replace('[\$,]', '', regex=True).astype(float)
        df = df.dropna(subset=["Date", "Amount"])
        if df.empty:
            return None, None
        transactions = df.to_dict(orient="records")
        return df, transactions
    except:
        return None, None

def summarize_expenses(df):
    total = df["Amount"].sum()
    category_totals = df.groupby("Category")["Amount"].sum().round(2).to_dict()
    category_averages = df.groupby("Category")["Amount"].mean().round(2).to_dict()
    return {
        "Total Spending": round(total, 2),
        "Category Totals": category_totals,
        "Category Averages": category_averages
    }

def monthly_summary(df):
    return {
        "Monthly Total": round(df["Amount"].sum(), 2),
        "Monthly Average": round(df["Amount"].mean(), 2)
    }

def generate_spending_advice(summary_dict):
    total = summary_dict.get("Total Spending", 0)
    category_totals = summary_dict.get("Category Totals", {})
    advice = []
    if total == 0 or not category_totals:
        return ["No spending data available."]
    for category, amount in category_totals.items():
        percent = (amount / total) * 100
        if percent >= 20:
            advice.append(f"{category} is {percent:.1f}% of your spending. Consider setting a limit.")
        elif percent >= 10:
            advice.append(f"{category} makes up {percent:.1f}%. Keep an eye on it.")
        else:
            advice.append(f"{category} is only {percent:.1f}%. No action needed.")
    return advice

def format_summary_table(df):
    """Create summary table with budget column (initially 0)"""
    summary = summarize_expenses(df)
    total = summary["Total Spending"]
    table_data = []
    for category, total_amount in summary["Category Totals"].items():
        avg_amount = summary["Category Averages"][category]
        percent = (total_amount / total) * 100
        highlight = "⚠️" if percent >= 20 else ""
        # Add budget column as 0 initially (will be editable)
        table_data.append([category, f"${total_amount:.2f} {highlight}", f"${avg_amount:.2f}", 0])
    return table_data

def format_monthly_table(df):
    monthly = monthly_summary(df)
    return [[f"${monthly['Monthly Total']:.2f}", f"${monthly['Monthly Average']:.2f}"]]

# ============================================
# 📊 NEW: Budget Comparison Chart
# ============================================

def create_budget_comparison_chart(summary_table_data):
    """
    Creates a bar chart comparing actual spending vs budget for each category.
    summary_table_data format: [category, actual_spending, avg_spending, budget]
    """
    try:
        if not summary_table_data or len(summary_table_data) == 0:
            return go.Figure().update_layout(title="No data available")

        categories = []
        actual_spending = []
        budgets = []
        colors = []

        for row in summary_table_data:
            if len(row) < 4:
                continue

            category = str(row[0])

            # Extract numeric value from "$XXX.XX" format or direct string
            actual_str = str(row[1]).replace('$', '').replace('⚠️', '').replace(',', '').strip()
            try:
                actual = float(actual_str)
            except:
                actual = 0.0

            # Handle budget value - could be string, float, int, or None
            try:
                budget = float(row[3]) if row[3] not in [None, '', 'None'] else 0.0
            except:
                budget = 0.0

            categories.append(category)
            actual_spending.append(actual)
            budgets.append(budget)

            # Color: Red if over budget, Green if under budget
            if budget > 0:
                colors.append('#FF4444' if actual > budget else '#44FF44')
            else:
                colors.append('#888888')

        if not categories:
            return go.Figure().update_layout(title="No valid data to display")

        # Create the bar chart
        fig = go.Figure()

        # Actual spending bars
        fig.add_trace(go.Bar(
            name='Actual Spending',
            x=categories,
            y=actual_spending,
            marker_color=colors,
            text=[f'${x:.2f}' for x in actual_spending],
            textposition='outside'
        ))

        # Budget bars
        fig.add_trace(go.Bar(
            name='Budget',
            x=categories,
            y=budgets,
            marker_color='lightblue',
            text=[f'${x:.2f}' for x in budgets],
            textposition='outside',
            opacity=0.7
        ))

        fig.update_layout(
            title='Budget vs Actual Spending Comparison',
            xaxis_title='Category',
            yaxis_title='Amount ($)',
            barmode='group',
            height=450,
            showlegend=True,
            legend=dict(x=0.01, y=0.99)
        )

        return fig

    except Exception as e:
        print(f"Chart error: {str(e)}")
        return go.Figure().update_layout(title=f"Error creating chart: {str(e)}")

def generate_budget_status_text(summary_table_data):
    """Generate text summary of budget status"""
    try:
        if not summary_table_data:
            return "No budget data available."

        status_lines = []
        total_actual = 0
        total_budget = 0

        for row in summary_table_data:
            if len(row) < 4:
                continue

            category = str(row[0])

            # Extract numeric values safely
            actual_str = str(row[1]).replace('$', '').replace('⚠️', '').replace(',', '').strip()
            try:
                actual = float(actual_str)
            except:
                actual = 0.0

            try:
                budget = float(row[3]) if row[3] not in [None, '', 'None'] else 0.0
            except:
                budget = 0.0

            total_actual += actual
            total_budget += budget

            if budget > 0:
                difference = budget - actual
                if actual > budget:
                    status_lines.append(f"🔴 {category}: OVER budget by ${abs(difference):.2f} (${actual:.2f} / ${budget:.2f})")
                else:
                    status_lines.append(f"🟢 {category}: Under budget by ${difference:.2f} (${actual:.2f} / ${budget:.2f})")
            else:
                status_lines.append(f"⚪ {category}: No budget set (spent ${actual:.2f})")

        # Overall summary
        if total_budget > 0:
            overall_diff = total_budget - total_actual
            overall_status = f"\n{'='*50}\n📊 OVERALL SUMMARY:\n"
            overall_status += f"   Total Spent: ${total_actual:.2f}\n"
            overall_status += f"   Total Budget: ${total_budget:.2f}\n"
            if total_actual > total_budget:
                overall_status += f"   ❌ OVER total budget by ${abs(overall_diff):.2f}"
            else:
                overall_status += f"   ✅ UNDER total budget by ${overall_diff:.2f}"
            status_lines.append(overall_status)
        else:
            status_lines.append("\n⚠️ No budgets have been set yet. Enter values in the Budget column above.")

        return "\n".join(status_lines)

    except Exception as e:
        return f"Error generating budget status: {str(e)}"

# ============================================
# 💬 Structured Chatbot Logic with Range-Aware Dates
# ============================================

def extract_dates_from_question(question):
    date_matches = re.findall(r"(\d{1,2}/\d{1,2}/\d{4})", question)
    dates = [datetime.strptime(d, "%d/%m/%Y").date() for d in date_matches]
    if len(dates) == 2:
        start, end = sorted(dates)
        return start, end
    elif len(dates) == 1:
        return dates[0], dates[0]
    else:
        return None, None

def filter_transactions(transactions, question):
    start_date, end_date = extract_dates_from_question(question)
    filtered = transactions

    if start_date and end_date:
        filtered = [t for t in filtered if start_date <= t["Date"].date() <= end_date]

    question_lower = question.lower()
    known_categories = set([t["Category"].lower() for t in transactions])
    matched_categories = [c for c in known_categories if c in question_lower]
    if matched_categories:
        filtered = [t for t in filtered if t["Category"].lower() in matched_categories]

    words_in_question = set(re.findall(r"\b\w+\b", question.lower()))
    filtered_desc = []
    for t in filtered:
        desc_words = set(re.findall(r"\b\w+\b", t["Description"].lower()))
        if words_in_question & desc_words:
            filtered_desc.append(t)
    if filtered_desc:
        filtered = filtered_desc

    return filtered

def format_transactions_for_prompt(transactions):
    if not transactions:
        return "No matching transactions found."
    lines = []
    for t in transactions:
        lines.append(
            f"Date: {t['Date'].strftime('%d/%m/%Y')}, "
            f"Amount: ${t['Amount']:.2f}, "
            f"Category: {t['Category']}, "
            f"Description: {t['Description']}"
        )
    return "\n".join(lines)

# ============================================
# 🧠 Local RAG Setup (FAISS + sentence-transformers)
# ============================================

class LocalRAG:
    def __init__(self):
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.texts = []
        self.vectors = None
        self.index = None

    def add_texts(self, new_texts):
        self.texts.extend(new_texts)
        embeddings = self.model.encode(self.texts, convert_to_numpy=True)
        self.vectors = embeddings.astype('float32')
        self.index = faiss.IndexFlatL2(self.vectors.shape[1])
        self.index.add(self.vectors)

    def query(self, question, top_k=3):
        if not self.index or len(self.texts) == 0:
            return []
        q_vec = self.model.encode([question]).astype('float32')
        distances, indices = self.index.search(q_vec, top_k)
        return [self.texts[i] for i in indices[0] if i < len(self.texts)]

rag = LocalRAG()

# ============================================
# 💬 Financial Sage with RAG + Insights
# ============================================

def summarize_habits(transactions):
    if not transactions:
        return "No transactions to analyze."
    df = pd.DataFrame(transactions)
    category_totals = df.groupby("Category")["Amount"].sum()
    top_category = category_totals.idxmax()
    total_spent = df["Amount"].sum()
    num_transactions = len(df)
    avg_transaction = df["Amount"].mean()
    summary = (
        f"You have {num_transactions} transactions totaling ${total_spent:.2f}. "
        f"On average, you spend ${avg_transaction:.2f} per transaction. "
        f"You spend the most on {top_category} (${category_totals[top_category]:.2f})."
    )
    return summary

def financial_sage_rag(question, transactions):
    filtered = filter_transactions(transactions, question)
    habits_summary = summarize_habits(filtered)
    rag_results = rag.query(question)
    rag_text = "\n".join(rag_results) if rag_results else "No additional reference documents."

    prompt = (
        "You are the Financial Sage, a friendly guide who explains spending habits clearly.\n"
        f"Matching transactions:\n{format_transactions_for_prompt(filtered)}\n\n"
        f"Summary of habits:\n{habits_summary}\n\n"
        f"Additional references:\n{rag_text}\n\n"
        f"Answer the user's question: {question}"
    )
    return get_response(prompt)

def get_sage_response(question, transactions_state):
    if transactions_state:
        return financial_sage_rag(question, transactions_state)
    else:
        return get_response(question)

# ============================================
# 🧩 Gradio App Layout
# ============================================

with gr.Blocks(title="💰 Financial Sage Dashboard") as app:

    gr.Markdown("<h1 style='text-align:center'>💰 Financial Sage Dashboard</h1>")

    transactions_state = gr.State(value=None)
    chat_history = gr.State(value=[])

    with gr.Row():
        # CSV Dashboard
        with gr.Column(scale=1, min_width=450):
            gr.Markdown("### 📁 Upload Your CSV")
            file_input = gr.File(file_types=[".csv"])

            # Summary table with editable budget column
            gr.Markdown("#### 📊 Spending Summary")
            gr.Markdown("💡 *Enter your budget amounts in the last column, then click 'Apply Budgets' below*")
            summary_output = gr.Dataframe(
                headers=["Category", "Total Spending", "Average Spending", "Budget ($)"],
                interactive=True,
                datatype=["str", "str", "str", "number"],
                col_count=(4, "fixed")
            )

            # Button to apply budgets and generate chart
            apply_budget_btn = gr.Button("📊 Apply Budgets & Show Comparison", variant="primary", size="lg")

            # Budget status and chart
            budget_status_output = gr.Textbox(label="💰 Budget Status", lines=10, interactive=False)
            budget_chart_output = gr.Plot(label="📊 Budget Comparison Chart")

            gr.Markdown("---")
            monthly_output = gr.Dataframe(headers=["Monthly Total","Monthly Average"], interactive=False)
            advice_output = gr.Textbox(label="💡 Spending Advice", lines=6, interactive=False)
            top_category_output = gr.Textbox(label="🏆 Top Spending Category", interactive=False)

            def handle_csv(file):
                df, transactions = load_and_clean_csv(file)
                if df is None:
                    return [], [], "", None, "", None, ""

                # Add CSV as text to RAG
                rag.add_texts(df.astype(str).apply(lambda row: ' | '.join(row), axis=1).tolist())

                # Determine Top Spending Category
                category_totals = summarize_expenses(df)["Category Totals"]
                top_category = max(category_totals, key=category_totals.get) if category_totals else "N/A"

                return (
                    format_summary_table(df),
                    format_monthly_table(df),
                    "\n".join(generate_spending_advice(summarize_expenses(df))),
                    transactions,
                    top_category,
                    None,  # Clear chart
                    "Upload complete! Now enter your budgets in the table above."
                )

            file_input.change(
                fn=handle_csv,
                inputs=file_input,
                outputs=[summary_output, monthly_output, advice_output, transactions_state,
                        top_category_output, budget_chart_output, budget_status_output]
            )

            # Apply budgets button
            def apply_budgets(summary_table_data):
                try:
                    # Debug: Check what we're receiving
                    print(f"Received data type: {type(summary_table_data)}")
                    print(f"Received data: {summary_table_data}")

                    if summary_table_data is None:
                        return "⚠️ No data available. Please upload a CSV file first.", None

                    # Handle different Gradio dataframe formats
                    if isinstance(summary_table_data, dict):
                        # Gradio 4.x format: dict with 'data' key
                        if 'data' in summary_table_data:
                            data = summary_table_data['data']
                        else:
                            data = list(summary_table_data.values())
                    elif isinstance(summary_table_data, pd.DataFrame):
                        # Convert DataFrame to list of lists
                        data = summary_table_data.values.tolist()
                    else:
                        # Assume it's already a list
                        data = summary_table_data

                    if not data or len(data) == 0:
                        return "⚠️ No data available. Please upload a CSV file first.", None

                    chart = create_budget_comparison_chart(data)
                    status = generate_budget_status_text(data)
                    return status, chart

                except Exception as e:
                    import traceback
                    error_details = traceback.format_exc()
                    print(f"Full error: {error_details}")
                    return f"❌ Error: {str(e)}\n\nPlease check the console for details.", None

            apply_budget_btn.click(
                fn=apply_budgets,
                inputs=[summary_output],
                outputs=[budget_status_output, budget_chart_output]
            )

        # Chatbot
        with gr.Column(scale=1, min_width=400):
            gr.Markdown("### 💬 Financial Sage Chat")
            chat_output = gr.Chatbot()
            question_input = gr.Textbox(placeholder="Ask anything about your spending or habits...", lines=2)
            chat_button = gr.Button("Ask the Sage 💭")

            def chat_with_history(question, transactions_state, history):
                answer = get_sage_response(question, transactions_state)
                history = history + [(question, answer)]
                return history, history, ""

            chat_button.click(
                fn=chat_with_history,
                inputs=[question_input, transactions_state, chat_history],
                outputs=[chat_output, chat_history, question_input]
            )

            question_input.submit(
                fn=chat_with_history,
                inputs=[question_input, transactions_state, chat_history],
                outputs=[chat_output, chat_history, question_input]
            )

app.launch()

It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://14f78afece47ec0d25.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [21]:
import pandas as pd
import gradio as gr
from hands_on_ai.chat import get_response
import warnings
import re
from datetime import datetime
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import plotly.graph_objects as go

warnings.filterwarnings("ignore")

# ============================================
# ✅ CSV Handling and Analysis
# ============================================

def load_and_clean_csv(file):
    try:
        df = pd.read_csv(file.name)
        expected_columns = ["Date", "Amount", "Category", "Description"]
        if list(df.columns) != expected_columns:
            return None, None
        df["Date"] = pd.to_datetime(df["Date"].astype(str).str.strip(), dayfirst=True, errors="coerce")
        df["Amount"] = df["Amount"].replace('[\$,]', '', regex=True).astype(float)
        df = df.dropna(subset=["Date", "Amount"])
        if df.empty:
            return None, None
        transactions = df.to_dict(orient="records")
        return df, transactions
    except:
        return None, None

def summarize_expenses(df):
    total = df["Amount"].sum()
    category_totals = df.groupby("Category")["Amount"].sum().round(2).to_dict()
    category_averages = df.groupby("Category")["Amount"].mean().round(2).to_dict()
    return {
        "Total Spending": round(total, 2),
        "Category Totals": category_totals,
        "Category Averages": category_averages
    }

def monthly_summary(df):
    return {
        "Monthly Total": round(df["Amount"].sum(), 2),
        "Monthly Average": round(df["Amount"].mean(), 2)
    }

def generate_spending_advice(summary_dict):
    total = summary_dict.get("Total Spending", 0)
    category_totals = summary_dict.get("Category Totals", {})
    advice = []
    if total == 0 or not category_totals:
        return ["No spending data available."]
    for category, amount in category_totals.items():
        percent = (amount / total) * 100
        if percent >= 20:
            advice.append(f"{category} is {percent:.1f}% of your spending. Consider setting a limit.")
        elif percent >= 10:
            advice.append(f"{category} makes up {percent:.1f}%. Keep an eye on it.")
        else:
            advice.append(f"{category} is only {percent:.1f}%. No action needed.")
    return advice

def format_summary_table(df):
    """Create summary table with budget column (initially 0)"""
    summary = summarize_expenses(df)
    total = summary["Total Spending"]
    table_data = []
    for category, total_amount in summary["Category Totals"].items():
        avg_amount = summary["Category Averages"][category]
        percent = (total_amount / total) * 100
        highlight = "⚠️" if percent >= 20 else ""
        # Add budget column as 0 initially (will be editable)
        table_data.append([category, f"${total_amount:.2f} {highlight}", f"${avg_amount:.2f}", 0])
    return table_data

def format_monthly_table(df):
    monthly = monthly_summary(df)
    return [[f"${monthly['Monthly Total']:.2f}", f"${monthly['Monthly Average']:.2f}"]]

# ============================================
# 📊 NEW: Budget Comparison Chart
# ============================================

def create_budget_comparison_chart(summary_table_data):
    """
    Creates a modern, polished bar chart comparing actual spending vs budget.
    summary_table_data format: [category, actual_spending, avg_spending, budget]
    """
    import plotly.graph_objects as go

    try:
        if not summary_table_data or len(summary_table_data) == 0:
            return go.Figure().update_layout(title="No data available")

        categories, actual_spending, budgets, colors = [], [], [], []

        for row in summary_table_data:
            if len(row) < 4:
                continue

            category = str(row[0])
            actual_str = str(row[1]).replace('$', '').replace('⚠️', '').replace(',', '').strip()
            try:
                actual = float(actual_str)
            except:
                actual = 0.0

            try:
                budget = float(row[3]) if row[3] not in [None, '', 'None'] else 0.0
            except:
                budget = 0.0

            categories.append(category)
            actual_spending.append(actual)
            budgets.append(budget)

            # Color palette: soft red for over budget, soft green for under, gray for no budget
            if budget > 0:
                colors.append('#EF4444' if actual > budget else '#22C55E')  # red / green
            else:
                colors.append('#9CA3AF')  # neutral gray

        if not categories:
            return go.Figure().update_layout(title="No valid data to display")

        # Create Figure
        fig = go.Figure()

        # Actual spending bars
        fig.add_trace(go.Bar(
            name='Actual Spending',
            x=categories,
            y=actual_spending,
            marker_color=colors,
            text=[f'${x:.2f}' for x in actual_spending],
            textposition='outside',
            textfont=dict(size=12, color='#333'),
            hovertemplate="<b>%{x}</b><br>Spent: $%{y:.2f}<extra></extra>"
        ))

        # Budget bars
        fig.add_trace(go.Bar(
            name='Budget',
            x=categories,
            y=budgets,
            marker_color='#60A5FA',  # soft blue
            text=[f'${x:.2f}' for x in budgets],
            textposition='outside',
            textfont=dict(size=12, color='#333'),
            hovertemplate="<b>%{x}</b><br>Budget: $%{y:.2f}<extra></extra>",
            opacity=0.7
        ))

        # Layout styling
        fig.update_layout(
            title=dict(
                text='💰 Budget vs Actual Spending',
                x=0.5,
                font=dict(size=22, color='#111', family='Arial Black')
            ),
            xaxis=dict(
                title='Category',
                tickfont=dict(size=13, color='#111', family='Arial'),
                showgrid=False,
                zeroline=False
            ),
            yaxis=dict(
                title='Amount ($)',
                titlefont=dict(size=14, color='#111'),
                tickfont=dict(size=12),
                showgrid=True,
                gridcolor='rgba(200,200,200,0.3)'
            ),
            plot_bgcolor='white',
            paper_bgcolor='white',
            barmode='group',
            bargap=0.25,
            bargroupgap=0.05,
            height=500,
            showlegend=True,
            legend=dict(
                x=0.02, y=0.98,
                bgcolor='rgba(255,255,255,0)',
                bordercolor='rgba(0,0,0,0)',
                font=dict(size=13)
            ),
            margin=dict(t=70, l=50, r=30, b=50)
        )

        # Add a subtle horizontal line for the average budget (if meaningful)
        if any(budgets):
            avg_budget = np.mean([b for b in budgets if b > 0])
            fig.add_shape(
                type="line",
                x0=-0.5, x1=len(categories) - 0.5,
                y0=avg_budget, y1=avg_budget,
                line=dict(color="rgba(30,144,255,0.6)", width=2, dash="dot"),
            )
            fig.add_annotation(
                x=len(categories) - 0.5,
                y=avg_budget,
                text=f"Avg Budget: ${avg_budget:.2f}",
                showarrow=False,
                font=dict(size=12, color="dodgerblue"),
                xanchor="right",
                yanchor="bottom"
            )

        return fig

    except Exception as e:
        print(f"Chart error: {str(e)}")
        return go.Figure().update_layout(title=f"Error creating chart: {str(e)}")

def generate_budget_status_text(summary_table_data):
    """Generate text summary of budget status"""
    try:
        if not summary_table_data:
            return "No budget data available."

        status_lines = []
        total_actual = 0
        total_budget = 0

        for row in summary_table_data:
            if len(row) < 4:
                continue

            category = str(row[0])

            # Extract numeric values safely
            actual_str = str(row[1]).replace('$', '').replace('⚠️', '').replace(',', '').strip()
            try:
                actual = float(actual_str)
            except:
                actual = 0.0

            try:
                budget = float(row[3]) if row[3] not in [None, '', 'None'] else 0.0
            except:
                budget = 0.0

            total_actual += actual
            total_budget += budget

            if budget > 0:
                difference = budget - actual
                if actual > budget:
                    status_lines.append(f"🔴 {category}: OVER budget by ${abs(difference):.2f} (${actual:.2f} / ${budget:.2f})")
                else:
                    status_lines.append(f"🟢 {category}: Under budget by ${difference:.2f} (${actual:.2f} / ${budget:.2f})")
            else:
                status_lines.append(f"⚪ {category}: No budget set (spent ${actual:.2f})")

        # Overall summary
        if total_budget > 0:
            overall_diff = total_budget - total_actual
            overall_status = f"\n{'='*50}\n📊 OVERALL SUMMARY:\n"
            overall_status += f"   Total Spent: ${total_actual:.2f}\n"
            overall_status += f"   Total Budget: ${total_budget:.2f}\n"
            if total_actual > total_budget:
                overall_status += f"   ❌ OVER total budget by ${abs(overall_diff):.2f}"
            else:
                overall_status += f"   ✅ UNDER total budget by ${overall_diff:.2f}"
            status_lines.append(overall_status)
        else:
            status_lines.append("\n⚠️ No budgets have been set yet. Enter values in the Budget column above.")

        return "\n".join(status_lines)

    except Exception as e:
        return f"Error generating budget status: {str(e)}"

# ============================================
# 💬 Structured Chatbot Logic with Range-Aware Dates
# ============================================

def extract_dates_from_question(question):
    date_matches = re.findall(r"(\d{1,2}/\d{1,2}/\d{4})", question)
    dates = [datetime.strptime(d, "%d/%m/%Y").date() for d in date_matches]
    if len(dates) == 2:
        start, end = sorted(dates)
        return start, end
    elif len(dates) == 1:
        return dates[0], dates[0]
    else:
        return None, None

def filter_transactions(transactions, question):
    start_date, end_date = extract_dates_from_question(question)
    filtered = transactions

    if start_date and end_date:
        filtered = [t for t in filtered if start_date <= t["Date"].date() <= end_date]

    question_lower = question.lower()
    known_categories = set([t["Category"].lower() for t in transactions])
    matched_categories = [c for c in known_categories if c in question_lower]
    if matched_categories:
        filtered = [t for t in filtered if t["Category"].lower() in matched_categories]

    words_in_question = set(re.findall(r"\b\w+\b", question.lower()))
    filtered_desc = []
    for t in filtered:
        desc_words = set(re.findall(r"\b\w+\b", t["Description"].lower()))
        if words_in_question & desc_words:
            filtered_desc.append(t)
    if filtered_desc:
        filtered = filtered_desc

    return filtered

def format_transactions_for_prompt(transactions):
    if not transactions:
        return "No matching transactions found."
    lines = []
    for t in transactions:
        lines.append(
            f"Date: {t['Date'].strftime('%d/%m/%Y')}, "
            f"Amount: ${t['Amount']:.2f}, "
            f"Category: {t['Category']}, "
            f"Description: {t['Description']}"
        )
    return "\n".join(lines)

# ============================================
# 🧠 Local RAG Setup (FAISS + sentence-transformers)
# ============================================

class LocalRAG:
    def __init__(self):
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.texts = []
        self.vectors = None
        self.index = None

    def add_texts(self, new_texts):
        self.texts.extend(new_texts)
        embeddings = self.model.encode(self.texts, convert_to_numpy=True)
        self.vectors = embeddings.astype('float32')
        self.index = faiss.IndexFlatL2(self.vectors.shape[1])
        self.index.add(self.vectors)

    def query(self, question, top_k=3):
        if not self.index or len(self.texts) == 0:
            return []
        q_vec = self.model.encode([question]).astype('float32')
        distances, indices = self.index.search(q_vec, top_k)
        return [self.texts[i] for i in indices[0] if i < len(self.texts)]

rag = LocalRAG()

# ============================================
# 💬 Financial Sage with RAG + Insights
# ============================================

def summarize_habits(transactions):
    if not transactions:
        return "No transactions to analyze."
    df = pd.DataFrame(transactions)
    category_totals = df.groupby("Category")["Amount"].sum()
    top_category = category_totals.idxmax()
    total_spent = df["Amount"].sum()
    num_transactions = len(df)
    avg_transaction = df["Amount"].mean()
    summary = (
        f"You have {num_transactions} transactions totaling ${total_spent:.2f}. "
        f"On average, you spend ${avg_transaction:.2f} per transaction. "
        f"You spend the most on {top_category} (${category_totals[top_category]:.2f})."
    )
    return summary

def financial_sage_rag(question, transactions):
    filtered = filter_transactions(transactions, question)
    habits_summary = summarize_habits(filtered)
    rag_results = rag.query(question)
    rag_text = "\n".join(rag_results) if rag_results else "No additional reference documents."

    prompt = (
        "You are the Financial Sage, a friendly guide who explains spending habits clearly.\n"
        f"Matching transactions:\n{format_transactions_for_prompt(filtered)}\n\n"
        f"Summary of habits:\n{habits_summary}\n\n"
        f"Additional references:\n{rag_text}\n\n"
        f"Answer the user's question: {question}"
    )
    return get_response(prompt)

def get_sage_response(question, transactions_state):
    if transactions_state:
        return financial_sage_rag(question, transactions_state)
    else:
        return get_response(question)

# ============================================
# 🧩 Gradio App Layout
# ============================================

with gr.Blocks(title="💰 Financial Sage Dashboard", theme=gr.themes.Soft()) as app:

    # 1️⃣ Consistency: Title and visual identity are clear and uniform
    gr.Markdown(
        """
        <h1 style='text-align:center; color:#2E8B57;'>💰 Financial Sage Dashboard</h1>
        <p style='text-align:center; font-size:16px;'>
        Your all-in-one assistant for tracking spending, applying budgets, and gaining financial insights.<br>
        </p>
        """
    )

    transactions_state = gr.State(value=None)
    chat_history = gr.State(value=[])

    with gr.Row():
        # ===============================
        # 📊 CSV UPLOAD & DASHBOARD PANEL
        # ===============================
        with gr.Column(scale=1, min_width=450):
            # 2️⃣ Universal Usability: clear, logical order of steps
            gr.Markdown("## 📁 Step 1: Upload & Analyze Your Spending")

            file_input = gr.File(
                label="Upload CSV File",
                file_types=[".csv"],
                file_count="single",
                interactive=True
            )

            gr.Markdown(
                "💡 **Tip:** Ensure your CSV contains columns: `Date`, `Amount`, `Category`, and `Description`."
            )

            # 3️⃣ Informative Feedback: visually clear summary with editable fields
            gr.Markdown("### 📊 Spending Summary")
            gr.Markdown("💡 *Enter your budget in the last column, then click 'Apply Budgets' below.*")

            summary_output = gr.Dataframe(
                headers=["Category", "Total Spending", "Average Spending", "Budget ($)"],
                interactive=True,
                datatype=["str", "str", "str", "number"],
                col_count=(4, "fixed"),
                wrap=True
            )

            # 4️⃣ Design Dialogs for Closure: explicit action button to close each step
            apply_budget_btn = gr.Button(
                "📊 Apply Budgets & Show Comparison",
                variant="primary"
            )

            budget_status_output = gr.Textbox(
                label="💰 Budget Status",
                lines=10,
                interactive=False,
                placeholder="Your budget summary will appear here..."
            )

            budget_chart_output = gr.Plot(label="📉 Budget Comparison Chart")

            gr.Markdown("---")
            monthly_output = gr.Dataframe(
                headers=["Monthly Total", "Monthly Average"],
                interactive=False
            )

            advice_output = gr.Textbox(
                label="💡 Personalized Spending Advice",
                lines=6,
                interactive=False
            )

            top_category_output = gr.Textbox(
                label="🏆 Top Spending Category",
                interactive=False
            )

            # 5️⃣ Simple Error Handling & Feedback
            def handle_csv(file):
                df, transactions = load_and_clean_csv(file)
                if df is None:
                    return [], [], "", None, "", None, "❌ Invalid file. Please upload a valid CSV."

                rag.add_texts(df.astype(str).apply(lambda row: ' | '.join(row), axis=1).tolist())
                category_totals = summarize_expenses(df)["Category Totals"]
                top_category = max(category_totals, key=category_totals.get) if category_totals else "N/A"

                return (
                    format_summary_table(df),
                    format_monthly_table(df),
                    "\n".join(generate_spending_advice(summarize_expenses(df))),
                    transactions,
                    top_category,
                    None,
                    "✅ Upload complete! Enter budgets in the table above."
                )

            file_input.change(
                fn=handle_csv,
                inputs=file_input,
                outputs=[
                    summary_output, monthly_output, advice_output, transactions_state,
                    top_category_output, budget_chart_output, budget_status_output
                ]
            )

            # 6️⃣ Easy Reversal of Actions: user can re-upload CSV anytime to reset data
            def apply_budgets(summary_table_data):
                try:
                    # Debug: Check what we're receiving
                    print(f"Received data type: {type(summary_table_data)}")
                    print(f"Received data: {summary_table_data}")

                    if summary_table_data is None:
                        return "⚠️ No data available. Please upload a CSV file first.", None

                    # Handle different Gradio dataframe formats
                    if isinstance(summary_table_data, dict):
                        # Gradio 4.x format: dict with 'data' key
                        if 'data' in summary_table_data:
                            data = summary_table_data['data']
                        else:
                            data = list(summary_table_data.values())
                    elif isinstance(summary_table_data, pd.DataFrame):
                        # Convert DataFrame to list of lists
                        data = summary_table_data.values.tolist()
                    else:
                        # Assume it's already a list
                        data = summary_table_data

                    if not data or len(data) == 0:
                        return "⚠️ No data available. Please upload a CSV file first.", None

                    chart = create_budget_comparison_chart(data)
                    status = generate_budget_status_text(data)
                    return status, chart

                except Exception as e:
                    import traceback
                    error_details = traceback.format_exc()
                    print(f"Full error: {error_details}")
                    return f"❌ Error: {str(e)}\n\nPlease check the console for details.", None

            apply_budget_btn.click(
                fn=apply_budgets,
                inputs=[summary_output],
                outputs=[budget_status_output, budget_chart_output]
            )


        # ===============================
        # 💬 FINANCIAL CHATBOT PANEL
        # ===============================
        with gr.Column(scale=1, min_width=400):
            gr.Markdown("## 💬 Step 2: Chat with the Financial Sage")
            gr.Markdown(
                "Ask questions like *'What’s my top spending category?'* or *'How can I save more?'*"
            )

            chat_output = gr.Chatbot(label="Chat History", height=400)
            question_input = gr.Textbox(
                placeholder="Type your question here...",
                lines=2,
                label="Ask the Sage"
            )

            chat_button = gr.Button("Ask the Sage 💭", variant="secondary")

            # 7️⃣ Internal Locus of Control: user explicitly triggers chatbot actions
            def chat_with_history(question, transactions_state, history):
                answer = get_sage_response(question, transactions_state)
                history = history + [(question, answer)]
                return history, history, ""

            chat_button.click(
                fn=chat_with_history,
                inputs=[question_input, transactions_state, chat_history],
                outputs=[chat_output, chat_history, question_input]
            )

            question_input.submit(
                fn=chat_with_history,
                inputs=[question_input, transactions_state, chat_history],
                outputs=[chat_output, chat_history, question_input]
            )

    # 8️⃣ Reduce Short-Term Memory Load: guide users visually
    gr.Markdown(
        """
        ---
        ### 🧭 User Guidance Summary
        - **Step 1:** Upload CSV → Review Spending → Enter Budgets
        - **Step 2:** Click "Apply Budgets" → Review Chart & Advice
        - **Step 3:** Chat with Financial Sage for insights
        <br><br>
        ✅ Created by Sangi
        """
    )

app.launch()



It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://22703ca3369ddde237.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


