In [2]:
from dataclasses import dataclass
from typing import List, Optional, Dict, Any, Union
from datetime import date as date_type

import clickhouse_connect
import pandas as pd
import numpy as np


# ============================================================
# Dataclasses
# ============================================================

@dataclass
class AdMetric:
    date: Union[str, date_type]
    platform: str
    campaignId: str
    campaignName: str
    adSetId: str
    adSetName: str
    adId: str
    adName: str
    spend: float
    impressions: int
    interactions: int
    clicks: int
    conversions: int
    conversionValue: float


@dataclass
class AMAdStatus:
    type: str
    platform: str
    campaignId: str
    adSetId: str
    adId: str
    name: str
    productType: str
    budget: str
    targetROAS: float
    isActive: bool


# ============================================================
# ClickHouse connection helper
# ============================================================

def connect_to_clickhouse():
    return clickhouse_connect.get_client(host='localhost', port=8123, username='default')


# ============================================================
# Name resolution (latest names from fact_metrics)
#   - safer keys include campaignId/adSetId to avoid collisions
# ============================================================

def _get_latest_names(website_id: str, market: str, client=None) -> Dict[str, str]:
    if client is None:
        client = connect_to_clickhouse()

    where_market = "AND market = {market: String}" if market != 'all' else ""

    query = f"""
    WITH filtered_data AS (
        SELECT
            timestamp,
            platform,
            campaignId,
            campaignName,
            adSetId,
            adSetName,
            adId,
            adName
        FROM fact_metrics FINAL
        WHERE websiteId = {{websiteId: String}}
          {where_market}
          AND isDelete = false
    )
    SELECT
        'campaign' AS type,
        platform,
        campaignId,
        '' AS adSetId,
        '' AS adId,
        argMaxIf(campaignName, timestamp, campaignName IS NOT NULL AND campaignName != '') AS name
    FROM filtered_data
    GROUP BY platform, campaignId
    HAVING name != ''

    UNION ALL

    SELECT
        'adSet' AS type,
        platform,
        campaignId,
        adSetId,
        '' AS adId,
        argMaxIf(adSetName, timestamp, adSetName IS NOT NULL AND adSetName != '') AS name
    FROM filtered_data
    GROUP BY platform, campaignId, adSetId
    HAVING name != ''

    UNION ALL

    SELECT
        'ad' AS type,
        platform,
        campaignId,
        adSetId,
        adId,
        argMaxIf(adName, timestamp, adName IS NOT NULL AND adName != '') AS name
    FROM filtered_data
    GROUP BY platform, campaignId, adSetId, adId
    HAVING name != ''
    """

    params = {'websiteId': website_id}
    if market != 'all':
        params['market'] = market

    result = client.query(query, parameters=params)

    # key format: type::platform::campaignId::adSetId::adId
    return {f"{row[0]}::{row[1]}::{row[2]}::{row[3]}::{row[4]}": row[5] for row in result.result_rows}


def _apply_latest_names_to_metrics(items: List[AdMetric], names_lookup: Dict[str, str]) -> None:
    for item in items:
        k_campaign = f"campaign::{item.platform}::{item.campaignId}::::"
        if k_campaign in names_lookup:
            item.campaignName = names_lookup[k_campaign]

        k_adset = f"adSet::{item.platform}::{item.campaignId}::{item.adSetId}::"
        if k_adset in names_lookup:
            item.adSetName = names_lookup[k_adset]

        k_ad = f"ad::{item.platform}::{item.campaignId}::{item.adSetId}::{item.adId}"
        if k_ad in names_lookup:
            item.adName = names_lookup[k_ad]


# ============================================================
# Ad-level metrics from fact_metrics
# ============================================================

def _get_ad_metrics(
    website_id: str,
    start_date: str,
    end_date: str,
    timezone: str,
    market: str,
    client=None
) -> List[AdMetric]:
    if client is None:
        client = connect_to_clickhouse()

    where_market = "AND market = {market: String}" if market != 'all' else ""

    query = f"""
    SELECT
      toDate(timestamp, {{timezone: String}}) AS date,
      platform,
      campaignId,
      adSetId,
      adId,
      sum(toFloat64(spend)) AS spend,
      sum(impressions) AS impressions,
      sum(interactions) AS interactions,
      sum(clicks) AS clicks,
      sum(conversions) AS conversions,
      sum(toFloat64(conversionsValue)) AS conversionValue
    FROM fact_metrics FINAL
    WHERE
      websiteId = {{websiteId: String}}
      AND toDate(timestamp, {{timezone: String}}) >= toDate({{startDate: String}})
      AND toDate(timestamp, {{timezone: String}}) <  toDate({{endDate: String}})
      {where_market}
      AND isDelete = false
    GROUP BY date, platform, campaignId, adSetId, adId
    ORDER BY date
    """

    params = {
        'websiteId': website_id,
        'startDate': start_date,
        'endDate': end_date,
        'timezone': timezone,
    }
    if market != 'all':
        params['market'] = market

    result = client.query(query, parameters=params)

    return [
        AdMetric(
            date=row[0],
            platform=str(row[1]) if row[1] is not None else '',
            campaignId=str(row[2]) if row[2] is not None else '',
            campaignName='',
            adSetId=str(row[3]) if row[3] is not None else '',
            adSetName='',
            adId=str(row[4]) if row[4] is not None else '',
            adName='',
            spend=float(row[5]) if row[5] is not None else 0.0,
            impressions=int(row[6]) if row[6] is not None else 0,
            interactions=int(row[7]) if row[7] is not None else 0,
            clicks=int(row[8]) if row[8] is not None else 0,
            conversions=int(row[9]) if row[9] is not None else 0,
            conversionValue=float(row[10]) if row[10] is not None else 0.0,
        )
        for row in result.result_rows
    ]


# ============================================================
# SKU-level contributions (sku_weight + sku_gross_profit + share)
#   - FIXES:
#     1) No alias in WHERE (JSONExtractString(track,'model') instead)
#     2) Returns productId, productName, sku_weight (qty), share
#     3) sku_spend/impressions computed from fact_metrics * share
# ============================================================

def _get_sku_contrib(
    website_id: str,
    start_date: str,
    end_date: str,
    customer_type: str,
    market: str,
    timezone: str,
    modelType: str = 'linear',
    client=None,
) -> pd.DataFrame:
    if client is None:
        client = connect_to_clickhouse()

    where_parts = [
        'o.websiteId = {websiteId: String}',
        'toDate(o.orderedAt, {timezone: String}) >= toDate({startDate: String})',
        'toDate(o.orderedAt, {timezone: String}) <  toDate({endDate: String})'
    ]

    if market != 'all':
        where_parts.append('o.market = {market: String}')
    if customer_type == 'return':
        where_parts.append('o.customerFirstOrderedAt < toDateTime({startDate: String}, {timezone: String})')
    elif customer_type == 'new':
        where_parts.append('o.customerFirstOrderedAt >= toDateTime({startDate: String}, {timezone: String})')

    where_clause = ' AND '.join(where_parts)
    where_market_metrics = "AND market = {market: String}" if market != 'all' else ""

    query = f"""
    WITH
    order_sku AS (
        SELECT
            websiteId,
            orderId,
            productId,
            sum(orderProductQuantity) AS sku_qty,
            sum(grossProfit) AS sku_grossProfit
        FROM fact_order_products o FINAL
        WHERE {where_clause}
        GROUP BY websiteId, orderId, productId
    ),

    order_totals AS (
        SELECT
            o.websiteId AS websiteId,
            o.orderId   AS orderId,
            toDate(argMax(o.orderedAt, o.lastUpdatedAt), {{timezone: String}}) AS date,
            argMax(o.eventTracksJSON, o.lastUpdatedAt) AS eventTracksJSON
        FROM fact_order_intelligence o FINAL
        WHERE {where_clause}
        GROUP BY o.websiteId, o.orderId
    ),

    product_details AS (
        SELECT DISTINCT
            a.websiteId,
            a.productId,
            a.productGroupId,
            a.name AS productName,
            b.name AS productGroupName
        FROM (
            SELECT
                websiteId,
                productId,
                productGroupId,
                argMax(name, updatedAt) AS name
            FROM raw_products_new FINAL
            WHERE websiteId = {{websiteId: String}}
            GROUP BY websiteId, productId, productGroupId
        ) AS a
        LEFT JOIN (
            SELECT DISTINCT
                websiteId,
                productGroupId,
                name
            FROM raw_product_groups_new FINAL
            WHERE websiteId = {{websiteId: String}}
        ) AS b
            ON a.websiteId = b.websiteId
           AND a.productGroupId = b.productGroupId
    ),

    unwind_tracks AS (
        SELECT
            ot.websiteId AS websiteId,
            ot.orderId   AS orderId,
            ot.date      AS date,
            JSONExtractString(track, 'platform')    AS platform,
            JSONExtractString(track, 'campaignId')  AS campaignId,
            JSONExtractString(track, 'adSetId')     AS adSetId,
            JSONExtractString(track, 'adId')        AS adId,
            toFloat64OrZero(JSONExtractString(track, 'weight')) AS weight
        FROM order_totals ot
        ARRAY JOIN JSONExtractArrayRaw(ot.eventTracksJSON) AS track
        WHERE JSONExtractString(track, 'model') = {{modelType: String}}
    ),

    sku_contrib AS (
        SELECT
            t.websiteId as websiteId,
            t.date as date,
            t.platform as platform,
            t.campaignId as campaignId,
            t.adSetId as adSetId,
            t.adId as adId,
            s.productId as productId,
            pd.productGroupId as productGroupId,
            sum(s.sku_qty) AS sku_weight,
            sum(s.sku_grossProfit * t.weight) AS sku_grossProfit
        FROM unwind_tracks t
        INNER JOIN order_sku s
            ON s.websiteId = t.websiteId
           AND s.orderId   = t.orderId
        LEFT JOIN product_details pd
            ON pd.websiteId = t.websiteId
           AND pd.productId = s.productId
        GROUP BY
            websiteId, date, platform, campaignId, adSetId, adId, productId, productGroupId
    ),

    sku_share AS (
        SELECT
            websiteId,
            date,
            platform,
            campaignId,
            adSetId,
            adId,
            productId,
            productGroupId,
            sku_weight,
            sku_grossProfit,
            sku_weight
              / NULLIF(
                  sum(sku_weight) OVER (PARTITION BY date, platform, campaignId, adSetId, adId),
                  0
                ) AS share
        FROM sku_contrib
    ),

    metrics AS (
        SELECT
            toDate(timestamp, {{timezone: String}}) AS date,
            platform,
            campaignId,
            adSetId,
            adId,
            sum(toFloat64(spend)) AS spend,
            sum(impressions) AS impressions, 
            sum(interactions) AS interactions,
            sum(clicks) AS clicks
        FROM fact_metrics FINAL
        WHERE
            websiteId = {{websiteId: String}}
            AND toDate(timestamp, {{timezone: String}}) >= toDate({{startDate: String}})
            AND toDate(timestamp, {{timezone: String}}) <  toDate({{endDate: String}})
            {where_market_metrics}
            AND isDelete = false
        GROUP BY date, platform, campaignId, adSetId, adId
    )

    SELECT
        s.date as date,
        s.platform as platform,
        s.campaignId as campaignId,
        s.adSetId  as adSetId,
        s.adId as adId,

        s.productId as productId,
        s.productGroupId as productGroupId,

        s.sku_weight as sku_weight,
        s.share as share,

        m.spend * s.share       AS sku_spend,
        m.impressions * s.share AS sku_impressions,
        m.clicks * s.share       AS sku_clicks,

        s.sku_grossProfit       AS sku_grossProfit,

        pd.productName as productName,
        pd.productGroupName as productGroupName
    FROM sku_share s
    LEFT JOIN metrics m
        ON m.date       = s.date
       AND m.platform   = s.platform
       AND m.campaignId = s.campaignId
       AND m.adSetId    = s.adSetId
       AND m.adId       = s.adId
    LEFT JOIN product_details pd
        ON pd.websiteId = {{websiteId: String}}
       AND pd.productId = s.productId
    SETTINGS final = 1
    """

    params = {
        'websiteId': website_id,
        'startDate': start_date,
        'endDate': end_date,
        'timezone': timezone,
        'modelType': modelType,
    }
    if market != 'all':
        params['market'] = market

    return client.query_df(query, parameters=params)


# ============================================================
# SKU sales (all orders, including “organic”) from fact_order_products
# ============================================================

def _get_sku_sales(
    website_id: str,
    start_date: str,
    end_date: str,
    customer_type: str,
    market: str,
    timezone: str,
    client=None,
) -> List[Dict[str, Any]]:
    if client is None:
        client = connect_to_clickhouse()

    where_parts = [
        'o.websiteId = {websiteId: String}',
        'toDate(o.orderedAt, {timezone: String}) >= toDate({startDate: String})',
        'toDate(o.orderedAt, {timezone: String}) <  toDate({endDate: String})'
    ]

    if market != 'all':
        where_parts.append('o.market = {market: String}')
    if customer_type == 'return':
        where_parts.append('o.customerFirstOrderedAt < toDateTime({startDate: String}, {timezone: String})')
    elif customer_type == 'new':
        where_parts.append('o.customerFirstOrderedAt >= toDateTime({startDate: String}, {timezone: String})')

    where_clause = ' AND '.join(where_parts)

    query = f"""
    SELECT
      toDate(o.orderedAt, {{timezone: String}}) AS date,
      p.productId,
      sum(p.grossSales)                  AS grossSales,
      sum(p.productDiscounts)            AS productDiscounts,
      sum(p.orderRefundTotalPreShipping) AS orderRefundTotalPreShipping,
      sum(p.productRefundProportionated) AS productRefundProportionated,
      sum(p.netSales)                    AS netSales,
      sum(p.productPurchaseCost)         AS productPurchaseCost,
      sum(p.refundProductPurchaseCost)   AS refundProductPurchaseCost,
      sum(p.grossProfit)                 AS grossProfit,
      sum(p.variableCost)                AS variableCost,
      sum(p.trackCost)                   AS trackCost,
      sum(p.trackCostByCPC)              AS trackCostByCPC,
      sum(p.marginalContribution)        AS marginalContribution,
      sum(p.marginalContributionByCPC)   AS marginalContributionByCPC,
      sum(p.fixedCost)                   AS fixedCost,
      sum(p.netProfit)                   AS netProfit,
      sum(p.netProfitByCPC)              AS netProfitByCPC,
      countDistinct(o.orderId)           AS ordersCount
    FROM fact_order_intelligence o
    INNER JOIN fact_order_products p
      ON p.websiteId = o.websiteId
     AND p.orderId   = o.orderId
    WHERE {where_clause}
    GROUP BY date, p.productId
    SETTINGS final = 1
    """

    params = {
        'websiteId': website_id,
        'startDate': start_date,
        'endDate': end_date,
        'timezone': timezone,
    }
    if market != 'all':
        params['market'] = market

    result = client.query(query, parameters=params)

    rows: List[Dict[str, Any]] = []
    for row in result.result_rows:
        rows.append({
            'date': row[0],
            'productId': str(row[1]) if row[1] is not None else '',
            'grossSales': float(row[2]) if row[2] is not None else 0.0,
            'productDiscounts': float(row[3]) if row[3] is not None else 0.0,
            'orderRefundTotalPreShipping': float(row[4]) if row[4] is not None else 0.0,
            'productRefundProportionated': float(row[5]) if row[5] is not None else 0.0,
            'netSales': float(row[6]) if row[6] is not None else 0.0,
            'productPurchaseCost': float(row[7]) if row[7] is not None else 0.0,
            'refundProductPurchaseCost': float(row[8]) if row[8] is not None else 0.0,
            'grossProfit': float(row[9]) if row[9] is not None else 0.0,
            'variableCost': float(row[10]) if row[10] is not None else 0.0,
            'trackCost': float(row[11]) if row[11] is not None else 0.0,
            'trackCostByCPC': float(row[12]) if row[12] is not None else 0.0,
            'marginalContribution': float(row[13]) if row[13] is not None else 0.0,
            'marginalContributionByCPC': float(row[14]) if row[14] is not None else 0.0,
            'fixedCost': float(row[15]) if row[15] is not None else 0.0,
            'netProfit': float(row[16]) if row[16] is not None else 0.0,
            'netProfitByCPC': float(row[17]) if row[17] is not None else 0.0,
            'ordersCount': int(row[18]) if row[18] is not None else 0,
        })
    return rows


# ============================================================
# Klaviyo cost + ad status (kept as-is, minor tidy)
# ============================================================

def _get_klaviyo_cost(website_id: str, start_date: str, end_date: str, timezone: str, client=None) -> Optional[float]:
    if client is None:
        client = connect_to_clickhouse()

    query = """
    WITH daily_spends AS (
        SELECT
          toDate(arrayJoin(
            arrayMap(x -> startedAt + toIntervalDay(x), range(0, dateDiff('day', startedAt, endedAt)))
          ), {timezone: String}) AS date,
          ROUND(value / dateDiff('day', startedAt, endedAt), 4) AS spend
        FROM raw_cost_metrics m FINAL
        WHERE websiteId = {websiteId: String} AND code = 'klaviyo'
          AND endedAt >= toDate({startDate: String}) AND startedAt <= toDate({endDate: String})
    )
    SELECT ROUND(SUM(spend), 2) AS spend
    FROM daily_spends
    WHERE date >= toDate({startDate: String}) AND date < toDate({endDate: String})
    """

    result = client.query(query, parameters={
        'websiteId': website_id,
        'startDate': start_date,
        'endDate': end_date,
        'timezone': timezone
    })
    rows = result.result_rows
    return float(rows[0][0]) if rows and rows[0][0] is not None else None


def _get_ad_status(
    website_id: str,
    campaign_search_strings: List[str],
    klaviyo_search_strings: List[str],   # kept for parity even if unused
    ad_set_search_strings: List[str],
    ad_search_strings: List[str],
    client=None
) -> List[AMAdStatus]:
    if client is None:
        client = connect_to_clickhouse()

    max_count = max(
        len(campaign_search_strings),
        len(klaviyo_search_strings),
        len(ad_set_search_strings),
        len(ad_search_strings),
        1
    )
    batch_size = 1000
    all_results: List[AMAdStatus] = []

    for i in range(0, max_count, batch_size):
        batch_campaigns = campaign_search_strings[i:i + batch_size] or ['']
        batch_ad_sets = ad_set_search_strings[i:i + batch_size] or ['']
        batch_ads = ad_search_strings[i:i + batch_size] or ['']

        query = """
          WITH final_campaigns AS (
            SELECT 'campaign' AS type, c.platform, c.campaignId, '' AS adSetId, '' AS adId,
              argMax(c.name, c.insertedAt) AS name,
              argMax(c.productType, c.insertedAt) AS productType,
              argMax(CONCAT(c.budget, ' ', c.budgetPeriod), c.insertedAt) AS budget,
              argMax(toString(c.targetROAS), c.insertedAt) AS targetROAS,
              argMax(c.isActive, c.insertedAt) AS isActive
            FROM raw_campaigns c
            WHERE c.websiteId = {websiteId: String}
              AND c.campaignId IN ({campaignSearchStrings: Array(String)})
            GROUP BY c.websiteId, c.platform, c.campaignId
          ),
          final_ad_sets AS (
            SELECT 'adSet' AS type, a.platform, a.campaignId, a.adSetId, '' AS adId,
              argMax(a.name, a.insertedAt) AS name,
              '' AS productType, '' AS budget, '' AS targetROAS,
              argMax(a.isActive, a.insertedAt) AS isActive
            FROM raw_ad_sets a
            WHERE a.websiteId = {websiteId: String}
              AND a.adSetId IN ({adSetSearchStrings: Array(String)})
            GROUP BY a.websiteId, a.platform, a.campaignId, a.adSetId
          ),
          final_ads AS (
            SELECT 'ad' AS type, a.platform, a.campaignId, a.adSetId, a.adId,
              argMax(a.name, a.insertedAt) AS name,
              '' AS productType, '' AS budget, '' AS targetROAS,
              argMax(a.isActive, a.insertedAt) AS isActive
            FROM raw_ads a
            WHERE a.websiteId = {websiteId: String}
              AND a.adId IN ({adSearchStrings: Array(String)})
            GROUP BY a.websiteId, a.platform, a.campaignId, a.adSetId, a.adId
          )
          SELECT type, platform, campaignId, adSetId, adId, name, productType, budget, targetROAS, isActive
          FROM (
            SELECT * FROM final_campaigns
            UNION ALL SELECT * FROM final_ad_sets
            UNION ALL SELECT * FROM final_ads
          )
          SETTINGS final = 1
        """

        result = client.query(query, parameters={
            'websiteId': website_id,
            'campaignSearchStrings': batch_campaigns,
            'adSetSearchStrings': batch_ad_sets,
            'adSearchStrings': batch_ads
        })

        for row in result.result_rows:
            all_results.append(AMAdStatus(
                type=row[0],
                platform=row[1],
                campaignId=str(row[2]) if row[2] is not None else '',
                adSetId=str(row[3]) if row[3] is not None else '',
                adId=str(row[4]) if row[4] is not None else '',
                name=row[5] if row[5] is not None else '',
                productType=row[6] if row[6] is not None else '',
                budget=row[7] if row[7] is not None else '',
                targetROAS=float(row[8]) if row[8] and row[8] != '' else 0.0,
                isActive=bool(row[9])
            ))

    return all_results


# ============================================================
# Main API handler
# ============================================================

def main_api_handler_sku_attr(
    website_id: str,
    start_date: str,
    end_date: str,
    customer_type: str,
    market: str,
    timezone: str = 'UTC',
    campaign_prod: Optional[pd.DataFrame] = None,
    client=None
) -> Dict[str, Any]:
    if client is None:
        client = connect_to_clickhouse()

    # website timezone override
    tz_df = client.query_df(
        "SELECT DISTINCT timezone FROM websites FINAL WHERE websiteId = {websiteId: String}",
        parameters={'websiteId': website_id}
    )
    if not tz_df.empty and tz_df.iloc[0].get('timezone'):
        timezone = tz_df.iloc[0]['timezone']

    # 1) ad metrics (totals)
    ad_metrics = _get_ad_metrics(
        website_id=website_id,
        start_date=start_date,
        end_date=end_date,
        timezone=timezone,
        market=market,
        client=client
    )

    names_lookup = _get_latest_names(website_id, market, client)
    _apply_latest_names_to_metrics(ad_metrics, names_lookup)

    df_ad = pd.DataFrame([{
        'date': m.date,
        'platform': m.platform,
        'campaignId': str(m.campaignId),
        'campaignName': m.campaignName,
        'adSetId': str(m.adSetId),
        'adSetName': m.adSetName,
        'adId': str(m.adId),
        'adName': m.adName,
        'spend': m.spend,
        'impressions': m.impressions,
        'interactions': m.interactions,
        'clicks': m.clicks,
        'conversions': m.conversions,
        'conversionValue': m.conversionValue,
    } for m in ad_metrics])

    if not df_ad.empty:
        df_ad['date'] = pd.to_datetime(df_ad['date'])
        for c in ['campaignId', 'adSetId', 'adId', 'platform']:
            df_ad[c] = df_ad[c].astype(str)

    # 2) sku contrib (weights + share + attributed gp by sku + fair spend/impressions)
    df_alloc = _get_sku_contrib(
        website_id=website_id,
        start_date=start_date,
        end_date=end_date,
        customer_type=customer_type,
        market=market,
        timezone=timezone,
        client=client
    )

    if df_alloc is None or df_alloc.empty:
        klaviyo_cost = _get_klaviyo_cost(website_id, start_date, end_date, timezone, client)
        return {
            'adData': df_ad,
            'skuAllocation': pd.DataFrame(),
            'skuPerformance': pd.DataFrame(),
            'adStatus': pd.DataFrame(),
            'klaviyoCost': klaviyo_cost,
        }

    # normalize dtypes
    df_alloc['date'] = pd.to_datetime(df_alloc['date'])
    for col in ['productId', 'productGroupId', 'campaignId', 'adSetId', 'adId', 'platform']:
        if col in df_alloc.columns:
            df_alloc[col] = df_alloc[col].astype(str)

    # 3) join ad totals into allocation base (do NOT overwrite later)
    if not df_ad.empty:
        df_ad_for_join = df_ad[[
            'date', 'platform', 'campaignId', 'adSetId', 'adId',
            'spend', 'impressions', 'interactions', 'clicks', 'campaignName', 'adSetName', 'adName'
        ]].rename(columns={
            'spend': 'ad_spend_total',
            'impressions': 'ad_impressions_total',
            'interactions': 'ad_interactions_total',
            'clicks': 'ad_clicks_total'
        })

        df_alloc = df_alloc.merge(
            df_ad_for_join,
            on=['date', 'platform', 'campaignId', 'adSetId', 'adId'],
            how='left'
        )
    else:
        df_alloc['ad_spend_total'] = 0.0
        df_alloc['ad_impressions_total'] = 0.0
        df_alloc['ad_interactions_total'] = 0.0
        df_alloc['ad_clicks_total'] = 0.0
        df_alloc['campaignName'] = ''
        df_alloc['adSetName'] = ''
        df_alloc['adName'] = ''

    df_alloc['ad_spend_total'] = df_alloc['ad_spend_total'].fillna(0.0).astype(float)
    df_alloc['ad_impressions_total'] = df_alloc['ad_impressions_total'].fillna(0.0).astype(float)
    df_alloc['ad_interactions_total'] = df_alloc['ad_interactions_total'].fillna(0.0).astype(float)
    df_alloc['ad_clicks_total'] = df_alloc['ad_clicks_total'].fillna(0.0).astype(float)

    # 4) isLead tagging (optional)
    if campaign_prod is not None and not campaign_prod.empty:
        cp = campaign_prod.copy()
        if 'productGroupIds' in cp.columns:
            cp = cp.explode('productGroupIds').rename(columns={'productGroupIds': 'productGroupId'})

        for col in ['productGroupId', 'campaignId', 'adSetId', 'adId']:
            if col in cp.columns:
                cp[col] = cp[col].astype(str)

        cp['isLead'] = 1
        cp = cp.drop_duplicates(subset=['campaignId', 'adSetId', 'adId', 'productGroupId'])

        df_alloc = df_alloc.merge(
            cp[['campaignId', 'adSetId', 'adId', 'productGroupId', 'isLead']],
            on=['campaignId', 'adSetId', 'adId', 'productGroupId'],
            how='left'
        )
        df_alloc['isLead'] = df_alloc['isLead'].fillna(0).astype(int)
    else:
        df_alloc['isLead'] = 0

    # 5) compute weights + lead-only shares
    group_keys = ['date', 'platform', 'campaignId', 'adSetId', 'adId']

    df_alloc['sku_weight'] = pd.to_numeric(df_alloc['sku_weight'], errors='coerce').fillna(0.0)
    df_alloc['share'] = pd.to_numeric(df_alloc['share'], errors='coerce').fillna(0.0)

    df_alloc['total_weight_all'] = df_alloc.groupby(group_keys)['sku_weight'].transform('sum')
    df_alloc['lead_weight'] = np.where(df_alloc['isLead'] == 1, df_alloc['sku_weight'], 0.0)
    df_alloc['total_weight_lead'] = df_alloc.groupby(group_keys)['lead_weight'].transform('sum')

    # Only compute gross profit total from attributed rows
    df_alloc['ad_gross_profit_total'] = df_alloc.groupby(group_keys)['sku_grossProfit'].transform('sum')

    df_alloc['share_lead_only'] = np.where(
        df_alloc['total_weight_lead'] > 0,
        df_alloc['lead_weight'] / df_alloc['total_weight_lead'],
        0.0
    )

    # lead only (strict): allocate FULL ad totals only across lead SKUs if lead_weight exists
    df_alloc['spend_lead_only'] = df_alloc['ad_spend_total'] * df_alloc['share_lead_only']
    df_alloc['impressions_lead_only'] = df_alloc['ad_impressions_total'] * df_alloc['share_lead_only']
    df_alloc['clicks_lead_only'] = df_alloc['ad_clicks_total'] * df_alloc['share_lead_only']
    df_alloc['gross_profit_lead_only'] = df_alloc['ad_gross_profit_total'] * df_alloc['share_lead_only']

    # 6) skuAllocation output (fair = already proportional in SQL)
    keep_cols = [
        'date',
        'productId',
        'productGroupId',
        'productGroupName',
        'productName',
        'platform',
        'campaignId',
        'campaignName',
        'adSetId',
        'adSetName',
        'adId',
        'adName',
        'isLead',
        'sku_weight',
        'share',
        'sku_grossProfit',
        'sku_spend',
        'sku_impressions',
        'sku_clicks',
        'gross_profit_lead_only',
        'spend_lead_only',
        'impressions_lead_only',
        'clicks_lead_only',
    ]
    for c in keep_cols:
        if c not in df_alloc.columns:
            df_alloc[c] = np.nan

    df_sku_alloc = (
        df_alloc[keep_cols]
        .rename(columns={
            'sku_grossProfit': 'gross_profit_fair',
            'sku_spend': 'spend_fair',
            'sku_impressions': 'impressions_fair',
            'sku_clicks': 'clicks_fair',
        })
        .sort_values(['date', 'productId', 'platform', 'campaignId', 'adSetId', 'adId'])
        .reset_index(drop=True)
    )

    # 7) put gross profit total on df_ad (ad-day level) from df_alloc, not from df_sku_alloc missing columns
    if not df_ad.empty and not df_alloc.empty:
        keys = ['date', 'platform', 'campaignId', 'adSetId', 'adId']
        df_gp = (
            df_alloc.groupby(keys, as_index=False)['sku_grossProfit']
            .sum()
            .rename(columns={'sku_grossProfit': 'ad_gross_profit'})
        )
        df_ad = df_ad.merge(df_gp, on=keys, how='left')
        df_ad['ad_gross_profit'] = df_ad['ad_gross_profit'].fillna(0.0)

    # 8) sku sales (organic + all)
    sku_sales_rows = _get_sku_sales(
        website_id=website_id,
        start_date=start_date,
        end_date=end_date,
        customer_type=customer_type,
        market=market,
        timezone=timezone,
        client=client
    )
    df_sku_sales = pd.DataFrame(sku_sales_rows)
    if not df_sku_sales.empty:
        df_sku_sales['date'] = pd.to_datetime(df_sku_sales['date'])
        df_sku_sales['productId'] = df_sku_sales['productId'].astype(str)

    # 9) skuPerformance = sales + fair + lead-only
    if not df_sku_sales.empty and not df_sku_alloc.empty:
        agg = df_sku_alloc.groupby(['date', 'productId'], as_index=False).agg(
            sku_spend_fair=('spend_fair', 'sum'),
            sku_impressions_fair=('impressions_fair', 'sum'),
            sku_gross_profit_fair=('gross_profit_fair', 'sum'),
            sku_clicks_fair=('clicks_fair', 'sum'),

            sku_spend_lead_only=('spend_lead_only', 'sum'),
            sku_impressions_lead_only=('impressions_lead_only', 'sum'),
            sku_gross_profit_lead_only=('gross_profit_lead_only', 'sum'),
            sku_clicks_lead_only=('clicks_lead_only', 'sum'),
        )

        df_sku_perf = df_sku_sales.merge(agg, on=['date', 'productId'], how='left')

        fill_cols = [c for c in agg.columns if c not in ('date', 'productId')]
        for c in fill_cols:
            df_sku_perf[c] = df_sku_perf[c].fillna(0.0)

        # attach dims
        dims = df_sku_alloc[['productId', 'productGroupId', 'productGroupName', 'productName']].drop_duplicates('productId')
        df_sku_perf = df_sku_perf.merge(dims, on='productId', how='left')
        df_sku_perf = df_sku_perf.sort_values(['date', 'productId']).reset_index(drop=True)
    else:
        df_sku_perf = pd.DataFrame()

    # 10) ad status + klaviyo cost
    campaign_ids = list({m.campaignId for m in ad_metrics if m.campaignId})
    adset_ids = list({m.adSetId for m in ad_metrics if m.adSetId})
    ad_ids = list({m.adId for m in ad_metrics if m.adId})

    ad_status_rows = _get_ad_status(
        website_id=website_id,
        campaign_search_strings=[str(x) for x in campaign_ids],
        klaviyo_search_strings=[],
        ad_set_search_strings=[str(x) for x in adset_ids],
        ad_search_strings=[str(x) for x in ad_ids],
        client=client
    )

    df_ad_status = pd.DataFrame([{
        'type': s.type,
        'platform': s.platform,
        'campaignId': s.campaignId,
        'adSetId': s.adSetId,
        'adId': s.adId,
        'name': s.name,
        'productType': s.productType,
        'budget': s.budget,
        'targetROAS': s.targetROAS,
        'isActive': s.isActive,
    } for s in ad_status_rows])

    klaviyo_cost = _get_klaviyo_cost(website_id, start_date, end_date, timezone, client)

    # return
    if not df_ad.empty:
        df_ad = df_ad.sort_values(['date', 'platform', 'campaignId', 'adSetId', 'adId']).reset_index(drop=True)

    return {
        'adData': df_ad,
        'skuAllocation': df_sku_alloc,
        'skuPerformance': df_sku_perf,
        'adStatus': df_ad_status,
        'klaviyoCost': klaviyo_cost,
    }


In [3]:
# ============================================================
# ClickHouse utils with AWS Secrets (optional)
# ============================================================
from botocore.config import Config 
from botocore.exceptions import ClientError, ParamValidationError
import boto3
import json, joblib



def initialize_credentials(
    secret_name="SHARED_LAMBDA_CREDENTIALS",
    region_name="ap-southeast-2",
    timeout=15,
    profile_name="default"
):
    """
    Initialize the global credentials by calling get_secret_with_retry.
    """
    global credentials
    credentials = get_secret_with_retry(secret_name, region_name, timeout, profile_name)[0]
    return credentials


def get_secret_with_retry(
    secret_name="SHARED_LAMBDA_CREDENTIALS",
    region_name="ap-southeast-2",
    timeout=15,
    profile_name="default"
):
    """
    Retrieves a secret from AWS Secrets Manager with retry logic and a timeout.
    """
    config = Config(connect_timeout=timeout, read_timeout=timeout)
    try:
        session = boto3.session.Session(profile_name=profile_name, region_name=region_name)
        client = session.client(service_name='secretsmanager', region_name=region_name, config=config)

        resp = client.get_secret_value(
            SecretId=secret_name,
            VersionStage='AWSCURRENT'
        )
        secret = resp['SecretString']
        secret_dict = json.loads(secret)
        return secret_dict, secret
    except Exception:
        # Fallback to default session (e.g. Lambda)
        session = boto3.session.Session(region_name=region_name)
        client = session.client(service_name='secretsmanager', region_name=region_name, config=config)
        resp = client.get_secret_value(
            SecretId=secret_name,
            VersionStage='AWSCURRENT'
        )
        secret = resp['SecretString']
        secret_dict = json.loads(secret)
        return secret_dict, secret


def create_clickhouse_client(credentials, database='profitpeak'):
    """
    Create a ClickHouse client using credentials from Secrets Manager.
    Handles vpce.* host rewriting as fallback.
    """
    original_host = credentials['CLICKHOUSE_URL'].replace('https://', '')
    port = int(credentials['CLICKHOUSE_PORT'])
    user = credentials['CLICKHOUSE_USER']
    password = credentials['CLICKHOUSE_PASSWORD']

    try:
        return clickhouse_connect.get_client(
            host=original_host,
            port=port,
            secure=True,
            username=user,
            verify=False,
            password=password,
            database=database
        )
    except Exception as e:
        print(f"First attempt failed with host='{original_host}': {e}")
        alternative_host = original_host.replace('vpce.', '')
        if alternative_host != original_host:
            print(f"Trying again after removing 'vpce.': {alternative_host}")
            try:
                return clickhouse_connect.get_client(
                    host=alternative_host,
                    port=port,
                    secure=True,
                    username=user,
                    verify=False,
                    password=password,
                    database=database
                )
            except Exception as e2:
                print(f"Second attempt also failed with host='{alternative_host}': {e2}")
                raise
        else:
            raise


In [4]:
credentials = initialize_credentials(profile_name='live')
client = create_clickhouse_client(credentials)

Unexpected Http Driver Exception


First attempt failed with host='vuh9osomeb.ap-southeast-2.vpce.aws.clickhouse.cloud': Error HTTPSConnectionPool(host='vuh9osomeb.ap-southeast-2.vpce.aws.clickhouse.cloud', port=8443): Max retries exceeded with url: /? (Caused by NameResolutionError("HTTPSConnection(host='vuh9osomeb.ap-southeast-2.vpce.aws.clickhouse.cloud', port=8443): Failed to resolve 'vuh9osomeb.ap-southeast-2.vpce.aws.clickhouse.cloud' ([Errno -2] Name or service not known)")) executing HTTP request attempt 1 (https://vuh9osomeb.ap-southeast-2.vpce.aws.clickhouse.cloud:8443)
Trying again after removing 'vpce.': vuh9osomeb.ap-southeast-2.aws.clickhouse.cloud


In [5]:
pd.set_option('display.max_columns', None)

In [6]:
campaign_prod = joblib.load('df_final_lead_prods_campaign.pkl')
campaign_prod.head(3)

Unnamed: 0,destinationUrl_item,productGroupIds,names,matched_words_collection_list,matched_words_url_list,destinationUrl,campaignId,adSetId,adId,adName,adSetName,campaignName
0,babyboofashion.com/en-us/collections/afterpay-...,"[1882382893119, 1882383089727, 1882491387967, ...","[Ada Mini Dress - Red, Adella Mini Dress - Ivo...",[afterpay],[afterpayday],babyboofashion.com/en-us/collections/afterpay-...,1826992347106322,1826992347106338,1826992348120193,ad1,Pure Pixel,Product Sales | Catalog | Seasonal Sale 25% | ...
1,http://babyboofashion.com/collections/bodycon-...,"[11233141268, 1397312487487, 142584643604, 143...","[Ada Mini Dress - Red, Addison Mini Dress - Ba...",[bodycondresses],[bodycondresses],"[""http://babyboofashion.com/collections/bodyco...",2088184682,114427534683,485179032363,,,
2,http://babyboofashion.com/collections/bodycon-...,"[11233141268, 1397312487487, 142584643604, 143...","[Ada Mini Dress - Red, Addison Mini Dress - Ba...",[bodycondresses],[bodycondresses],"[""http://babyboofashion.com/collections/bodyco...",2088184682,114427534683,485179032366,,,


In [7]:
initialize_credentials(profile_name='live')
client = create_clickhouse_client(credentials)
res = main_api_handler_sku_attr(website_id='6839260124a2adf314674a5e', start_date='2025-10-01', end_date='2025-12-01', customer_type='all', market = 'all', client=client, campaign_prod=campaign_prod)


Unexpected Http Driver Exception


First attempt failed with host='vuh9osomeb.ap-southeast-2.vpce.aws.clickhouse.cloud': Error HTTPSConnectionPool(host='vuh9osomeb.ap-southeast-2.vpce.aws.clickhouse.cloud', port=8443): Max retries exceeded with url: /? (Caused by NameResolutionError("HTTPSConnection(host='vuh9osomeb.ap-southeast-2.vpce.aws.clickhouse.cloud', port=8443): Failed to resolve 'vuh9osomeb.ap-southeast-2.vpce.aws.clickhouse.cloud' ([Errno -2] Name or service not known)")) executing HTTP request attempt 1 (https://vuh9osomeb.ap-southeast-2.vpce.aws.clickhouse.cloud:8443)
Trying again after removing 'vpce.': vuh9osomeb.ap-southeast-2.aws.clickhouse.cloud


KeyboardInterrupt: 

In [None]:
# website_id = '6839260124a2adf314674a5e'
# tz_df = client.query_df(
#     f"SELECT DISTINCT timezone FROM websites FINAL WHERE websiteId = '{website_id}'"
# )
# if not tz_df.empty and tz_df.iloc[0]['timezone']:
#     timezone = tz_df.iloc[0]['timezone']


# df_alloc = _get_sku_contrib(
#     website_id='6839260124a2adf314674a5e',
#     start_date='2025-10-01', end_date='2025-12-01', customer_type='all', market = 'all', 
#     timezone=timezone,
#     client=client
# )

# if campaign_prod is not None and not campaign_prod.empty:
#     cp = campaign_prod.copy()  
#     if 'productGroupIds' in cp.columns:
#         cp = cp.explode('productGroupIds').rename(columns={'productGroupIds': 'productGroupId'})
#     for col in ['productGroupId', 'campaignId', 'adSetId', 'adId']:
#         cp[col] = cp[col].astype(str)                 
#     cp['isLead'] = 1
#     cp = cp.drop_duplicates(subset=['campaignId', 'adSetId', 'adId', 'productGroupId'])

#     print(df_alloc.shape)
#     df_alloc = df_alloc.merge(
#         cp[['campaignId', 'adSetId', 'adId', 'productGroupId', 'isLead']],
#         on=['campaignId', 'adSetId', 'adId', 'productGroupId'],
#         how='left'
#     )
#     df_alloc['isLead'] = df_alloc['isLead'].fillna(0).astype(int)
#     print(df_alloc.shape)
# else:
#     df_alloc['isLead'] = 0

In [None]:
temp = res['skuAllocation']
temp.fillna('', inplace=True)

temp.to_csv('sku_allocation_v2.csv', index = False)
# temp.head(20)
# find duplicates with same date and productId
# temp[temp.duplicated(subset=['date', 'productGroupId', 'campaignId', 'adSetId', 'adId', 'platform'], keep=False)][temp.platform == 'google']
# temp[(temp.productGroupId == '7181358956607') & (temp.date == '2025-10-01	') & (temp.adId == '659231372798')]

In [8]:
temp = pd.read_csv('sku_allocation_v2.csv')

In [9]:
temp.head(30)

Unnamed: 0,date,productId,productGroupId,productGroupName,productName,platform,campaignId,campaignName,adSetId,adSetName,adId,adName,isLead,sku_weight,share,gross_profit_fair,spend_fair,impressions_fair,clicks_fair,gross_profit_lead_only,spend_lead_only,impressions_lead_only,clicks_lead_only
0,2025-10-01,31423777374271,4409129271359,Allie Mini Dress - White,M,direct,,,,,,,0,9,0.001237,8.01819,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,2025-10-01,31423777374271,4409129271359,Allie Mini Dress - White,M,google,20157356312,,150915637364,,659104112618,,1,2,0.086957,37.512,12.705043,109.130435,8.869565,67.636196,13.282545,114.090909,9.272727
2,2025-10-01,31423777374271,4409129271359,Allie Mini Dress - White,M,google,21934620170,AU | PMAX | Generic [BE]],,,,,0,2,0.006349,37.512,9.375897,1184.203175,15.174603,0.0,0.0,0.0,0.0
3,2025-10-01,31423777374271,4409129271359,Allie Mini Dress - White,M,organic_referral,couponfollow,,,,,,0,3,0.014778,2.67273,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,2025-10-01,31423777374271,4409129271359,Allie Mini Dress - White,M,organic_referral,simplycodes,,,,,,0,3,0.028037,2.67273,0.0,0.0,0.0,0.0,0.0,0.0,0.0
5,2025-10-01,31423777374271,4409129271359,Allie Mini Dress - White,M,organic_referral,wethrift,,,,,,0,1,0.043478,0.89091,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6,2025-10-01,31423777374271,4409129271359,Allie Mini Dress - White,M,organic_search,google,,,,,,0,3,0.00099,2.67273,0.0,0.0,0.0,0.0,0.0,0.0,0.0
7,2025-10-01,31423777374271,4409129271359,Allie Mini Dress - White,M,organic_unclassified,retailmenot,,,,,,0,2,0.009174,1.78182,0.0,0.0,0.0,0.0,0.0,0.0,0.0
8,2025-10-01,33329682874431,4920899338303,Seamless Clear Strap Thong - Nude,XS,klaviyo,campaign,,email,,01K4YA48NHYQEE8P2EEFC9FMTA,,0,2,0.5,19.37,0.0,0.0,0.0,0.0,0.0,0.0,0.0
9,2025-10-01,33329682874431,4920899338303,Seamless Clear Strap Thong - Nude,XS,meta,6863214555659,Newness: Collection Ads | Stories & Reels Fun...,6880632381659,Cold #2 17/09/2025 Story & Reels,6882810556659,Paula Black Reaction | Instagram | ED: 24/12/2025,0,2,0.5,19.37,33.03,1449.0,77.0,0.0,0.0,0.0,0.0


In [13]:
temp[['campaignName', 'campaignId']][(temp.campaignName.isnull()) & (temp.platform == 'google')].drop_duplicates().campaignId.tolist()

['20157356312',
 '20168349790',
 '18589491816',
 '20234327407',
 '20153153907',
 '22712563199',
 nan,
 '20224960311',
 '22111399894',
 '22885369836',
 '22887454236',
 '20160594940',
 '22472868880',
 '20266748620',
 '23046959893',
 '20587831882',
 '20160582946',
 '20168297014',
 '23208825318',
 '23218208347',
 '22905057599',
 '23039486380',
 '23294695607',
 '20952555014',
 '23330211787',
 '22798369148',
 '22897293520',
 '23226608306',
 '23321089392',
 '20160601141',
 '20259405484',
 '23321064627',
 '23326130579',
 '23293065116']

In [None]:
res['skuAllocation'][['gross_profit_fair', 'gross_profit_lead_only', 'spend_fair', 'spend_lead_only', \
                      'impressions_fair', 'impressions_lead_only', 'clicks_fair', 'clicks_lead_only']].sum()

In [None]:
temp[(temp.campaigId == '22472868880') & (temp.platform == 'google')][['campaignId', 'adSetId', 'adId', 'date']].\
    groupby(['campaignId', 'adSetId', 'adId'])['date'].agg('min').reset_index().sort_values('date', ascending = False).head(20)

In [None]:
# ### check the different in amount 
# temp1 = res['skuAllocation'].groupby(['date', 'platform'], observed= False)[['ad_spend', 'ad_gross_profit']].agg('sum').reset_index()
# temp1 = temp1[temp1.ad_spend > 0]
# temp2 = res['adData'].groupby(['date', 'platform'], observed= False)[['spend']].agg('sum').reset_index()

# temp3 = temp1.merge(temp2, on= ['date', 'platform'], how= 'left')
# temp3[np.abs(temp3.ad_spend - temp3.spend) > 1000]


In [None]:
# cp = campaign_prod.copy()
# if 'productGroupIds' in cp.columns:
#     cp = cp.explode('productGroupIds')
#     cp = cp.rename(columns={'productGroupIds': 'productGroupId'})

# cp['productGroupId'] = cp['productGroupId'].astype(str)
# print("campaign_prod duplicate keys:", cp.duplicated(subset=["campaignId","adSetId","adId","productGroupId"]).sum())


In [None]:
# import joblib 

# res1 = joblib.load('res1.pkl')
# res1 = res1['skuAllocation']

# res2 = res['skuAllocation']

# k = ["date","platform","campaignId","adSetId","adId"]
# cmp = res1.merge(res2, on=k, how="outer", suffixes=("_s1","_s2")).fillna(0)
