# Research Notebook: Test Transformation Strategies Step by Step

This notebook validates the transformation helpers and utilities in this application step by step.

Covered:
- Expressions (EXP), Filters (FIL), Joins (JNR)
- Lookups (LKP), Routers (RTR), Aggregations (AGG)
- Sequence generation (SEQ), Merge (UPD_STR) — demo/optional
- Validation helpers, AST extraction/normalization, logic derivation, code validator


In [None]:
# Setup: imports and Spark session (local)
import json
import sys

spark = None
try:
    from pyspark.sql import SparkSession
    spark = (
        SparkSession.builder.appName("research-strategy-tests")
        .master("local[*]")
        .config("spark.ui.showConsoleProgress", "false")
        .getOrCreate()
    )
    print("Spark version:", spark.version)
except Exception as e:
    print("pyspark not available — Spark tests will be skipped.")
    print(e)

# Import the local transformation framework utilities
import importlib
tfw = None
try:
    tfw = importlib.import_module("transform_framework")
    print("Loaded transform_framework.py")
except Exception as e:
    print("Could not load transform_framework.py", e)

# Import AST/validator agents utilities
agt = None
try:
    agt = importlib.import_module("infa_to_pyspark_agents")
    print("Loaded infa_to_pyspark_agents.py")
except Exception as e:
    print("Could not load infa_to_pyspark_agents.py", e)


## Sample DataFrames
We create small DataFrames to exercise each helper.

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

if spark:
    tfw.setup_spark(spark, shuffle_partitions=8)

    src_schema = T.StructType([
        T.StructField("emp_id", T.IntegerType(), False),
        T.StructField("emp_name", T.StringType(), True),
        T.StructField("dept_id", T.IntegerType(), True),
        T.StructField("gross_sal", T.DoubleType(), True),
        T.StructField("tax_rate", T.DoubleType(), True),
        T.StructField("active_flg", T.StringType(), True),
        T.StructField("hire_date", T.StringType(), True),
    ])
    src_rows = [
        (1, "Alice 	", 10, 100000.0, 30.0, "Y", "2020-01-01"),
        (2, "Bob", 10, 80000.0, 25.0, "N", "2021-06-15"),
        (3, "Charlie", 20, 120000.0, 28.0, "Y", "2019-11-30"),
    ]
    df_src = spark.createDataFrame(src_rows, src_schema)

    lkp_schema = T.StructType([
        T.StructField("dept_id", T.IntegerType(), False),
        T.StructField("dept_name", T.StringType(), True),
        T.StructField("min_emp_id", T.IntegerType(), True),
        T.StructField("max_emp_id", T.IntegerType(), True),
    ])
    lkp_rows = [
        (10, "Engineering", 1, 100),
        (20, "Finance", 100, 200),
    ]
    df_lkp = spark.createDataFrame(lkp_rows, lkp_schema)
    df_src.show()
    df_lkp.show()
else:
    print("Skipping — Spark not available.")


## EXP: apply_expression
- Trim names, compute TAX_AMT and NET_SAL, add LOAD_TS

In [None]:
if spark:
    exprs = [
        {"name": "emp_name", "expr": "trim(emp_name)"},
        {"name": "tax_amt", "expr": "gross_sal * tax_rate / 100"},
        {"name": "net_sal", "expr": "gross_sal - (gross_sal * tax_rate / 100)"},
        {"name": "load_ts", "expr": "current_timestamp()"},
    ]
    df_exp = tfw.apply_expression(df_src, exprs)
    df_exp.show()
else:
    print("Skipping — Spark not available.")


## FIL: apply_filter
- Keep only active employees

In [None]:
if spark:
    df_fil = tfw.apply_filter(df_exp, "active_flg = 'Y'")
    df_fil.show()
else:
    print("Skipping — Spark not available.")


## JNR: apply_join
- Join with department lookup by `dept_id`

In [None]:
if spark:
    df_jnr = tfw.apply_join(df_fil, df_lkp.select("dept_id", "dept_name"), on=["dept_id"], how="left")
    df_jnr.show()
else:
    print("Skipping — Spark not available.")


## LKP: apply_lookup_range
- Range lookup by `emp_id` between `[min_emp_id, max_emp_id)`

In [None]:
if spark:
    df_lkp_range = tfw.apply_lookup_range(df_jnr, df_lkp, key="emp_id", min_col="min_emp_id", max_col="max_emp_id", value_cols=["dept_name"])
    df_lkp_range.show()
else:
    print("Skipping — Spark not available.")


## RTR: apply_router
- Split into salary buckets (LOW, MID, HIGH)

In [None]:
if spark:
    rules = [("LOW", "net_sal < 90000"), ("MID", "net_sal >= 90000 AND net_sal < 110000"), ("HIGH", "net_sal >= 110000")]
    routed = tfw.apply_router(df_lkp_range, rules)
    {k: v.count() for k, v in routed.items()}
else:
    print("Skipping — Spark not available.")


## AGG: apply_aggregations
- Total net salary by department

In [None]:
if spark:
    df_agg = tfw.apply_aggregations(df_lkp_range, group_cols=["dept_id"], agg_exprs={"total_net": "sum(net_sal)"})
    df_agg.show()
else:
    print("Skipping — Spark not available.")


## SEQ: add_sequence
- Add a surrogate key by row_number() over all rows

In [None]:
if spark:
    df_seq = tfw.add_sequence(df_lkp_range, col_name="emp_sk", method="row_number")
    df_seq.select("emp_sk", "emp_id", "emp_name").show()
else:
    print("Skipping — Spark not available.")


## Validation Helpers
- Enforce schema, not-null, unique, surrogate key checks

In [None]:
if spark:
    expected = [("emp_id", "int"), ("emp_name", "string"), ("dept_id", "int"), ("gross_sal", "double"), ("tax_rate", "double"), ("active_flg", "string"), ("hire_date", "string")]
    df_enf = tfw.enforce_schema_df(df_src.select("emp_id", "emp_name", "dept_id", "gross_sal", "tax_rate", "active_flg", "hire_date"), expected)
    df_enf.printSchema()

    issues = []
    issues += tfw.validate_not_null_df(df_enf, ["emp_id"])
    issues += tfw.validate_unique_df(df_enf, ["emp_id"])
    issues += tfw.validate_surrogate_key_df(tfw.add_sequence(df_enf, "emp_sk"), "emp_sk")
    print("Validation issues:")
    for i in issues: print("-", i)
    if not issues: print("No issues found.")
else:
    print("Skipping — Spark not available.")


## MERGE Demo (optional)
This requires Delta Lake support. On a vanilla local Spark without Delta, skip execution.
In Databricks, set `target_table` to a managed Delta table name and run.

In [None]:
if spark:
    try:
        # Create small source df for merge and a target table if Delta is available
        df_merge_src = df_seq.select("emp_id", "emp_name").limit(2)
        target_table = "default.emp_merge_demo"
        spark.sql("CREATE DATABASE IF NOT EXISTS default")
        # Try creating a Delta table (may fail if Delta not installed)
        spark.sql(f"CREATE TABLE IF NOT EXISTS {target_table} (emp_id INT, emp_name STRING) USING delta")
        ok, msg = tfw.safe_delta_merge(
            spark,
            df_merge_src,
            target_table=target_table,
            on="t.emp_id = s.emp_id",
            update_set={"emp_name": "s.emp_name"},
            insert_set={"emp_id": "s.emp_id", "emp_name": "s.emp_name"},
            when_not_matched_insert=True,
            overwrite_fallback=False,
        )
        print("MERGE result:", ok, msg)
    except Exception as e:
        print("Skipping MERGE demo — Delta likely unavailable.")
        print(e)
else:
    print("Skipping — Spark not available.")


## AST Extraction and Normalization
- Parse a tiny Informatica mapping XML and inspect the AST

In [None]:
sample_xml = """<MAPPING NAME="m_demo">
  <SOURCE NAME="SRC_EMP">
    <SOURCEFIELD NAME="EMP_ID" DATATYPE="integer"/>
  </SOURCE>
  <TARGET NAME="TGT_EMP">
    <TARGETFIELD NAME="EMP_ID" DATATYPE="integer"/>
  </TARGET>
  <TRANSFORMATION NAME="FIL_ACTIVE" TYPE="Filter">
    <TABLEATTRIBUTE NAME="Filter Condition" VALUE="ACTIVE_FLG='Y'"/>
  </TRANSFORMATION>
</MAPPING>"""

if agt:
    raw_ast = agt.extractor(sample_xml)
    norm_ast = agt.normalizer(raw_ast)
    print("AST:")
    print(raw_ast)
    print(
        "
Normalized AST:\n", norm_ast
    )
else:
    print("Agents module not available.")


## Logic Derivation and Code Validator
- Derive logic from AST metadata and run the rule-based validator on a sample PySpark snippet

In [None]:
if agt:
    logic = agt.derive_logic(norm_ast)
    print("Derived Logic:")
    print(logic)

    sample_pyspark = """
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([(1, "A")], ['emp_id','emp_name'])
df.write.mode('overwrite').format('delta').saveAsTable('analytics.emp_payroll')
"""
    review = agt.validator(sample_pyspark, norm_ast, sql_code=None, intended_logic="keep active only", extra_target_names=["emp_payroll"])
    print("Validator output:")
    print(review)
else:
    print("Agents module not available.")


## Notes
- For full MERGE and Delta operations, run this notebook on Databricks with Delta Lake.
- The Streamlit app (`streamlit_app.py`) can generate a Databricks-style notebook that embeds these helpers.