In [1]:
# =========================
# 02_data_layer.ipynb
# Data Layer Construction (pandas table abstraction)
# =========================

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Optional, Tuple

import pandas as pd


# ---------- Config ----------
RAW_FILE = "play_reviews_ingested.csv"
PROCESSED_FILE = "reviews_enriched.csv"

RAW_TABLE_NAME = "play_reviews"
PROCESSED_TABLE_NAME = "reviews_enriched"

JOIN_KEY = "review_id"

# Optional output (set to None if you don't want to save)
MERGED_OUTPUT_NAME = "reviews_merged.csv"


# ---------- Helpers ----------
def resolve_repo_root() -> Path:
    """
    Robustly find repo root when running from:
    - repo root
    - notebooks/ directory
    """
    cwd = Path.cwd()

    # Common patterns
    if (cwd / "data").exists() and (cwd / "notebooks").exists():
        return cwd

    if cwd.name == "notebooks" and (cwd.parent / "data").exists():
        return cwd.parent

    # Fallback: climb up a few levels looking for /data
    cur = cwd
    for _ in range(5):
        if (cur / "data").exists():
            return cur
        cur = cur.parent

    raise FileNotFoundError(
        "Could not locate repo root containing a 'data/' directory. "
        "Run this notebook from repo root or notebooks/."
    )


def assert_columns_exist(df: pd.DataFrame, required: Tuple[str, ...], table_name: str) -> None:
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"Table '{table_name}' is missing required columns: {missing}")


def safe_read_csv(path: Path, table_name: str) -> pd.DataFrame:
    if not path.exists():
        raise FileNotFoundError(f"Missing file for table '{table_name}': {path}")
    return pd.read_csv(path)


@dataclass
class DataLayer:
    """
    Lightweight relational-style data layer using pandas tables.
    """
    repo_root: Path
    raw_dir: Path
    processed_dir: Path
    tables_raw: Dict[str, pd.DataFrame]
    tables_processed: Dict[str, pd.DataFrame]

    @classmethod
    def build(cls) -> "DataLayer":
        repo_root = resolve_repo_root()
        raw_dir = repo_root / "data" / "raw"
        processed_dir = repo_root / "data" / "processed"

        # Load as explicit "tables"
        raw_path = raw_dir / RAW_FILE
        processed_path = processed_dir / PROCESSED_FILE

        tables_raw = {
            RAW_TABLE_NAME: safe_read_csv(raw_path, RAW_TABLE_NAME)
        }
        tables_processed = {
            PROCESSED_TABLE_NAME: safe_read_csv(processed_path, PROCESSED_TABLE_NAME)
        }

        return cls(
            repo_root=repo_root,
            raw_dir=raw_dir,
            processed_dir=processed_dir,
            tables_raw=tables_raw,
            tables_processed=tables_processed
        )

    def describe(self) -> None:
        print("Repo root:", self.repo_root)
        print("\n[RAW TABLES]")
        for name, df in self.tables_raw.items():
            print(f" - {name}: shape={df.shape}")
        print("\n[PROCESSED TABLES]")
        for name, df in self.tables_processed.items():
            print(f" - {name}: shape={df.shape}")

    def integrity_checks(self, join_key: str = JOIN_KEY) -> Dict[str, float]:
        raw = self.tables_raw[RAW_TABLE_NAME]
        processed = self.tables_processed[PROCESSED_TABLE_NAME]

        assert_columns_exist(raw, (join_key,), RAW_TABLE_NAME)
        assert_columns_exist(processed, (join_key,), PROCESSED_TABLE_NAME)

        raw_ids = raw[join_key].astype(str)
        processed_ids = processed[join_key].astype(str)

        raw_unique = raw_ids.nunique(dropna=True)
        processed_unique = processed_ids.nunique(dropna=True)

        overlap = len(set(raw_ids.dropna()) & set(processed_ids.dropna()))
        coverage = overlap / raw_unique if raw_unique else 0.0

        # Duplicates
        raw_dup_rate = 1 - (raw_unique / len(raw_ids)) if len(raw_ids) else 0.0
        processed_dup_rate = 1 - (processed_unique / len(processed_ids)) if len(processed_ids) else 0.0

        results = {
            "raw_rows": float(len(raw_ids)),
            "raw_unique_ids": float(raw_unique),
            "processed_rows": float(len(processed_ids)),
            "processed_unique_ids": float(processed_unique),
            "id_overlap": float(overlap),
            "processed_coverage_of_raw": float(coverage),
            "raw_duplicate_rate": float(raw_dup_rate),
            "processed_duplicate_rate": float(processed_dup_rate),
        }
        return results

    def build_analysis_view(
        self,
        join_key: str = JOIN_KEY,
        processed_keep_cols: Optional[list] = None
    ) -> pd.DataFrame:
        """
        Create an analysis-ready merged dataframe (like a SQL view).
        """
        raw = self.tables_raw[RAW_TABLE_NAME].copy()
        processed = self.tables_processed[PROCESSED_TABLE_NAME].copy()

        assert_columns_exist(raw, (join_key,), RAW_TABLE_NAME)
        assert_columns_exist(processed, (join_key,), PROCESSED_TABLE_NAME)

        # normalize join key to string to avoid dtype mismatch
        raw[join_key] = raw[join_key].astype(str)
        processed[join_key] = processed[join_key].astype(str)

        # choose columns from processed to avoid duplicate columns explosion
        if processed_keep_cols is None:
            # Default: keep everything except columns that definitely exist in raw (excluding join_key)
            raw_cols = set(raw.columns)
            keep = [c for c in processed.columns if (c == join_key) or (c not in raw_cols)]
            processed_keep_cols = keep

        merged = raw.merge(
            processed[processed_keep_cols],
            on=join_key,
            how="left"
        )

        return merged

    def save_merged(self, merged: pd.DataFrame, filename: str = MERGED_OUTPUT_NAME) -> Optional[Path]:
        if not filename:
            return None
        out_path = self.processed_dir / filename
        merged.to_csv(out_path, index=False)
        return out_path


# ---------- Run the data layer ----------
dl = DataLayer.build()
dl.describe()

print("\n[RELATIONAL INTEGRITY CHECKS]")
checks = dl.integrity_checks()
for k, v in checks.items():
    if "rate" in k or "coverage" in k:
        print(f"{k}: {v:.2%}")
    else:
        print(f"{k}: {int(v)}")

print("\n[BUILD ANALYSIS VIEW]")
# You can customize keep columns here if you want only a few from processed:
# processed_keep_cols = ["review_id", "sentiment_label", "sentiment_score", "language", "review_len"]
merged = dl.build_analysis_view()

print("Merged shape:", merged.shape)
display(merged.head(5))

# ---------- Optional: save merged ----------
out_path = dl.save_merged(merged)
print("\nSaved merged dataset to:", out_path)


Repo root: d:\intern project\google-play-reviews

[RAW TABLES]
 - play_reviews: shape=(26880, 11)

[PROCESSED TABLES]
 - reviews_enriched: shape=(26880, 13)

[RELATIONAL INTEGRITY CHECKS]
raw_rows: 26880
raw_unique_ids: 26880
processed_rows: 26880
processed_unique_ids: 26880
id_overlap: 26880
processed_coverage_of_raw: 100.00%
raw_duplicate_rate: 0.00%
processed_duplicate_rate: 0.00%

[BUILD ANALYSIS VIEW]
Merged shape: (26880, 17)


Unnamed: 0,review_id,user_name,user_image_url,review_text,rating,thumbs_up,review_created_version,review_time,reply_text,reply_time,app_version,thumbs_up_count,reply_content,replied_at,review_len,word_count,sentiment
0,2876fe36-e40d-45e4-98c6-509d937232a9,Mahakal Gulshan Sharma,https://play-lh.googleusercontent.com/a-/ALV-U...,radhe radhe,3,0,,2025-12-29 10:43:25,,,,0,,,11,2,0.0
1,f45da16f-dd83-4224-8093-d857a616afb9,Donthi Naresh,https://play-lh.googleusercontent.com/a/ACg8oc...,super,5,0,1.2025.350,2025-12-29 10:41:40,,,1.2025.350,0,,,5,1,0.5994
2,6af6aec0-a07b-4ffc-8b27-5231ebf1ff23,Royrex Ndlovu,https://play-lh.googleusercontent.com/a-/ALV-U...,ultra super app,5,0,1.2025.350,2025-12-29 10:41:21,,,1.2025.350,0,,,15,3,0.5994
3,2686a432-6404-4082-82fa-cb9e1144c226,mohamed,https://play-lh.googleusercontent.com/a/ACg8oc...,It is great,5,0,1.2025.350,2025-12-29 10:40:17,,,1.2025.350,0,,,11,3,0.6249
4,87f13bab-ce7f-4bca-9ab1-48075715b93c,Rahul Rajput,https://play-lh.googleusercontent.com/a/ACg8oc...,bhut bura aap h,1,0,,2025-12-29 10:40:15,,,,0,,,15,4,0.0



Saved merged dataset to: d:\intern project\google-play-reviews\data\processed\reviews_merged.csv
