# 02_silver_notebook_arias

**Purpose**: Migrate Arias Data from Alteryx ETL Process to Fabric PySpark.

**Source**: `APAC_CRM_Analytics_LH.src_arias_crb` (Bronze, Japanese columns)

**Output**: `APAC_Reporting_LH.clean_arias` (Silver)

**Reference Tables** (all in `APAC_CRM_Analytics_LH`):
- `ref_Chloe_asia_currency_mapping` — Currency exchange rates (JPY → USD)
- `ref_Chloe_arias_product_mapping` — Product / LOB mapping
- `ref_Chloe_insurer_mapping` — Insurer mapping (shared)

**Alteryx Tool Mapping**:
| Cell | Alteryx Tools |
| :--- | :--- |
| Cell 2 | Input (src_arias_crb), Rename Japanese columns |
| Cell 3 | Formula (Tool 147) — date conversion, hardcoded fields, derived fields |
| Cell 4 | N/A — single stream, no union |
| Cell 5 | Currency Join, Product Join, Insurer Join, Final Select |
| Cell 6 | Output to Silver |

In [None]:
# =============================================================================
# Cell 1: Setup & Configuration
# =============================================================================
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType, DateType, IntegerType, LongType, FloatType, DecimalType
from pyspark.sql.utils import AnalysisException

# Lakehouse
BRONZE_LH = "APAC_CRM_Analytics_LH"
SILVER_LH = "APAC_Reporting_LH"

# Tables
SOURCE_TABLE = f"{BRONZE_LH}.src_arias_crb"
TARGET_TABLE = f"{SILVER_LH}.clean_arias"

# Reference tables
REF_CURRENCY = f"{BRONZE_LH}.ref_Chloe_asia_currency_mapping"
REF_PRODUCT = f"{BRONZE_LH}.ref_Chloe_arias_product_mapping"
REF_INSURER = f"{BRONZE_LH}.ref_Chloe_insurer_mapping"

In [None]:
# =============================================================================
# Cell 2: Load Bronze Data & Rename Columns
# =============================================================================
try:
    df_arias = spark.sql(f"SELECT * FROM {SOURCE_TABLE}")

    print("=== SOURCE SCHEMA ===")
    df_arias.printSchema()
    print("\n=== SOURCE COLUMNS ===")
    print(df_arias.columns)
    print("\n=== SAMPLE DATA (first 3 rows) ===")
    display(df_arias.limit(3))

    # Rename Japanese columns to English
    df_arias_renamed = df_arias \
        .withColumnRenamed("請求書番号", "Invoice No.") \
        .withColumnRenamed("保険始期", "From") \
        .withColumnRenamed("保険終期", "To") \
        .withColumnRenamed("契約者名", "Name of Client") \
        .withColumnRenamed("保険会社名", "Insurer") \
        .withColumnRenamed("保険種類", "Class of Insurance") \
        .withColumnRenamed("保険料", "Premium") \
        .withColumnRenamed("手数料（税抜_", "Full Commission") \
        .withColumnRenamed("収益認識日", "Premium Receipt/Paid Date") \
        .withColumnRenamed("Policy No", "Policy No.") \
        .withColumnRenamed("チーム名", "Team") \
        .withColumnRenamed("ＡＥ名", "A/E") \
        .withColumnRenamed("Recurring", "Recurring/Non-Recurring") \
        .withColumnRenamed("６分類", "6分類")

    print("\n=== RENAMED SCHEMA ===")
    df_arias_renamed.printSchema()
    print(f"\n=== ROW COUNT: {df_arias_renamed.count()} ===")

except AnalysisException:
    raise Exception(f"ERROR: Source table {SOURCE_TABLE} not found.")

In [None]:
# =============================================================================
# Cell 3: Transformation Logic (Alteryx Tool 147 - Arias Data Formula)
#   - Force correct data types BEFORE transformation
#   - Date conversion with multi-format coalesce
#   - Hardcoded fields, derived fields, column renames
# =============================================================================

# --- Step 1: Force correct data types BEFORE formulas ---
# Premium and Full Commission may come in as strings from Excel source
df_typed = df_arias_renamed \
    .withColumn("Premium", F.col("Premium").cast(DoubleType())) \
    .withColumn("Full Commission", F.col("`Full Commission`").cast(DoubleType()))

# --- Step 2: Date Conversion ---
# Source dates may be: yyyyMMdd numeric, yyyy-MM-dd string, or already DateType
# Try multiple formats with coalesce for robustness
df_transformed = (df_typed
    .withColumn("InvoiceDate", F.coalesce(
        F.to_date(F.col("Premium Receipt/Paid Date").cast(StringType()), "yyyyMMdd"),
        F.to_date(F.col("Premium Receipt/Paid Date").cast(StringType()), "yyyy-MM-dd"),
        F.col("Premium Receipt/Paid Date").cast(DateType())
    ))
    .withColumn("InceptionDate", F.coalesce(
        F.to_date(F.col("From").cast(StringType()), "yyyyMMdd"),
        F.to_date(F.col("From").cast(StringType()), "yyyy-MM-dd"),
        F.col("From").cast(DateType())
    ))
    .withColumn("ExpiryDate", F.coalesce(
        F.to_date(F.col("To").cast(StringType()), "yyyyMMdd"),
        F.to_date(F.col("To").cast(StringType()), "yyyy-MM-dd"),
        F.col("To").cast(DateType())
    ))
    .withColumn("FinalDate", F.col("InceptionDate"))
)

print("=== DATE PARSING CHECK ===")
df_transformed.select(
    F.col("Premium Receipt/Paid Date"), F.col("InvoiceDate"),
    F.col("From"), F.col("InceptionDate"),
    F.col("To"), F.col("ExpiryDate"),
    F.col("Premium"), F.col("Full Commission")
).show(5, truncate=False)

# --- Step 3: Hardcoded Fields (Alteryx Formula Tool 147) ---
df_transformed = (df_transformed
    .withColumn("DataSource", F.lit("Arias"))
    .withColumn("RevenueCountry", F.lit("Japan"))
    .withColumn("Segment", F.lit("null"))
    .withColumn("InsurerCountry", F.lit("JAPAN"))
    .withColumn("DunsNumber", F.lit("UNKNOWN-ARIAS"))
    .withColumn("BusinessType", F.lit("UNKNOWN"))
    .withColumn("PartyIdWtw", F.lit("UNKNOWN-ARIAS"))
    .withColumn("Ccy", F.lit("JPY"))
    .withColumn("ReinsuranceDescription", F.lit("null"))
    .withColumn("PolicyDescription", F.lit("null"))
    .withColumn("Department", F.lit("null"))
    .withColumn("TransactionType",
        F.when(F.col("`Recurring/Non-Recurring`") == "R", F.lit("RENEWAL"))
        .when(F.col("`Recurring/Non-Recurring`") == "N", F.lit("NEW"))
        .otherwise(F.col("`Recurring/Non-Recurring`"))
    )
)

# --- Step 4: Derived Fields ---
# CLIENT ID = [Name of Client], SYSTEM ID = [Name of Client]
# INCEPTION YEAR = DateTimeYear([INCEPTION DATE])
# CCYYEAR = "JPY" + "-" + [INCEPTION YEAR]
df_transformed = (df_transformed
    .withColumn("ClientIdWtw", F.col("Name of Client"))
    .withColumn("SystemId", F.col("Name of Client"))
    .withColumn("InceptionYear", F.year(F.col("InceptionDate")).cast(StringType()))
    .withColumn("Ccyyear", F.concat(F.lit("JPY-"), F.col("InceptionYear")))
)

# --- Step 5: Rename remaining columns ---
df_transformed = (df_transformed
    .withColumnRenamed("Name of Client", "ClientName")
    .withColumnRenamed("Insurer", "InsurerName")
    .withColumnRenamed("Class of Insurance", "SystemProductId")
    .withColumnRenamed("A/E", "AccountHandler")
    .withColumnRenamed("Policy No.", "InvoicePolicyNumber")
)

print("=== SCHEMA AFTER FORMULAS ===")
df_transformed.printSchema()
print("\n=== SAMPLE (first 3 rows) ===")
display(df_transformed.limit(3))

In [None]:
# =============================================================================
# Cell 4: Union & Unification — SKIPPED (single stream)
# =============================================================================
# Arias has only one data stream.
# No union or column renaming needed at this stage.
print("Cell 4: Skipped — single stream, no union required.")

In [None]:
# =============================================================================
# Cell 5: Reference Joins + Currency Conversion + Final Select
# =============================================================================

# --- Load reference tables ---
try:
    df_currency = spark.sql(f"SELECT * FROM {REF_CURRENCY}")
    df_product = spark.sql(f"SELECT * FROM {REF_PRODUCT}")
    df_insurer = spark.sql(f"SELECT * FROM {REF_INSURER}")
except AnalysisException as e:
    print(f"WARNING: Reference table not found — {e}")
    raise

# --- Join 1: Currency Mapping ---
# Key: Ccyyear (= "JPY-" + YEAR(InceptionDate))
# Brings in: Value (exchange rate for USD conversion)
df_currency_ref = df_currency.select(
    F.upper(F.trim(F.col("CCYYEAR"))).alias("_currency_join_key"),
    F.col("Value").alias("Ccyvalue")
)

df = df_transformed.join(
    df_currency_ref,
    F.trim(F.upper(df_transformed["Ccyyear"])) == df_currency_ref["_currency_join_key"],
    "left"
).drop("_currency_join_key")

# --- Join 2: Product Mapping ---
# Key: SystemProductId (= Class of Insurance, already renamed)
# Brings in: Lvl 2 Mapping, GLOBs, GLOBS SPLIT P&C
df_product_ref = df_product.select(
    F.upper(F.trim(F.col("`Class of Insurance`"))).alias("_product_join_key"),
    F.col("`Lvl 2 Mapping`"),
    F.col("GLOBs"),
    F.col("`GLOBS SPLIT P&C`")
)

df = df.join(
    df_product_ref,
    F.trim(F.upper(df["SystemProductId"])) == df_product_ref["_product_join_key"],
    "left"
).drop("_product_join_key")

# --- Join 3: Insurer Mapping ---
# Key: InsurerName (= Insurer, already renamed)
# Brings in: MAPPED_INSURER, Lloyd's Asia or Lloyd's London
df_insurer_ref = df_insurer.select(
    F.upper(F.trim(F.col("Insurer"))).alias("_insurer_join_key"),
    F.col("MAPPED_INSURER"),
    F.col("`Lloyd's Asia or Lloyd's London`")
)

df = df.join(
    df_insurer_ref,
    F.trim(F.upper(df["InsurerName"])) == df_insurer_ref["_insurer_join_key"],
    "left"
).drop("_insurer_join_key")

# --- Currency Formula ---
# Convert JPY to USD using the exchange rate Ccyvalue
df = (df
    .withColumn("PremiumUsd", (F.col("Premium") * F.coalesce(F.col("Ccyvalue"), F.lit(0))).cast(DoubleType()))
    .withColumn("BrokerageUsd", (F.col("`Full Commission`") * F.coalesce(F.col("Ccyvalue"), F.lit(0))).cast(DoubleType()))
)

# --- Final Select: PascalCase aliases ---
df_final = df.select(
    F.col("BrokerageUsd").cast(DoubleType()).alias("BrokerageUsd"),
    F.col("BusinessType").cast(StringType()).alias("BusinessType"),
    F.col("ClientIdWtw").cast(StringType()).alias("ClientIdWtw"),
    F.col("ClientName").cast(StringType()).alias("ClientName"),
    F.col("DataSource").cast(StringType()).alias("DataSource"),
    F.col("Department").cast(StringType()).alias("Department"),
    F.col("DunsNumber").cast(StringType()).alias("DunsNumber"),
    F.col("InvoiceDate").cast(DateType()).alias("InvoiceDate"),
    F.col("ExpiryDate").cast(DateType()).alias("ExpiryDate"),
    F.col("FinalDate").cast(DateType()).alias("FinalDate"),
    F.col("GLOBs").cast(StringType()).alias("Globs"),
    F.col("`GLOBS SPLIT P&C`").cast(StringType()).alias("GlobsSplitPc"),
    F.col("InsurerCountry").cast(StringType()).alias("InsurerCountry"),
    F.col("InsurerName").cast(StringType()).alias("InsurerName"),
    F.col("`Lloyd's Asia or Lloyd's London`").cast(StringType()).alias("Lloyds"),
    F.col("`Lvl 2 Mapping`").cast(StringType()).alias("SubProductClass"),
    F.col("MAPPED_INSURER").cast(StringType()).alias("InsurerMapping"),
    F.col("PartyIdWtw").cast(StringType()).alias("PartyIdWtw"),
    F.col("PolicyDescription").cast(StringType()).alias("PolicyDescription"),
    F.col("SystemProductId").cast(StringType()).alias("SystemProductId"),
    F.col("InvoicePolicyNumber").cast(StringType()).alias("InvoicePolicyNumber"),
    F.col("PremiumUsd").cast(DoubleType()).alias("PremiumUsd"),
    F.col("ReinsuranceDescription").cast(StringType()).alias("ReinsuranceDescription"),
    F.col("RevenueCountry").cast(StringType()).alias("RevenueCountry"),
    F.col("AccountHandler").cast(StringType()).alias("AccountHandler"),
    F.col("InceptionDate").cast(DateType()).alias("InceptionDate"),
    F.col("SystemId").cast(StringType()).alias("SystemId"),
    F.col("TransactionType").cast(StringType()).alias("TransactionType")
).drop("Segment")

print("=== FINAL SCHEMA ===")
df_final.printSchema()
print(f"\n=== FINAL ROW COUNT: {df_final.count()} ===")
print("\n=== FINAL SAMPLE (first 5 rows) ===")
display(df_final.limit(5))

In [None]:
# =============================================================================
# Cell 6: Write to Silver
# =============================================================================
print(f"Writing to {TARGET_TABLE}...")
df_final.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(TARGET_TABLE)

print(f"Success. Rows written: {spark.table(TARGET_TABLE).count()}")
print(f"Columns: {len(spark.table(TARGET_TABLE).columns)}")
display(spark.table(TARGET_TABLE).limit(5))