In [4]:
# === Quick annotation helpers from two CSVs ===
# Uses:
#   - brand_active_leaders__by_label_skin.csv  (brand metrics per Label × Skin)
#   - product_active_counts__by_label_skin.csv (product rows with active_count_skin)
#
# What you get:
#   - state_summary(label, skin, max_price=None, topn=5)
#   - top_products_for_brand(label, skin, brand, max_price=None, topk=3)
#
# Example calls are at the bottom.

from pathlib import Path
import pandas as pd
import numpy as np

# -------- locate files (adjust names/paths if yours differ) --------
CANDIDATES = [
    Path("data/analysis_tables"),
    Path("data"),
    Path("."),
]

def find_in_candidates(filename):
    for d in CANDIDATES:
        p = d / filename
        if p.exists():
            return p
    raise FileNotFoundError(f"Could not find {filename} in {', '.join(map(str, CANDIDATES))}")

BRAND_CSV  = find_in_candidates("brand_active_leaders__by_label_skin.csv")
PROD_CSV   = find_in_candidates("product_active_counts__by_label_skin.csv")

brand_df = pd.read_csv(BRAND_CSV)
prod_df  = pd.read_csv(PROD_CSV)

# ---- normalize column names we rely on; raise if missing ----
def need(df, cols, name):
    missing = [c for c in cols if c not in df.columns]
    if missing:
        raise ValueError(f"{name} missing columns: {missing}")

need(brand_df, ["Label","skin","brand","num_products","avg_active","total_active","avg_price","avg_rating"], "brand_active_leaders__by_label_skin.csv")
need(prod_df,  ["Label","skin","brand","name","product_id","active_count_skin","price","rank"], "product_active_counts__by_label_skin.csv")

# clean types
prod_df["price"] = pd.to_numeric(prod_df["price"], errors="coerce")
prod_df["rank"]  = pd.to_numeric(prod_df["rank"], errors="coerce")
prod_df["active_count_skin"] = pd.to_numeric(prod_df["active_count_skin"], errors="coerce").fillna(0).astype(int)

brand_df["avg_price"]  = pd.to_numeric(brand_df["avg_price"], errors="coerce")
brand_df["avg_rating"] = pd.to_numeric(brand_df["avg_rating"], errors="coerce")
brand_df["avg_active"] = pd.to_numeric(brand_df["avg_active"], errors="coerce")
brand_df["total_active"]= pd.to_numeric(brand_df["total_active"], errors="coerce").fillna(0).astype(int)
brand_df["num_products"]= pd.to_numeric(brand_df["num_products"], errors="coerce").fillna(0).astype(int)

# -------- helpers you’ll actually call --------
def state_summary(label: str, skin: str, max_price: float | None = None, topn: int = 5) -> dict:
    """
    Returns three ranked tables (DataFrames):
      - by_avg_active, by_rating, by_value   for the given (label, skin)
    If max_price is provided, brand metrics are recomputed from products <= max_price.
    """
    label = str(label); skin = str(skin)

    if max_price is None:
        b = brand_df[(brand_df["Label"]==label) & (brand_df["skin"]==skin)].copy()
    else:
        # Recompute brand metrics under the budget from products table
        sub = prod_df[(prod_df["Label"]==label) & (prod_df["skin"]==skin)]
        sub = sub[sub["price"].le(max_price)]
        if sub.empty:
            return {"by_avg_active": pd.DataFrame(), "by_rating": pd.DataFrame(), "by_value": pd.DataFrame()}
        b = (sub.groupby("brand", dropna=False)
                .agg(num_products=("product_id","nunique"),
                     avg_active=("active_count_skin","mean"),
                     total_active=("active_count_skin","sum"),
                     avg_price=("price","mean"),
                     avg_rating=("rank","mean"))
                .reset_index())
    if b.empty:
        return {"by_avg_active": b, "by_rating": b, "by_value": b}

    # Value = rating per dollar (guard against 0)
    b = b.assign(value=lambda d: d["avg_rating"] / d["avg_price"].replace(0, np.nan))

    by_avg_active = b.sort_values(["avg_active","total_active","num_products"],
                                  ascending=[False,False,False]).head(topn)
    by_rating     = b.sort_values(["avg_rating","num_products"], ascending=[False,False]).head(topn)
    by_value      = b.sort_values(["value","avg_rating"],     ascending=[False,False]).head(topn)

    # Friendly formatting for quick reading in the notebook
    fmt = lambda df: df.assign(
        avg_active=lambda d: d["avg_active"].round(2),
        avg_price=lambda d: d["avg_price"].round(2),
        avg_rating=lambda d: d["avg_rating"].round(2),
        value=lambda d: (d.get("value") if "value" in d else np.nan).round(3)
    )
    return {
        "by_avg_active": fmt(by_avg_active),
        "by_rating":     fmt(by_rating),
        "by_value":      fmt(by_value),
    }

def top_products_for_brand(label: str, skin: str, brand: str,
                           max_price: float | None = None, topk: int = 3) -> pd.DataFrame:
    """
    For a winner brand in this (label, skin), list the top products
    (sorted by rating desc, then active_count desc, then price asc).
    """
    sub = prod_df[(prod_df["Label"]==label) & (prod_df["skin"]==skin) &
                  (prod_df["brand"].str.casefold()==str(brand).casefold())].copy()
    if max_price is not None:
        sub = sub[sub["price"].le(max_price)]
    if sub.empty:
        return pd.DataFrame(columns=["brand","name","price","rank","active_count_skin"])

    sub = sub.sort_values(["rank","active_count_skin","price"], ascending=[False,False,True])
    out = sub[["brand","name","price","rank","active_count_skin"]].head(topk).copy()
    out["price"] = out["price"].round(2)
    out["rank"]  = out["rank"].round(2)
    return out.reset_index(drop=True)

# --------- EXAMPLES (uncomment & run) ----------
label, skin = "Moisturizer", "Oily"
max_price   = 60  # set None to ignore budget
sums = state_summary(label, skin, max_price=max_price, topn=5)
print(f"\nTop brands by avg_active — {label} / {skin} / budget ≤ ${max_price}:")
print(sums["by_avg_active"].to_string(index=False))
print(f"\nTop brands by rating — {label} / {skin}:")
print(sums["by_rating"].to_string(index=False))
print(f"\nBest value brands (rating per $) — {label} / {skin}:")
print(sums["by_value"].to_string(index=False))
#
# # Pick a brand from one of the tables:
brand = sums["by_avg_active"]["brand"].iloc[0] if not sums["by_avg_active"].empty else "IT COSMETICS"
print(f"\nTop products for {brand}:")
# === Quick annotation helpers from two CSVs ===
# Uses:
#   - brand_active_leaders__by_label_skin.csv  (brand metrics per Label × Skin)
#   - product_active_counts__by_label_skin.csv (product rows with active_count_skin)
#
# What you get:
#   - state_summary(label, skin, max_price=None, topn=5)
#   - top_products_for_brand(label, skin, brand, max_price=None, topk=3)
#
# Example calls are at the bottom.

from pathlib import Path
import pandas as pd
import numpy as np

# -------- locate files (adjust names/paths if yours differ) --------
CANDIDATES = [
    Path("data/analysis_tables"),
    Path("data"),
    Path("."),
]

def find_in_candidates(filename):
    for d in CANDIDATES:
        p = d / filename
        if p.exists():
            return p
    raise FileNotFoundError(f"Could not find {filename} in {', '.join(map(str, CANDIDATES))}")

BRAND_CSV  = find_in_candidates("brand_active_leaders__by_label_skin.csv")
PROD_CSV   = find_in_candidates("product_active_counts__by_label_skin.csv")

brand_df = pd.read_csv(BRAND_CSV)
prod_df  = pd.read_csv(PROD_CSV)

# ---- normalize column names we rely on; raise if missing ----
def need(df, cols, name):
    missing = [c for c in cols if c not in df.columns]
    if missing:
        raise ValueError(f"{name} missing columns: {missing}")

need(brand_df, ["Label","skin","brand","num_products","avg_active","total_active","avg_price","avg_rating"], "brand_active_leaders__by_label_skin.csv")
need(prod_df,  ["Label","skin","brand","name","product_id","active_count_skin","price","rank"], "product_active_counts__by_label_skin.csv")

# clean types
prod_df["price"] = pd.to_numeric(prod_df["price"], errors="coerce")
prod_df["rank"]  = pd.to_numeric(prod_df["rank"], errors="coerce")
prod_df["active_count_skin"] = pd.to_numeric(prod_df["active_count_skin"], errors="coerce").fillna(0).astype(int)

brand_df["avg_price"]  = pd.to_numeric(brand_df["avg_price"], errors="coerce")
brand_df["avg_rating"] = pd.to_numeric(brand_df["avg_rating"], errors="coerce")
brand_df["avg_active"] = pd.to_numeric(brand_df["avg_active"], errors="coerce")
brand_df["total_active"]= pd.to_numeric(brand_df["total_active"], errors="coerce").fillna(0).astype(int)
brand_df["num_products"]= pd.to_numeric(brand_df["num_products"], errors="coerce").fillna(0).astype(int)

# -------- helpers you’ll actually call --------
def state_summary(label: str, skin: str, max_price: float | None = None, topn: int = 5) -> dict:
    """
    Returns three ranked tables (DataFrames):
      - by_avg_active, by_rating, by_value   for the given (label, skin)
    If max_price is provided, brand metrics are recomputed from products <= max_price.
    """
    label = str(label); skin = str(skin)

    if max_price is None:
        b = brand_df[(brand_df["Label"]==label) & (brand_df["skin"]==skin)].copy()
    else:
        # Recompute brand metrics under the budget from products table
        sub = prod_df[(prod_df["Label"]==label) & (prod_df["skin"]==skin)]
        sub = sub[sub["price"].le(max_price)]
        if sub.empty:
            return {"by_avg_active": pd.DataFrame(), "by_rating": pd.DataFrame(), "by_value": pd.DataFrame()}
        b = (sub.groupby("brand", dropna=False)
                .agg(num_products=("product_id","nunique"),
                     avg_active=("active_count_skin","mean"),
                     total_active=("active_count_skin","sum"),
                     avg_price=("price","mean"),
                     avg_rating=("rank","mean"))
                .reset_index())
    if b.empty:
        return {"by_avg_active": b, "by_rating": b, "by_value": b}

    # Value = rating per dollar (guard against 0)
    b = b.assign(value=lambda d: d["avg_rating"] / d["avg_price"].replace(0, np.nan))

    by_avg_active = b.sort_values(["avg_active","total_active","num_products"],
                                  ascending=[False,False,False]).head(topn)
    by_rating     = b.sort_values(["avg_rating","num_products"], ascending=[False,False]).head(topn)
    by_value      = b.sort_values(["value","avg_rating"],     ascending=[False,False]).head(topn)

    # Friendly formatting for quick reading in the notebook
    fmt = lambda df: df.assign(
        avg_active=lambda d: d["avg_active"].round(2),
        avg_price=lambda d: d["avg_price"].round(2),
        avg_rating=lambda d: d["avg_rating"].round(2),
        value=lambda d: (d.get("value") if "value" in d else np.nan).round(3)
    )
    return {
        "by_avg_active": fmt(by_avg_active),
        "by_rating":     fmt(by_rating),
        "by_value":      fmt(by_value),
    }

def top_products_for_brand(label: str, skin: str, brand: str,
                           max_price: float | None = None, topk: int = 3) -> pd.DataFrame:
    """
    For a winner brand in this (label, skin), list the top products
    (sorted by rating desc, then active_count desc, then price asc).
    """
    sub = prod_df[(prod_df["Label"]==label) & (prod_df["skin"]==skin) &
                  (prod_df["brand"].str.casefold()==str(brand).casefold())].copy()
    if max_price is not None:
        sub = sub[sub["price"].le(max_price)]
    if sub.empty:
        return pd.DataFrame(columns=["brand","name","price","rank","active_count_skin"])

    sub = sub.sort_values(["rank","active_count_skin","price"], ascending=[False,False,True])
    out = sub[["brand","name","price","rank","active_count_skin"]].head(topk).copy()
    out["price"] = out["price"].round(2)
    out["rank"]  = out["rank"].round(2)
    return out.reset_index(drop=True)

# --------- EXAMPLES (uncomment & run) ----------
label, skin = "Moisturizer", "Oily"
max_price   = 60  # set None to ignore budget
sums = state_summary(label, skin, max_price=max_price, topn=5)
print(f"\nTop brands by avg_active — {label} / {skin} / budget ≤ ${max_price}:")
print(sums["by_avg_active"].to_string(index=False))
print(f"\nTop brands by rating — {label} / {skin}:")
print(sums["by_rating"].to_string(index=False))
print(f"\nBest value brands (rating per $) — {label} / {skin}:")
print(sums["by_value"].to_string(index=False))
#
# # Pick a brand from one of the tables:
brand = sums["by_avg_active"]["brand"].iloc[0] if not sums["by_avg_active"].empty else "IT COSMETICS"
print(f"\nTop products for {brand}:")
print(top_products_for_brand(label, skin, brand, max_price=max_price, topk=3).to_string(index=False))
print(top_products_for_brand(label, skin, brand, max_price=max_price, topk=3).to_string(index=False))



Top brands by avg_active — Moisturizer / Oily / budget ≤ $60:
                    brand  num_products  avg_active  total_active  avg_price  avg_rating  value
             IT COSMETICS             7        4.29            30      38.14        4.07  0.107
                  CLARINS             1        3.00             3      57.00        4.20  0.074
DR. DENNIS GROSS SKINCARE             1        3.00             3      60.00        4.20  0.070
               JACK BLACK             1        3.00             3      30.00        4.50  0.150
                 GLAMGLOW             3        2.67             8      55.67        4.17  0.075

Top brands by rating — Moisturizer / Oily:
     brand  num_products  avg_active  total_active  avg_price  avg_rating  value
SON & PARK             1        2.00             2       30.0        4.60  0.153
   LANEIGE             6        2.17            13       34.5        4.57  0.132
 BIOSSANCE             2        1.50             3       55.0        4.55 

In [6]:
import pandas as pd, numpy as np, json
from pathlib import Path

# ---------- Locate the data/ folder no matter where this notebook runs ----------
def find_data_dir():
    here = Path.cwd()
    for p in [here, *here.parents]:
        maybe = p / "data"
        if maybe.is_dir():
            return maybe
    raise FileNotFoundError("Could not locate a 'data/' directory up the tree.")

DATA = find_data_dir()
AN   = DATA / "analysis_tables"
print("Using data dir:", DATA.as_posix())

# ---------- Choose the product-level CSV ----------
# Preferred: full product_active_counts_by_label_skin.csv
# Fallback : product_active_counts__TOPBRANDS_only.csv
candidates = [
    AN / "product_active_counts_by_label_skin.csv",
    AN / "product_active_counts__TOPBRANDS_only.csv",
]
prod_path = next((p for p in candidates if p.exists()), None)
if prod_path is None:
    raise FileNotFoundError("Couldn't find the product_active_counts CSV in data/analysis_tables/")

print("Loading:", prod_path.name)
prod = pd.read_csv(prod_path)

# ---------- Basic cleanup ----------
for c in ["price","rank","active_count_skin"]:
    if c in prod.columns:
        prod[c] = pd.to_numeric(prod[c], errors="coerce")

req_cols = ["brand","name","Label","skin","price","rank","active_count_skin"]
missing = [c for c in req_cols if c not in prod.columns]
if missing:
    raise ValueError(f"CSV is missing required columns: {missing}")

# ---------- Budget buckets you’ll mirror in the UI ----------
BUDGET_EDGES = [25, 40, 60, 80, 120, 200, 1e9]
def bucket_edge(p):
    p = float(p) if pd.notna(p) else 1e9
    for e in BUDGET_EDGES:
        if p <= e + 1e-9:
            return int(e if e < 1e9 else 0)   # 0 = uncapped bucket
    return 0

prod["budget_edge"] = prod["price"].apply(bucket_edge)

def state_key(label, skin, edge):  # Label__Skin__max{edge}
    return f"{label}__{skin}__max{int(edge)}"

# ---------- Build state_winners.json ----------
state_winners = {}
prod_use = prod.dropna(subset=["price"]).copy()
prod_use["active_count_skin"] = prod_use["active_count_skin"].fillna(0)

for (label, skin, edge), g in prod_use.groupby(["Label","skin","budget_edge"]):
    brand_stats = (g.groupby("brand", as_index=False)
                     .agg(num_products=("name","nunique"),
                          avg_active=("active_count_skin","mean"),
                          total_active=("active_count_skin","sum"),
                          avg_price=("price","mean"),
                          avg_rating=("rank","mean")))
    if brand_stats.empty:
        continue

    def top_by_avg_active(df, n=3):
        x = df.sort_values(["avg_active","avg_rating"], ascending=[False, False]).head(n)
        x["value"] = x["avg_active"] * x["avg_rating"]
        return x[["brand","num_products","avg_active","total_active","avg_price","avg_rating","value"]].to_dict("records")

    def top_by_rating(df, n=3):
        x = df.sort_values(["avg_rating","avg_price"], ascending=[False, True]).head(n)
        x["value"] = x["avg_rating"] / x["avg_price"].replace(0, np.nan)
        return x[["brand","num_products","avg_active","total_active","avg_price","avg_rating","value"]].to_dict("records")

    def top_by_value(df, n=3):
        y = df.assign(value=lambda d: d["avg_rating"]/d["avg_price"].replace(0, np.nan))
        x = y.sort_values("value", ascending=False).head(n)
        return x[["brand","num_products","avg_active","total_active","avg_price","avg_rating","value"]].to_dict("records")

    winners = {
        "top_by_avg_active": top_by_avg_active(brand_stats),
        "top_by_rating":     top_by_rating(brand_stats),
        "top_by_value":      top_by_value(brand_stats),
    }
    state_winners[state_key(label, skin, edge)] = winners

# ---------- Build brand_top_products.json ----------
brand_top_products = {}
for (label, skin, edge), g in prod_use.groupby(["Label","skin","budget_edge"]):
    key = state_key(label, skin, edge)
    bucket = {}
    for brand, gb in g.groupby("brand"):
        top_prods = (gb.sort_values(["active_count_skin","rank","price"],
                                    ascending=[False, False, True])
                       .head(3)[["name","price","rank","active_count_skin"]]
                       .rename(columns={"rank":"rating",
                                        "active_count_skin":"active_count"}))
        bucket[brand] = top_prods.to_dict("records")
    brand_top_products[key] = bucket

# ---------- Write JSONs to data/ ----------
(DATA).mkdir(parents=True, exist_ok=True)
with open(DATA / "best_brand_for_skin_types.json", "w") as f:
    json.dump(state_winners, f, indent=2)
with open(DATA / "best_products_for_brand.json", "w") as f:
    json.dump(brand_top_products, f, indent=2)

print("Wrote:")
print(" -", (DATA / "best_brand_for_skin_types.json").as_posix())
print(" -", (DATA / "brand_top_products.json").as_posix())


Using data dir: /Users/tatiannasanchez/Desktop/lab-1/portfolio/projects/skincare-viz/data
Loading: product_active_counts__TOPBRANDS_only.csv
Wrote:
 - /Users/tatiannasanchez/Desktop/lab-1/portfolio/projects/skincare-viz/data/best_brand_for_skin_types.json
 - /Users/tatiannasanchez/Desktop/lab-1/portfolio/projects/skincare-viz/data/brand_top_products.json
