In [25]:
from pyspark.sql import SparkSession

# Replace with your Oracle connection details
jdbc_url = "jdbc:oracle:thin:@//localhost:1521/xe"
username = "HR"
password = "HR"

try:
    # Create a Spark session with JDBC options
    spark = SparkSession.builder.appName("JDBCConnectivityTest").getOrCreate()
    EMP_SRC_df = spark.read.jdbc(url=jdbc_url, table="EMP_SRC",  properties={"user": username, "password": password})
    EMP_PYSPARK_SCD1_df = spark.read.jdbc(url=jdbc_url, table="EMP_PYSPARK_SCD1",  properties={"user": username, "password": password})
    # If the connection was successful, you can print the result
    EMP_SRC_df.show()
    EMP_PYSPARK_SCD1_df.show()
   # spark.stop()
    print("JDBC connection test successful.")
except Exception as e:
    print(f"JDBC connection test failed: {str(e)}")

+-----+------+---------+----+-------------------+-------+-------+------+
|EMPNO| ENAME|      JOB| MGR|           HIREDATE|    SAL|   COMM|DEPTNO|
+-----+------+---------+----+-------------------+-------+-------+------+
| 7839|  KING|PRESIDENT|NULL|1981-11-17 00:00:00|5000.00|   NULL|    10|
| 7698| BLAKE|  MANAGER|7839|1981-05-01 00:00:00|2850.00|   NULL|    30|
| 7782| CLARK|  MANAGER|7839|1981-06-09 00:00:00|2450.00|   NULL|    10|
| 7566| JONES|  MANAGER|7839|1981-04-02 00:00:00|2975.00|   NULL|    20|
| 7788| SCOTT|  ANALYST|7566|1987-04-19 00:00:00|3000.00|   NULL|    20|
| 7902|  FORD|  ANALYST|7566|1981-12-03 00:00:00|3000.00|   NULL|    20|
| 7369| SMITH|    CLERK|7902|1980-12-17 00:00:00| 800.00|   NULL|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20 00:00:00|1600.00| 300.00|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-22 00:00:00|1250.00| 500.00|    30|
| 7654|MARTIN| SALESMAN|7698|1981-09-28 00:00:00|1250.00|1400.00|    30|
| 7844|TURNER| SALESMAN|7698|1981-09-08 00:00:00|15

In [26]:
from pyspark.sql.functions import col

# Step 1: Identify records to update
updates_df = EMP_SRC_df.alias("src") \
    .join(EMP_PYSPARK_SCD1_df.alias("tgt"), on="EMPNO") \
    .filter(
        (col("src.JOB") != col("tgt.JOB")) |
        (col("src.SAL") != col("tgt.SAL")) |
        (col("src.COMM") != col("tgt.COMM")) |
        (col("src.MGR") != col("tgt.MGR")) |
        (col("src.DEPTNO") != col("tgt.DEPTNO"))
    ) \
    .select("src.*")
updates_df.show()

+-----+------+-------+----+-------------------+-------+----+------+
|EMPNO| ENAME|    JOB| MGR|           HIREDATE|    SAL|COMM|DEPTNO|
+-----+------+-------+----+-------------------+-------+----+------+
| 7900| JAMES|  CLERK|7698|1981-12-03 00:00:00| 950.00|NULL|    10|
| 7934|MILLER|  CLERK|7782|1982-01-23 00:00:00|1500.00|NULL|    10|
| 7945|   ROB|ANALYST|7698|1983-01-20 00:00:00|3500.00|NULL|    10|
+-----+------+-------+----+-------------------+-------+----+------+



In [27]:
# Step 2: Identify records to insert
# Left anti join to find records in EMP_SRC that are not in EMP_PYSPARK_SCD1
inserts_df = EMP_SRC_df.alias("src") \
    .join(EMP_PYSPARK_SCD1_df.alias("tgt"), on="EMPNO", how="left_anti") \
    .select("src.*")
inserts_df.show()

+-----+-----+---+---+--------+---+----+------+
|EMPNO|ENAME|JOB|MGR|HIREDATE|SAL|COMM|DEPTNO|
+-----+-----+---+---+--------+---+----+------+
+-----+-----+---+---+--------+---+----+------+



In [28]:
# Step 3 : Identify records unchanged
# Left join to find records in EMP_PYSPARK_SCD1 that are not in EMP_SRC
unchanged_df = EMP_PYSPARK_SCD1_df.alias("tgt") \
    .join(EMP_SRC_df.alias("src"), on="EMPNO", how="left_anti") \
    .select("tgt.*")
unchanged_df.show()

+-----+-----+---+---+--------+---+----+------+
|EMPNO|ENAME|JOB|MGR|HIREDATE|SAL|COMM|DEPTNO|
+-----+-----+---+---+--------+---+----+------+
+-----+-----+---+---+--------+---+----+------+



In [None]:
# Step 4: Final SCD1 union of all data
final_df = unchanged_df.unionByName(updates_df).unionByName(inserts_df)
# Show the final DataFrame
final_df.show()

+-----+------+-------+----+-------------------+-------+----+------+
|EMPNO| ENAME|    JOB| MGR|           HIREDATE|    SAL|COMM|DEPTNO|
+-----+------+-------+----+-------------------+-------+----+------+
| 7900| JAMES|  CLERK|7698|1981-12-03 00:00:00| 950.00|NULL|    10|
| 7934|MILLER|  CLERK|7782|1982-01-23 00:00:00|1500.00|NULL|    10|
| 7945|   ROB|ANALYST|7698|1983-01-20 00:00:00|3500.00|NULL|    10|
+-----+------+-------+----+-------------------+-------+----+------+



In [30]:
# Replace with your Oracle connection details
jdbc_url = "jdbc:oracle:thin:@//localhost:1521/xe"
username = "HR"
password = "HR"

final_df.write.jdbc(
url=jdbc_url, 
table="EMP_PYSPARK_SCD1", 
mode="overwrite", 
properties={"user": username, "password": password}
 )
    