In [2]:
"""
customer_feedback_sentiment.py

Run in Jupyter or as a script. Adjust the input paths if needed.

What it does (high level):
- Reads a raw CSV of reviews (customer_reviews.csv).
- Loads large positive/negative lexicon files (text files).
- Cleans and tokenizes each review.
- Matches multi-word or single-word lexicon entries as whole-phrase matches.
- Applies simple negation handling (flip polarity if a negation occurs within a small window).
- Counts positive/negative matches, normalizes to a sentiment score, assigns a label.
- Saves processed CSV, summary CSV, Excel workbook and two PNG charts.

Edit the variables in the CONFIG section below to point to your files.
"""

import os
import re
import sys
from collections import defaultdict
from typing import List, Tuple, Set

import pandas as pd
import matplotlib.pyplot as plt

# ----------------- CONFIG: adjust these paths for your machine -----------------
RAW_REVIEWS_CSV = r"C:\Users\victor\Downloads\customer_reviews.csv"
POSITIVE_LEXICON_PATH = r"C:\Users\victor\Desktop\VIC\dsc\positive-words.txt"
NEGATIVE_LEXICON_PATH = r"C:\Users\victor\Desktop\VIC\dsc\negative-words.txt"

# Fallback path if the exact windows path is not found (useful for testing here)
FALLBACK_POS_PATH = "positive-words.txt"   # change if you put the file elsewhere
FALLBACK_NEG_PATH = "negative-words.txt"

# Output filenames (will be created in the current working directory)
OUT_PROCESSED_CSV = "customer_reviews_processed_by_you.csv"
OUT_SUMMARY_CSV = "summary_by_product_by_you.csv"
OUT_EXCEL = "customer_feedback_dashboard_by_you.xlsx"
OUT_SENT_PNG = "sentiment_distribution_by_you.png"
OUT_RATING_PNG = "avg_rating_by_product_by_you.png"

# Parameters you might want to tune
NEGATION_WINDOW = 3     # how many tokens before a matched phrase to look for a negation
POSITIVE_THRESHOLD = 0.08
NEGATIVE_THRESHOLD = -0.05

# ----------------- Helper functions -----------------
def try_open_lexicon(path: str) -> List[str]:
    """
    Read a lexicon file, ignoring comment lines (starting with ';' or '#') and blank lines.
    Attempt utf-8 first, then latin-1 (common for older lexicon files).
    Returns a list of raw lines (not yet cleaned).
    """
    if not os.path.exists(path):
        raise FileNotFoundError(f"Lexicon file not found: {path}")

    for encoding in ("utf-8", "latin-1"):
        try:
            with open(path, "r", encoding=encoding) as f:
                lines = [ln.rstrip("\n") for ln in f]
            # filter out comment header lines and blank lines
            items = []
            for ln in lines:
                ln_stripped = ln.strip()
                if not ln_stripped:
                    continue
                if ln_stripped.startswith(";") or ln_stripped.startswith("#"):
                    continue
                items.append(ln_stripped)
            return items
        except UnicodeDecodeError:
            continue
    # If we reach here, raise a helpful error
    raise UnicodeDecodeError(f"Cannot decode lexicon file {path} with utf-8 or latin-1.")


_re_non_alnum = re.compile(r"[^a-z0-9\s]")  # compile once

def clean_text_to_string(s: str) -> str:
    """
    Normalize text for phrase matching:
    - Lowercase
    - Replace non-alphanumeric characters with spaces
    - Collapse multiple spaces
    - Surround with single spaces (padding) to enable whole-phrase substring matching.
    Returns cleaned string, e.g. " this is text "
    """
    if s is None:
        s = ""
    s = str(s).lower()
    s = _re_non_alnum.sub(" ", s)         # replace punctuation with space
    s = re.sub(r"\s+", " ", s).strip()   # collapse multi-space
    return f" {s} "                       # pad with spaces

def phrase_to_clean(phrase: str) -> str:
    """
    Convert a lexicon phrase into the same cleaned-padded format used for review text.
    For example "fast delivery" -> " fast delivery "
    """
    p = phrase.lower()
    p = _re_non_alnum.sub(" ", p)
    p = re.sub(r"\s+", " ", p).strip()
    return f" {p} "

def find_phrase_positions(tokens: List[str], phrase_tokens: List[str]) -> List[int]:
    """
    Return list of start indices in tokens where phrase_tokens exactly match.
    Example: tokens = ['i','love','it','fast','delivery']; phrase_tokens = ['fast','delivery'] -> [3]
    This is token-level detection (used if you choose token-based approach).
    Note: we primarily use cleaned-string substring matching (below) for simplicity with multi-word phrases.
    """
    positions = []
    L = len(phrase_tokens)
    if L == 0:
        return positions
    for i in range(len(tokens) - L + 1):
        if tokens[i:i+L] == phrase_tokens:
            positions.append(i)
    return positions

# ----------------- Core sentiment function -----------------
class SentimentLexicon:
    def __init__(self, positive_list: List[str], negative_list: List[str]):
        """
        Prepare lexicon: create cleaned phrase -> original phrase mapping,
        and order entries by descending token length so longer phrases match first.
        """
        # Clean each entry and ignore empty lines
        def prepare(list_in):
            cleaned = []
            for ln in list_in:
                ln2 = ln.strip()
                if not ln2:
                    continue
                c = phrase_to_clean(ln2)    # " phrase words "
                # store pairs (cleaned_phrase, token_length)
                token_length = len(c.split())
                cleaned.append((c, token_length, ln2))
            # sort by token_length descending to match longer phrases first
            cleaned.sort(key=lambda x: -x[1])
            return cleaned

        self.pos = prepare(positive_list)
        self.neg = prepare(negative_list)

        # small set of negation words (expandable)
        self.negations = {"not","no","never","none","cannot","can't","dont","don't","didnt","didn't","doesnt","doesn't","won't","wouldn't","couldn't","isn't","isnt"}

    def analyze_review(self, raw_text: str, negation_window:int = NEGATION_WINDOW) -> Tuple[int,int,int,List[Tuple[str,str,int]]]:
        """
        Analyze a single review text.
        Returns:
         - pos_count (int)
         - neg_count (int)
         - token_count (int)
         - matches (list of tuples): (matched_phrase_orig, polarity('pos'/'neg'), start_char_index_in_cleaned_text)
        Explanation:
         - We do substring checks on a cleaned, padded string to detect multi-word phrases as whole-phrases.
         - We avoid double-counting overlapping matched tokens by tracking used character index ranges.
         - We apply simple negation handling: if a negation word appears within `negation_window` tokens BEFORE the matched phrase, the phrase polarity flips.
        """
        cleaned = clean_text_to_string(raw_text)  # e.g. " i love it fast delivery "
        token_count = 0
        # token_count approximate: count words in cleaned (strip spaces then split)
        token_count = len(cleaned.strip().split())

        used_char_indices: Set[int] = set()
        pos_count = 0
        neg_count = 0
        matches = []

        # helper: find token index of char position to check negation window
        cleaned_tokens = cleaned.strip().split()  # no external punctuation now
        # create a mapping from token index to char pos in cleaned string
        # (we can compute approximate token positions by splitting the cleaned string)
        cum_chars = []
        cpos = 1  # because cleaned string starts with a space we added; tokenization uses strip
        for t in cleaned_tokens:
            cum_chars.append(cpos)
            cpos += len(t) + 1  # token + single space

        # function to check negation within window tokens before a token index (word idx)
        def has_negation_before_word(word_idx:int) -> bool:
            # check previous negation_window tokens
            start = max(0, word_idx - negation_window)
            for idx in range(start, word_idx):
                if cleaned_tokens[idx] in self.negations:
                    return True
            return False

        # A function to mark used char ranges (avoid overlap)
        def range_overlaps_any(start_char:int, length:int) -> bool:
            for ch in range(start_char, start_char + length):
                if ch in used_char_indices:
                    return True
            return False

        def mark_range(start_char:int, length:int):
            for ch in range(start_char, start_char + length):
                used_char_indices.add(ch)

        # We'll search positive phrases first, then negative phrases.
        # This order is a design choice: long positive phrases should match before short negative tokens
        # to reduce accidental partial overlaps. The lists are pre-sorted by token length descending.
        for cleaned_list, polarity in ((self.pos, "pos"), (self.neg, "neg")):
            for cleaned_phrase, _, orig_phrase in cleaned_list:
                # cleaned_phrase includes leading/trailing space, cleaned
                # find all substring positions
                start_idx = 0
                while True:
                    found_at = cleaned.find(cleaned_phrase, start_idx)
                    if found_at == -1:
                        break
                    # ensure we don't double-count overlapping matches
                    if range_overlaps_any(found_at, len(cleaned_phrase)):
                        # skip this occurrence and continue searching after it
                        start_idx = found_at + 1
                        continue

                    # Determine the approximate word index where phrase starts:
                    # count how many spaces are before found_at in cleaned
                    # (since cleaned has single spaces) -> token_index = number of tokens before this match
                    # We'll use split on the substring up to found_at
                    prefix = cleaned[:found_at].strip()
                    if prefix == "":
                        word_idx = 0
                    else:
                        word_idx = len(prefix.split())

                    # negation check: if negation present within the previous NEGATION_WINDOW tokens, flip polarity
                    negated = has_negation_before_word(word_idx)

                    # Decide final polarity for counting
                    if polarity == "pos":
                        if negated:
                            # counts as negative
                            neg_count += 1
                            matches.append((orig_phrase, "pos->neg", found_at))
                        else:
                            pos_count += 1
                            matches.append((orig_phrase, "pos", found_at))
                    else:  # polarity == "neg"
                        if negated:
                            # negation of negative -> becomes positive
                            pos_count += 1
                            matches.append((orig_phrase, "neg->pos", found_at))
                        else:
                            neg_count += 1
                            matches.append((orig_phrase, "neg", found_at))

                    # mark char range as used and move start_idx forward
                    mark_range(found_at, len(cleaned_phrase))
                    start_idx = found_at + len(cleaned_phrase)

        return pos_count, neg_count, token_count, matches

# ----------------- Main processing routine -----------------
def process_reviews(csv_path: str,
                    pos_lex_path: str,
                    neg_lex_path: str,
                    out_processed_csv: str = OUT_PROCESSED_CSV,
                    out_summary_csv: str = OUT_SUMMARY_CSV,
                    out_excel: str = OUT_EXCEL):
    # 1) Read reviews CSV
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"Reviews CSV not found at: {csv_path}")
    df = pd.read_csv(csv_path, parse_dates=["review_date"])
    print(f"Loaded {len(df)} reviews from {csv_path}")

    # 2) Load lexicons (with helpful fallbacks)
    def load_safe(path, fallback):
        if os.path.exists(path):
            try:
                items = try_open_lexicon(path)
                print(f"Loaded {len(items)} entries from {path}")
                return items
            except Exception as e:
                print(f"Error reading {path}: {e}")
                # try fallback if available
        if os.path.exists(fallback):
            try:
                items = try_open_lexicon(fallback)
                print(f"Loaded {len(items)} entries from fallback {fallback}")
                return items
            except Exception as e:
                raise RuntimeError(f"Failed to read both {path} and fallback {fallback}: {e}")
        raise FileNotFoundError(f"Neither {path} nor fallback {fallback} exist.")

    pos_list = load_safe(pos_lex_path, FALLBACK_POS_PATH)
    neg_list = load_safe(neg_lex_path, FALLBACK_NEG_PATH)

    # 3) Initialize lexicon analyzer
    lex = SentimentLexicon(pos_list, neg_list)

    # 4) Compute sentiment for each review. We'll create columns:
    #    sentiment_score, sentiment_label, pos_count, neg_count, token_count
    results = []
    for i, text in enumerate(df['review_text'].fillna("").astype(str)):
        pos_c, neg_c, token_c, matches = lex.analyze_review(text, negation_window=NEGATION_WINDOW)
        score = (pos_c - neg_c) / max(1, token_c)
        if score > POSITIVE_THRESHOLD:
            label = "positive"
        elif score < NEGATIVE_THRESHOLD:
            label = "negative"
        else:
            label = "neutral"
        results.append({
            "pos_count": pos_c,
            "neg_count": neg_c,
            "token_count": token_c,
            "sentiment_score": round(score, 4),
            "sentiment_label": label,
            # (optional) keep a small summary of matches for debugging - join first few matches:
            "match_preview": "; ".join([f"{m[0]}|{m[1]}" for m in matches[:6]])
        })
        # (Optional) progress print every 200 rows
        if (i+1) % 200 == 0:
            print(f"Processed {i+1} reviews...")

    # join results into dataframe
    rdf = pd.DataFrame(results)
    df_out = pd.concat([df.reset_index(drop=True), rdf.reset_index(drop=True)], axis=1)

    # 5) Save processed CSV
    df_out.to_csv(out_processed_csv, index=False)
    print(f"Saved processed reviews to {out_processed_csv}")

    # 6) Aggregate summary by product
    summary = df_out.groupby(['product_id','product_name']).agg(
        total_reviews = ('review_id','count'),
        avg_rating = ('rating','mean'),
        avg_sentiment_score = ('sentiment_score','mean'),
        positive_count = ('sentiment_label', lambda x: (x=='positive').sum()),
        neutral_count = ('sentiment_label', lambda x: (x=='neutral').sum()),
        negative_count = ('sentiment_label', lambda x: (x=='negative').sum())
    ).reset_index()
    summary.to_csv(out_summary_csv, index=False)
    print(f"Saved summary by product to {out_summary_csv}")

    # 7) Save Excel workbook with two sheets
    with pd.ExcelWriter(out_excel, engine='openpyxl') as writer:
        df_out.to_excel(writer, sheet_name='processed_reviews', index=False)
        summary.to_excel(writer, sheet_name='summary_by_product', index=False)
    print(f"Wrote Excel dashboard to {out_excel}")

    # 8) Create two charts and save PNGs
    # Sentiment distribution (overall)
    sent_order = ['positive','neutral','negative']
    sent_counts = df_out['sentiment_label'].value_counts().reindex(sent_order).fillna(0)
    plt.figure(figsize=(6,4))
    sent_counts.plot(kind='bar')
    plt.title("Customer Review Sentiment Distribution")
    plt.xlabel("Sentiment")
    plt.ylabel("Count")
    plt.tight_layout()
    plt.savefig(OUT_SENT_PNG)
    plt.close()
    print(f"Saved sentiment distribution chart to {OUT_SENT_PNG}")

    # Average rating by product
    plt.figure(figsize=(8,4))
    summary_sorted = summary.sort_values('avg_rating', ascending=False)
    plt.bar(summary_sorted['product_name'], summary_sorted['avg_rating'])
    plt.title("Average Customer Rating by Product")
    plt.xlabel("Product")
    plt.ylabel("Average Rating")
    plt.xticks(rotation=20, ha='right')
    plt.tight_layout()
    plt.savefig(OUT_RATING_PNG)
    plt.close()
    print(f"Saved avg rating chart to {OUT_RATING_PNG}")

    # Done
    return df_out, summary

# --------------- If executed as a script, run process ----------------
if __name__ == "__main__":
    # attempt to handle common mistakes - inform user if files are missing
    try:
        df_processed, summary_tbl = process_reviews(RAW_REVIEWS_CSV, POSITIVE_LEXICON_PATH, NEGATIVE_LEXICON_PATH)
        print("Processing complete. Check the generated CSV/Excel/PNG files in the current folder.")
    except Exception as e:
        print("Error during processing:", e)
        print("Tips:")
        print("- Verify RAW_REVIEWS_CSV, POSITIVE_LEXICON_PATH and NEGATIVE_LEXICON_PATH exist and are correct.")
        print("- If lexicon files fail to decode, try opening them and saving as UTF-8 or use Latin-1.")
        sys.exit(1)


Loaded 120 reviews from C:\Users\victor\Downloads\customer_reviews.csv
Loaded 2006 entries from C:\Users\victor\Desktop\VIC\dsc\positive-words.txt
Loaded 4783 entries from C:\Users\victor\Desktop\VIC\dsc\negative-words.txt
Saved processed reviews to customer_reviews_processed_by_you.csv
Saved summary by product to summary_by_product_by_you.csv
Wrote Excel dashboard to customer_feedback_dashboard_by_you.xlsx
Saved sentiment distribution chart to sentiment_distribution_by_you.png
Saved avg rating chart to avg_rating_by_product_by_you.png
Processing complete. Check the generated CSV/Excel/PNG files in the current folder.
