In [0]:
# =========================
# UNION Bronze -> Silver (handle dots in column names)
# =========================
import re
from pyspark.sql import functions as F
from functools import reduce

# ===== CONFIG =====
CATALOG    = "lapse_scoring_dev"
SRC_SCHEMA = "01_bronze"
DST_SCHEMA = "02_silver"

TABLES = [
    "fwd_max_daily_fwd_max_member",
]

# Kolom asli di Bronze (raw) — mengandung titik
WANTED_RAW = [
    "no.","member_id","client_code","fwd_max_card_no.","name","reg_date",
    "birth_date","email","mobile_no.","member_type","policy_no.","card_type",
    "gender","passion","member_status","address","city"
]

TARGET_TABLE = "fwd_max_member_silver"
WRITE_MODE   = "overwrite"

# ===== Helpers =====
def safe_name(s: str) -> str:
    """Bikin nama kolom aman: lowercase + ganti selain [a-z0-9_] jadi underscore + trim underscore."""
    n = re.sub(r"[^a-zA-Z0-9_]", "_", s).lower()
    n = re.sub(r"_+", "_", n).strip("_")
    if not n or n[0].isdigit():
        n = f"c_{n}" if n else "col"
    return n

# Map raw->safe (contoh: "no." -> "no", "policy_no." -> "policy_no")
RAW2SAFE = {raw: safe_name(raw) for raw in WANTED_RAW}

def select_wanted_with_alias(df, raw2safe):
    """Select kolom raw (pakai backtick) dan alias ke nama safe; yang tidak ada -> NULL."""
    cols_df = set(df.columns)
    sel = []
    for raw, safe in raw2safe.items():
        if raw in cols_df:
            sel.append(F.col(f"`{raw}`").cast("string").alias(safe))  # backtick untuk nama bertitik
        else:
            sel.append(F.lit(None).cast("string").alias(safe))
    return df.select(*sel)

def union_all(dfs):
    return reduce(lambda a,b: a.unionByName(b, allowMissingColumns=True), dfs)

# ===== Ensure destination schema exists =====
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.`{DST_SCHEMA}`")

# ===== Load, align, and union =====
dfs = []
for t in TABLES:
    src_fqn = f"{CATALOG}.`{SRC_SCHEMA}`.{t}"
    df0 = spark.table(src_fqn)

    dfi = select_wanted_with_alias(df0, RAW2SAFE) \
            .withColumn("_source_table", F.lit(t)) \
            .withColumn("_ingest_ts_union", F.current_timestamp())

    dfs.append(dfi)

df_union = dfs[0] if len(dfs) == 1 else union_all(dfs)

# ===== Write to Silver =====
TARGET_FQN = f"{CATALOG}.`{DST_SCHEMA}`.{TARGET_TABLE}"
(
    df_union.write
      .mode(WRITE_MODE)
      .option("mergeSchema","true")
      .option("overwriteSchema","true")
      .format("delta")
      .saveAsTable(TARGET_FQN)
)

print(f"Write OK -> {TARGET_FQN}")
print("Rows:", spark.table(TARGET_FQN).count())
display(spark.sql(f"SELECT * FROM {TARGET_FQN} LIMIT 50"))


In [0]:
# =========================
# Rename columns to business names (make a new Silver table)
# =========================
from pyspark.sql import functions as F

CATALOG     = "lapse_scoring_dev"
SRC_SCHEMA  = "02_silver"
SRC_TABLE   = "fwd_max_member_silver"          # tabel hasil union (semua STRING)
DST_SCHEMA  = "02_silver"
DST_TABLE   = "fwd_max_member_bn_silver"       # bn = business names

# Mapping: old -> new (edit jika perlu)
BUSINESS_MAP = {
    "no":   "Row_No",
    "member_id": "Member_ID",
    "client_code": "Client_Number",
    "fwd_max_card_no": "FWD_Max_Card_No",
    "name": "Member_Name",
    "reg_date": "Registration_Date",
    "birth_date": "Birth_Date",
    "email": "Email_Address",
    "mobile_no": "Mobile_Number",
    "member_type": "Member_Type",
    "policy_no": "Contract_Number",
    "card_type": "Card_Type",
    "gender": "Gender",
    "passion": "Passion_Category",
    "member_status": "Member_Status",
    "address": "Address",
    "city": "City",
    # lineage
    "_source_table":     "source_table",
    "_ingest_ts_union":  "union_ingest_ts",
}

SRC_FQN = f"{CATALOG}.`{SRC_SCHEMA}`.{SRC_TABLE}"
DST_FQN = f"{CATALOG}.`{DST_SCHEMA}`.{DST_TABLE}"

df = spark.table(SRC_FQN)

# cek kolom yang ada vs mapping
existing = set(df.columns)
mapped_existing = {k:v for k,v in BUSINESS_MAP.items() if k in existing}
missing_in_source = [k for k in BUSINESS_MAP.keys() if k not in existing]
if missing_in_source:
    print("⚠️ Missing in source (ignored):", missing_in_source)

# select dengan alias business
df_bn = df.select([F.col(k).alias(v) for k,v in mapped_existing.items()])

# tulis sebagai tabel baru
(df_bn.write
    .mode("overwrite")
    .option("mergeSchema","true")
    .option("overwriteSchema","true")
    .format("delta")
    .saveAsTable(DST_FQN))

print(f"Write OK -> {DST_FQN}")
display(spark.sql(f"SELECT * FROM {DST_FQN} LIMIT 50"))


In [0]:
# ============================================
# CAST tipe data dari BN -> simpan ke tabel BARU
# ============================================
from pyspark.sql import functions as F

# --- CONFIG ---
CATALOG      = "lapse_scoring_dev"
SRC_SCHEMA   = "02_silver"
SRC_TABLE    = "fwd_max_member_bn_silver"        # tabel BN sumber (string-heavy)
DST_SCHEMA   = "02_silver"
DST_TABLE    = "fwd_max_member_dt_silver"  # >>> tabel BARU hasil casting

SRC_FQN = f"{CATALOG}.`{SRC_SCHEMA}`.{SRC_TABLE}"
DST_FQN = f"{CATALOG}.`{DST_SCHEMA}`.{DST_TABLE}"

# Pastikan schema tujuan ada
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.`{DST_SCHEMA}`")

# --- LOAD ---
df = spark.table(SRC_FQN)

# 1) Trim semua kolom string
for c, t in df.dtypes:
    if t.startswith("string"):
        df = df.withColumn(c, F.trim(F.col(c)))

# 2) Uppercase untuk kode/flag (sesuaikan jika ada tambahan)
uppercase_cols = [
    "Member_Type", "Gender"
]
for c in uppercase_cols:
    if c in df.columns:
        df = df.withColumn(c, F.upper(F.col(c)))

# 3) Parse TANGGAL: format spesifik per kolom + fallback tolerant
if "Registration_Date" in df.columns:
    df = df.withColumn(
        "Registration_Date",
        F.coalesce(
            F.expr("try_to_date(Registration_Date, 'yyyy-MM-dd')"),
            F.expr("try_to_date(regexp_replace(CAST(Registration_Date AS STRING), '[^0-9]', ''), 'yyyyMMdd')")
        )
    )

if "Birth_Date" in df.columns:
    df = df.withColumn(
        "Birth_Date",
        F.coalesce(
            F.expr("try_to_date(Birth_Date, 'dd/MM/yyyy')"),
            F.expr("try_to_date(regexp_replace(CAST(Birth_Date AS STRING), '[^0-9]', ''), 'yyyyMMdd')")
        )
    )

# 4) Numerik (double): bersihkan locale lalu cast ke double
def to_double(col):
    s = F.regexp_replace(col.cast("string"), r"\s", "")
    return (
        F.when(s.rlike(r"^\d{1,3}(\.\d{3})+,\d+$"), F.regexp_replace(F.regexp_replace(s, r"\.", ""), ",", "."))
         .when(s.rlike(r"^\d+,\d+$"),               F.regexp_replace(s, ",", "."))
         .when(s.rlike(r"^\d{1,3}(,\d{3})+\.\d+$"), F.regexp_replace(s, ",", ""))
         .otherwise(s)
    ).cast("double")

double_cols = [
    # Tambahkan kolom numeric lain jika ada, mis.:
    # "Loan_Value", "Total_Fee", "Coverage_Debt"
]
for c in double_cols:
    if c in df.columns:
        df = df.withColumn(c, to_double(F.col(c)))

# 5) Integer: usia (Life_Number dibiarkan string jika itu ID)
int_cols = [
    "Row_No",
    "Member_ID","Member_Status",
]
for c in int_cols:
    if c in df.columns:
        df = df.withColumn(c, F.col(c).cast("int"))

# 6) Pastikan ID/teks utama tetap STRING (defensif)
id_string_cols = [
    "FWD_Max_Card_No","Client_Number", "Member_Name", "Email_Address", "Contract_Number", "Card_Type", "Passion_Category","Mobile_Number",
    "Address", "City"
]
for c in id_string_cols:
    if c in df.columns:
        df = df.withColumn(c, F.col(c).cast("string"))

# 7) Tulis ke TABEL BARU (tidak menyentuh tabel sumber)
(df.write
    .mode("overwrite")              # buat/replace tabel baru
    .option("mergeSchema", "true")
    .option("overwriteSchema", "true")
    .format("delta")
    .saveAsTable(DST_FQN))

print(f"Typed table written -> {DST_FQN}")
print("Rows:", spark.table(DST_FQN).count())
display(spark.sql(f"DESCRIBE {DST_FQN}"))
display(spark.sql(f"SELECT * FROM {DST_FQN} LIMIT 20"))
