In [None]:
!pip install pandas numpy matplotlib scikit-learn tensorflow gradio




In [None]:
"""
Robust Time-Series Forecasting Gradio App
- Input: company name (must exist in CSV).
- Output: performance_dashboard.png, forecast_plot.png, and a summary message.
Place this file alongside your dataset (default path /content/synthetic_stock_data.csv)
and optional model.h5 (if you want to use a pre-saved LSTM).
"""

import os, sys, subprocess, warnings, io
warnings.filterwarnings("ignore")

# ---------- helper: resilient imports ----------
def try_import(name, import_from=None):
    try:
        if import_from:
            m = __import__(import_from, fromlist=[name])
            return getattr(m, name), None
        else:
            m = __import__(name)
            return m, None
    except Exception as e:
        return None, e

# optional forecasting libs
auto_arima = None
arima_available = True
obj, err = try_import('auto_arima', import_from='pmdarima')
if err:
    arima_available = False
else:
    auto_arima = obj

Prophet = None
prophet_available = True
obj, err = try_import('Prophet', import_from='prophet')
if err:
    obj2, err2 = try_import('Prophet', import_from='fbprophet')
    if obj2:
        Prophet = obj2
    else:
        prophet_available = False
else:
    Prophet = obj

# plotting libs
try:
    import matplotlib.pyplot as plt
except Exception:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "matplotlib"])
    import matplotlib.pyplot as plt

# core libs
import pandas as pd
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout

# reporting / images (FPDF/Openpyxl not needed inside Gradio display)
import gradio as gr

# reproducibility
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)

# ---------- load dataset (adjust path if needed) ----------
DATA_PATHS = [
    "synthetic_stock_data.csv",
    "/content/synthetic_stock_data.csv",
    "/mnt/data/synthetic_stock_data.csv",
    "stock_data.csv"
]
dataset_path = None
for p in DATA_PATHS:
    if os.path.exists(p):
        dataset_path = p
        break

if dataset_path is None:
    raise FileNotFoundError("Couldn't find dataset. Place CSV as 'synthetic_stock_data.csv' or update DATA_PATHS.")

raw_df = pd.read_csv(dataset_path)
raw_df.columns = [c.lower() for c in raw_df.columns]

# ensure minimal required columns exist
if not any(c in raw_df.columns for c in ["date"]):
    # assume first column is date-like
    pass

# normalize column names
date_col = "date" if "date" in raw_df.columns else raw_df.columns[0]
close_col = "close" if "close" in raw_df.columns else raw_df.select_dtypes(include=np.number).columns[-1]
company_col = "company" if "company" in raw_df.columns else None

# rename canonical
raw_df = raw_df.rename(columns={date_col: "Date", close_col: "Close"})
if company_col:
    raw_df = raw_df.rename(columns={company_col: "Company"})
else:
    # create a dummy company column if missing (so user can query that name)
    raw_df["Company"] = "COMPANY"

# parse date and sort
raw_df["Date"] = pd.to_datetime(raw_df["Date"], errors="coerce")
raw_df["Close"] = pd.to_numeric(raw_df["Close"], errors="coerce")
raw_df = raw_df.dropna(subset=["Date", "Close"]).sort_values("Date").reset_index(drop=True)

# ---------- helper functions ----------
def evaluate_model(y_true, y_pred, name="Model"):
    # align lengths
    y_true = np.asarray(y_true).ravel()
    y_pred = np.asarray(y_pred).ravel()
    minlen = min(len(y_true), len(y_pred))
    if minlen == 0:
        return {"Model": name, "MAE": np.nan, "RMSE": np.nan}
    y_true = y_true[:minlen]
    y_pred = y_pred[:minlen]
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    return {"Model": name, "MAE": mae, "RMSE": rmse}

def save_performance_plot(results_list, outpath="performance_dashboard.png"):
    df = pd.DataFrame(results_list)
    # drop nan rows
    df = df.dropna(subset=["MAE","RMSE"], how="all")
    if df.empty:
        # create placeholder
        plt.figure(figsize=(6,3))
        plt.text(0.5, 0.5, "No performance metrics", ha="center", va="center")
        plt.axis("off")
        plt.savefig(outpath, bbox_inches="tight", dpi=150)
        plt.close()
        return outpath
    df = df.set_index("Model")[["MAE","RMSE"]]
    ax = df.plot(kind="bar", figsize=(8,4), rot=0)
    ax.set_ylabel("Error")
    ax.set_title("Model Performance (MAE & RMSE)")
    plt.tight_layout()
    plt.savefig(outpath, dpi=150)
    plt.close()
    return outpath

def save_forecast_plot(df_train, df_test, preds_dict, outpath="forecast_plot.png"):
    plt.figure(figsize=(12,5))
    plt.plot(df_train["Date"], df_train["Close"], label="Train")
    plt.plot(df_test["Date"], df_test["Close"], label="Test", color="black")
    for name, series in preds_dict.items():
        # ensure series length matches df_test length or pad with nan
        arr = np.asarray(series)
        if len(arr) < len(df_test):
            arr = np.concatenate([np.full(len(df_test)-len(arr), np.nan), arr]) if len(arr)>0 else np.full(len(df_test), np.nan)
        plt.plot(df_test["Date"], arr, label=name)
    plt.title("Forecast Comparison")
    plt.xlabel("Date")
    plt.ylabel("Close")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(outpath, dpi=150)
    plt.close()
    return outpath

def make_sequences(data, window):
    X, y = [], []
    for i in range(window, len(data)):
        X.append(data[i-window:i, 0])
        y.append(data[i, 0])
    return np.array(X), np.array(y)

def safe_float_array(a):
    return np.asarray(a).astype(float)

# ---------- main forecasting routine for a company ----------
def run_forecast_for_company(company_name: str, use_saved_lstm=True, lstm_epochs=6):
    # sanitize input
    company_name = (company_name or "").strip()
    if not company_name:
        return None, None, "❌ Please enter a company name."

    # filter dataset (case-insensitive)
    df_company = raw_df[raw_df["Company"].str.lower() == company_name.lower()].copy()
    if df_company.empty:
        # try partial match
        df_company = raw_df[raw_df["Company"].str.lower().str.contains(company_name.lower())].copy()

    if df_company.empty:
        return None, None, f"❌ Company '{company_name}' not found in dataset."

    # reset index & ensure sorted
    df_company = df_company.sort_values("Date").reset_index(drop=True)

    n = len(df_company)
    # pick initial SEQ
    SEQ = 10
    # if dataset small, reduce SEQ dynamically
    if n < SEQ + 5:
        SEQ = max(3, max(1, int(n * 0.12)))  # keep small but meaningful
    if SEQ >= n:
        SEQ = max(1, n - 1)

    # TEST_DAYS: at most 180 or 20% of data but ensure there's at least one training row
    TEST_DAYS = min(180, max(1, int(n * 0.2)))
    if n - TEST_DAYS < 1:
        TEST_DAYS = max(1, n - 1)

    # scaled data and sequences
    scaler = MinMaxScaler()
    scaled = scaler.fit_transform(df_company[["Close"]].values)
    X_all, y_all = make_sequences(scaled, SEQ)
    if len(X_all) == 0:
        return None, None, "❌ Not enough rows to build sequences. Need at least (SEQ+1) rows."

    # Ensure TEST_DAYS <= len(X_all)
    if TEST_DAYS > len(X_all):
        TEST_DAYS = max(1, len(X_all) // 2)

    # train/test for sequences
    X_train = X_all[:-TEST_DAYS] if len(X_all) > TEST_DAYS else X_all[:0]
    y_train = y_all[:-TEST_DAYS] if len(y_all) > TEST_DAYS else y_all[:0]
    X_test = X_all[-TEST_DAYS:]
    y_test = y_all[-TEST_DAYS:]

    # Map df_train/df_test (by Date) for plotting: df_test should correspond to last TEST_DAYS rows in df_company starting at index SEQ
    df_test = df_company.iloc[-TEST_DAYS:].copy().reset_index(drop=True)
    df_train = df_company.iloc[: len(df_company) - TEST_DAYS].copy().reset_index(drop=True)

    preds = {}
    results = []

    # --- Baseline: Naive (last value from df_train) ---
    if len(df_train) >= 1:
        last_val = df_train["Close"].iloc[-1]
    else:
        last_val = df_company["Close"].iloc[0]
    naive_pred = np.repeat(last_val, len(df_test))
    preds["Naive"] = naive_pred
    results.append(evaluate_model(df_test["Close"].values, naive_pred, "Naive"))

    # --- Baseline: MA(window) ---
    ma_window = min(7, max(1, len(df_train)))
    if len(df_train) >= ma_window:
        ma_last = df_train["Close"].rolling(ma_window).mean().dropna()
        ma_last_val = ma_last.iloc[-1] if not ma_last.empty else df_train["Close"].iloc[-1] if len(df_train) else df_company["Close"].mean()
    else:
        ma_last_val = df_train["Close"].iloc[-1] if len(df_train) else df_company["Close"].mean()
    ma_pred = np.repeat(ma_last_val, len(df_test))
    preds[f"MA({ma_window})"] = ma_pred
    results.append(evaluate_model(df_test["Close"].values, ma_pred, f"MA({ma_window})"))

    # --- ARIMA (if available & enough training data) ---
    if arima_available and auto_arima and len(df_train) >= 12:
        try:
            arima_model = auto_arima(df_train["Close"].values, seasonal=False, suppress_warnings=True, error_action="ignore")
            arima_forecast = arima_model.predict(n_periods=len(df_test))
            preds["ARIMA"] = arima_forecast
            results.append(evaluate_model(df_test["Close"].values, arima_forecast, "ARIMA"))
        except Exception as e:
            # skip ARIMA on errors
            print("ARIMA error:", e)

    # --- Prophet (if available & enough data) ---
    if prophet_available and Prophet and len(df_train) >= 10:
        try:
            prophet_df = df_train[["Date","Close"]].rename(columns={"Date":"ds","Close":"y"})
            model_prophet = Prophet(daily_seasonality=True)
            model_prophet.fit(prophet_df)
            future = pd.DataFrame(df_test["Date"]).rename(columns={"Date":"ds"})
            prophet_pred = model_prophet.predict(future)["yhat"].values
            preds["Prophet"] = prophet_pred
            results.append(evaluate_model(df_test["Close"].values, prophet_pred, "Prophet"))
        except Exception as e:
            print("Prophet error:", e)

    # --- LSTM: use saved model.h5 if available and compatible, else train a lightweight on-the-fly model if dataset permits ---
    lstm_used = False
    # Try use external model.h5 only if shape compatibility likely OK
    if use_saved_lstm and os.path.exists("model.h5"):
        try:
            loaded = load_model("model.h5")
            # loaded expects (batch, seq, 1). We'll attempt prediction using X_test if shapes align.
            if X_test.shape[1] == loaded.input_shape[1]:
                y_pred_scaled = loaded.predict(X_test.reshape((-1, X_test.shape[1], 1)))
                y_pred = scaler.inverse_transform(y_pred_scaled).ravel()
                preds["LSTM"] = y_pred
                results.append(evaluate_model(df_test["Close"].values[:len(y_pred)], y_pred, "LSTM"))
                lstm_used = True
            else:
                # mismatch sequence length -> will attempt on-the-fly train
                print("Saved model sequence mismatch: saved seq", loaded.input_shape[1], "data seq", X_test.shape[1])
        except Exception as e:
            print("Could not load model.h5:", e)

    # On-the-fly LSTM training (only if not used saved and there's enough training examples)
    if (not lstm_used) and len(X_train) >= 5:
        try:
            # prepare shapes
            X_train_tf = X_train.reshape((-1, X_train.shape[1], 1))
            X_test_tf = X_test.reshape((-1, X_test.shape[1], 1))
            model = Sequential([LSTM(64, input_shape=(X_train_tf.shape[1],1)), Dropout(0.2), Dense(1)])
            model.compile(optimizer="adam", loss="mse")
            # small epochs to keep interactive time low
            model.fit(X_train_tf, y_train, epochs=max(3, lstm_epochs), batch_size=16, verbose=0)
            y_pred_scaled = model.predict(X_test_tf)
            y_pred = scaler.inverse_transform(y_pred_scaled).ravel()
            preds["LSTM"] = y_pred
            results.append(evaluate_model(df_test["Close"].values[:len(y_pred)], y_pred, "LSTM"))
            # optionally save quick model (commented)
            # model.save("model_quick.h5")
            lstm_used = True
        except Exception as e:
            print("On-the-fly LSTM failed:", e)

    # create result images (performance & forecast)
    perf_path = save_performance_plot(results, outpath="performance_dashboard.png")
    fcst_path = save_forecast_plot(df_train, df_test, preds, outpath="forecast_plot.png")

    last_date = df_test["Date"].max().strftime("%Y-%m-%d")
    message = f"✅ Generates performance dashboard & forecast plots with Date up to {last_date} (Company: {company_name}, rows: {n})"

    return perf_path, fcst_path, message

# ---------- Gradio UI ----------
title = "📈 Time Series Forecast Dashboard (by Company)"
desc = "Enter company name from the dataset. The app filters rows for that company, runs quick forecasts and returns performance & forecast plots."

iface = gr.Interface(
    fn=run_forecast_for_company,
    inputs=gr.Textbox(lines=1, placeholder="Enter company name (case-insensitive)"),
    outputs=[gr.Image(type="filepath", label="Performance Dashboard"),
             gr.Image(type="filepath", label="Forecast Plot"),
             gr.Textbox(label="Message")],
    title=title,
    description=desc,
    allow_flagging="never",
    examples=[]
)

if __name__ == "__main__":
    iface.launch()


It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://a844faf29d099b5952.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
