In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, row_number, coalesce
from pyspark.sql.window import Window

In [None]:
# initialize the sparksession with appname as simple_etl and proving configuratiosn of postgresql

spark = SparkSession.builder \
    .appName("simple_etl") \
    .config("spark.jars", "/postgresql-42.7.54.jar") \
    .getOrCreate()

In [None]:
# Extracting alreadfy existing table created in sql server.

transactions = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/postgres") \
        .option("dbtable", "transactions").option("user", "postgres") \
        .option("password", "ratan").load()

In [None]:
# A new table which we want to create and save in the transformed data

new_table_name = "updated_transactions"

In [None]:
# Transformation: handling duplicate data

window_spec = Window.partitionBy("user_id", "amount", "transaction_type", "transaction_date", "status", "method").orderBy(col("transaction_id"))

# Add a row_number column to distinguish duplicates
transactions_with_duplicates = transactions.withColumn("row_num", row_number().over(window_spec))

# Keep only the first occurrence (row_num = 1) and drop duplicates
transactions_deduplicated = transactions_with_duplicates.filter(col("row_num") == 1).drop("row_num")

In [None]:
# creating and loading data into a new table in postgre. You can see the table after running this in pg admin

transactions_deduplicated.write.mode("overwrite").format("jdbc").option("url", "jdbc:postgresql://localhost:5432/postgres") \
    .option("dbtable", new_table_name).option("mode", "append").option("user", "postgres") \
    .option("password", "ratan").save()