In [23]:
# Import Dependencies
import pandas as pd
import re
from datetime import datetime
import tabulate

# 1. Conceptualize the Data

**Grain**  
Each row = one streaming event (a single user–content interaction), identified by `USER_ID`, `SESSION_ID` and `CONTENT_ID`.

**Key Metrics**  
- **Binge & Retention:** binge session count, total binge minutes, `CHURN_RISK_SCORE`  
- **Ad Performance:** `AD_IMPRESSIONS`, `AD_CLICKS`, CTR (`AD_CLICKS/AD_IMPRESSIONS`), `ECPM_USD`, ecommerce conversion rate  
- **Inventory & Revenue:** `FILL_RATE`, average `ECPM_USD`, `REVENUE_USD`, `MARGIN_USD`

**Key Dimensions**  
- **User & Segment:** `AGE_GROUP`, `SUBSCRIPTION_TYPE`  
- **Time:** `VIEW_DATE` (date), session window (e.g. last 30 days)  
- **Geography & Platform:** `COUNTRY_CODE`, `DEVICE_TYPE`, `PLATFORM`  
- **Content:** `CONTENT_TYPE`, `GENRE`, and combinations like `PLATFORM`×`GENRE`

**Example Record Story**  
> On **2024-10-12** at **19:30**, **User U01234** (Age 25–34, Premium) watched **Content C04567** (“Episode Title 123”, Drama) in a binge session:  
> - **Watch Duration:** 1,200 s (75% complete)  
> - **Binge Sessions (30 d):** 4 sessions; **Binge Minutes:** 80 min  
> - **Ad Performance:** 4 impressions, 1 click → **CTR:** 25%; **eCPM:** \$8.50; **Ecom Conversion:** Y  
> - **Inventory Metrics:** **Fill Rate:** 85%; **Revenue:** \$3.50; **Margin:** \$2.10  
> - **Churn Risk Score:** 0.10  


# 2. Locate Solvable Problems

In [24]:
# 1. Load raw data

df = pd.read_csv("../data/raw/content_events_50k_raw.csv")

In [25]:
# 2. Define your issues‐logging class

class IssuesLogger:
    """
    IssuesLogger performs data validation on a DataFrame,
    logging issues related to single-column and cross-field inconsistencies.

    It supports:
    - Null/missing value checks
    - Data type, format, range, and categorical validation
    - Duplicate detection
    - Cross-field logical checks (e.g., date ordering, session/user ID consistency)
    """

    def __init__(self, df):
        """
        Initialize with the DataFrame to check.

        Parameters:
            df (pd.DataFrame): The data to validate.
        """
        self.df = df
        self.issues = []

        # Define your expectations here or pass them in later if preferred
        self.EXPECTED = {
            "AGE_GROUP":        ["18-24","25-34","35-44","45-54","55-64","65+"],
            "GENDER":           ["Male","Female","Other","Prefer not to say"],
            "COUNTRY_CODE":     None,  # checked by regex
            "IS_RETURNING_USER": ["Y","N"],
            "CONTENT_TYPE":     df["CONTENT_TYPE"].dropna().unique().tolist(),
            "IS_BINGE_SESSION": ["Y","N"],
            "VIEWING_MODE":     df["VIEWING_MODE"].dropna().unique().tolist(),
            "SUBSCRIPTION_TYPE":df["SUBSCRIPTION_TYPE"].dropna().unique().tolist(),
            "AD_EXPOSURE":      ["Y","N"],
            "ECOMMERCE_CONVERSION":["Y","N"],
            "IS_1ST_PARTY_DATA": ["Y","N"],
            "EXIT_REASON":      df["EXIT_REASON"].dropna().unique().tolist(),
            "DATA_SOURCE":      df["DATA_SOURCE"].dropna().unique().tolist(),
        }

        self.REGEX = {
            "USER_ID":    r"^U\d+$",
            "SESSION_ID": r"^U\d+_S\d+$",
            "CONTENT_ID": r"^C\d+$",
            "COUNTRY_CODE": r"^[A-Z]{2}$",
        }

        self.RANGES = {
            "CHURN_RISK_SCORE":   (0.0, 1.0),
            "PERCENT_COMPLETED":  (0.0, 100.0),
            "DURATION_SEC":       (0, None),
            "WATCH_DURATION_SEC": (0, None),
            "FILL_RATE":          (0.0, 100.0),
            # add other numeric ranges as needed
        }

    def log(self, idx, col, issue, value, solvable="TBC", resolution=None):
        """
        Append an issue record to the issues list.

        Parameters:
            idx (int): Row index in original DataFrame.
            col (str or None): Column name related to the issue.
            issue (str): Description of the issue.
            value: Offending value(s).
            solvable (str): Whether issue is solvable ("Yes", "No", "TBC").
            resolution (str or None): Suggested resolution action.
        """
        self.issues.append({
            "row":       idx,
            "column":    col,
            "issue":     issue,
            "value":     value,
            "solvable?": solvable,
            "resolution": resolution
        })

    def check_nulls(self):
        """Check for null or missing values across all columns."""
        for col in self.df:
            null_idx = self.df.index[self.df[col].isna()]
            for idx in null_idx:
                self.log(idx, col, "Null/Missing", None, "TBC",
                         "Decide on drop or impute")

    def check_duplicates(self):
        """Identify fully duplicated rows."""
        dup_idx = self.df[self.df.duplicated(keep=False)].index
        for idx in dup_idx:
            self.log(idx, None, "Duplicate row", None, "Yes",
                     "drop_duplicates() later")

    def check_categoricals(self, expected_dict=None):
        """
        Validate categorical columns against expected categories.

        Parameters:
            expected_dict (dict): Mapping of column to list of allowed categories.
        """
        if expected_dict is None:
            expected_dict = self.EXPECTED

        for col, allowed in expected_dict.items():
            if allowed is None:
                continue
            for idx, val in self.df[col].dropna().items():
                if val not in allowed:
                    sol = "Yes" if str(val).lower() in map(str.lower, allowed) else "TBC"
                    res = f"Map to one of {allowed}"
                    self.log(idx, col, "Unexpected category", val, sol, res)

    def check_regex(self, regex_dict=None):
        """
        Validate string formats using regex patterns.

        Parameters:
            regex_dict (dict): Mapping of column to regex pattern.
        """
        if regex_dict is None:
            regex_dict = self.REGEX

        for col, pat in regex_dict.items():
            for idx, val in self.df[col].dropna().items():
                if not re.match(pat, str(val)):
                    self.log(idx, col, f"Bad format (≠ {pat})", val, "TBC",
                             f"Enforce regex {pat}")

    def check_ranges(self, ranges_dict=None):
        """
        Check numeric columns are within specified ranges.

        Parameters:
            ranges_dict (dict): Mapping of column to (min, max) tuple.
        """
        if ranges_dict is None:
            ranges_dict = self.RANGES

        for col, (lo, hi) in ranges_dict.items():
            ser = pd.to_numeric(self.df[col], errors="coerce")
            mask = ser.notna() & ((lo is not None and ser < lo) | (hi is not None and ser > hi))
            for idx in ser[mask].index:
                self.log(idx, col, f"Out of bounds [{lo},{hi}]", self.df.at[idx,col],
                         "TBC", f"Clamp or investigate")

    def check_date_formats(self, col, fmt="%Y-%m-%d"):
        """
        Validate that date columns conform to a specified format.

        Parameters:
            col (str): Column name.
            fmt (str): Expected date format string.
        """
        for idx, val in self.df[col].dropna().items():
            try:
                datetime.strptime(str(val), fmt)
            except Exception:
                self.log(idx, col, f"Bad date format (not {fmt})", val, "Yes",
                         f"Reformat to {fmt}")

    def check_user_id(self):
        """
        Validate USER_ID values, flag missing or BAD_ID patterns.
        Attempt to infer USER_ID from SESSION_ID prefix when possible.
        """
        for idx, val in self.df["USER_ID"].items():
            val_str = str(val) if pd.notna(val) else ""
            if pd.isna(val) or val_str.startswith("BAD_ID_"):
                session_id = self.df.at[idx, "SESSION_ID"]
                if pd.notna(session_id) and "_" in session_id:
                    possible_user = session_id.split("_")[0]
                    self.log(
                        idx,
                        "USER_ID",
                        "Missing or bad USER_ID, possible from SESSION_ID",
                        val,
                        "TBC",
                        f"Check with stakeholder, then use USER_ID from SESSION_ID"
                    )
                else:
                    self.log(
                        idx,
                        "USER_ID",
                        "Missing or bad USER_ID",
                        val,
                        "TBC",
                        "No SESSION_ID to infer USER_ID from"
                    )

    def check_cross_field(self):
        """
        Perform cross-field validations including logical date orders,
        session and user ID consistency, ads rules, and duration checks.
        """
        # Convert datetime columns upfront for efficiency
        acd = pd.to_datetime(self.df["ACCOUNT_CREATION_DATE"], errors="coerce")
        rel = pd.to_datetime(self.df["RELEASE_DATE"], errors="coerce")
        start = pd.to_datetime(self.df["START_TIME"], errors="coerce")
        end = pd.to_datetime(self.df["END_TIME"], errors="coerce")
        view_date = pd.to_datetime(self.df["VIEW_DATE"], errors="coerce") if "VIEW_DATE" in self.df else None

        # 1) START_TIME ≤ END_TIME
        mask_start_after_end = start > end
        for idx in self.df[mask_start_after_end].index:
            self.log(idx, "START_TIME/END_TIME", "start > end", 
                     (self.df.at[idx,"START_TIME"], self.df.at[idx,"END_TIME"]),
                     "TBC", "Swap or investigate timestamps")

        # 2) RELEASE_DATE ≤ START_TIME
        mask_release_after_start = rel > start
        for idx in self.df[mask_release_after_start].index:
            self.log(idx, "RELEASE_DATE/START_TIME", "release after start",
                     (self.df.at[idx,"RELEASE_DATE"], self.df.at[idx,"START_TIME"]),
                     "TBC", "Fix source or drop")

        # 3) ACCOUNT_CREATION_DATE ≤ START_TIME
        mask_acd_after_start = acd > start
        for idx in self.df[mask_acd_after_start].index:
            self.log(idx, "ACCOUNT_CREATION_DATE/START_TIME", "account created after start",
                     (self.df.at[idx,"ACCOUNT_CREATION_DATE"], self.df.at[idx,"START_TIME"]),
                     "TBC", "Check source or drop")

        # 4) START_TIME and END_TIME within VIEW_DATE (if VIEW_DATE present)
        if view_date is not None:
            mask_start_before_view = start.dt.date < view_date.dt.date
            mask_end_after_view = end.dt.date > view_date.dt.date
            for idx in self.df[mask_start_before_view | mask_end_after_view].index:
                self.log(idx, "START_TIME/END_TIME/VIEW_DATE", "session times outside VIEW_DATE",
                         (self.df.at[idx,"START_TIME"], self.df.at[idx,"END_TIME"], self.df.at[idx,"VIEW_DATE"]),
                         "TBC", "Check session and view date consistency")

        # 5) WATCH_DURATION_SEC ≈ END_TIME - START_TIME (±1 second)
        for idx, row in self.df.iterrows():
            try:
                delta = (pd.to_datetime(row["END_TIME"]) - pd.to_datetime(row["START_TIME"])).total_seconds()
                if pd.notna(row["WATCH_DURATION_SEC"]) and abs(delta - row["WATCH_DURATION_SEC"]) > 1:
                    self.log(idx, "WATCH_DURATION_SEC", "Duration mismatch", row["WATCH_DURATION_SEC"], "TBC",
                             "Recompute from timestamps")
            except Exception:
                # Ignore rows with invalid timestamps here, already caught elsewhere
                pass

        # 6) Ads specific rules
        ad_mask = self.df["CONTENT_TYPE"].str.lower() == "ad"
        for idx in self.df[ad_mask].index:
            row = self.df.loc[idx]
            # content_title must include "Ad Creative"
            if "ad creative" not in str(row["CONTENT_TITLE"]).lower():
                self.log(idx, "CONTENT_TITLE", "Ad missing 'Ad Creative'", row["CONTENT_TITLE"], "TBC")

            # AD_EXPOSURE must be "Y"
            if row["AD_EXPOSURE"] != "Y":
                self.log(idx, "AD_EXPOSURE", "Ad must have exposure 'Y'", row["AD_EXPOSURE"], "Yes", "Set to 'Y'")

            # AD_ID must be present
            if pd.isna(row["AD_ID"]):
                self.log(idx, "AD_ID", "Missing AD_ID for Ad content", None, "TBC")

            # RATING should be null for ads
            if pd.notna(row["RATING"]):
                self.log(idx, "RATING", "Ads should not have rating", row["RATING"], "Yes", "Drop rating")

        # 7) CONTENT_TYPE and AD_EXPOSURE consistency for non-ads
        non_ad_mask = self.df["CONTENT_TYPE"].str.lower() != "ad"
        for idx in self.df[non_ad_mask].index:
            row = self.df.loc[idx]
            if row["AD_EXPOSURE"] not in [None, "N", "n", ""]:
                self.log(idx, "AD_EXPOSURE", "Non-ad content should not have AD_EXPOSURE 'Y'", row["AD_EXPOSURE"], "Yes", "Set to 'N' or null")

            if pd.notna(row.get("AD_ID", None)):
                self.log(idx, "AD_ID", "Non-ad content should not have AD_ID", row["AD_ID"], "Yes", "Clear AD_ID")

        # 8) EPISODE_NUMBER numeric & positive if CONTENT_TYPE == "show"
        show_mask = self.df["CONTENT_TYPE"].str.lower() == "show"
        for idx in self.df[show_mask].index:
            val = self.df.at[idx, "EPISODE_NUMBER"] if "EPISODE_NUMBER" in self.df else None
            if pd.isna(val) or not (isinstance(val, (int, float)) and val > 0):
                self.log(idx, "EPISODE_NUMBER", "Invalid episode number for show content", val, "TBC", "Ensure positive integer episode number")

        # 9) USER_ID and SESSION_ID matching
        for idx, session_val in self.df["SESSION_ID"].dropna().items():
            if "_" in session_val:
                user_part = session_val.split("_")[0]
                actual_user = self.df.at[idx, "USER_ID"]
                if actual_user != user_part:
                    self.log(idx, "SESSION_ID/USER_ID", "SESSION_ID user part and USER_ID mismatch",
                             (actual_user, user_part), "TBC", "Investigate source or align USER_ID with SESSION_ID")

    def run_all(self):
        """
        Run all checks in a logical sequence.
        """
        self.check_nulls()
        self.check_duplicates()
        self.check_categoricals()
        self.check_regex()
        self.check_ranges()
        self.check_date_formats("ACCOUNT_CREATION_DATE", "%Y-%m-%d")
        self.check_date_formats("RELEASE_DATE", "%Y-%m-%d")
        if "VIEW_DATE" in self.df:
            self.check_date_formats("VIEW_DATE", "%Y-%m-%d")
        self.check_user_id()
        self.check_cross_field()

    def issues_df(self):
        """
        Aggregate issues by type and return a summary DataFrame with counts.

        Returns:
            pd.DataFrame: Summary of issues grouped by column, issue, solvable, and resolution,
                          with row counts of affected rows.
        """
        df = pd.DataFrame(self.issues)
        if df.empty:
            return pd.DataFrame(columns=["column","issue","row_count","solvable?","resolution"])

        summary = (
            df
            .groupby(
                ["column","issue","solvable?","resolution"],
                dropna=False,
                as_index=False
            )
            .agg(row_count = pd.NamedAgg(column="row", aggfunc=lambda x: x.nunique()))
        )
        return summary[
            ["column","issue","row_count","solvable?","resolution"]
        ]


In [26]:
# Run your audit
audit = IssuesLogger(df)
audit.run_all()
issues = audit.issues_df()

# Sort by most frequent issues
issues = issues.sort_values(by="row_count", ascending=False)

# Preview top 15
print(issues.head(15))


                           column  \
8                  EPISODE_NUMBER   
12                         RATING   
2                     AD_EXPOSURE   
4                           AD_ID   
5                           AD_ID   
7                        ECPM_USD   
13        RELEASE_DATE/START_TIME   
6                    COUNTRY_CODE   
11                         RATING   
15             SESSION_ID/USER_ID   
3                           AD_ID   
1                     AD_EXPOSURE   
9                          GENDER   
16  START_TIME/END_TIME/VIEW_DATE   
10              IS_RETURNING_USER   

                                             issue  row_count solvable?  \
8                                     Null/Missing      30909       TBC   
12                                    Null/Missing      28530       TBC   
2   Non-ad content should not have AD_EXPOSURE 'Y'      25911       Yes   
4             Non-ad content should not have AD_ID      25911       Yes   
5                               

In [27]:
# 4. Export your skeleton

# Export full issues log to CSV
issues.to_csv("../reports/01_data_issues_log.csv", index=False)

# Optionally print full table in markdown format
print(issues.to_markdown(index=False))

| column                        | issue                                            |   row_count | solvable?   | resolution                                                     |
|:------------------------------|:-------------------------------------------------|------------:|:------------|:---------------------------------------------------------------|
| EPISODE_NUMBER                | Null/Missing                                     |       30909 | TBC         | Decide on drop or impute                                       |
| RATING                        | Null/Missing                                     |       28530 | TBC         | Decide on drop or impute                                       |
| AD_EXPOSURE                   | Non-ad content should not have AD_EXPOSURE 'Y'   |       25911 | Yes         | Set to 'N' or null                                             |
| AD_ID                         | Non-ad content should not have AD_ID             |       25911 | Yes        

## 5. Evaluate Unsolvable Issues

Filter our issues log for the rows we marked `solvable? = N` or `solvable? = TBC` and record why we can’t fix them now, or what needs to be known before we can fix them.
