In [None]:
# ===============================================
# file: playwright_runner.py
# Weekly Shopify Channel Performance -> Summary -> Google Sheets
# - Dynamic SINCE/UNTIL via CLI args or .env (defaults UNTIL=today)
# - Optional AUTO_LOGIN using SHOPIFY_EMAIL/SHOPIFY_PASSWORD (+ optional 2FA)
# - First successful login saves session to playwright_storage_state.json for future runs
# ===============================================
import os
import sys
import time
import json
import asyncio
from pathlib import Path
import re
from pathlib import Path
from datetime import datetime, timedelta, date

# --- Windows stability: required for Playwright subprocess in some notebook/event-loop setups
if sys.platform.startswith("win"):
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())

# --- deps (auto-install if missing)
def ensure(pkg: str):
    import importlib
    try:
        importlib.import_module(pkg)
    except ModuleNotFoundError:
        import subprocess
        print(f"{pkg} not found; installing...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])

ensure("python-dotenv")
ensure("playwright")
ensure("pandas")
ensure("google-api-python-client")
ensure("google-auth")
ensure("google-auth-oauthlib")
ensure("google-auth-httplib2")

from dotenv import load_dotenv
import pandas as pd
from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError

from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request

# Ensure browser binaries exist (idempotent)
def ensure_playwright_browsers():
    import subprocess
    try:
        subprocess.check_call([sys.executable, "-m", "playwright", "install", "chromium"])
    except Exception as e:
        print('Failed to install Playwright Chromium automatically:', e)
        print('Run manually: python -m playwright install chromium')

ensure_playwright_browsers()

load_dotenv()

python-dotenv not found; installing...
google-api-python-client not found; installing...
google-auth not found; installing...
google-auth-oauthlib not found; installing...
google-auth-httplib2 not found; installing...


True

In [None]:
# -------------------------
# Helpers: parsing args + date logic
# -------------------------
def parse_ymd(s: str) -> date:
    return datetime.strptime(s, "%Y-%m-%d").date()

def ymd(d: date) -> str:
    return d.isoformat()

def get_date_range_from_args_or_env():
    """
    Precedence:
      1) CLI args --since/--until
      2) .env SINCE/UNTIL
      3) fallback SINCE="2025-09-01", UNTIL=today
    """
    parser = argparse.ArgumentParser(add_help=True)
    parser.add_argument("--since", type=str, default=None, help="YYYY-MM-DD (inclusive)")
    parser.add_argument("--until", type=str, default=None, help="YYYY-MM-DD (inclusive)")
    parser.add_argument("--last-days", type=int, default=None, help="Override since/until: last N days ending today (inclusive)")
    parser.add_argument("--last-full-week", action="store_true", help="Override since/until: last full Mon-Sun week")
    args = parser.parse_args()

    today = datetime.now().date()

    if args.last_days and args.last_days > 0:
        since_d = today - timedelta(days=args.last_days - 1)
        until_d = today
        return ymd(since_d), ymd(until_d)

    if args.last_full_week:
        # last full week Mon-Sun ending before today (if today is Sun, last week is previous Mon-Sun)
        # Define week start Monday=0
        week_start = 0
        # Find most recent Sunday before today
        # weekday: Mon=0 ... Sun=6
        days_since_sun = (today.weekday() - 6) % 7
        last_sun = today - timedelta(days=days_since_sun or 7)  # if today is Sunday, go back 7 days
        last_mon = last_sun - timedelta(days=6)
        return ymd(last_mon), ymd(last_sun)

    since = args.since or os.getenv("SINCE") or "2025-09-01"
    until = args.until or os.getenv("UNTIL") or ymd(today)
    return since, until

In [None]:
# -----------------
# Config from .env
#------------------

STORE_SLUG = os.getenv("SHOPIFY_STORE_SLUG")
DOWNLOAD_DIR = Path(os.getenv("DOWNLOAD_DIR", "./downloads")).resolve()
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)

SINCE, UNTIL = get_date_range_from_args_or_env()
COUNTRY = os.getenv("COUNTRY", "US")

# Visible browser for first run recommended
HEADLESS = os.getenv("CHROME_HEADLESS", "0").lower() not in ("0", "false", "no")

STATE_FILE = Path(os.getenv("PLAYWRIGHT_STATE_FILE", "playwright_storage_state.json")).resolve()

UPLOAD_TO_SHEET = os.getenv("UPLOAD_TO_SHEET", "0").lower() in ("1", "true", "yes")
SHEET_ID = os.getenv("SHEET_ID")
SHEET_NAME = os.getenv("SHEET_NAME", "Automated_reports_updated")
SHEET_MODE = os.getenv("SHEET_MODE", "append").lower()  # append | overwrite

CREDENTIALS_JSON = Path(os.getenv("GOOGLE_CREDENTIALS_JSON", "credentials.json")).resolve()
TOKEN_JSON = Path(os.getenv("GOOGLE_TOKEN_JSON", "token.json")).resolve()

# Weekly loop settings:
# WEEK_START = 0 means Monday, 6 means Sunday
WEEK_START = int(os.getenv("WEEK_START", "0"))

# Optional: custom labels per week
# WEEK_LABELS format: "YYYY-MM-DD:YYYY-MM-DD=Label;YYYY-MM-DD:YYYY-MM-DD=Label2"
WEEK_LABELS_RAW = os.getenv("WEEK_LABELS", "").strip()

# Optional: auto login to Shopify
AUTO_LOGIN = os.getenv("AUTO_LOGIN", "0").lower() in ("1", "true", "yes")
SHOPIFY_EMAIL = os.getenv("SHOPIFY_EMAIL")
SHOPIFY_PASSWORD = os.getenv("SHOPIFY_PASSWORD")

# REPORT_URL = (
#     f"https://admin.shopify.com/store/{STORE_SLUG}/marketing/reports/channels"
#     f"?attributionModel=last_click_non_direct"
#     f"&since={SINCE}&until={UNTIL}"
#     f"&sortColumn=sessions&sortDirection=desc"
#     f"&country={COUNTRY}"
# )


In [6]:
# ----------------------------
# Google Sheets OAuth helpers
#----------------------------
SCOPES = ["https://www.googleapis.com/auth/spreadsheets"]

def get_sheets_service():
    """
    OAuth Installed App flow.
    - First run opens a browser to consent, saves token.json
    - Future runs reuse token.json
    """
    creds = None

    if TOKEN_JSON.exists():
        creds = Credentials.from_authorized_user_file(str(TOKEN_JSON), SCOPES)

    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            if not CREDENTIALS_JSON.exists():
                raise FileNotFoundError(
                    f"Missing {CREDENTIALS_JSON}. Save your OAuth JSON as 'credentials.json' next to this script."
                )
            flow = InstalledAppFlow.from_client_secrets_file(str(CREDENTIALS_JSON), SCOPES)
            creds = flow.run_local_server(port=0)

        TOKEN_JSON.write_text(creds.to_json(), encoding="utf-8")

    return build("sheets", "v4", credentials=creds)

def ensure_tab(service, spreadsheet_id: str, sheet_name: str):
    meta = service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute()
    existing = [s["properties"]["title"] for s in meta.get("sheets", [])]
    if sheet_name in existing:
        return
    req = {"requests": [{"addSheet": {"properties": {"title": sheet_name}}}]}
    service.spreadsheets().batchUpdate(spreadsheetId=spreadsheet_id, body=req).execute()

def upload_df_to_sheet(df: pd.DataFrame, spreadsheet_id: str, sheet_name: str, mode: str = "append"):
    if not spreadsheet_id:
        raise ValueError("SHEET_ID is missing in .env")
    service = get_sheets_service()
    ensure_tab(service, spreadsheet_id, sheet_name)

    header_and_rows = [df.columns.tolist()] + df.astype(str).values.tolist()

    if mode == "overwrite":
        service.spreadsheets().values().clear(
            spreadsheetId=spreadsheet_id,
            range=f"{sheet_name}!A:ZZ",
            body={}
        ).execute()
        service.spreadsheets().values().update(
            spreadsheetId=spreadsheet_id,
            range=f"{sheet_name}!A1",
            valueInputOption="USER_ENTERED",
            body={"values": header_and_rows},
        ).execute()
        print(f"✅ Uploaded to Google Sheet (overwrite): {sheet_name}")
        return

    # append
    first_cell = service.spreadsheets().values().get(
        spreadsheetId=spreadsheet_id,
        range=f"{sheet_name}!A1:A1"
    ).execute().get("values", [])

    if not first_cell:
        service.spreadsheets().values().update(
            spreadsheetId=spreadsheet_id,
            range=f"{sheet_name}!A1",
            valueInputOption="USER_ENTERED",
            body={"values": header_and_rows},
        ).execute()
        print(f"✅ Uploaded to Google Sheet (new tab): {sheet_name}")
    else:
        service.spreadsheets().values().append(
            spreadsheetId=spreadsheet_id,
            range=f"{sheet_name}!A1",
            valueInputOption="USER_ENTERED",
            insertDataOption="INSERT_ROWS",
            body={"values": df.astype(str).values.tolist()},
        ).execute()
        print(f"✅ Appended to Google Sheet: {sheet_name}")

In [None]:
# -------------------------
# Helpers: week labels + formatting
# -------------------------
def format_date_range(start_ymd: str, end_ymd: str) -> str:
    sdt = datetime.strptime(start_ymd, "%Y-%m-%d")
    edt = datetime.strptime(end_ymd, "%Y-%m-%d")
    return f"{sdt.month}/{sdt.day}-{edt.month}/{edt.day}"

def month_name_from_date(d: date) -> str:
    return d.strftime("%B")

def week_start_for(d: date, week_start: int = 0) -> date:
    delta = (d.weekday() - week_start) % 7
    return d - timedelta(days=delta)

def iter_weeks(since: date, until: date, week_start: int = 0):
    """
    Yields (start, end) date pairs inclusive.
    Weeks are aligned to week_start (default Monday).
    First week starts at since (not earlier), last ends at until.
    """
    cur_start = since
    while cur_start <= until:
        anchor = week_start_for(cur_start, week_start)
        week_end = anchor + timedelta(days=6)
        cur_end = min(week_end, until)
        yield (cur_start, cur_end)
        cur_start = cur_end + timedelta(days=1)

def parse_week_labels(raw: str):
    """
    WEEK_LABELS format: "YYYY-MM-DD:YYYY-MM-DD=Label;YYYY-MM-DD:YYYY-MM-DD=Label2"
    """
    labels = {}
    if not raw:
        return labels
    items = [x.strip() for x in raw.split(";") if x.strip()]
    for item in items:
        if "=" not in item or ":" not in item:
            continue
        k, v = item.split("=", 1)
        labels[k.strip()] = v.strip()
    return labels

WEEK_LABELS = parse_week_labels(WEEK_LABELS_RAW)

def get_week_label(start: date, end: date) -> str:
    key = f"{start.isoformat()}:{end.isoformat()}"
    return WEEK_LABELS.get(key, "")

# -------------------------
# Helpers: parsing Shopify export
# -------------------------
def to_number(x) -> float:
    if x is None:
        return 0.0
    s = str(x).strip()
    if s in ("", "—", "-", "nan", "None"):
        return 0.0
    s = s.replace("$", "").replace(",", "")
    try:
        return float(s)
    except Exception:
        return 0.0

def bucket_row(ref_platform: str, channel: str, typ: str) -> str:
    rp = (ref_platform or "").strip().lower()
    ch = (channel or "").strip().lower()
    ty = (typ or "").strip().lower()

    if ch == "direct" or rp == "direct":
        return "Direct Website Sales (Organic)"

    if ch == "google" and ty == "paid":
        return "Google ads (Sales)"

    if ch == "google" and ty == "organic":
        return "Google Search (Organic Sales)"

    if ch == "attentive" or rp == "attentive":
        return "Attentive SMS (Sales)"

    if ch == "privy" or rp == "privy":
        return "Privey Email Marketing (Sales)"
    
    if ch == "activecampaign" or rp == "activecampaign":
        return "ActiveCampaign (Sales)"

    return "Other Channel Sales MISC"

def build_misc_notes(df_other: pd.DataFrame, channel_col: str, type_col: str, sales_col: str) -> str:
    if df_other.empty:
        return ""
    tmp = df_other.copy()
    tmp["_name"] = tmp.apply(lambda r: f"{str(r[channel_col]).strip()} ({str(r[type_col]).strip()})", axis=1)
    tmp["_sales"] = tmp[sales_col].apply(to_number)
    grouped = (
        tmp.groupby("_name", as_index=False)["_sales"].sum()
        .sort_values("_sales", ascending=False)
    )
    parts = []
    for _, r in grouped.head(10).iterrows():
        if r["_sales"] > 0:
            parts.append(f"{r['_name']} ${r['_sales']:,.2f}")
    return " | ".join(parts)

def summarize_channel_csv_to_weekly_row(csv_path: Path, start: date, end: date) -> pd.DataFrame:
    df = pd.read_csv(csv_path)

    colmap = {c.lower().strip(): c for c in df.columns}
    def col(name_lower):
        return colmap.get(name_lower, None)

    rp_col = col("referring platform")
    ch_col = col("channel")
    ty_col = col("type")
    sales_col = col("sales")
    cost_col = col("cost")

    if not (rp_col and ch_col and ty_col and sales_col):
        raise ValueError(f"CSV missing required columns. Found: {list(df.columns)}")

    df["_sales"] = df[sales_col].apply(to_number)
    df["_cost"] = df[cost_col].apply(to_number) if cost_col else 0.0

    df["_bucket"] = df.apply(lambda r: bucket_row(r[rp_col], r[ch_col], r[ty_col]), axis=1)

    agg = df.groupby("_bucket", as_index=False).agg({"_sales": "sum", "_cost": "sum"})
    sales_by = {row["_bucket"]: float(row["_sales"]) for _, row in agg.iterrows()}
    cost_by  = {row["_bucket"]: float(row["_cost"])  for _, row in agg.iterrows()}

    other_df = df[df["_bucket"] == "Other Channel Sales MISC"].copy()
    misc_notes = build_misc_notes(other_df, ch_col, ty_col, sales_col)

    tot_sales = float(df["_sales"].sum())
    tot_cost  = float(df["_cost"].sum())

    # Not true gross margin; this is marketing margin proxy
    marketing_margin = ((tot_sales - tot_cost) / tot_sales) if tot_sales else 0.0

    start_str = start.isoformat()
    end_str = end.isoformat()

    label = get_week_label(start, end)
    dates_week = format_date_range(start_str, end_str)
    if label:
        dates_week = f"{dates_week} ({label})"

    month = month_name_from_date(start)

    row = {
        "Month": month,
        "Dates/ Week": dates_week,

        "Direct Website Sales (Organic)": round(sales_by.get("Direct Website Sales (Organic)", 0.0), 2),
        "Google ads (Sales)": round(sales_by.get("Google ads (Sales)", 0.0), 2),
        "Google Search (Organic Sales)": round(sales_by.get("Google Search (Organic Sales)", 0.0), 2),
        "Attentive SMS (Sales)": round(sales_by.get("Attentive SMS (Sales)", 0.0), 2),
        "Privey Email Marketing (Sales)": round(sales_by.get("Privey Email Marketing (Sales)", 0.0), 2),
        "ActiveCampaign (Sales)": round(sales_by.get("ActiveCampaign (Sales)", 0.0), 2),
        "Other Channel Sales MISC": round(sales_by.get("Other Channel Sales MISC", 0.0), 2),

        "Tot Sales": round(tot_sales, 2),

        "Google ads (Cost)": round(cost_by.get("Google ads (Sales)", 0.0), 2),
        "Privey Email Marketing (Cost)": round(cost_by.get("Privey Email Marketing (Sales)", 0.0), 2),
        "Attentive SMS (Cost)": round(cost_by.get("Attentive SMS (Sales)", 0.0), 2),
        "Total Cost": round(tot_cost, 2),

        "GPM": round(marketing_margin, 4),
        "MISC.": misc_notes,

        # "Upload_Date": datetime.now().strftime("%Y-%m-%d"),
        "Range_Start": start_str,
        "Range_End": end_str,
        "Country": COUNTRY,
    }

    ordered_cols = [
        "Month",
        "Dates/ Week",
        "Direct Website Sales (Organic)",
        "Google ads (Sales)",
        "Google Search (Organic Sales)",
        "Attentive SMS (Sales)",
        "Privey Email Marketing (Sales)",
        "ActiveCampaign (Sales)",
        "Other Channel Sales MISC",
        "Tot Sales",
        "Google ads (Cost)",
        "Privey Email Marketing (Cost)",
        "Attentive SMS (Cost)",
        "Total Cost",
        "GPM",
        "MISC.",
        # "Upload_Date",
        "Range_Start",
        "Range_End",
        "Country",
    ]

    return pd.DataFrame([[row.get(c, "") for c in ordered_cols]], columns=ordered_cols)


In [None]:
# -------------------------
# Shopify: build URL + click export
# -------------------------
def build_report_url(store_slug: str, since_ymd: str, until_ymd: str, country: str) -> str:
    return (
        f"https://admin.shopify.com/store/{store_slug}/marketing/reports/channels"
        f"?attributionModel=last_click_non_direct"
        f"&since={since_ymd}&until={until_ymd}"
        f"&sortColumn=sessions&sortDirection=desc"
        f"&country={country}"
    )

async def click_export_flow(page):
    # Wait until the page is interactive
    await page.wait_for_load_state("domcontentloaded")
    await page.wait_for_timeout(1200)

    # Sometimes the report UI takes extra time
    # Wait for any button area to render
    await page.wait_for_selector("button", timeout=60000)

    # 1) Try direct Export button (several variants)
    export_clicked = False
    export_selectors = [
        "button:has-text('Export')",
        "button[aria-label='Export']",
        "[role='button']:has-text('Export')",
    ]
    for sel in export_selectors:
        try:
            await page.click(sel, timeout=5000)
            export_clicked = True
            break
        except Exception:
            pass

    # 2) If not found, try overflow menu then Export
    if not export_clicked:
        overflow_selectors = [
            "button[aria-label='More actions']",
            "button[aria-haspopup='menu']",
            "button:has-text('More actions')",
            "button:has-text('More')",
        ]
        for sel in overflow_selectors:
            try:
                await page.click(sel, timeout=5000)
                break
            except Exception:
                pass

        # click Export inside menu
        await page.click("text=Export", timeout=15000)

    await page.wait_for_timeout(800)

    # 3) Choose CSV if there is a choice
    csv_selectors = [
        "text=CSV",
        "button:has-text('CSV')",
        "[role='menuitem']:has-text('CSV')",
        "label:has-text('CSV')",
    ]
    for sel in csv_selectors:
        try:
            await page.click(sel, timeout=2500)
            break
        except Exception:
            pass

    # 4) Confirm Export (dialog)
    confirm_selectors = [
        "button:has-text('Export')",
        "button[aria-label='Export']",
        "text=Export",
    ]
    for sel in confirm_selectors:
        try:
            await page.click(sel, timeout=8000)
            break
        except Exception:
            pass

async def auto_login_shopify(page):
    """
    Attempt Shopify login using SHOPIFY_EMAIL/SHOPIFY_PASSWORD from .env.
    If 2FA prompt appears, asks for code (or uses SHOPIFY_OTP if provided).
    """
    if not SHOPIFY_EMAIL or not SHOPIFY_PASSWORD:
        raise ValueError("AUTO_LOGIN=1 but SHOPIFY_EMAIL/SHOPIFY_PASSWORD missing in .env")

    await page.goto(f"https://admin.shopify.com/store/{STORE_SLUG}", timeout=60000)
    await page.wait_for_load_state("domcontentloaded")

    # Email step
    try:
        await page.fill("#account_email", SHOPIFY_EMAIL, timeout=15000)
        await page.click("button[name='commit']", timeout=15000)
    except Exception:
        pass

    # Password step
    try:
        await page.fill("#account_password", SHOPIFY_PASSWORD, timeout=15000)
        await page.click("button[name='commit']", timeout=15000)
    except Exception:
        pass

    # 2FA step (if present)
    try:
        otp_selector = "input[name='two_factor_code'], input[name='otp']"
        await page.wait_for_selector(otp_selector, timeout=8000)
        otp = os.getenv("SHOPIFY_OTP", "").strip() or input("Enter Shopify 2FA code: ").strip()
        await page.fill(otp_selector, otp)
        await page.click("button[name='commit']", timeout=15000)
    except Exception:
        pass

    # Wait for admin UI
    await page.wait_for_selector("nav[aria-label='Primary'], #AppFrameMain", timeout=60000)

In [None]:
# -------------------------
# Main automation
# -------------------------
async def run():
    if not STORE_SLUG:
        raise ValueError("Missing SHOPIFY_STORE_SLUG in .env")

    since_d = parse_ymd(SINCE)
    until_d = parse_ymd(UNTIL)
    if until_d < since_d:
        raise ValueError("UNTIL must be >= SINCE")

    print(f"DOWNLOAD_DIR: {DOWNLOAD_DIR}")
    print(f"Range: {since_d} -> {until_d} (inclusive)")
    print(f"Weekly loop (week_start={WEEK_START}, Mon=0)")
    print(f"UPLOAD_TO_SHEET={UPLOAD_TO_SHEET} SHEET_MODE={SHEET_MODE}")

    overwrite_first = (SHEET_MODE == "overwrite")

    async with async_playwright() as p:
        browser = await p.chromium.launch(
            headless=HEADLESS,
            args=["--disable-dev-shm-usage", "--no-sandbox", "--disable-setuid-sandbox"]
        )

        if STATE_FILE.exists():
            context = await browser.new_context(storage_state=str(STATE_FILE), accept_downloads=True)
        else:
            context = await browser.new_context(accept_downloads=True)

        page = await context.new_page()

        try:
            await page.goto(f"https://admin.shopify.com/store/{STORE_SLUG}", timeout=60000)

            if not STATE_FILE.exists():
                if AUTO_LOGIN:
                    print("\nAUTO_LOGIN enabled: attempting Shopify login...")
                    await auto_login_shopify(page)
                else:
                    print("\n✅ FIRST RUN: Log into Shopify in the opened browser window.")
                    print("   When Shopify Admin is fully loaded, return here and press Enter.\n")
                    input("Press Enter after Shopify login is complete... ")

                await context.storage_state(path=str(STATE_FILE))
                print(f"✅ Saved Shopify session to: {STATE_FILE}\n")

            all_summary_rows = []

            for start, end in iter_weeks(since_d, until_d, week_start=WEEK_START):
                since_ymd = start.isoformat()
                until_ymd = end.isoformat()
                report_url = build_report_url(STORE_SLUG, since_ymd, until_ymd, COUNTRY)

                print(f"\n--- Exporting {since_ymd} to {until_ymd} ---")
                await page.goto(report_url, timeout=60000)

                async with page.expect_download(timeout=180000) as dl_info:
                    await click_export_flow(page)
                download = await dl_info.value

                stamp = time.strftime("%Y%m%d_%H%M%S")
                dest = DOWNLOAD_DIR / f"shopify_channel_perf_{COUNTRY}_{since_ymd}_{until_ymd}_{stamp}.csv"
                await download.save_as(str(dest))
                print(f"✅ Downloaded: {dest}")

                summary_df = summarize_channel_csv_to_weekly_row(dest, start, end)
                all_summary_rows.append(summary_df)

                summary_path = dest.with_name(dest.stem + "_SUMMARY.csv")
                summary_df.to_csv(summary_path, index=False)
                print(f"✅ Summary saved: {summary_path}")

                if UPLOAD_TO_SHEET:
                    mode = "overwrite" if overwrite_first else "append"
                    upload_df_to_sheet(summary_df, SHEET_ID, SHEET_NAME, mode=mode)
                    overwrite_first = False

            if all_summary_rows:
                combined = pd.concat(all_summary_rows, ignore_index=True)
                combined_path = DOWNLOAD_DIR / f"shopify_weekly_summary_{COUNTRY}_{SINCE}_{UNTIL}_{time.strftime('%Y%m%d_%H%M%S')}.csv"
                combined.to_csv(combined_path, index=False)
                print(f"\n✅ Combined summary saved: {combined_path}")

        finally:
            await context.close()
            await browser.close()

if __name__ == "__main__":
    asyncio.run(run())