In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import make_pipeline


num_pipe=make_pipeline(SimpleImputer(strategy="mean"),StandardScaler())

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

# Initialize Spark session
spark = SparkSession.builder \
    .appName("SCD Type 1 Implementation for Employees") \
    .config("spark.jars", "C:/jars/postgresql-42.7.4.jar") \
    .getOrCreate()

# Database connection details
db_url = "jdbc:postgresql://localhost:5432/NewDB"  # Replace with your host, port, and database
db_properties = {
    "user": "postgres",      # Replace with your PostgreSQL username
    "password": "postgres",  # Replace with your PostgreSQL password
    "driver": "org.postgresql.Driver"
}

# Load source (employee) and target (mydata) tables
source_table = "employee"
target_table = "mydata"

source_df = spark.read.jdbc(url=db_url, table=source_table, properties=db_properties)
target_df = spark.read.jdbc(url=db_url, table=target_table, properties=db_properties)

# Define the primary key and columns to update
key_column = "EMPLOYEE_ID"  # Replace with the primary key column name
update_columns = ["FIRST_NAME", "LAST_NAME", "EMAIL", "PHONE_NUMBER", "HIRE_DATE", "JOB_ID", "SALARY", "COMMISSION_PCT", "MANAGER_ID", "DEPARTMENT_ID"]

# Perform SCD Type 1 Logic
# Step 1: Join source and target tables
joined_df = source_df.alias("source").join(
    target_df.alias("target"),
    col(f"source.{key_column}") == col(f"target.{key_column}"),
    "outer"
)

# Step 2: Identify rows to update
to_update = joined_df.filter(
    col(f"target.{key_column}").isNotNull() & (
        (col("source.FIRST_NAME") != col("target.FIRST_NAME")) |
        (col("source.LAST_NAME") != col("target.LAST_NAME")) |
        (col("source.EMAIL") != col("target.EMAIL")) |
        (col("source.PHONE_NUMBER") != col("target.PHONE_NUMBER")) |
        (col("source.HIRE_DATE") != col("target.HIRE_DATE")) |
        (col("source.JOB_ID") != col("target.JOB_ID")) |
        (col("source.SALARY") != col("target.SALARY")) |
        (col("source.COMMISSION_PCT") != col("target.COMMISSION_PCT")) |
        (col("source.MANAGER_ID") != col("target.MANAGER_ID")) |
        (col("source.DEPARTMENT_ID") != col("target.DEPARTMENT_ID"))
    )
).select("source.*")

# Step 3: Identify rows to insert (new records in source)
to_insert = joined_df.filter(col(f"target.{key_column}").isNull()).select("source.*")

# Step 4: Write updates and inserts back to PostgreSQL
# Updates
if to_update.count() > 0:
    to_update.write \
        .jdbc(url=db_url, table=target_table, mode="overwrite", properties=db_properties)

# Inserts
if to_insert.count() > 0:
    to_insert.write \
        .jdbc(url=db_url, table=target_table, mode="append", properties=db_properties)

# Stop Spark session
spark.stop()
