In [None]:

# Snowpark Python bootstrap: pull file from stage, (Excel→CSV if needed), profile, chunk if large, and push CSV back to stage.
# References: Python worksheets & Snowpark session/file APIs (docs)  [1](https://docs.snowflake.com/en/developer-guide/snowpark/python/python-worksheets)[2](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.FileOperation.put)

import os, io, time, json, logging
from datetime import datetime
import pandas as pd
from pandas.api.types import is_numeric_dtype, is_datetime64_any_dtype
from snowflake.snowpark.context import get_active_session

# ---- Config (edit if needed) ----
DB = "BCG"
SCHEMA = "BCG_SCHEMA"                  # space in name → keep quotes in SQL cells
STAGE = "BCG_STAGE"
SRC_FILE = "Data for POC project data 1 (1).csv"  # current staged file name
TARGET_CSV = "data_for_poc_project_data_1.csv"    # normalized CSV name in stage
LOCAL_DIR = "/tmp/pb_ingest"
os.makedirs(LOCAL_DIR, exist_ok=True)

session = get_active_session()
logging.getLogger().setLevel(logging.INFO)

def stage_url(db, schema, stage, file=None):
    quoted_schema = f'"{schema}"'
    base = f'@{db}.{quoted_schema}.{stage}'
    return f"{base}/{file}" if file else base

# ---- Download from stage ----
t0 = time.time()
get_res = session.file.get(stage_url(DB, SCHEMA, STAGE, SRC_FILE), LOCAL_DIR)
logging.info(f"GET results: {get_res}")
local_path = os.path.join(LOCAL_DIR, SRC_FILE)
assert os.path.exists(local_path), f"Local file not found: {local_path}"

# ---- Read & convert (supports both .xlsx and .csv) ----
ext = os.path.splitext(local_path)[1].lower()
if ext in [".xlsx", ".xls"]:
    df = pd.read_excel(local_path, engine="openpyxl")
    # Write to normalized CSV path
    csv_path = os.path.join(LOCAL_DIR, TARGET_CSV)
    df.to_csv(csv_path, index=False)
    logging.info(f"Converted Excel → CSV: {csv_path}")
else:
    # Already CSV: load and re-save to normalized name to enforce consistent naming
    df = pd.read_csv(local_path)
    csv_path = os.path.join(LOCAL_DIR, TARGET_CSV)
    df.to_csv(csv_path, index=False)
    logging.info(f"CSV normalized: {csv_path}")

# ---- Profile schema & emit data dictionary ----
profile = []
n_rows, n_cols = df.shape
for col in df.columns:
    series = df[col]
    nulls = int(series.isna().sum())
    null_pct = round((nulls / len(series)) * 100, 2) if len(series) else 0.0
    card = int(series.nunique(dropna=True))
    dtype = str(series.dtype)

    min_val = max_val = None
    samples = None
    if is_numeric_dtype(series):
        s = pd.to_numeric(series, errors="coerce")
        min_val = float(s.min()) if s.notna().any() else None
        max_val = float(s.max()) if s.notna().any() else None
    elif is_datetime64_any_dtype(series):
        s = pd.to_datetime(series, errors="coerce")
        min_val = s.min().isoformat() if pd.notna(s.min()) else None
        max_val = s.max().isoformat() if pd.notna(s.max()) else None
    else:
        s = series.astype(str)
        lengths = s.where(series.notna(), None).dropna().str.len()
        min_val = int(lengths.min()) if len(lengths) else None
        max_val = int(lengths.max()) if len(lengths) else None
        samples = list(series.dropna().astype(str).unique()[:5])

    profile.append({
        "column": col, "dtype": dtype, "nulls": nulls, "null_pct": null_pct,
        "cardinality": card, "min": min_val, "max": max_val, "samples": samples
    })

summary = {"rows": n_rows, "columns": n_cols, "data_dictionary": profile}
dict_path = os.path.join(LOCAL_DIR, "data_profile_summary.json")
with open(dict_path, "w") as f:
    json.dump(summary, f, indent=2)
logging.info(f"Profile saved: {dict_path}")

# ---- Chunk large files (rare for 3k rows; included for robustness) ----
# If file > ~100MB, split into chunks; else single file.
chunks = [csv_path]
size_mb = os.path.getsize(csv_path) / (1024 * 1024)
if size_mb > 100:
    chunks = []
    CHUNK_ROWS = 250_000
    for i, start in enumerate(range(0, len(df), CHUNK_ROWS)):
        part = df.iloc[start:start+CHUNK_ROWS]
        part_path = os.path.join(LOCAL_DIR, f"{os.path.splitext(TARGET_CSV)[0]}_part{i+1}.csv")
        part.to_csv(part_path, index=False)
        chunks.append(part_path)
    logging.info(f"Chunked into {len(chunks)} part(s).")

# ---- Upload CSV back to stage (overwrite, auto-compress for transport) ----
put_results = []
for path in chunks:
    put_results += session.file.put(
        path,
        stage_url(DB, SCHEMA, STAGE),
        auto_compress=True,
        overwrite=True
    )
logging.info(f"PUT results: {put_results}")  # session.file.put docs  [2](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.FileOperation.put)

# ---- Timing & LIST sanity check ----
elapsed = round(time.time() - t0, 2)
print(json.dumps({
    "rows": n_rows, "cols": n_cols, "uploaded_files": [os.path.basename(p) for p in chunks],
    "elapsed_sec": elapsed
}, indent=2))

# Optional: list stage files for confirmation (SQL LIST command is available separately in Cell 2).
# Staging/PUT guidance  [3](https://docs.snowflake.cn/en/user-guide/data-load-local-file-system-stage)


In [None]:


    USE WAREHOUSE COMPUTE_WH;
    USE DATABASE BCG;
    USE SCHEMA "BCG_SCHEMA";

    CREATE STAGE IF NOT EXISTS BCG_STAGE COMMENT = 'POC staging for billing/performance workbook';

    CREATE OR REPLACE RESOURCE MONITOR RM_PB_WH WITH CREDIT_QUOTA = 150
      TRIGGERS ON 80 PERCENT DO NOTIFY
               ON 100 PERCENT DO SUSPEND;


    CREATE TAG IF NOT EXISTS LINEAGE_SOURCE;
    CREATE TAG IF NOT EXISTS DATA_CLASSIFICATION;
    ALTER STAGE BCG_STAGE SET TAG LINEAGE_SOURCE = 'poc_upload', DATA_CLASSIFICATION = 'non_pii';


In [None]:

# Upload normalized CSV(s) from local temp to @BCG."BCG SCHEMA".BCG_STAGE and list files.
# Snowpark FileOperation.put reference  [2](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.FileOperation.put)
# Staging/LIST reference  [3](https://docs.snowflake.cn/en/user-guide/data-load-local-file-system-stage)

from snowflake.snowpark.context import get_active_session
session = get_active_session()

DB, SCHEMA, STAGE = "BCG", "BCG_SCHEMA", "BCG_STAGE"
quoted_schema = f'"{SCHEMA}"'
stage_base = f'@{DB}.{quoted_schema}.{STAGE}'

# Re-list the stage after uploads (Cell 0 already put the CSV).
res = session.sql(f"LIST {stage_base}").collect()
print({"stage": stage_base, "file_count": len(res), "files": [r[1] for r in res]})


In [None]:


  CREATE OR REPLACE FILE FORMAT FF_CSV TYPE = CSV SKIP_HEADER = 1 FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = '\"' NULL_IF = ('\\N','NULL','',' ') COMPRESSION = 'AUTO' COMMENT = 'CSV file format for POC billing';
  CREATE OR REPLACE FILE FORMAT FF_JSON TYPE = JSON COMMENT = 'For extensibility (disabled by default)';



In [None]:

CREATE OR REPLACE TABLE T_BILLING_LANDING (
  ProjectCode STRING,
  ProjectName STRING,
  ClientName STRING,
  EmpID NUMBER(38,0),
  EmpName STRING,
  WeekStartDate NUMBER(38,0),   -- Excel serial → convert in curated view
  WeekEndDate   NUMBER(38,0),   -- Excel serial → convert in curated view
  CurrencyCode STRING,
  HoursFilled NUMBER(18,2),
  DailyHourRate NUMBER(18,2),
  NetClientCharges NUMBER(18,2),
  GrossClientCharges NUMBER(18,2),
  PartnerEffortHours NUMBER(18,2),
  PartnerCost NUMBER(18,2),
  PartnerRevenue NUMBER(18,2),
  TotalRevenue NUMBER(18,2),
  Country STRING,
  City STRING,
  BillingStatus STRING,
  InvoiceNumber STRING,
  Department STRING,
  Role STRING,
  ProjectStartDate NUMBER(38,0), -- Excel serial
  ProjectEndDate NUMBER(38,0),   -- Excel serial
  "Utilization%" NUMBER(18,2),
  "DiscountApplied%" NUMBER(18,2),
  TaxAmount NUMBER(18,2),
  BillingCycle STRING,
  ProjectStatus STRING,
  ClientIndustry STRING,
  ManagerName STRING,
  CostCenter STRING
) COMMENT='Landing table: raw types from CSV (Excel serial dates), curated view will cast & validate';


In [None]:

# Cell 3.1 — Build & load weekly FX (landing first, fallback to stage CSV) — PATCHED
from snowflake.snowpark.context import get_active_session
import pandas as pd

session = get_active_session()

# Context
session.sql("USE WAREHOUSE COMPUTE_WH").collect()
session.sql("USE DATABASE BCG").collect()
session.sql("USE SCHEMA BCG_SCHEMA").collect()

# --- Ensure T_FX_RATES_WEEKLY exists, and if it exists with 3 cols, add the 4th ---
# 1) Create the table if it doesn't exist at all (with the full 4-column schema)
session.sql("""
CREATE TABLE IF NOT EXISTS BCG.BCG_SCHEMA.T_FX_RATES_WEEKLY (
  currency_code        STRING,
  week_start_d         DATE,
  rate_usd_per_curr    NUMBER(18,9),  -- USD per 1 local unit
  rate_curr_per_usd    NUMBER(18,9)   -- Local per 1 USD (derived as 1 / rate_usd_per_curr)
)
""").collect()

# 2) If the table exists but is still the old 3-column schema, add the 4th column
col_exists = session.sql("""
SELECT COUNT(*) AS c
FROM BCG.INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'BCG_SCHEMA'
  AND TABLE_NAME   = 'T_FX_RATES_WEEKLY'
  AND COLUMN_NAME  = 'RATE_CURR_PER_USD'
""").collect()[0][0]

if col_exists == 0:
    session.sql("""
    ALTER TABLE BCG.BCG_SCHEMA.T_FX_RATES_WEEKLY
    ADD COLUMN rate_curr_per_usd NUMBER(18,9)
    """).collect()

# Idempotent load: clear table to avoid duplicates on re-run
session.sql("TRUNCATE TABLE BCG.BCG_SCHEMA.T_FX_RATES_WEEKLY").collect()

# Determine source availability
landing_count = session.sql("SELECT COUNT(*) AS c FROM BCG.BCG_SCHEMA.T_BILLING_LANDING").collect()
use_stage = (len(landing_count) == 0) or (landing_count[0][0] == 0)

# Pull distinct (currency_code, week_start_d) from the best source
if not use_stage:
    qry = """
    SELECT DISTINCT
      TRIM(CurrencyCode) AS currency_code,
      COALESCE(
        TRY_TO_DATE(WeekStartDate),
        DATEADD('day', TRY_TO_NUMBER(WeekStartDate), '1899-12-30')
      ) AS week_start_d
    FROM BCG.BCG_SCHEMA.T_BILLING_LANDING
    WHERE CurrencyCode IS NOT NULL
      AND (TRY_TO_DATE(WeekStartDate) IS NOT NULL OR TRY_TO_NUMBER(WeekStartDate) IS NOT NULL)
    """
    df = session.sql(qry).to_pandas()
else:
    # Fallback: read from staged CSV using FF_CSV (created in Cell 4_1)
    qry_stage = """
    SELECT DISTINCT
      TRIM($8) AS currency_code,
      COALESCE(
        TRY_TO_DATE($6),
        DATEADD('day', TRY_TO_NUMBER($6), '1899-12-30')
      ) AS week_start_d
    FROM @BCG.BCG_SCHEMA.BCG_STAGE/data_for_poc_project_data_1.csv
    ( FILE_FORMAT => 'FF_CSV' )
    WHERE $8 IS NOT NULL
      AND (TRY_TO_DATE($6) IS NOT NULL OR TRY_TO_NUMBER($6) IS NOT NULL)
    """
    df = session.sql(qry_stage).to_pandas()

# Normalize and validate
df.columns = [c.lower() for c in df.columns]
if df.empty:
    raise ValueError("No (currency_code, week_start_d) found from landing or staged CSV. Confirm file name and column positions ($6=WeekStartDate, $8=CurrencyCode).")

# Synthetic FX map (edit/extend as needed)
FX_MAP = {
    "USD": 1.00,  # USD per USD
    "INR": 0.012, # USD per INR
    "EUR": 1.10,
    "GBP": 1.27,
    "AUD": 0.68,
    "CAD": 0.74,
    "SGD": 0.75,
    "AED": 0.27,
    "CHF": 1.25
}

def usd_per_local(curr: str):
    curr = (curr or "").upper().strip()
    return FX_MAP.get(curr, None)

df["currency_code"] = df["currency_code"].str.upper().str.strip()
df["rate_usd_per_curr"] = df["currency_code"].apply(usd_per_local)
df["rate_curr_per_usd"] = df["rate_usd_per_curr"].apply(
    lambda r: None if r is None or r == 0 else round(1.0 / float(r), 9)
)

# QA: list missing currencies (no rate found)
missing_currencies = sorted(set(df.loc[df["rate_usd_per_curr"].isna(), "currency_code"].tolist()))
if missing_currencies:
    print("⚠️ No FX rate configured for currencies:", missing_currencies)

# Keep only rows with a valid rate; drop dupes on key
df_nonnull = (
    df[df["rate_usd_per_curr"].notnull()]
      .drop_duplicates(subset=["currency_code", "week_start_d"])
      .copy()
)

if df_nonnull.empty:
    raise ValueError("All currencies missing FX rates. Populate FX_MAP or load a real FX source.")

# Persist to stage and COPY into the table with an explicit column list
csv_path = "/tmp/fx_rates_weekly.csv"
df_nonnull = df_nonnull[["currency_code", "week_start_d", "rate_usd_per_curr", "rate_curr_per_usd"]]
df_nonnull.to_csv(csv_path, index=False)

# Ensure stage exists (in case Cell 4_1 hasn't run yet)
session.sql("CREATE STAGE IF NOT EXISTS BCG.BCG_SCHEMA.BCG_STAGE").collect()

session.file.put(csv_path, '@BCG.BCG_SCHEMA.BCG_STAGE', overwrite=True, auto_compress=False)

session.sql("""
COPY INTO BCG.BCG_SCHEMA.T_FX_RATES_WEEKLY (currency_code, week_start_d, rate_usd_per_curr, rate_curr_per_usd)
FROM @BCG.BCG_SCHEMA.BCG_STAGE
FILES = ('fx_rates_weekly.csv')
FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1, FIELD_DELIMITER = ',')
ON_ERROR = 'ABORT_STATEMENT'
""").collect()

# Verify
session.sql("SELECT COUNT(*) AS rows_loaded FROM BCG.BCG_SCHEMA.T_FX_RATES_WEEKLY").show()
session.sql("""
SELECT currency_code, week_start_d, rate_usd_per_curr, rate_curr_per_usd
FROM BCG.BCG_SCHEMA.T_FX_RATES_WEEKLY
ORDER BY currency_code, week_start_d
LIMIT 10
""").show()

# Optional: surface missing currencies to a Snowflake temp table for quick review
if missing_currencies:
    session.sql("CREATE OR REPLACE TEMP TABLE MISSING_FX_CURRENCIES (currency_code STRING)").collect()
    values_clause = ", ".join([f"('{c}')" for c in missing_currencies])
    session.sql(f"INSERT INTO MISSING_FX_CURRENCIES VALUES {values_clause}").collect()
    session.sql("SELECT * FROM MISSING_FX_CURRENCIES").show()


In [None]:

-- Cell 4_1 — UPDATED

USE WAREHOUSE COMPUTE_WH;
USE DATABASE BCG;
USE SCHEMA BCG_SCHEMA;

-- Ensure stage & CSV file format exist (used by Cell 3_1 fallback)
CREATE STAGE IF NOT EXISTS BCG.BCG_SCHEMA.BCG_STAGE;
CREATE FILE FORMAT IF NOT EXISTS BCG.BCG_SCHEMA.FF_CSV
  TYPE = 'CSV'
  SKIP_HEADER = 1
  FIELD_DELIMITER = ','
  FIELD_OPTIONALLY_ENCLOSED_BY = '"'
  NULL_IF = ('', 'NULL');

-- Landing table for raw ingestion
CREATE OR REPLACE TABLE BCG.BCG_SCHEMA.T_BILLING_LANDING (
  ProjectCode STRING,
  ProjectName STRING,
  ClientName STRING,
  EmpID NUMBER(38,0),
  EmpName STRING,

  -- Store source dates as STRING (can be 'YYYY-MM-DD' or Excel serial text)
  WeekStartDate STRING,
  WeekEndDate   STRING,

  CurrencyCode STRING,
  HoursFilled NUMBER(18,2),
  DailyHourRate NUMBER(18,2),
  NetClientCharges NUMBER(18,2),
  GrossClientCharges NUMBER(18,2),
  PartnerEffortHours NUMBER(18,2),
  PartnerCost NUMBER(18,2),
  PartnerRevenue NUMBER(18,2),
  TotalRevenue NUMBER(18,2),
  Country STRING,
  City STRING,
  BillingStatus STRING,
  InvoiceNumber STRING,
  Department STRING,
  Role STRING,

  -- Project dates as STRING too
  ProjectStartDate STRING,
  ProjectEndDate   STRING,

  "Utilization%" NUMBER(18,2),
  "DiscountApplied%" NUMBER(18,2),
  TaxAmount NUMBER(18,2),
  BillingCycle STRING,
  ProjectStatus STRING,
  ClientIndustry STRING,
  ManagerName STRING,
  CostCenter STRING
) COMMENT='Landing table: raw values; date columns kept as STRING for simple COPY.';


In [None]:

USE WAREHOUSE COMPUTE_WH;
USE DATABASE BCG;
USE SCHEMA BCG_SCHEMA;

-- If the file is gzip, point to .csv.gz; adjust the file name if needed.
COPY INTO BCG.BCG_SCHEMA.T_BILLING_LANDING
FROM @BCG.BCG_SCHEMA.BCG_STAGE/data_for_poc_project_data_1.csv.gz
FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1, FIELD_DELIMITER = ',', FIELD_OPTIONALLY_ENCLOSED_BY = '"')
ON_ERROR = 'CONTINUE'   -- keep loading even if a few bad rows exist; switch to ABORT once clean
FORCE = TRUE;


In [None]:

USE WAREHOUSE COMPUTE_WH;
USE DATABASE BCG;
USE SCHEMA BCG_SCHEMA;

CREATE OR REPLACE VIEW BCG.BCG_SCHEMA.VW_BILLING_BASE
COMMENT = 'Curated base: robust DATE casting (ISO or serial) + weekly FX → USD duplicates.'
AS
WITH base AS (
  SELECT
    -- Canonical dims
    TRIM(ProjectCode) AS project_code_d,
    TRIM(ProjectName) AS project_name_d,
    TRIM(ClientName)  AS client_name_d,
    TRIM(Department)  AS department_d,
    TRIM(Role)        AS role_d,
    TRIM(Country)     AS country_d,
    TRIM(City)        AS city_d,
    TRIM(CurrencyCode) AS currency_code_d,
    TRIM(InvoiceNumber) AS invoice_number_d,
    TRIM(BillingStatus) AS billing_status_d,
    TRIM(BillingCycle)  AS billing_cycle_d,
    TRIM(ProjectStatus) AS project_status_d,
    TRIM(ClientIndustry) AS client_industry_d,
    TRIM(ManagerName)   AS manager_name_d,
    TRIM(CostCenter)    AS cost_center_d,

    /* Date casting: accept ISO 'YYYY-MM-DD' or Excel serial (as text) */
    COALESCE(
      TRY_TO_DATE(WeekStartDate),
      DATEADD('day', TRY_TO_NUMBER(WeekStartDate), '1899-12-30')
    ) AS week_start_d,

    COALESCE(
      TRY_TO_DATE(WeekEndDate),
      DATEADD('day', TRY_TO_NUMBER(WeekEndDate), '1899-12-30')
    ) AS week_end_d,

    DATE_TRUNC(
      'month',
      COALESCE(TRY_TO_DATE(WeekStartDate),
               DATEADD('day', TRY_TO_NUMBER(WeekStartDate), '1899-12-30'))
    ) AS month_bucket_d,

    COALESCE(
      TRY_TO_DATE(ProjectStartDate),
      DATEADD('day', TRY_TO_NUMBER(ProjectStartDate), '1899-12-30')
    ) AS project_start_d,

    COALESCE(
      TRY_TO_DATE(ProjectEndDate),
      DATEADD('day', TRY_TO_NUMBER(ProjectEndDate), '1899-12-30')
    ) AS project_end_d,

    -- Facts (explicit decimal types everywhere to avoid integer coercion)
    TRY_TO_DECIMAL(HoursFilled,         18, 2) AS hours_f,
    TRY_TO_DECIMAL(DailyHourRate,       18, 2) AS rate_f,
    TRY_TO_DECIMAL(NetClientCharges,    18, 2) AS net_f,
    TRY_TO_DECIMAL(GrossClientCharges,  18, 2) AS gross_f,
    TRY_TO_DECIMAL(PartnerEffortHours,  18, 2) AS partner_hours_f,
    TRY_TO_DECIMAL(PartnerCost,         18, 2) AS partner_cost_f,
    TRY_TO_DECIMAL(PartnerRevenue,      18, 2) AS partner_rev_f,
    TRY_TO_DECIMAL(TotalRevenue,        18, 2) AS total_rev_f,
    TRY_TO_DECIMAL("Utilization%",      18, 2)/100.0 AS utilization_f,
    TRY_TO_DECIMAL("DiscountApplied%",  18, 2)/100.0 AS discount_rate_f,
    TRY_TO_DECIMAL(TaxAmount,           18, 2) AS tax_amount_f,

    -- Validation flags (perform arithmetic in DECIMAL(18,2))
    (ABS(
       TRY_TO_DECIMAL(HoursFilled, 18, 2) * TRY_TO_DECIMAL(DailyHourRate, 18, 2)
       - TRY_TO_DECIMAL(GrossClientCharges, 18, 2)
     ) <= 0.01) AS gross_calc_ok_f,

    (ABS(
       TRY_TO_DECIMAL(PartnerEffortHours, 18, 2) * TRY_TO_DECIMAL(PartnerCost, 18, 2)
       - TRY_TO_DECIMAL(PartnerRevenue, 18, 2)
     ) <= 0.01) AS partner_calc_ok_f
  FROM BCG.BCG_SCHEMA.T_BILLING_LANDING
)
SELECT
  b.*,
  fx.rate_usd_per_curr,

  -- USD duplicates (weekly FX join; NVL for safety). All arithmetic in DECIMAL(18,2).
  CASE WHEN b.currency_code_d = 'USD' THEN b.rate_f
       ELSE b.rate_f * NVL(fx.rate_usd_per_curr, 1.0) END           AS rate_f_usd,

  CASE WHEN b.currency_code_d = 'USD' THEN b.gross_f
       ELSE b.gross_f * NVL(fx.rate_usd_per_curr, 1.0) END          AS gross_f_usd,

  CASE WHEN b.currency_code_d = 'USD' THEN b.net_f
       ELSE b.net_f * NVL(fx.rate_usd_per_curr, 1.0) END            AS net_f_usd,

  CASE WHEN b.currency_code_d = 'USD' THEN b.partner_cost_f
       ELSE b.partner_cost_f * NVL(fx.rate_usd_per_curr, 1.0) END   AS partner_cost_f_usd,

  CASE WHEN b.currency_code_d = 'USD' THEN b.partner_rev_f
       ELSE b.partner_rev_f * NVL(fx.rate_usd_per_curr, 1.0) END    AS partner_rev_f_usd,

  CASE WHEN b.currency_code_d = 'USD' THEN b.total_rev_f
       ELSE b.total_rev_f * NVL(fx.rate_usd_per_curr, 1.0) END      AS total_rev_f_usd,

  CASE WHEN b.currency_code_d = 'USD' THEN b.tax_amount_f
       ELSE b.tax_amount_f * NVL(fx.rate_usd_per_curr, 1.0) END     AS tax_amount_usd

FROM base b
LEFT JOIN BCG.BCG_SCHEMA.T_FX_RATES_WEEKLY fx
  ON fx.currency_code = b.currency_code_d
 AND fx.week_start_d  = b.week_start_d;


In [None]:

USE WAREHOUSE COMPUTE_WH;
USE DATABASE BCG;
USE SCHEMA BCG_SCHEMA;

CREATE OR REPLACE VIEW BCG.BCG_SCHEMA.VW_CLIENT_DIM AS
SELECT DISTINCT
  UPPER(TRIM(ClientName)) AS client_name_clean_d,
  MD5(UPPER(TRIM(ClientName))) AS client_sk_d
FROM BCG.BCG_SCHEMA.T_BILLING_LANDING
WHERE ClientName IS NOT NULL;

CREATE OR REPLACE VIEW BCG.BCG_SCHEMA.VW_PROJECT_DIM AS
SELECT DISTINCT
  UPPER(TRIM(ProjectCode)) AS project_code_clean_d,
  UPPER(TRIM(ProjectName)) AS project_name_clean_d,
  MD5(UPPER(TRIM(ProjectCode))) AS project_sk_d
FROM BCG.BCG_SCHEMA.T_BILLING_LANDING
WHERE ProjectCode IS NOT NULL;


In [None]:

-- CELL 8.1: Monthly × Currency aggregation MV (single-table; NO DISTINCT aggregates)
CREATE OR REPLACE MATERIALIZED VIEW BCG.BCG_SCHEMA.MV_MONTH_CURRENCY_AGG
COMMENT = 'Monthly currency-level aggregates from T_BILLING_LANDING; compile-safe single-table MV without DISTINCT. Note: MVs consume credits.'
CLUSTER BY (month_bucket_d, currency_code_d)
AS
SELECT
  /* month bucket using the same casting rule used in VW_BILLING_BASE */
  DATE_TRUNC('month',
    COALESCE(
      TRY_TO_DATE(t.WeekStartDate),
      DATEADD('day', TRY_TO_NUMBER(t.WeekStartDate), '1899-12-30')
    )
  ) AS month_bucket_d,
  t.CurrencyCode AS currency_code_d,

  /* local aggregates with decimal safety */
  SUM(TRY_TO_DECIMAL(t.GrossClientCharges, 18, 2)) AS total_gross_m,
  SUM(TRY_TO_DECIMAL(t.NetClientCharges,   18, 2)) AS total_net_m,
  AVG(TRY_TO_DECIMAL(t.DailyHourRate,      18, 2)) AS avg_rate_m,
  SUM(TRY_TO_DECIMAL(t.HoursFilled,        18, 2)) AS hours_m
FROM BCG.BCG_SCHEMA.T_BILLING_LANDING AS t
GROUP BY
  DATE_TRUNC('month',
    COALESCE(
      TRY_TO_DATE(t.WeekStartDate),
      DATEADD('day', TRY_TO_NUMBER(t.WeekStartDate), '1899-12-30')
    )
  ),
  t.CurrencyCode;



-- CELL 8.2: Client × Month aggregation MV (single-table; already compliant)
CREATE OR REPLACE MATERIALIZED VIEW BCG.BCG_SCHEMA.MV_CLIENT_MONTH_AGG
COMMENT = 'Client-month aggregates from T_BILLING_LANDING; compile-safe single-table MV. Note: MVs consume credits.'
CLUSTER BY (month_bucket_d, client_name_d)
AS
SELECT
  DATE_TRUNC('month',
    COALESCE(
      TRY_TO_DATE(t.WeekStartDate),
      DATEADD('day', TRY_TO_NUMBER(t.WeekStartDate), '1899-12-30')
    )
  ) AS month_bucket_d,
  t.ClientName AS client_name_d,

  /* local aggregates with decimal safety */
  SUM(TRY_TO_DECIMAL(t.TotalRevenue,      18, 2)) AS total_revenue_m,
  SUM(TRY_TO_DECIMAL(t.NetClientCharges,  18, 2)) AS total_net_m,
  AVG(TRY_TO_DECIMAL(t.DailyHourRate,     18, 2)) AS avg_rate_m,
  SUM(TRY_TO_DECIMAL(t.HoursFilled,       18, 2)) AS hours_m
FROM BCG.BCG_SCHEMA.T_BILLING_LANDING AS t
GROUP BY
  DATE_TRUNC('month',
    COALESCE(
      TRY_TO_DATE(t.WeekStartDate),
      DATEADD('day', TRY_TO_NUMBER(t.WeekStartDate), '1899-12-30')
    )
  ),
  t.ClientName;


In [None]:

CREATE OR REPLACE VIEW BCG.BCG_SCHEMA.V_BILLING_SEMANTIC AS
/* 
  - FX: T_FX_RATES_WEEKLY(currency_code, week_start_d, rate_usd_per_curr, rate_curr_per_usd)
        where rate_usd_per_curr = USD per 1 local unit.
  - Landing: T_BILLING_LANDING with numeric percent columns:
      "Utilization%" NUMBER(18,2), "DiscountApplied%" NUMBER(18,2)
  - Row-level conversion first; aggregate in BI later.
*/

WITH l AS (
  SELECT
    -- Raw columns
    ProjectCode, ProjectName, ClientName, EmpID, EmpName,
    WeekStartDate, WeekEndDate,
    CurrencyCode,
    HoursFilled, DailyHourRate, NetClientCharges, GrossClientCharges,
    PartnerEffortHours, PartnerCost, PartnerRevenue, TotalRevenue,
    Country, City, BillingStatus, InvoiceNumber, Department, Role,
    ProjectStartDate, ProjectEndDate, "Utilization%", "DiscountApplied%",
    TaxAmount, BillingCycle, ProjectStatus, ClientIndustry, ManagerName, CostCenter,

    -- Normalize week start date (ISO string or Excel serial)
    COALESCE(
      TRY_TO_DATE(WeekStartDate),
      DATEADD('day', TRY_TO_NUMBER(WeekStartDate), '1899-12-30')
    ) AS week_start_d_norm
  FROM BCG.BCG_SCHEMA.T_BILLING_LANDING
),

-- De-duplicate FX to 1 row per (currency, week)
fx_dedup AS (
  SELECT
    currency_code,
    week_start_d,
    rate_usd_per_curr,
    rate_curr_per_usd
  FROM BCG.BCG_SCHEMA.T_FX_RATES_WEEKLY
  QUALIFY ROW_NUMBER() OVER (PARTITION BY currency_code, week_start_d ORDER BY currency_code) = 1
),

-- (currency, week) present in facts
weeks AS (
  SELECT DISTINCT
    UPPER(TRIM(CurrencyCode)) AS currency_code,
    week_start_d_norm         AS week_start_d
  FROM l
  WHERE week_start_d_norm IS NOT NULL
    AND CurrencyCode IS NOT NULL
),

-- Forward-fill FX across weeks per currency
fx_filled AS (
  SELECT
    w.currency_code,
    w.week_start_d,
    d.rate_usd_per_curr AS rate_usd_per_curr_exact,
    d.rate_curr_per_usd AS rate_curr_per_usd_exact,

    LAST_VALUE(d.rate_usd_per_curr) IGNORE NULLS OVER (
      PARTITION BY w.currency_code
      ORDER BY w.week_start_d
      ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS rate_usd_per_curr_ff,

    LAST_VALUE(d.rate_curr_per_usd) IGNORE NULLS OVER (
      PARTITION BY w.currency_code
      ORDER BY w.week_start_d
      ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS rate_curr_per_usd_ff
  FROM weeks w
  LEFT JOIN fx_dedup d
    ON d.currency_code = w.currency_code
   AND d.week_start_d  = w.week_start_d
),

fx_best AS (
  SELECT
    currency_code,
    week_start_d,
    COALESCE(rate_usd_per_curr_exact, rate_usd_per_curr_ff) AS rate_usd_per_curr,
    COALESCE(rate_curr_per_usd_exact, rate_curr_per_usd_ff) AS rate_curr_per_usd,
    CASE 
      WHEN rate_usd_per_curr_exact IS NOT NULL THEN 'EXACT'
      WHEN rate_usd_per_curr_ff    IS NOT NULL THEN 'FFILL'
      ELSE 'MISSING'
    END AS fx_match_type
  FROM fx_filled
),

-- Percent normalization: input is numeric already (NUMBER(18,2))
norm AS (
  SELECT
    l.*,

    /* Normalize to 0..100 domain
       - treat <=1 as fraction (0..1) → *100
       - treat 1..100 as percent already
       - otherwise NULL (and flag later)
    */
    CASE
      WHEN "Utilization%" IS NULL THEN NULL
      WHEN "Utilization%" <= 1 THEN "Utilization%" * 100
      WHEN "Utilization%" > 1 AND "Utilization%" <= 100 THEN "Utilization%"
      ELSE NULL
    END AS Utilization_Pct_100,

    CASE
      WHEN "DiscountApplied%" IS NULL THEN NULL
      WHEN "DiscountApplied%" <= 1 THEN "DiscountApplied%" * 100
      WHEN "DiscountApplied%" > 1 AND "DiscountApplied%" <= 100 THEN "DiscountApplied%"
      ELSE NULL
    END AS DiscountApplied_Pct_100
  FROM l
)

SELECT
  -- Dimensions
  n.ProjectCode, n.ProjectName, n.ClientName, n.EmpID, n.EmpName,
  n.Country, n.City, n.Department, n.Role, n.BillingStatus, n.InvoiceNumber,
  n.BillingCycle, n.ProjectStatus, n.ClientIndustry, n.ManagerName, n.CostCenter,

  -- Dates
  n.WeekStartDate, n.WeekEndDate, n.week_start_d_norm AS WeekStartDate_D,
  n.ProjectStartDate, n.ProjectEndDate,

  -- Currency & FX
  UPPER(TRIM(n.CurrencyCode)) AS CurrencyCode,
  fx.rate_usd_per_curr,
  fx.rate_curr_per_usd,
  fx.fx_match_type,

  -- Local measures
  n.HoursFilled,
  n.DailyHourRate,
  n.NetClientCharges,
  n.GrossClientCharges,
  n.PartnerEffortHours,
  n.PartnerCost,
  n.PartnerRevenue,
  n.TotalRevenue,

  -- ✅ USD conversions (USD per local → multiply)
  (n.DailyHourRate      * fx.rate_usd_per_curr) AS DailyHourRate_USD,
  (n.NetClientCharges   * fx.rate_usd_per_curr) AS NetClientCharges_USD,
  (n.GrossClientCharges * fx.rate_usd_per_curr) AS GrossClientCharges_USD,
  (n.PartnerCost        * fx.rate_usd_per_curr) AS PartnerCost_USD,
  (n.PartnerRevenue     * fx.rate_usd_per_curr) AS PartnerRevenue_USD,
  (n.TotalRevenue       * fx.rate_usd_per_curr) AS TotalRevenue_USD,

  -- ✅ Percents normalized (0..100) + helpers
  n.Utilization_Pct_100,
  n.DiscountApplied_Pct_100,
  ROUND(n.Utilization_Pct_100, 2)       AS Utilization_Pct_100_Rounded,
  ROUND(n.DiscountApplied_Pct_100, 2)   AS DiscountApplied_Pct_100_Rounded,

  -- Weighted average helpers
  (n.Utilization_Pct_100 * NULLIF(n.HoursFilled, 0))         AS Utilization_Pct_WNumer,
  (n.DiscountApplied_Pct_100 * NULLIF(n.NetClientCharges, 0)) AS DiscountApplied_Pct_WNumer,

  -- QA flags
  (fx.rate_usd_per_curr IS NULL) AS FX_Missing_Flag,
  CASE 
    WHEN fx.rate_usd_per_curr IS NULL THEN NULL
    WHEN UPPER(TRIM(n.CurrencyCode)) = 'USD' AND ABS(fx.rate_usd_per_curr - 1.0) > 0.000001 THEN TRUE
    ELSE FALSE
  END AS FX_Unexpected_USD_Rate_Flag,

  CASE
    WHEN "Utilization%" IS NOT NULL AND ("Utilization%" < 0 OR "Utilization%" > 100) THEN TRUE
    ELSE FALSE
  END AS Utilization_OutOfRange_Flag,

  -- Traceability
  CASE 
    WHEN fx.rate_usd_per_curr IS NULL THEN NULL
    ELSE 'USD_PER_LOCAL_MULTIPLY'
  END AS FX_Method

FROM norm n
LEFT JOIN fx_best fx
  ON fx.currency_code = UPPER(TRIM(n.CurrencyCode))
 AND fx.week_start_d  = n.week_start_d_norm;


no need to execute the below


In [None]:

CREATE OR REPLACE VIEW BCG.BCG_SCHEMA.VM_SEM_BILLING AS
/* 
  - FX table: BCG.BCG_SCHEMA.T_FX_RATES_WEEKLY(currency_code, week_start_d, rate_usd_per_curr, rate_curr_per_usd)
      where rate_usd_per_curr = USD per 1 local unit (e.g., INR → 0.012).
  - Landing table: BCG.BCG_SCHEMA.T_BILLING_LANDING
      with numeric percent cols: "Utilization%", "DiscountApplied%" (NUMBER(18,2)).
  - Row-level conversion first; aggregate in BI later.
*/

WITH l AS (
  SELECT
    -- Raw columns
    ProjectCode, ProjectName, ClientName, EmpID, EmpName,
    WeekStartDate, WeekEndDate,
    CurrencyCode,
    HoursFilled, DailyHourRate, NetClientCharges, GrossClientCharges,
    PartnerEffortHours, PartnerCost, PartnerRevenue, TotalRevenue,
    Country, City, BillingStatus, InvoiceNumber, Department, Role,
    ProjectStartDate, ProjectEndDate, "Utilization%", "DiscountApplied%",
    TaxAmount, BillingCycle, ProjectStatus, ClientIndustry, ManagerName, CostCenter,

    -- Normalize week-start date (supports ISO date or Excel serial)
    COALESCE(
      TRY_TO_DATE(WeekStartDate),
      DATEADD('day', TRY_TO_NUMBER(WeekStartDate), '1899-12-30')
    ) AS week_start_d_norm
  FROM BCG.BCG_SCHEMA.T_BILLING_LANDING
),

-- Ensure one FX row per (currency, week)
fx_dedup AS (
  SELECT
    currency_code,
    week_start_d,
    rate_usd_per_curr,
    rate_curr_per_usd
  FROM BCG.BCG_SCHEMA.T_FX_RATES_WEEKLY
  QUALIFY ROW_NUMBER() OVER (PARTITION BY currency_code, week_start_d ORDER BY currency_code) = 1
),

-- (currency, week) pairs present in facts
weeks AS (
  SELECT DISTINCT
    UPPER(TRIM(CurrencyCode)) AS currency_code,
    week_start_d_norm         AS week_start_d
  FROM l
  WHERE week_start_d_norm IS NOT NULL
    AND CurrencyCode IS NOT NULL
),

-- Forward-fill FX across weeks per currency
fx_filled AS (
  SELECT
    w.currency_code,
    w.week_start_d,
    d.rate_usd_per_curr AS rate_usd_per_curr_exact,
    d.rate_curr_per_usd AS rate_curr_per_usd_exact,

    LAST_VALUE(d.rate_usd_per_curr) IGNORE NULLS OVER (
      PARTITION BY w.currency_code
      ORDER BY w.week_start_d
      ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS rate_usd_per_curr_ff,

    LAST_VALUE(d.rate_curr_per_usd) IGNORE NULLS OVER (
      PARTITION BY w.currency_code
      ORDER BY w.week_start_d
      ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS rate_curr_per_usd_ff
  FROM weeks w
  LEFT JOIN fx_dedup d
    ON d.currency_code = w.currency_code
   AND d.week_start_d  = w.week_start_d
),

fx_best AS (
  SELECT
    currency_code,
    week_start_d,
    COALESCE(rate_usd_per_curr_exact, rate_usd_per_curr_ff) AS rate_usd_per_curr,
    COALESCE(rate_curr_per_usd_exact, rate_curr_per_usd_ff) AS rate_curr_per_usd,
    CASE 
      WHEN rate_usd_per_curr_exact IS NOT NULL THEN 'EXACT'
      WHEN rate_usd_per_curr_ff    IS NOT NULL THEN 'FFILL'
      ELSE 'MISSING'
    END AS fx_match_type
  FROM fx_filled
),

-- Percent normalization (input is numeric NUMBER(18,2)):
-- 0..1 treated as fraction → *100; 1..100 treated as percent; else NULL
norm AS (
  SELECT
    l.*,
    CASE
      WHEN "Utilization%" IS NULL THEN NULL
      WHEN "Utilization%" <= 1 THEN "Utilization%" * 100
      WHEN "Utilization%" > 1 AND "Utilization%" <= 100 THEN "Utilization%"
      ELSE NULL
    END AS Utilization_Pct_100,

    CASE
      WHEN "DiscountApplied%" IS NULL THEN NULL
      WHEN "DiscountApplied%" <= 1 THEN "DiscountApplied%" * 100
      WHEN "DiscountApplied%" > 1 AND "DiscountApplied%" <= 100 THEN "DiscountApplied%"
      ELSE NULL
    END AS DiscountApplied_Pct_100
  FROM l
)

SELECT
  -- Dimensions
  n.ProjectCode, n.ProjectName, n.ClientName, n.EmpID, n.EmpName,
  n.Country, n.City, n.Department, n.Role, n.BillingStatus, n.InvoiceNumber,
  n.BillingCycle, n.ProjectStatus, n.ClientIndustry, n.ManagerName, n.CostCenter,

  -- Dates
  n.WeekStartDate, n.WeekEndDate, n.week_start_d_norm AS WeekStartDate_D,
  n.ProjectStartDate, n.ProjectEndDate,

  -- Currency & FX context
  UPPER(TRIM(n.CurrencyCode)) AS CurrencyCode,
  fx.rate_usd_per_curr,
  fx.rate_curr_per_usd,
  fx.fx_match_type,

  -- Local measures
  n.HoursFilled,
  n.DailyHourRate,
  n.NetClientCharges,
  n.GrossClientCharges,
  n.PartnerEffortHours,
  n.PartnerCost,
  n.PartnerRevenue,
  n.TotalRevenue,

  -- ✅ USD conversions (USD per local → multiply)
  (n.DailyHourRate      * fx.rate_usd_per_curr) AS DailyHourRate_USD,
  (n.NetClientCharges   * fx.rate_usd_per_curr) AS NetClientCharges_USD,
  (n.GrossClientCharges * fx.rate_usd_per_curr) AS GrossClientCharges_USD,
  (n.PartnerCost        * fx.rate_usd_per_curr) AS PartnerCost_USD,
  (n.PartnerRevenue     * fx.rate_usd_per_curr) AS PartnerRevenue_USD,
  (n.TotalRevenue       * fx.rate_usd_per_curr) AS TotalRevenue_USD,

  -- ✅ Percents normalized (0..100) + helpers
  n.Utilization_Pct_100,
  n.DiscountApplied_Pct_100,
  ROUND(n.Utilization_Pct_100, 2)       AS Utilization_Pct_100_Rounded,
  ROUND(n.DiscountApplied_Pct_100, 2)   AS DiscountApplied_Pct_100_Rounded,

  -- Weighted-average helpers (build ratios in BI):
  --   WAVG Utilization = SUM(Utilization_Pct_WNumer) / NULLIF(SUM(HoursFilled), 0)
  (n.Utilization_Pct_100 * NULLIF(n.HoursFilled, 0))          AS Utilization_Pct_WNumer,
  (n.DiscountApplied_Pct_100 * NULLIF(n.NetClientCharges, 0)) AS DiscountApplied_Pct_WNumer,

  -- QA flags
  (fx.rate_usd_per_curr IS NULL) AS FX_Missing_Flag,
  CASE 
    WHEN fx.rate_usd_per_curr IS NULL THEN NULL
    WHEN UPPER(TRIM(n.CurrencyCode)) = 'USD' AND ABS(fx.rate_usd_per_curr - 1.0) > 0.000001 THEN TRUE
    ELSE FALSE
  END AS FX_Unexpected_USD_Rate_Flag,

  CASE
    WHEN "Utilization%" IS NOT NULL AND ("Utilization%" < 0 OR "Utilization%" > 100) THEN TRUE
    ELSE FALSE
  END AS Utilization_OutOfRange_Flag,

  -- Traceability
  CASE 
    WHEN fx.rate_usd_per_curr IS NULL THEN NULL
    ELSE 'USD_PER_LOCAL_MULTIPLY'
  END AS FX_Method

FROM norm n
LEFT JOIN fx_best fx
  ON fx.currency_code = UPPER(TRIM(n.CurrencyCode))
 AND fx.week_start_d  = n.week_start_d_norm;


In [None]:

-- CELL 11.1: Query history focused on SEMANTIC_VIEW / VW_SEM_BILLING usage
CREATE OR REPLACE VIEW BCG.BCG_SCHEMA.VW_QH_ANALYST_SV
COMMENT = 'Query history filtered for SEMANTIC_VIEW operations and VW_SEM_BILLING usage.'
AS
SELECT
  qh.QUERY_ID,
  qh.USER_NAME,
  qh.ROLE_NAME,
  qh.WAREHOUSE_NAME,
  qh.DATABASE_NAME,
  qh.SCHEMA_NAME,
  qh.QUERY_TEXT,
  qh.START_TIME,
  qh.END_TIME,
  qh.EXECUTION_STATUS,
  qh.BYTES_SCANNED,
  qh.ROWS_PRODUCED
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY AS qh
WHERE qh.QUERY_TEXT ILIKE '%SEMANTIC_VIEW%'
   OR qh.QUERY_TEXT ILIKE '%VW_SEM_BILLING%';

-- CELL 11.2: Warehouse metering focused on PB_WH
CREATE OR REPLACE VIEW BCG.BCG_SCHEMA.VW_WH_METERING_PB
COMMENT = 'Warehouse metering for PB_WH (credits, compute time) to monitor trial costs.'
AS
SELECT
  wm.WAREHOUSE_NAME,
  wm.START_TIME,
  wm.END_TIME,
  wm.CREDITS_USED,
  wm.CREDITS_USED_COMPUTE,
  wm.CREDITS_USED_CLOUD_SERVICES
FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY AS wm
WHERE wm.WAREHOUSE_NAME = 'COMPUTE_WH';


In [None]:
CREATE OR REPLACE SEMANTIC VIEW BCG.BCG_SCHEMA.VW_SEM_BILLING
TABLES (
  po AS BCG.BCG_SCHEMA.VW_BILLING_BASE PRIMARY KEY (project_code_d)
)
FACTS (
  -- local & USD facts exposed from the base view
  PUBLIC po.hours_f                 AS po.hours_f             WITH SYNONYMS = ('hours', 'worked hours')         COMMENT = 'Hours filled for the employee-week',
  PUBLIC po.rate_f                  AS po.rate_f              WITH SYNONYMS = ('hourly rate', 'rate')           COMMENT = 'Daily hour rate (local currency)',
  PUBLIC po.gross_f                 AS po.gross_f             WITH SYNONYMS = ('gross charges', 'gross')        COMMENT = 'Gross client charges (local)',
  PUBLIC po.net_f                   AS po.net_f               WITH SYNONYMS = ('net charges', 'net')            COMMENT = 'Net client charges (local)',
  PUBLIC po.partner_hours_f         AS po.partner_hours_f     COMMENT = 'Partner effort hours (unitless)',
  PUBLIC po.partner_cost_f          AS po.partner_cost_f      COMMENT = 'Partner cost (local)',
  PUBLIC po.partner_rev_f           AS po.partner_rev_f       COMMENT = 'Partner revenue (local)',
  PUBLIC po.total_rev_f             AS po.total_rev_f         WITH SYNONYMS = ('total revenue', 'revenue')      COMMENT = 'Total revenue (local)',
  PUBLIC po.discount_rate_f         AS po.discount_rate_f     WITH SYNONYMS = ('discount %')                    COMMENT = 'Discount applied percentage (unitless)',
  PUBLIC po.utilization_f           AS po.utilization_f       WITH SYNONYMS = ('utilization %')                 COMMENT = 'Utilization percentage (unitless)',
  PUBLIC po.gross_calc_ok_f         AS po.gross_calc_ok_f     COMMENT = 'Flag: gross calculation OK (boolean-like)',
  PUBLIC po.partner_calc_ok_f       AS po.partner_calc_ok_f   COMMENT = 'Flag: partner calculation OK (boolean-like)',

  -- USD facts (already computed in base view)
  PUBLIC po.rate_f_usd              AS po.rate_f_usd          WITH SYNONYMS = ('rate usd')                      COMMENT = 'Hourly/daily rate converted to USD',
  PUBLIC po.gross_f_usd             AS po.gross_f_usd         WITH SYNONYMS = ('gross usd')                     COMMENT = 'Gross charges converted to USD',
  PUBLIC po.net_f_usd               AS po.net_f_usd           WITH SYNONYMS = ('net usd')                       COMMENT = 'Net charges converted to USD',
  PUBLIC po.partner_cost_f_usd      AS po.partner_cost_f_usd  COMMENT = 'Partner cost converted to USD',
  PUBLIC po.partner_rev_f_usd       AS po.partner_rev_f_usd   COMMENT = 'Partner revenue converted to USD',
  PUBLIC po.total_rev_f_usd         AS po.total_rev_f_usd     WITH SYNONYMS = ('revenue usd')                   COMMENT = 'Total revenue converted to USD',
  PUBLIC po.tax_amount_usd          AS po.tax_amount_usd      WITH SYNONYMS = ('tax usd')                       COMMENT = 'Tax amount converted to USD'
)
DIMENSIONS (
  -- dimensional attributes
  PUBLIC po.month_bucket_d          AS po.month_bucket_d      WITH SYNONYMS = ('month', 'month bucket')         COMMENT = 'Month bucket based on week_start_d',
  PUBLIC po.currency_code_d         AS po.currency_code_d     WITH SYNONYMS = ('currency')                      COMMENT = 'Currency code from source',
  PUBLIC po.client_name_d           AS po.client_name_d       WITH SYNONYMS = ('client')                        COMMENT = 'Client name',
  PUBLIC po.project_code_d          AS po.project_code_d      WITH SYNONYMS = ('project code')                  COMMENT = 'Project code (primary key in table alias)',
  PUBLIC po.project_name_d          AS po.project_name_d      WITH SYNONYMS = ('project')                       COMMENT = 'Project name',
  PUBLIC po.manager_name_d          AS po.manager_name_d      WITH SYNONYMS = ('manager')                       COMMENT = 'Manager name',
  PUBLIC po.department_d            AS po.department_d        WITH SYNONYMS = ('dept')                          COMMENT = 'Department',
  PUBLIC po.role_d                  AS po.role_d              WITH SYNONYMS = ('employee role')                 COMMENT = 'Role',
  PUBLIC po.country_d               AS po.country_d           WITH SYNONYMS = ('country')                       COMMENT = 'Country',
  PUBLIC po.city_d                  AS po.city_d              WITH SYNONYMS = ('city')                          COMMENT = 'City',
  PUBLIC po.billing_status_d        AS po.billing_status_d    WITH SYNONYMS = ('billing status')                COMMENT = 'Billing status',
  PUBLIC po.invoice_number_d        AS po.invoice_number_d    WITH SYNONYMS = ('invoice')                       COMMENT = 'Invoice number',
  PUBLIC po.billing_cycle_d         AS po.billing_cycle_d     WITH SYNONYMS = ('cycle')                         COMMENT = 'Billing cycle',
  PUBLIC po.project_status_d        AS po.project_status_d    WITH SYNONYMS = ('status')                        COMMENT = 'Project status',
  PUBLIC po.client_industry_d       AS po.client_industry_d   WITH SYNONYMS = ('industry')                      COMMENT = 'Client industry',
  PUBLIC po.cost_center_d           AS po.cost_center_d       WITH SYNONYMS = ('cost center')                   COMMENT = 'Cost center'
)
METRICS (
  -- Local metrics (DECIMAL scale enforced on aggregates without TRY_ because inputs are numeric)
  PUBLIC po.total_gross_m           AS TO_DECIMAL(SUM(po.gross_f),        18, 2)  WITH SYNONYMS = ('gross total')   COMMENT = 'Sum of gross client charges (local)',
  PUBLIC po.total_net_m             AS TO_DECIMAL(SUM(po.net_f),          18, 2)  WITH SYNONYMS = ('net total')     COMMENT = 'Sum of net client charges (local)',
  PUBLIC po.total_partner_rev_m     AS TO_DECIMAL(SUM(po.partner_rev_f),  18, 2)  COMMENT = 'Sum of partner revenue (local)',
  PUBLIC po.total_partner_cost_m    AS TO_DECIMAL(SUM(po.partner_cost_f), 18, 2)  COMMENT = 'Sum of partner cost (local)',
  PUBLIC po.total_revenue_m         AS TO_DECIMAL(SUM(po.total_rev_f),    18, 2)  WITH SYNONYMS = ('revenue total') COMMENT = 'Sum of total revenue (local)',
  PUBLIC po.hours_total_m           AS TO_DECIMAL(SUM(po.hours_f),        18, 2)  WITH SYNONYMS = ('hours total')   COMMENT = 'Total hours (unitless aggregated)',
  PUBLIC po.partner_hours_total_m   AS TO_DECIMAL(SUM(po.partner_hours_f),18, 2)  COMMENT = 'Total partner hours (unitless aggregated)',
  PUBLIC po.avg_rate_m              AS TO_DECIMAL(AVG(po.rate_f),         18, 2)  WITH SYNONYMS = ('avg rate')      COMMENT = 'Average rate (local)',
  PUBLIC po.avg_utilization_m       AS TO_DECIMAL(AVG(po.utilization_f),  18, 2)  WITH SYNONYMS = ('avg util')      COMMENT = 'Average utilization % (unitless)',
  PUBLIC po.avg_discount_m          AS TO_DECIMAL(AVG(po.discount_rate_f),18, 2)  WITH SYNONYMS = ('avg discount')  COMMENT = 'Average discount % (unitless)',
  PUBLIC po.invoice_count_m         AS COUNT(DISTINCT po.invoice_number_d)         WITH SYNONYMS = ('invoices')      COMMENT = 'Distinct invoice count',

  -- Derived local margins/ratios (safe division)
  PUBLIC po.margin_local_m          AS TO_DECIMAL(SUM(po.total_rev_f) - SUM(po.partner_cost_f), 18, 2) COMMENT = 'Local margin = revenue - partner cost',
  PUBLIC po.gross_to_net_ratio_m    AS TO_DECIMAL(CASE WHEN SUM(po.gross_f) = 0 THEN NULL ELSE SUM(po.net_f) / SUM(po.gross_f) END, 18, 6) COMMENT = 'Net/Gross ratio (local)',

  -- USD metrics
  PUBLIC po.total_gross_usd_m       AS TO_DECIMAL(SUM(po.gross_f_usd),       18, 2) WITH SYNONYMS = ('gross usd total')    COMMENT = 'Sum of gross (USD)',
  PUBLIC po.total_net_usd_m         AS TO_DECIMAL(SUM(po.net_f_usd),         18, 2) WITH SYNONYMS = ('net usd total')      COMMENT = 'Sum of net (USD)',
  PUBLIC po.total_revenue_usd_m     AS TO_DECIMAL(SUM(po.total_rev_f_usd),   18, 2) WITH SYNONYMS = ('revenue usd total')  COMMENT = 'Sum of revenue (USD)',
  PUBLIC po.avg_rate_usd_m          AS TO_DECIMAL(AVG(po.rate_f_usd),        18, 2) WITH SYNONYMS = ('avg rate usd')       COMMENT = 'Average rate (USD)',
  PUBLIC po.total_tax_usd_m         AS TO_DECIMAL(SUM(po.tax_amount_usd),    18, 2) WITH SYNONYMS = ('tax usd total')      COMMENT = 'Sum of tax (USD)',

  -- Derived USD metrics
  PUBLIC po.margin_usd_m            AS TO_DECIMAL(SUM(po.total_rev_f_usd) - SUM(po.partner_cost_f_usd), 18, 2) COMMENT = 'USD margin = revenue_usd - partner_cost_usd',
  PUBLIC po.revenue_after_tax_usd_m AS TO_DECIMAL(SUM(po.total_rev_f_usd) - SUM(po.tax_amount_usd),     18, 2) COMMENT = 'USD revenue minus tax (simple calc)',
  PUBLIC po.client_count_m          AS COUNT(DISTINCT po.client_name_d)                                      COMMENT = 'Distinct client count'
)
COMMENT = 'Expanded semantic view over VW_BILLING_BASE with enriched facts, dimensions, and local/USD metrics for Snowflake Cortex Analyst.';
