In [None]:
%pip install azure-eventhub azure-storage-blob pyodbc

In [None]:
%sh
curl https://packages.microsoft.com/keys/microsoft.asc | sudo tee /etc/apt/trusted.gpg.d/microsoft.asc
curl https://packages.microsoft.com/config/ubuntu/22.04/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list
sudo apt-get update
sudo ACCEPT_EULA=Y apt-get install -y msodbcsql18

In [None]:
# Mounting blob container to cluster

account_name = '<your account name>'
account_key = '<your account key>'
connection_string = '<your connection string>'

container_name = '<your blob container name>'
mountPoint = '/mnt/blobstorage' # here you can give any mounting pointing 

mount_point_list =[]
for mount in dbutils.fs.mounts():
    mount_point_list.append(mount.mountPoint)

if mountPoint not in mount_point_list:
    try:
        dbutils.fs.mount(
            source = f'wasbs://{container_name}@{account_name}.blob.core.windows.net',
            mount_point = mountPoint,
            extra_configs = {f'fs.azure.account.key.{account_name}.blob.core.windows.net' : account_key}
            )
        print('successfuly mounted ',container_name)
    except Exception as e:
        print("mount exception", e)
else:
    print('Already mounted ->', container_name)

#-------------------------------------------------------------------------------------------------

container_name2 = '<your second container name>'
mountPoint2 = '/mnt/checkpt' # here you can give any mounting pointing 

if mountPoint2 not in mount_point_list:
    try:
        dbutils.fs.mount(
            source = f'wasbs://{container_name2}@{account_name}.blob.core.windows.net',
            mount_point = mountPoint2,
            extra_configs = {f'fs.azure.account.key.{account_name}.blob.core.windows.net' : account_key}
            )
        print('successfuly mounted ',container_name2)
    except Exception as e:
        print("mount exception", e)
else:
    print('Already mounted ->',container_name)



In [None]:
import urllib.parse

username = urllib.parse.quote_plus('<mongodb username>')
password = urllib.parse.quote_plus('mongodb password')

# Replace the placeholder with your Atlas connection string
uri = f"mongodb+srv://{username}:{password}@cluster0.tbj0yoy.mongodb.net/?retryWrites=true&w=majority"

In [None]:
# Defining json schema for eventhub output

from pyspark.sql.types import StructField,StructType, TimestampType, StringType, IntegerType, ArrayType, MapType, BooleanType

json_schema = StructType([StructField("_id", StructType([
        StructField("_data", StringType(), nullable=False)
    ]), nullable=False),
    StructField("operationType", StringType(), nullable=False),
    StructField("clusterTime", StringType(), nullable=False),
    StructField("wallTime", TimestampType(), nullable=False),
    StructField("fullDocument", MapType(StringType(), StringType())),
    StructField('updateDescription', StructType([
        StructField('updatedFields', MapType(StringType(), StringType())),
        StructField('removedFields', ArrayType(StringType())),
        StructField('truncatedArrays', ArrayType(BooleanType()))])),
    StructField("ns", StructType([
        StructField("db", StringType(), nullable=False),
        StructField("coll", StringType(), nullable=False)
    ]), nullable=False),
    StructField("documentKey", StructType([
        StructField("_id", StringType(), nullable=False)
    ]), nullable=False)
])

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder. \
        appName("streamingExampleRead").\
        config('spark.mongodb.input.uri',uri).\
        config('spark.jars.packages', 'org.mongodb.mongo-spark-connector_2.12:10.2.0').\
        getOrCreate()

streamingDF1 =spark.readStream.format("mongodb")\
        .option('spark.mongodb.connection.uri', uri) \
    	.option('spark.mongodb.database', 'spark_mongo_data') \
        .option('spark.mongodb.collection', 'spark_data_coll') \
    	.option("forceDeleteTempCheckpointLocation", False) \
        .option('spark.mongodb.change.stream.publish.full.document.only',False)\
        .schema(json_schema)\
    	.load()


     

In [None]:
streamingDF1.printSchema()

In [None]:
streamingDF1.isStreaming

In [None]:
# stoaring raw change stream into azue blob container

#  .option("path", "/mnt/blobstorage/spark_data_coll/data") \ --> path to store data into blob container
#  .option("checkpointLocation", "/mnt/blobstorage/spark_data_coll/checkpoint") \ --> path to store chakepoint in blob container

streamingDF1.writeStream \
  .format("parquet") \
  .option("path", "/mnt/blobstorage/spark_data_coll/data") \
  .option("checkpointLocation", "/mnt/blobstorage/spark_data_coll/checkpoint") \
  .start()

In [None]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import from_json
from pyspark.sql.functions import col,expr
from datetime import datetime

In [None]:
df1 = streamingDF1.select(F.col('_id._data'),
                          F.col('operationType'),
                          F.to_timestamp(F.col('wallTime'), "yyyy-MM-dd HH:mm:ss.SSS").alias('wallTime'),
                          F.col('documentKey._id'),
                          F.col('fullDocument'),
                          F.col('updateDescription.updatedFields').alias('updatedFields'),
                          F.col('updateDescription.removedFields').alias('removedFields'),
                          F.col('updateDescription.truncatedArrays').alias('truncatedArrays'),
                          F.col('ns.db').alias('database'),
                          F.col('ns.coll').alias('collection'))

In [None]:
df1.printSchema()

In [None]:
display(df1)

In [None]:
import pyodbc

# Define the connection string
connection_string = "DRIVER={ODBC Driver 18 for SQL Server};SERVER=tcp:<server_name>.database.windows.net,1433;DATABASE=<database_name>;UID=<username>;PWD=<password>;Encrypt=yes;TrustServerCertificate=yes"

# Define the DNS name and port of the Azure SQL server
server_name = "<your-sql-server>.database.windows.net"
port = 1433

# Define the database name
database_name = "<your sql database>"

# Define the username and password
username = "<sql db username>"
password = "<sql db password>"

# Create a connection object
conn = pyodbc.connect(connection_string) #, server_name, port, database_name, username, password)
conn.autocommit = True
# Print a success message
print("Connected to Azure SQL Database!")

# Execute a query
cursor = conn.cursor()


In [None]:
import json

def insert_into_sql1(json_row,batch_id):
    # print((json_row))
    json_data = json_row.toJSON().collect()
    print(len(json_data))
    # json_data = json.load(json_row)
    if len(json_data) != 0:
        for i in json_data:
            dic = json.loads(i)

            try:

                if dic['operationType'] == "insert":

                    fd = dic['fullDocument']
                    table = dic['collection']
                    column = tuple(fd.keys()) + ('wallTime',)
                    value = tuple(fd.values()) + (dic['wallTime'],)

                    qu = f'INSERT INTO [dbo].{table} {column}' 
                    qu = qu.replace("'","")
                    query = qu + f'VALUES {value};'
                    print(query)
                    cursor.execute(query)
                    print("Inserted successfully")

                if dic['operationType'] == "update":

                    id = dic['_id']
                    table = dic['collection']
                    uf = dic['updatedFields']
                    wall_time = dic['wallTime']

                    if bool(uf):
                        uf.update({'wallTime':wall_time})
                        values = uf.items()

                        update_query = f"UPDATE {table} SET "

                        for column, value in values:
                            update_query += f"{column} = '{value}', "

                        update_query = update_query.rstrip(', ')  # Remove the trailing comma and space

                        update_query += f" WHERE _id = '{id}';"
                        print(update_query)
                        cursor.execute(update_query)
                    
                    rf = dic['removedFields']
                    
                    if bool(rf):
                        update_query2 = f"UPDATE {table} SET "
                        update_query2 += f"wallTime = '{wall_time}',"

                        for column in rf:
                            update_query2 += f"{column} = NULL, "

                        update_query2 = update_query2.rstrip(", ")  # Remove the trailing comma and space

                        update_query2 += f" WHERE _id = '{id}';"
                        print(update_query2)
                        cursor.execute(update_query2)

                    print("Updated successfully")

                if dic['operationType'] == "delete":

                    id = dic['_id']
                    table = dic['collection']

                    query3 = f"DELETE FROM [dbo].{table} WHERE _id = '{id}' ;"
                    cursor.execute(query3)
                    print("Deleted successfully")
            except Exception as e:
                print(e)

  
query1 = df1.writeStream.foreachBatch(insert_into_sql1).option("checkpointLocation","/mnt/checkpt/spark_data_coll/checkpoint/").start()



In [None]:
# query1.awaitTermination()

In [None]:
spark.streams.awaitAnyTermination()