In [2]:
import os
# Import required modules
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import col
from pyspark.sql import functions as f
from pyspark.sql import *
from pyspark.sql.types import *
# Delta is a storage layer for data lakes
from delta.tables import * 
# DeltaTable is the main class for Delta tables
from delta.tables import DeltaTable 

# create SparkSession
# Create a SparkSession and set the extraClassPath configuration
spark = SparkSession.builder.master("local[1]") \
    .appName("BridgeMySQL") \
    .config("spark.driver.extraClassPath", "/home/jovyan/work/jars/*") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")



# retrieve database login details from environment variables
jdbcDriver = spark.conf.get("spark.jdbc.driver.class", "org.mariadb.jdbc.Driver")
dbHost = spark.conf.get("spark.jdbc.host","mysql")
dbPort = spark.conf.get("spark.jdbc.port", "3306")
defaultDb = spark.conf.get("spark.jdbc.default.db", "default")
dbTable = spark.conf.get("spark.jdbc.table", "customers")
dbUser = spark.conf.get("spark.jdbc.user", "dataeng")
dbPass = spark.conf.get("spark.jdbc.password", "dataengineering_user")

# create jdbc url
connection_url = f'jdbc:mysql://{dbHost}:{dbPort}/{defaultDb}'

# read data from mysql
customers_sdf = spark.read \
    .format("jdbc") \
    .option("url", connection_url) \
    .option("driver", jdbcDriver) \
    .option("dbtable", dbTable) \
    .option("user", dbUser) \
    .option("password", dbPass) \
    .load()

customers_sdf.printSchema()
customers_sdf.createOrReplaceTempView("customers")

# show data
customers_sdf.show()

root
 |-- id: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- updated: timestamp (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+---+-------------------+-------------------+----------+---------+--------------------+
| id|            created|            updated|first_name|last_name|               email|
+---+-------------------+-------------------+----------+---------+--------------------+
|  1|2023-02-22 02:07:05|2023-02-22 02:07:05|     Scott|   Haines|  scott@coffeeco.com|
|  2|2023-02-22 02:07:05|2023-02-22 02:07:05|      John|     Hamm|  john.hamm@acme.com|
|  3|2023-02-22 02:07:05|2023-02-22 02:07:05|      Milo|   Haines|mhaines@coffeeco.com|
+---+-------------------+-------------------+----------+---------+--------------------+



                                                                                

In [5]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("updated_at", TimestampType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True)
])

# create some new customers
records = [
    ('4', '2023-02-22 02:46:00', '2023-02-21 02:49:00', 'Penny', 'Haines', 'penny@coffeeco.com'),
    ('5', '2023-02-22 02:47:00', '2023-02-21 02:50:00', 'Cloud', 'Fast', 'cloud.fast@acme.com'),
    ('6', '2023-02-22 02:48:00', '2023-02-21 02:51:00', 'Marshal', 'Haines', 'paws@coffeeco.com')
]

# convert string values to timestamp type
records = [(r[0], 
            datetime.strptime(r[1], '%Y-%m-%d %H:%M:%S'), 
            datetime.strptime(r[2], '%Y-%m-%d %H:%M:%S'),
            r[3], r[4], r[5]) for r in records]

new_customers_sdf = spark.createDataFrame(records, schema=schema)

new_customers_sdf.createOrReplaceTempView("new_customers")

# Join existing and new customers by ID and union them
# combine the two DataFrames and remove duplicates
all_customers = customers_sdf.union(new_customers_sdf).dropDuplicates(['id'])

# Execute the SparkSQL query 
# query = """
# SELECT DISTINCT *
# FROM (
#   SELECT *
#   FROM customers
#   UNION ALL
#   SELECT *
#   FROM new_customers
# ) t
# WHERE t.id IS NOT NULL
# ORDER BY id asc
# """

# # Perfom Join Operation
# all_customers = spark.sql(query)

all_customers.show()



+---+-------------------+-------------------+----------+---------+--------------------+
| id|            created|            updated|first_name|last_name|               email|
+---+-------------------+-------------------+----------+---------+--------------------+
|  1|2023-02-22 02:07:05|2023-02-22 02:07:05|     Scott|   Haines|  scott@coffeeco.com|
|  2|2023-02-22 02:07:05|2023-02-22 02:07:05|      John|     Hamm|  john.hamm@acme.com|
|  3|2023-02-22 02:07:05|2023-02-22 02:07:05|      Milo|   Haines|mhaines@coffeeco.com|
|  4|2023-02-22 02:46:00|2023-02-21 02:49:00|     Penny|   Haines|  penny@coffeeco.com|
|  5|2023-02-22 02:47:00|2023-02-21 02:50:00|     Cloud|     Fast| cloud.fast@acme.com|
|  6|2023-02-22 02:48:00|2023-02-21 02:51:00|   Marshal|   Haines|   paws@coffeeco.com|
+---+-------------------+-------------------+----------+---------+--------------------+



                                                                                

In [6]:
# write data back to mysql
all_customers.write \
    .format("jdbc") \
    .option("url", connection_url) \
    .option("driver", "org.mariadb.jdbc.Driver") \
    .option("dbtable", dbTable) \
    .option("user", dbUser) \
    .option("password", dbPass) \
    .mode("append") \
    .save()
    
# show data
all_customers.show()

+---+-------------------+-------------------+----------+---------+-------------------+
| id|            created|            updated|first_name|last_name|              email|
+---+-------------------+-------------------+----------+---------+-------------------+
|  4|2023-02-22 02:46:00|2023-02-21 02:49:00|     Penny|   Haines| penny@coffeeco.com|
|  5|2023-02-22 02:47:00|2023-02-21 02:50:00|     Cloud|     Fast|cloud.fast@acme.com|
|  6|2023-02-22 02:48:00|2023-02-21 02:51:00|   Marshal|   Haines|  paws@coffeeco.com|
+---+-------------------+-------------------+----------+---------+-------------------+

