In [16]:
from datetime import datetime
from pyspark.sql.types import MapType,StringType,IntegerType,TimestampType,DoubleType,FloatType,DecimalType,StructType,StructField
import notebookutils
import traceback

spark.conf.set('spark.sql.caseSensitive', True)
logger = sc._jvm.org.apache.log4j.LogManager.getLogger("com.omnisync.Logger")

def decodeEntity(operation,entity,dict):
    if entity == 'Currency':
        if operation == 'insert': 
            df = spark.sql("SELECT NVL(MAX(CurrencyKey),0)+1 AS NextCurrencyKey FROM " + "OmniSync_DE_LH_300_Gold_Contoso.dbo." + entity )
            nextValue = df.first()['NextCurrencyKey']
            logger.info('Next value is '+ str(nextValue))
            staged_rows = [(nextValue,dict['CurrencyName'],dict['CurrencyDescription'], dict['ConversionRate'],
                           datetime.now(), None)]
        elif operation == 'update':
            df = spark.sql("SELECT CurrencyKey,CreatedDate FROM OmniSync_DE_LH_300_Gold_Contoso.dbo." + entity + \
                           " WHERE CurrencyName='" + dict['CurrencyName'] + "'")

            df.show()
            if df.first() == None:
                raise Exception("Currency " + dict['CurrencyName'] + " has not been found.")

            currencyKeyValue = df.first()['CurrencyKey']
            createdDateValue = df.first()['CreatedDate']

            staged_rows = {"CurrencyKey": lit(currencyKeyValue), "CurrencyName": lit(dict['CurrencyName']),
                           "CurrencyDescription": lit(dict['CurrencyDescription']), "ConversionRate": lit(dict['ConversionRate']),
                           "CreatedDate": lit(createdDateValue), "UpdatedDate":  lit(datetime.now())}
    elif entity == 'Customer':
        geometryKey = notebookutils.notebook.run("OmniSync_DE_NB_GeographyCDC", 90, {"latitude": dict['Latitude'], "longitude": dict['Longitude'] })
        #geometryKey = notebookutils.notebook.run("OmniSync_DE_NB_GeographyCDC", 10, {"latitude":23, "longitude": 44 })
        
        if str(dict['CreatedDate']).isdigit():
            createdDateTimeStamp = datetime.datetime.fromtimestamp(dict['CreatedDate'])
            print(createdDateTimeStamp.strftime('%Y-%m-%d %H:%M:%S'))

        if str(dict['UpdatedDate']).isdigit():
            updatedDateTimeStamp = datetime.datetime.fromtimestamp(dict['UpdatedDate'])
            print(updatedDateTimeStamp.strftime('%Y-%m-%d %H:%M:%S'))

        if operation == 'insert':
            df = spark.sql("SELECT NVL(MAX(CustomerKey),0)+1 AS NextKey FROM OmniSync_DE_LH_300_Gold_Contoso.dbo.Customer")
            nextValue = df.first()['NextKey']
            logger.info('Next value is '+ str(nextValue))

            staged_rows = [(nextValue,geometryKey, None,None,None,None,None,
                            dict['EmailAddress'],None,None,None,None,None,None,None,
                            dict['AddressLine1'], dict['Phone'],None,'Company', dict['CompanyName'],
                            createdDateTimeStamp,updatedDateTimeStamp)]
        elif operation == 'update':
            df = spark.sql("SELECT CustomerKey,CreatedDate FROM OmniSync_DE_LH_300_Gold_Contoso.dbo.Customer" + \
                           " WHERE CustomerKey=" + dict['CustomerKey'])
            df.show()
            if df.first() == None:
                raise Exception(entity + " with CustomerKey " + dict['CustomerKey'] + \
                                " has not been found.")

            keyValue = df.first()['CustomerKey']
            createdDateValue = df.first()['CreatedDate']

            if str(createdDateValue).isdigit():
                createdDateTimeStamp = datetime.datetime.fromtimestamp(createdDateValue)
                print(createdDateTimeStamp.strftime('%Y-%m-%d %H:%M:%S'))

            staged_rows = {"CustomerKey": lit(keyValue), "AddressLine1": lit(dict['AddressLine1']),
                           "EmailAddress": lit(dict['EmailAddress']),"Phone": lit(dict['Phone']),
                           "CustomerType": "Company",
                           "CompanyName": lit(dict['CompanyName']), "CreatedDate": lit(createdDateTimeStamp), 
                           "UpdatedDate":  lit(updatedDateTimeStamp)}
        else:
            staged_rows = None
    else:
       staged_rows = None
    return staged_rows

def getSchema(entity):
    if entity == 'Currency':
        return StructType([ \
                StructField("CurrencyKey",IntegerType(),False), \
                StructField("CurrencyName",StringType(),False), \
                StructField("CurrencyDescription",StringType(),True), \
                StructField("ConversionRate",DoubleType(),True), \
                StructField("CreatedDate", TimestampType(), True), \
                StructField("UpdatedDate", TimestampType(), True)
            ])
    elif entity == 'Customer':
        return StructType([ \
                StructField("CustomerKey",IntegerType(),False), \
                StructField("GeographyKey",IntegerType(),False), \
                StructField("FirstName",StringType(),True), \
                StructField("LastName",StringType(),True), \
                StructField("BirthDate",TimestampType(),True), \
                StructField("MaritalStatus",StringType(),True), \
                StructField("Gender",StringType(),True), \
                StructField("EmailAddress",StringType(),True), \
                StructField("YearlyIncome",IntegerType(),True), \
                StructField("TotalChildren",IntegerType(),True), \
                StructField("NumberChildrenAtHome",IntegerType(),True), \
                StructField("Education",StringType(),True), \
                StructField("Occupation",StringType(),True), \
                StructField("HouseOwnerFlag",StringType(),True), \
                StructField("NumberCarsOwned",IntegerType(),True), \
                StructField("AddressLine1",StringType(),True), \
                StructField("Phone",StringType(),True), \
                StructField("DateFirstPurchase",TimestampType(),True), \
                StructField("CustomerType",StringType(),False), \
                StructField("CompanyName",StringType(),True), \
                StructField("CreatedDate", TimestampType(), False), \
                StructField("UpdatedDate", TimestampType(), True)
            ])
    elif entity == 'Product':
        return StructType([ \
                StructField("ProductKey",IntegerType(),False), \
                StructField("ProductName",StringType(),False), \
                StructField("ProductDescription",StringType(),True), \
                StructField("ProductSubcategoryKey",IntegerType(),False), \
                StructField("Manufacturer",StringType(),True), \
                StructField("BrandID",IntegerType(),True), \
                StructField("BrandName",StringType(),True), \
                StructField("ClassID",IntegerType(),True), \
                StructField("ClassName",StringType(),True), \
                StructField("ColorID",IntegerType(),True), \
                StructField("ColorName",StringType(),True), \
                StructField("Size",StringType(),True), \
                StructField("SizeUnitMeasureID",IntegerType(),True), \
                StructField("SizeMeasureName",StringType(),True), \
                StructField("Weight",FloatType(),True), \
                StructField("WeightUnitMeasureID",IntegerType(),True), \
                StructField("WeightUnitMeasureName",StringType(),True), \
                StructField("UnitCost",DecimalType(),False), \
                StructField("UnitPrice",DecimalType(),False), \
                StructField("CreatedDate", TimestampType(), False), \
                StructField("UpdatedDate", TimestampType(), True)
            ])
    elif entity == 'SalesOrders':
        return StructType([ \
                StructField("SalesOrdersKey",IntegerType(),False), \
                StructField("DateKey",TimestampType(),False), \
                StructField("StoreKey",IntegerType(),False), \
                StructField("ProductKey",IntegerType(),False), \
                StructField("CurrencyKey",IntegerType(),False), \
                StructField("CustomerKey",IntegerType(),False), \
                StructField("SalesOrderNumber",StringType(),False), \
                StructField("SalesOrderLineNumber",StringType(),False), \
                StructField("SalesQuantity",IntegerType(),False), \
                StructField("SalesAmount",DecimalType(),True), \
                StructField("TotalCost",DecimalType(),False), \
                StructField("UnitCost",DecimalType(),False), \
                StructField("UnitPrice",DecimalType(),False), \
                StructField("CreatedDate", TimestampType(), False), \
                StructField("UpdatedDate", TimestampType(), True)
            ])
    elif entity == 'Sales':
        return StructType([ \
                StructField("SalesKey",IntegerType(),False), \
                StructField("DateKey",TimestampType(),False), \
                StructField("StoreKey",IntegerType(),False), \
                StructField("ProductKey",IntegerType(),False), \
                StructField("CurrencyKey",IntegerType(),False), \
                StructField("UnitCost",DecimalType(),False), \
                StructField("UnitPrice",DecimalType(),False), \
                StructField("SalesQuantity",IntegerType(),False), \
                StructField("TotalCost",DecimalType(),False), \
                StructField("SalesAmount",DecimalType(),True), \
                StructField("CreatedDate", TimestampType(), False), \
                StructField("UpdatedDate", TimestampType(), True)
            ])
    elif entity == 'Store':
        return StructType([ \
                StructField("StoreKey",IntegerType(),False), \
                StructField("GeographyKey",IntegerType(),False), \
                StructField("StoreTypeID",IntegerType(),False), \
                StructField("StoreType",StringType(),False), \
                StructField("StoreName",StringType(),True), \
                StructField("StoreDescription",StringType(),False), \
                StructField("StorePhone",StringType(),False), \
                StructField("StoreFax",StringType(),False), \
                StructField("AddressLine1",StringType(),True), \
                StructField("AddressLine2",StringType(),False), \
                StructField("EmployeeCount",IntegerType(),False), \
                StructField("Longitude",DoubleType(),False), \
                StructField("Latitude",DoubleType(),True), \
                StructField("CreatedDate", TimestampType(), True), \
                StructField("UpdatedDate", TimestampType(), True)
            ])
    elif entity == 'MasterDataMapping':
        return StructType([ \
                StructField("MasterDataMappingKey",IntegerType(),False), \
                StructField("FabricId",StringType(),False), \
                StructField("SalesForceId",StringType(),False), \
                StructField("SAPId",StringType(),False), \
                StructField("StoreName",StringType(),True), \
                StructField("Entity",StringType(),False), \
                StructField("Name",StringType(),False), \
                StructField("CreatedDate", TimestampType(), True), \
                StructField("UpdatedDate", TimestampType(), True)
            ])
        
    else:
       return None 

def checkIfInsertNeeded(table, key, value):
    df = spark.sql("SELECT * FROM OmniSync_DE_LH_300_Gold_Contoso.dbo." + table + " WHERE " + key + "='" + value + "'")
    return False if df.count() > 0 else True

def insertEntity(row, entity):
    staged_df = spark.createDataFrame(row,getSchema(entity)) \
                        .write.mode("append").format("delta").saveAsTable(entity)
        
def getNaturalKey(entity):
    if entity == 'Currency':
        return 'CurrencyName'
    else: 
        return entity + 'Key'

def getPrimaryKey(entity):
    if entity == 'Currency':
        return 'CurrencyName'
    else: 
        return entity + 'Key'

def fixJson(jsonString):
    jsonString = jsonString.replace("\t","")
    jsonString = jsonString.replace("\r\n","")

    print(jsonString)
    
    return jsonString

def mergeMasterDataMapping(dict, entity, key):
    salesForceId = dict['SalesForceId'] 
    SAPId = dict['SAPId']

    if salesForceId != None: 
        df_mapping = spark.sql("SELECT * FROM OmniSync_DE_LH_300_Gold_Contoso.dbo.MasterDataMapping "\
                    "WHERE Entity='"+ entity +"' AND SalesForceId='" + salesForceId +"'" )
    elif SAPId != None: 
        df_mapping = spark.sql("SELECT * FROM OmniSync_DE_LH_300_Gold_Contoso.dbo.MasterDataMapping "\
                    "WHERE Entity='"+ entity +"' AND SAPId='" + SAPId + "'" )
    else:
        raise Exception("No SAPId or SalesForceId on CDC row")

    if df_mapping.count() == 0 :

        df_mapping_next = spark.sql("SELECT NVL(MAX(MasterDataMappingKey),0)+1 AS NextKey FROM \
                                        OmniSync_DE_LH_300_Gold_Contoso.dbo.MasterDataMapping")
        nextValueDataMapping = df_mapping_next.first()['NextKey']
        logger.info('Next value for MasterDataMapping is '+ str(nextValueDataMapping))

        if salesForceId != None: 
            mapping_rows = [(nextValueDataMapping, nextValue, salesForceId , None, \
                            '"+ entity +"', key , createdDateTimeStamp,updatedDateTimeStamp)]
        elif SAPId != None: 
            mapping_rows = [(nextValueDataMapping, nextValue, None, SAPId , \
                            '"+ entity +"', key , createdDateTimeStamp,updatedDateTimeStamp)]

        spark.createDataFrame(mapping_rows, getSchema('MasterDataMapping')) \
             .write.mode("append").format("delta").saveAsTable('MasterDataMapping')
        logger.info('Created MasterDataMapping: ' + mapping_rows)

    else:

        logger.info('Mapping with SalesForceId:' + dict['SalesForceId'] + ' already in the system. Updating...')

        id = df_mapping['MasterDataMappingKey']

        if (salesForceId == None ):
            salesForceId =  df_mapping['SalesForceId'] 

        if (SAPId == None ):
            SAPId =  df_mapping['SAPId'] 

        updatedDateValue = dict['UpdatedDate']

        if str(updatedDateValue).isdigit():
            updatedDateTimeStamp = datetime.datetime.fromtimestamp(updatedDateValue)
            print(updatedDateTimeStamp.strftime('%Y-%m-%d %H:%M:%S'))
    
        mapping_rows = [(id, df_mapping['FabricId'], salesForceId , SAPId, \
                            'MasterDataMapping', df_mapping['Name'] , \
                            df_mapping['CreatedDate'],updatedDateTimeStamp)]

        logger.info('Update key to check: ' + dict[pk])
        deltaTable = DeltaTable.forPath(spark, 'Tables/dbo/MasterDataMapping')

        deltaTable.update(
            condition = col('MasterDataMappingKey') == id,
            set = mapping_rows
        )

def getFabricIdFromSalesForceOrSAPId(dict):

    salesForceId = dict['SalesForceId']
    df = spark.sql("SELECT * FROM OmniSync_DE_LH_300_Gold_Contoso.dbo.MasterDataMapping LIMIT 1000")
    display(df)


def handleException(logger,  ex: Exception):
    # By this way we can know about the type of error occurring
    ex_t = type(ex).__name__
    err = str(ex)
    err_msg = f'[{ex_t}] - {err}'
    print(err_msg)
    logger.error(err_msg)
    # go through the trackback lines and individually add those to the log as an error
    for l in traceback.format_exc().splitlines():
        logger.error(l)
        print(l)



StatementMeta(, 0db808bc-4ea8-4d3c-9810-7a6f9f572168, 18, Finished, Available, Finished)

In [18]:
from pyspark.sql.types import * 
from delta.tables import *
from pyspark.sql.functions import *
import pandas as pd
from pyspark.sql.functions import from_json
import json
import traceback

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
logger = sc._jvm.org.apache.log4j.LogManager.getLogger("com.omnisync.Logger")

externalCDCSchema = spark.read.parquet("Tables/dbo/ExternalCDC").schema

df = spark.readStream.schema(externalCDCSchema).format("parquet").option("path", "Tables/dbo/ExternalCDC").load()

def sendToSinkTable(df, epoch_id):
    
    print('------------------------------Stream received---------------------------------------')
    
    try:
        dataCollect = df.collect()
        for row in dataCollect:
            logger.info(row)
            try:
                operation=row.Operation.lower()
                entity=row.Entity
                values = row.Values
                print("-----------------------Step 0-------------------------")
                dict = json.loads(fixJson(values))
                print("-----------------------Step 1-------------------------")
                naturalKey = getNaturalKey(entity)
                pk = getPrimaryKey(entity)  

                if operation == 'create':         
                    logger.info('Create key to check: ' + dict[pk])
                    entityRow = decodeEntity('Insert', entity, dict)
                    if (entityRow == None):
                        logger.info("Could not decode entity: " + entity )
                        continue

                    if checkIfInsertNeeded(entity, pk, dict[pk]):
                        insertEntity(entityRow, entity)
                        logger.info('Created ' + entity + ": " + entityRow)
                    else:
                        logger.info(entity + ' with id:' + dict[pk] + \
                                    ' already in the system. Skipping insert...')
                        
                    mergeMasterDataMapping(dict, entity, naturalKey)
                elif operation == 'update':
                    logger.info('Update key to check: ' + dict[pk])
                    deltaTable = DeltaTable.forPath(spark, 'Tables/dbo/'+ entity)
                    entityRow = decodeEntity('Update', entity, dict)
                    if (entityRow == None):
                        logger.info("Could not decode entity: " + entity )
                        continue

                    deltaTable.update(
                        condition = col(pk) == dict[pk],
                        set = entityRow
                    )
                    mergeMasterDataMapping(dict, entity, naturalKey)
                elif operation == 'delete':
                    logger.info('Delete key to check: ' + dict[pk])
                    deltaTable = DeltaTable.forPath(spark, 'Tables/dbo/'+ entity)
                    deltaTable.delete(col(pk) == dict[pk])
                    mergeMasterDataMapping(dict, entity, naturalKey)
                else:
                    logger.info('Error. Opration not recognized: ' + operation)
            except Exception as ex:
                print("-----------------------Error-------------------------")
                handleException(logger, ex)
    except Exception as e:
        # By this way we can know about the type of error occurring
        print("-----------------------Outside Error-------------------------")
        logger.error("The error is: outer-------")

logger.info('------------------------------Starting---------------------------------------')

df.writeStream \
    .outputMode("append") \
    .trigger(processingTime='10 seconds') \
    .option("checkpointLocation","Files/__checkpoint__") \
    .format("delta") \
    .foreachBatch(sendToSinkTable) \
    .start() \
    .awaitTermination()


StatementMeta(, 0db808bc-4ea8-4d3c-9810-7a6f9f572168, 20, Submitted, Running, Running)

------------------------------Stream received---------------------------------------
-----------------------Step 0-------------------------
{            "SalesForceId": "001d1000009ocrPAAQ","CompanyName": "kk","EmailAddress": "","Phone": "","CustomerType": "Company","AddressLine1": "    ", "Latitude": "","Longitude": ""  }
-----------------------Step 1-------------------------
-----------------------Error-------------------------
[KeyError] - 'CustomerKey'
Traceback (most recent call last):
  File "/tmp/ipykernel_9586/3228181397.py", line 35, in sendToSinkTable
    logger.info('Create key to check: ' + dict[pk])
                                          ~~~~^^^^
KeyError: 'CustomerKey'
