In [12]:
from simple_salesforce import Salesforce
import pandas as pd
from dotenv import load_dotenv
from datetime import date
import os

In [13]:
import os
from dotenv import load_dotenv
from sqlalchemy import create_engine
from urllib.parse import quote_plus

load_dotenv()

username = os.getenv("PG_USERNAME")
password = os.getenv("PG_PASSWORD")
host = os.getenv("PG_HOST")
port = os.getenv("PG_PORT")
database = os.getenv("PG_DATABASE_EXPORT")

# URL-encode the password
encoded_password = quote_plus(password)

engine = create_engine(
    f"postgresql+psycopg2://{username}:{encoded_password}@{host}:{port}/{database}"
)

In [14]:
query1 = """
SELECT
*
FROM staging.leads_opp_staging
where programme_name = 'Unknown'
"""

In [15]:
unknown_df = pd.read_sql(query1, engine)

In [17]:
unknown_df.shape

(433194, 30)

In [19]:
# --- Load files ---
first_tier  = pd.read_excel(r"C:/Users/112363/OneDrive - Taylor's Education Group/DWH_WIP/programme_code_mapping.xlsx", sheet_name="Tier1")
second_tier = pd.read_excel(r"C:/Users/112363/OneDrive - Taylor's Education Group/DWH_WIP/programme_code_mapping.xlsx", sheet_name="Tier2")
third_tier  = pd.read_excel(r"C:/Users/112363/OneDrive - Taylor's Education Group/DWH_WIP/programme_code_mapping.xlsx", sheet_name="Tier3")
fourth_tier = pd.read_excel(r"C:/Users/112363/OneDrive - Taylor's Education Group/DWH_WIP/programme_code_mapping.xlsx", sheet_name="Tier4")
odl_tier    = pd.read_excel(r"C:/Users/112363/OneDrive - Taylor's Education Group/DWH_WIP/programme_code_mapping.xlsx", sheet_name="ODL")

In [20]:
import re
import pandas as pd

In [21]:
import re
import pandas as pd
from functools import lru_cache

# -------------------------
# 0) Normalize columns (vectorized)
# -------------------------
def normalize_cols(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.columns = df.columns.str.strip()
    return df

first_tier  = normalize_cols(first_tier)
second_tier = normalize_cols(second_tier)
third_tier  = normalize_cols(third_tier)
fourth_tier = normalize_cols(fourth_tier)
odl_tier    = normalize_cols(odl_tier)
data_df     = normalize_cols(unknown_df)

# Ensure intake_year column exists & numeric
if "intake_year" not in data_df.columns:
    for alt in ["Intake Year", "intakeyear", "IntakeYear"]:
        if alt in data_df.columns:
            data_df = data_df.rename(columns={alt: "intake_year"})
            break
if "intake_year" in data_df.columns:
    data_df["intake_year"] = pd.to_numeric(data_df["intake_year"], errors="coerce").astype("Int64")

# Lowercase helpers for consistent text compares
for col in ["programme1", "level1", "vertical1"]:
    if col in data_df.columns:
        data_df[col + "__lc"] = data_df[col].astype(str).str.lower()
        
# -------------------------
# 1) Year range parser (vectorized)
# -------------------------
def prepare_rule_tier(df_rules: pd.DataFrame) -> pd.DataFrame:
    df = df_rules.copy()
    if "Intake Year" in df.columns:
        # Accept "YYYY" or "YYYY-YYYY"
        yr = df["Intake Year"].astype(str).str.strip()
        span = yr.str.extract(r"^\s*(\d{4})\s*-\s*(\d{4})\s*$")
        single = yr.str.extract(r"^\s*(\d{4})\s*$")
        df["start_year"] = pd.to_numeric(span[0].fillna(single[0]), errors="coerce")
        df["end_year"]   = pd.to_numeric(span[1].fillna(single[0]), errors="coerce")
    return df

first_tier  = prepare_rule_tier(first_tier)
second_tier = prepare_rule_tier(second_tier)
third_tier  = prepare_rule_tier(third_tier)
fourth_tier = prepare_rule_tier(fourth_tier)
odl_tier    = prepare_rule_tier(odl_tier)

# -------------------------
# 2) Wildmatch helpers (cached + vectorized)
# -------------------------
@lru_cache(maxsize=2048)
def _compile_wildcard(pattern: str):
    esc = re.escape(str(pattern))
    esc = esc.replace(r"\*", ".*").replace(r"\?", ".")
    return re.compile(rf"^{esc}$", flags=re.IGNORECASE)

def series_wildmatch(series: pd.Series, patterns):
    """Vectorized 'wildmatch' across a Series for many patterns."""
    s = series.fillna("").astype(str)
    # Fast path: combine patterns into one big alternation regex if possible
    # (still uses per-pattern compile cache once)
    masks = []
    for p in patterns:
        masks.append(s.str.match(_compile_wildcard(p)))
    if not masks:
        return pd.Series(False, index=series.index)
    # OR all masks
    out = masks[0]
    for m in masks[1:]:
        out = out | m
    return out

# -------------------------
# 3) Compile each Programme_Code_Rule into a vectorized mask
# -------------------------
_wild_re = re.compile(
    r"wildmatch\s*\(\s*([A-Za-z_][A-Za-z0-9_]*)\s*,\s*([^)]+?)\s*\)",
    flags=re.IGNORECASE
)

def _parse_patterns(arg_text: str):
    # split comma-separated quoted patterns: 'a','b','c'
    # tolerate quotes " or '
    parts = re.findall(r"""(['"])(.*?)\1""", arg_text)
    return [p[1] for p in parts] if parts else []

def compile_rule_to_mask(rule_str: str, df: pd.DataFrame) -> pd.Series:
    """
    Turn a Qlik-like rule string into a vectorized boolean pandas Series.
    Supports:
      - wildmatch(col, 'pat1','pat2',...)
      - AND/OR/NOT (case-insensitive)
      - simple numeric/string comparisons against df columns (e.g. intake_year < 2022)
    """
    expr = str(rule_str).strip()
    expr = re.sub(r"\bAND\b", "&", expr, flags=re.IGNORECASE)
    expr = re.sub(r"\bOR\b",  "|", expr, flags=re.IGNORECASE)
    expr = re.sub(r"\bNOT\b", "~", expr, flags=re.IGNORECASE)

    masks = {}
    repls = []
    # Replace each wildmatch(...) with a placeholder variable name
    for i, m in enumerate(_wild_re.finditer(expr)):
        col = m.group(1)
        args = m.group(2)
        pats = _parse_patterns(args)
        placeholder = f"_WMASK_{i}_"
        repls.append((m.span(), placeholder, col, pats))

    # Build the final expression by splicing placeholders in reverse order
    expr_list = list(expr)
    for (start, end), placeholder, col, pats in reversed(repls):
        expr_list[start:end] = placeholder
        # choose lowercased column if present to be consistent
        col_use = col + "__lc" if (col + "__lc") in df.columns else col
        masks[placeholder] = series_wildmatch(df[col_use] if col_use in df.columns else pd.Series("", index=df.index), pats)

    final_expr = "".join(expr_list)

    # Create evaluation context: df columns as variables
    ctx = {c: df[c] for c in df.columns}
    ctx.update(masks)

    # Evaluate safely with pandas eval (python engine for Series ops)
    try:
        mask = pd.eval(final_expr, engine="python", local_dict=ctx)
        # Ensure boolean Series
        mask = mask.astype(bool)
    except Exception:
        # Fallback: nothing matches if expression fails
        mask = pd.Series(False, index=df.index)
    return mask

# -------------------------
# 4) Rule-tier evaluator (vectorized per rule, not per row)
# -------------------------
def apply_rule_tier(df_rules: pd.DataFrame, df: pd.DataFrame):
    """
    Returns two Series: name_out, code_out from a rule-based tier.
    First match wins (by row order in df_rules).
    """
    name_out = pd.Series(pd.NA, index=df.index, dtype="object")
    code_out = pd.Series(pd.NA, index=df.index, dtype="object")

    if not {"Programme_Code_Rule", "Programme Name"}.issubset(df_rules.columns):
        return name_out, code_out

    # Pre-extract code column existence
    has_code = "Programme Code" in df_rules.columns

    for _, r in df_rules.iterrows():
        mask = pd.Series(True, index=df.index)

        # Year filter
        if "start_year" in r and pd.notna(r["start_year"]) and "intake_year" in df.columns:
            mask &= df["intake_year"].between(int(r["start_year"]), int(r["end_year"]))

        # Rule expression
        rule = r["Programme_Code_Rule"]
        mask &= compile_rule_to_mask(rule, df)

        # Only fill where not already set
        to_fill = mask & name_out.isna()
        if to_fill.any():
            name_out.loc[to_fill] = r["Programme Name"]
            if has_code and pd.notna(r.get("Programme Code", pd.NA)):
                code_out.loc[to_fill] = str(r["Programme Code"])

        # Early exit if everything is filled
        if name_out.notna().all():
            break

    return name_out, code_out

# -------------------------
# 5) Mapping-tier evaluator (vectorized via maps/merges)
# -------------------------
def apply_mapping_tier(df_map: pd.DataFrame, df: pd.DataFrame):
    """
    Priority:
      1) (programme1, level1, vertical1)
      2) (programme1, level1)
      3) (programme1, vertical1)
      4) (programme1)
    Returns name_out, code_out Series.
    """
    name_out = pd.Series(pd.NA, index=df.index, dtype="object")
    code_out = pd.Series(pd.NA, index=df.index, dtype="object")
    if "Programme Name" not in df_map.columns:
        return name_out, code_out

    # Normalize lookup columns to lowercase
    m = df_map.copy()
    for k in ["programme1", "level1", "vertical1"]:
        if k in m.columns:
            m[k + "__lc"] = m[k].astype(str).str.lower()

    # Build keyed dicts and fill in priority order with .map (fast)
    def _map_by_keys(keys):
        # Only proceed if all keys exist in both df and map
        if not all((k + "__lc") in df.columns for k in keys):
            return
        if not all((k + "__lc") in m.columns for k in keys):
            return
        key_series = df[[k + "__lc" for k in keys]].astype(str).agg("|".join, axis=1)
        key_map = m.drop_duplicates(subset=[k + "__lc" for k in keys]).copy()
        key_map["__k__"] = key_map[[k + "__lc" for k in keys]].astype(str).agg("|".join, axis=1)
        name_dict = dict(zip(key_map["__k__"], key_map["Programme Name"]))
        code_dict = dict(zip(key_map["__k__"], key_map["Programme Code"])) if "Programme Code" in key_map.columns else {}

        fill_mask = name_out.isna()
        if fill_mask.any():
            ks = key_series.where(fill_mask)
            name_out.update(ks.map(name_dict))
            if code_dict:
                code_out.update(ks.map(code_dict))

    _map_by_keys(["programme1", "level1", "vertical1"])
    _map_by_keys(["programme1", "level1"])
    _map_by_keys(["programme1", "vertical1"])
    _map_by_keys(["programme1"])

    return name_out, code_out

# -------------------------
# 6) Apply tiers in priority order (fully vectorized)
# -------------------------
TIERS = [first_tier, second_tier, third_tier, fourth_tier, odl_tier]

out_name = pd.Series(pd.NA, index=data_df.index, dtype="object")
out_code = pd.Series(pd.NA, index=data_df.index, dtype="object")

for tier_df in TIERS:
    if "Programme_Code_Rule" in tier_df.columns:
        n, c = apply_rule_tier(tier_df, data_df)
    else:
        n, c = apply_mapping_tier(tier_df, data_df)

    fill = out_name.isna() & n.notna()
    if fill.any():
        out_name.loc[fill] = n.loc[fill]
        out_code.loc[fill] = c.loc[fill]

    # Early exit if all rows resolved
    if out_name.notna().all():
        break

# Fallbacks
out_name = out_name.fillna("Unknown")
out_code = out_code.astype("object")

data_df["programme_name"] = out_name
data_df["programme_code"] = out_code


data_df[["programme_name"]].value_counts()

programme_name                                                                   
Unknown                                                                              332106
General Certificate of Education - Advanced Level (GCE A Level)                       44892
Master of Teaching and Learning (ODL)                                                 33395
Foundation in Computing                                                               13464
Advanced Diploma In Patisserie and Gastronomic Cuisine                                 5226
Bachelor of Accounting and Finance (Honours)                                           2635
Bachelor of Mass Communication (Honours) in Public Relations and Event Management       895
Bachelor of Electrical and Electronic Engineering with Honours                          412
Bachelor of Actuarial Studies (Honours)                                                 119
Bachelor of Mass Communication (Honours) in Advertising and Brand Management             5

In [22]:
data_df = data_df.drop(['programme1__lc', 'level1__lc','vertical1__lc'], axis=1)

In [23]:
query2 = """
SELECT
*
FROM staging.leads_opp_staging
where programme_name != 'Unknown'
"""

In [24]:
known_df = pd.read_sql(query2, engine)

In [25]:
combined_df = pd.concat([unknown_df, known_df], axis=0, ignore_index=True) 

In [26]:
combined_df.shape

(5396042, 30)

In [27]:
engine.dispose()
import os
from urllib.parse import quote
from sqlalchemy import create_engine
from dotenv import load_dotenv
 
def marcommdb_connection():
    # Load environment variables
    load_dotenv(override=True) 
 
    # Get credentials from environment variables
    username = os.getenv("PG_USERNAME")
    password = os.getenv("PG_PASSWORD")
    host = os.getenv("PG_HOST")
    port = os.getenv("PG_PORT")
    database = os.getenv("PG_DATABASE_EXPORT")
 
    # Ensure all credentials are available
    if not all([username, password, host, port, database]):
        raise ValueError("Missing one or more PostgreSQL environment variables!")
 
    # Encode password to handle special characters
    encoded_password = quote(password, safe="") if password else ""
 
    # Construct PostgreSQL connection string
    DATABASE_URL = f"postgresql+psycopg2://{username}:{encoded_password}@{host}:{port}/{database}"
 
    # Create and return SQLAlchemy engine
    return create_engine(DATABASE_URL)

In [28]:
engine= marcommdb_connection()

In [29]:
from sqlalchemy.types import Integer

combined_df.to_sql(
    'leads_opp_staging',
    engine,
    schema='staging',
    if_exists='replace',
    index=False
)

42