In [1]:
from __future__ import annotations

import json
import pickle
import random
from dataclasses import dataclass
from datetime import date, timedelta
from pathlib import Path
from typing import Dict, List, Tuple, Optional

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# ---------------------- PATHS (Windows-safe raw strings) ----------------------
DATA_CSV = r"C:\Users\sagni\Downloads\Price Sense\archive\flipkart_com-ecommerce_sample.csv"
ARTIFACT_DIR = r"C:\Users\sagni\Downloads\Price Sense"
H5_MODEL = Path(ARTIFACT_DIR) / "price_lstm.h5"
FAKE_REVIEW_PKL = Path(ARTIFACT_DIR) / "fake_review_lr.pkl"

# Prediction controls (edit as you like)
PRODUCT_QUERY = "phone"   # substring match over product title (case-insensitive). If None -> use PRODUCT_INDEX.
PRODUCT_INDEX = 0         # used if PRODUCT_QUERY finds nothing; picks the Nth product from CSV after filtering
FORECAST_DAYS = 7
SEED_DAYS = 60            # must match training's synthetic seed_days for similar behavior
SEQ_LEN = 7               # must match training's lstm_sequence_len

# For reproducibility (synthetic series generation)
RANDOM_SEED = 42

# -----------------------------------------------------------------------------

# --- Optional TensorFlow for true LSTM inference (fallback to moving average) ---
try:
    import tensorflow as tf  # noqa: F401
    TF_AVAILABLE = True
except Exception:
    TF_AVAILABLE = False


# -------------------- Data loading / column mapping ---------------------------
def load_csv(csv_path: str | Path) -> pd.DataFrame:
    p = Path(csv_path)
    if not p.exists():
        raise FileNotFoundError(f"CSV not found at: {p}")
    try:
        return pd.read_csv(p, encoding="utf-8")
    except UnicodeDecodeError:
        return pd.read_csv(p, encoding="latin1")


def map_columns(df: pd.DataFrame) -> Dict[str, Optional[str]]:
    cols = {c.lower(): c for c in df.columns}

    def find(name: str):
        for k, v in cols.items():
            if name in k:
                return v
        return None

    return {
        "title": find("product_name") or find("title") or list(df.columns)[0],
        "category": find("product_category_tree") or find("category"),
        "brand": find("brand"),
        "retail_price": find("retail_price") or find("mrp") or find("price"),
        "discounted_price": find("discounted_price") or find("discount_price"),
        "description": find("description") or find("product_description"),
    }


# ------------------------ Synthetic price series ------------------------------
def synthetic_series(base_price: float,
                     days: int,
                     noise_frac: float = 0.08,
                     deal_prob: float = 0.07,
                     deal_frac: float = 0.12) -> np.ndarray:
    prices = []
    p = float(max(1.0, base_price))
    for _ in range(days):
        noise = np.random.uniform(-noise_frac, noise_frac) * p
        p = max(1.0, p + noise)
        if random.random() < deal_prob:
            p = max(1.0, p * (1.0 - deal_frac))
        prices.append(p)
    return np.array(prices, dtype=np.float32)


def make_sequences(series: np.ndarray, seq_len: int) -> Tuple[np.ndarray, np.ndarray]:
    X, y = [], []
    for i in range(len(series) - seq_len):
        X.append(series[i:i + seq_len])
        y.append(series[i + seq_len])
    return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32)


# --------------------------- Price forecasting --------------------------------
def load_lstm_model(h5_path: Path):
    if TF_AVAILABLE and h5_path.exists():
        try:
            return tf.keras.models.load_model(h5_path)
        except Exception:
            return None
    return None


def forecast_next_k(series: np.ndarray, k: int, seq_len: int, model=None) -> List[float]:
    """
    If a Keras model is available, use it; else moving-average fallback.
    """
    if model is not None:
        hist = list(series.astype(np.float32))
        scale = max(1e-6, float(np.max(series)))
        hist_norm = [v/scale for v in hist]
        preds = []
        for _ in range(k):
            if len(hist_norm) < seq_len:
                pad = [hist_norm[0]] * (seq_len - len(hist_norm)) + hist_norm
                window = np.array(pad[-seq_len:], dtype=np.float32).reshape(1, seq_len, 1)
            else:
                window = np.array(hist_norm[-seq_len:], dtype=np.float32).reshape(1, seq_len, 1)
            try:
                pred_norm = float(model.predict(window, verbose=0)[0, 0])
            except Exception:
                pred_norm = float(np.mean(hist_norm[-seq_len:]))
            pred = float(pred_norm * scale)
            preds.append(pred)
            hist.append(pred)
            hist_norm.append(pred_norm)
        return preds

    # Fallback: moving average of last seq_len
    window = min(seq_len, len(series))
    ma = float(np.mean(series[-window:]))
    return [ma] * k


# ----------------------- Simple sentiment (optional) --------------------------
def simple_sentiment_label(t: str) -> str:
    tl = (t or "").lower()
    if any(w in tl for w in ["bad", "poor", "worst", "awful"]):
        return "NEGATIVE"
    if any(w in tl for w in ["good", "great", "excellent", "amazing", "love"]):
        return "POSITIVE"
    return "NEUTRAL"


# -------------------- Fake review features (match training) -------------------
@dataclass
class ReviewFeatures:
    length: int
    exclam: int
    caps_ratio: float
    unique_ratio: float
    digits: int
    words: int


def featurize(text: str) -> ReviewFeatures:
    import re
    t = text or ""
    words = re.findall(r"[A-Za-z0-9']+", t)
    words_lower = [w.lower() for w in words]
    unique_ratio = (len(set(words_lower)) / (len(words_lower) + 1e-6))
    caps_ratio = (sum(1 for c in t if c.isupper()) / (len(t) + 1e-6))
    digits = sum(ch.isdigit() for ch in t)
    return ReviewFeatures(
        length=len(t),
        exclam=t.count("!"),
        caps_ratio=float(caps_ratio),
        unique_ratio=float(unique_ratio),
        digits=int(digits),
        words=len(words),
    )


def vectorize(f: ReviewFeatures) -> List[float]:
    return [f.length, f.exclam, f.caps_ratio, f.unique_ratio, f.digits, f.words]


# -------------------------------- Main workflow -------------------------------
def pick_product(df: pd.DataFrame, cols: Dict[str, str | None],
                 query: Optional[str], fallback_index: int = 0) -> Tuple[int, Dict]:
    title_col = cols["title"]
    if query:
        m = df[ df[title_col].astype(str).str.contains(query, case=False, na=False) ]
        if not m.empty:
            idx = int(m.index[0])
            row = m.iloc[0].to_dict()
            return idx, row
    # fallback: nth row overall
    row = df.iloc[fallback_index].to_dict()
    return int(df.index[fallback_index]), row


def main():
    np.random.seed(RANDOM_SEED)
    random.seed(RANDOM_SEED)

    out_dir = Path(ARTIFACT_DIR)
    out_dir.mkdir(parents=True, exist_ok=True)

    # 1) Load data and map columns
    df = load_csv(DATA_CSV)
    cols = map_columns(df)

    # 2) Choose product
    idx, row = pick_product(df, cols, PRODUCT_QUERY, PRODUCT_INDEX)
    title = str(row.get(cols["title"]))
    desc = str(row.get(cols["description"])) if cols["description"] else ""
    retail = row.get(cols["retail_price"])
    disc = row.get(cols["discounted_price"])
    base = None
    try:
        base = float(disc) if disc == disc else None  # NaN check
    except Exception:
        base = None
    if base is None:
        try:
            base = float(retail) if retail == retail else None
        except Exception:
            base = None
    if base is None:
        base = 100.0

    print(f"[INFO] Selected product @index {idx}: {title[:90]}{'...' if len(title)>90 else ''}")
    print(f"[INFO] Base price used: {base:.2f}")

    # 3) Build synthetic last SEED_DAYS price history (same logic as training)
    history = synthetic_series(base_price=base, days=SEED_DAYS,
                               noise_frac=0.08, deal_prob=0.07, deal_frac=0.12)
    dates_hist = [date.today() - timedelta(days=(SEED_DAYS - 1 - i)) for i in range(SEED_DAYS)]

    # 4) Load LSTM model (if available) + forecast
    model = load_lstm_model(H5_MODEL)
    preds = forecast_next_k(history, FORECAST_DAYS, SEQ_LEN, model=model)
    future_dates = [dates_hist[-1] + timedelta(days=i) for i in range(1, FORECAST_DAYS + 1)]

    # 5) Save forecast CSV
    df_forecast = pd.DataFrame({
        "date": future_dates,
        "pred_price": preds
    })
    forecast_csv = out_dir / "price_forecast.csv"
    df_forecast.to_csv(forecast_csv, index=False, encoding="utf-8")
    print(f"[OK] Forecast CSV -> {forecast_csv}")

    # 6) Plot: history + forecast
    fig, ax = plt.subplots(figsize=(8, 4.5), dpi=140)
    ax.plot(dates_hist, history, marker="o", linewidth=1, label="History")
    ax.plot(future_dates, preds, marker="o", linewidth=1, label=f"Forecast {FORECAST_DAYS}d")
    ax.set_title(f"Price History & Forecast\n{title[:70]}{'...' if len(title)>70 else ''}")
    ax.set_xlabel("Date")
    ax.set_ylabel("Price (₹)")
    ax.legend()
    fig.tight_layout()
    out_png = out_dir / "price_forecast.png"
    fig.savefig(out_png, bbox_inches="tight")
    plt.close(fig)
    print(f"[OK] Forecast plot -> {out_png}")

    # 7) Fake review scoring for the selected product
    # Build pseudo "reviews" from description/title slices (same as training sensibly)
    text_src = (desc if (desc and isinstance(desc, str)) else title) or ""
    chunks = [text_src[:140], text_src[140:280], text_src[280:420]]
    reviews = [c.strip() for c in chunks if c and len(c.strip()) > 10]

    if FAKE_REVIEW_PKL.exists() and reviews:
        with open(FAKE_REVIEW_PKL, "rb") as f:
            clf = pickle.load(f)
        X = np.array([vectorize(featurize(t)) for t in reviews], dtype=float)
        if hasattr(clf, "predict_proba"):
            probs = clf.predict_proba(X)[:, 1]
        else:
            if hasattr(clf, "decision_function"):
                z = clf.decision_function(X)
                probs = 1.0 / (1.0 + np.exp(-z))
            else:
                probs = clf.predict(X).astype(float)

        df_scores = pd.DataFrame({
            "review_snippet": reviews,
            "suspicious_proba": probs
        }).sort_values("suspicious_proba", ascending=False)
        out_scores = out_dir / "fake_review_scores.csv"
        df_scores.to_csv(out_scores, index=False, encoding="utf-8")
        print(f"[OK] Fake-review scores -> {out_scores}")
    else:
        print("[WARN] No reviews extracted OR model file missing; skipping fake-review scoring.")

    # 8) Optional: naive sentiment pie for the same reviews (quick visual)
    if reviews:
        labels = [simple_sentiment_label(t) for t in reviews]
        lab, counts = np.unique(labels, return_counts=True)
        fig2, ax2 = plt.subplots(figsize=(5, 4), dpi=140)
        ax2.pie(counts, labels=lab, autopct="%1.0f%%")
        ax2.set_title("Simple Sentiment (rule-based)")
        fig2.tight_layout()
        pie_png = out_dir / "sentiment_pie.png"
        fig2.savefig(pie_png, bbox_inches="tight")
        plt.close(fig2)
        print(f"[OK] Sentiment pie -> {pie_png}")

    # 9) Print a tiny summary
    summary = {
        "product_index": idx,
        "product_title": title,
        "history_points": len(history),
        "forecast_days": FORECAST_DAYS,
        "price_forecast_png": str(out_png),
        "price_forecast_csv": str(forecast_csv),
    }
    print("\n=== Prediction Summary ===")
    print(json.dumps(summary, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()


[INFO] Selected product @index 1081: JRB 1038 Smallest Mobile Powered By OTG Enabled Android Smart Phone Portable 1038 USB Fan
[INFO] Base price used: 249.00
[OK] Forecast CSV -> C:\Users\sagni\Downloads\Price Sense\price_forecast.csv
[OK] Forecast plot -> C:\Users\sagni\Downloads\Price Sense\price_forecast.png
[OK] Fake-review scores -> C:\Users\sagni\Downloads\Price Sense\fake_review_scores.csv
[OK] Sentiment pie -> C:\Users\sagni\Downloads\Price Sense\sentiment_pie.png

=== Prediction Summary ===
{
  "product_index": 1081,
  "product_title": "JRB 1038 Smallest Mobile Powered By OTG Enabled Android Smart Phone Portable 1038 USB Fan",
  "history_points": 60,
  "forecast_days": 7,
  "price_forecast_png": "C:\\Users\\sagni\\Downloads\\Price Sense\\price_forecast.png",
  "price_forecast_csv": "C:\\Users\\sagni\\Downloads\\Price Sense\\price_forecast.csv"
}
