##Template for performing event based ingestion and merging from Attunity change files

Scalable CDC from DBMS to Azure Databricks

1. Summary of algorithm

  - CDC program send change data from various tables into ADLS Gen 2 folder, each table has its own folder.
  - Each change data file will come with a schema file (dfm) that describe the schema of the data file
  - Azure Eventgrid listen to new files landed in the subscribed folder and create messages detailing locations and type of operations for each file
  - Our main program will read messages from message queue, sort them by table then process messages in batch with a predefined size. By sorting we will have least number of table possible in each batch
  - Within each batch, the process_files program will group by table and retrieve a unique schema file and data files for each table in the group by. From schema file, it will form the schema and use it to retrieve data
  - For insert data, use regular insert. For update and delete, user MERGE to merge data to target table
2. Libraries: Please install azure-storage-queue and azure to Databricks cluster

3. Data: We use a sample CDC files generated by Attunity. Please upd]ate the sample data to folder where each folder represents a landing location for a table.

4. Configuring Azure Eventgrid and Storage Queue

   https://docs.microsoft.com/en-us/azure/event-grid/custom-event-to-queue-storage

5. Setup Event subscription in your storage account

   Route the event to a Storage Queue
6. #####TODOs: 
    - Error handling of failures & bad data
    - Data type change of schema
    - Deduplication at target table
    - Optimization of target delta tables

In [3]:
# Load data from Azure 
# Reset the widgets


dbutils.widgets.removeAll()

dbutils.widgets.text("STORAGE_ACCOUNT", "")
dbutils.widgets.text("SAS_KEY", "")
dbutils.widgets.text("ACCOUNT_KEY", "")
dbutils.widgets.text("QUEUE_NAME", "")
dbutils.widgets.text("ARCHIVE_QUEUE_NAME", "")
dbutils.widgets.text("INSTRUMENT_KEY", "")

dbutils.widgets.text("ROOT_PATH", "")

account_name = dbutils.widgets.get("STORAGE_ACCOUNT").strip()
sas=dbutils.widgets.get("SAS_KEY").strip()
account_key = dbutils.widgets.get("ACCOUNT_KEY").strip()
queue_name=  dbutils.widgets.get("QUEUE_NAME").strip()
archive_queue_name=  dbutils.widgets.get("ARCHIVE_QUEUE_NAME").strip()
instrument_key=dbutils.widgets.get("INSTRUMENT_KEY").strip()
conf_key = "fs.azure.account.key.{storage_acct}.dfs.core.windows.net".format(storage_acct=account_name)
spark.conf.set(conf_key, account_key)


root_path =dbutils.widgets.get("ROOT_PATH").strip()

#Dictionary of table name and target table path
 

spark.conf.set("spark.databricks.delta.schema.autoMerge", "true")

## Utility functions to parse schema and load data

In [5]:

from delta.tables import *
from pyspark.sql.types import StructField, StructType , LongType, StringType, DoubleType ,DecimalType, DateType,FloatType 

def get_file_info(metadata_file_paths, datafile_basepath="", datafile_extension="json"):
  
  """
  Function to parse dfm schema file and return schema for loading data file
  assumption is each table has its own directory.Inside each directory there're data files and each data file has a metadata file
  describing schema and other information.
  Normally in each load, each table will have only one schema but should they have more than 1 schemas due to changes at source, 
  the function has logic to deal with it.

  Parameters
  --------
  metadata_file_paths: a list, required
    a list of full paths to meta data files that contain schema and other information
  datafile_basepath: a string
    show the base path down to the last folder containing datafiles

  datafile_extension: a string
    extension showing type of data file e.g. json or csv..
  --------
  """
  schemas =spark.read.option("multiLine", True).option("mode", "PERMISSIVE").json(metadata_file_paths)
  #This map show mapping between oracle data type and Spark SQL. Needs updates to reflect motype
  ora_pyspark_map = {'STRING':StringType(), 'DATETIME':DateType(),'NUMERIC':DecimalType(), 'REAL8':DecimalType()}
#   ora_pyspark_map = {'STRING':StringType(), 'DATETIME':DateType(),'NUMERIC':FloatType()}

  schemas = schemas.select(["dataInfo", "fileInfo"]).collect()
  #this is a list to contain list of primary keys
  primary_keys=[]
  file_schema_mapping={}
  for num, schema in enumerate(schemas):
    targetSchema = StructType()
    for item in schema['dataInfo']['columns']:
      #default to String type if no mapping is found
      target_type = ora_pyspark_map.get(item['type'], StringType())
      if target_type ==DecimalType():
        scale =int(item['scale'])
        precision=int(item['precision'])
        if scale!=0 or precision!=0:
          target_type =DecimalType(precision,scale)
        else:
          target_type =DecimalType(38,10)

      targetSchema.add(item['name'],target_type)
       #only need to check first schema assuming primary keys won't change 
      if(num==0):
        if (item['primaryKeyPos']>0):
          primary_keys.append(item['name'])
        
    #build a dict of data file full path to target schema. This can be done because the assumption that data file is
    #in the same folder as metadata file
    file_schema_mapping[datafile_basepath+schema['fileInfo']['name']+"."+datafile_extension] = targetSchema
    
#This algorithm is to build dict of schema:data files (one to many relationship)  
  schema_datafiles_map= {}
  for key, value in sorted(file_schema_mapping.items()):
      schema_datafiles_map.setdefault(value, []).append(key)
  return schema_datafiles_map,primary_keys


def merge(updatesDF, target_tbl_path,schema,primary_keys, field_names):
  """
  Function to insert, delete or update data into target tables

  Parameters
  --------
  updatesDF: spark dataframe that contains change data to merge
  target_tbl_path: a string
   the base path to the target table to merge the updates to
  schema: a StrucType, optional
   the schema of target table, may not be needed
  primary_keys: a list
   contains a list of primary key(s) needed to delete or update table
  field_names: a list
   contains a list of fields of the target table 
  --------
  """
  #processing the insert

#   updatesDF.cache()
  insert_df = updatesDF.filter("header__change_oper='I'")
  #automatic add new columns
  insert_df.write.format("delta").option("mergeSchema", "true").mode("append").saveAsTable(target_tbl_path)
#   insert_df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").save(target_tbl_path)
  #processing the update/delete
  upsert_df = updatesDF.filter("header__change_oper='U' or header__change_oper='D'")
  #build dynamic match condition query
  update_alias ="updates"
  target_tbl_alias ='tgt_tbl'
  match_condition=""
  for num, key in enumerate(primary_keys):
    if num==0:
      match_condition= "{0}.{1} ={2}.{1}".format(target_tbl_alias, key,update_alias)
    else:
      match_condition = match_condition+  " and {0}.{1} ={2}.{1}".format(target_tbl_alias, key,update_alias)
    
    
  update_dict ={fieldname: update_alias+"."+fieldname for fieldname in field_names}


  targetTable = DeltaTable.forName(spark, target_tbl_path)
  targetTable.alias(target_tbl_alias).merge(
      upsert_df.alias(update_alias),match_condition
       ) \
    .whenMatchedUpdate(update_alias+".header__change_oper='U'",set = update_dict ) \
    .whenMatchedDelete(update_alias+".header__change_oper='D'") \
    .execute()
  updatesDF.unpersist()
def process_files(metadata_files,datafile_basepath,target_tbl_path,tc):

  """
  Function to process multiple data/metadata files and deliver updates/inserts to target tables

  Parameters
  --------
  metadata_file_paths: a list, required
    a list of full paths to meta data files that contain schema and other information
  datafile_basepath: a string
    show the base path down to the last folder containing datafiles

  target_tbl_path: a string
    path to target delta table.
  --------
  """
  datafile_extension='csv'
  zip_extension="gz"
  chg_tbl_name ="chg_tbl"
  metadata_file_paths=[datafile_basepath+metafile for metafile in metadata_files]
  schema_datafiles_map,primary_keys = get_file_info(metadata_file_paths,datafile_basepath,zip_extension)
  for item in schema_datafiles_map.items():
    datafile_paths = item[1]
    schema=item[0]
    print("number of files for the table: ", len(datafile_paths))
    tc.track_metric("Number of files", len(datafile_paths), properties = {"table":target_tbl_path})
     
#     print("schema before read is: ",schema)
    field_names =[fieldname for fieldname in schema.fieldNames()]
    data = spark.read.format(datafile_extension).schema(schema).load(datafile_paths)
#     print("schema after read is : ",data.schema)
    data.createOrReplaceTempView(chg_tbl_name)
    #generate below query dynamically based on target schema
    sql_query = "select {0} from (select {0}, RANK() OVER (PARTITION BY {1} ORDER BY header__change_seq DESC) AS RNK  from {2} ) A where RNK=1".format(",".join(field_names),",".join(primary_keys),chg_tbl_name )
    updatesDF =sql(sql_query)
    updatesDF.cache()
    print("Files in the update: ", datafile_paths)
    tc.track_event("Files in the update", {'files': datafile_paths, "table":target_tbl_path })
    updatesize=updatesDF.count()
    print("Size of the updates: ",updatesize )
    tc.track_metric("Size of update", updatesize, properties = {"table":target_tbl_path})
    merge(updatesDF,target_tbl_path, schema,primary_keys, field_names)
    tc.track_event('Table processing', { 'table': target_tbl_path })
    tc.flush()
  

### Main procedure to process incoming data from Eventgrid

In [7]:
from azure.storage.queue import QueueService,QueueMessageFormat
import ast
import time
from applicationinsights import TelemetryClient
tc = TelemetryClient(instrument_key)
#Authenticate to Datalake where the files are landed. You can use ABFS or WASB depending on authentication method


#Dictionary contain table name and table path maps
def main_proc(max_num_tbl=1):
  #authenticate to the Storeage Queue Service using a shared SAS key

  queue_service = QueueService(account_name=account_name,sas_token=sas)

  #Set visibility_timeout which is the estimated time that your processing will last so that other parallel clusters may not see it. The messages will be back to queue 
  #unless you explicitly delete them which should be done after successful operation. 32 is the max number of messages in one read. If you need more than that, call get_messages 
  #multiple times.
  #Do this while a while loop so that it keep processing new files
  batch_size=32
  batch=0
  max_messages_explored = 800
  #initial visibility timeout to filter out messages
  init_visibility_timeout =60*30
  #timeout for processing 
  visibility_timeout =2
  #wait time if current queue is empty before retry
  wait_time =10
  while True:
    dfm_filelist=[]
    table_list=[]
    #Get estimate of queue length
    metadata = queue_service.get_queue_metadata(queue_name)
    count = metadata.approximate_message_count
    print("Begin processing, entire queue length is ", count)
    start_queue = time.time()
    tc.track_event('Queue ingesting', { 'queue_length': count })
    tc.flush()
    messages=None
    messages_ids=[]
    #This is to get more messages than the default limit of 32
    while True:
      batch_messages = queue_service.get_messages(
            queue_name, num_messages=batch_size, visibility_timeout=init_visibility_timeout)

      if messages is None:
        messages =  batch_messages
      else:
        messages = messages+batch_messages
        if len(batch_messages)==0 or len(messages)>=max_messages_explored:
          break
    #This is the path to append with new files extracted from the queue 
    for message in messages:
      content =QueueMessageFormat.binary_base64decode(message.content).decode('utf8')
      json_content = ast.literal_eval(content)  
      data_content = json_content['data']
      file_name =data_content['url'].split("/")[-1]
      tbl_name =data_content['url'].split("/")[-2]

      if ((data_content['api']=="CopyBlob" or data_content['api']=="CreateFile") and ".dfm" in file_name and "__ct" in tbl_name):

        dfm_filelist.append(file_name)
        table_list.append(tbl_name)
        messages_ids.append({"file_name":file_name,"tbl":tbl_name, "id":message.id,"pop_receipt":message.pop_receipt,"content":message.content})
    #here is the main processing logic (Data transformation)
    #1. Reading files, the reader can read multiple files
    table_dfm_dict = {file:tbl for tbl,file in zip(table_list,dfm_filelist)}
    dfm_filelist,table_list=[],[]
    reduced_tbl_file = {}
    #create a grouping of table:file_list
    for key, value in sorted(table_dfm_dict.items()):
        reduced_tbl_file.setdefault(value, []).append(key)
    
    #only get certain number of tables to process
    reduced_tbl_file = dict(list(reduced_tbl_file.items())[0:max_num_tbl])
#     print("reduced table file list is ", reduced_tbl_file)
    #reset the dfm file list to the files associate with the selected table(s)
    dfm_filelist = [file for key in reduced_tbl_file.keys() for file in reduced_tbl_file[key]]
    #now release unused files from list by setting new timeout for files in scope
    tmp_messages_ids=[]
    for message in messages_ids:
      if message["file_name"] in dfm_filelist:
        tmp_messages_ids.append({"id":message['id'],"pop_receipt":message['pop_receipt'],"content":message['content'],"tbl":message['tbl']})
      else:
        queue_service.update_message(queue_name, message['id'], message['pop_receipt'], visibility_timeout) 
    messages_ids =tmp_messages_ids
#     print("done updating message timeout, new file list is ",dfm_filelist)
    filelistlen =len(dfm_filelist)
    if filelistlen>0:
      print("Start processing ", filelistlen, " files in this batch")
      start_batch = time.time()
      tc.track_event('start batch processing', { 'number of files': filelistlen })
      tc.flush()
      for tbl in reduced_tbl_file.keys():
#         tbl_path=table_path.get(tbl,root_path+"/"+tbl[:-4])
        if ("__ct" in file_name):
          tbl_path=tbl[:-4]
        else:
          tbl_path=tbl
          
        tbl_messages_ids=[{"id":message['id'],"pop_receipt":message['pop_receipt'],"content":message['content']} for message in messages_ids if tbl in message['tbl']]

        print("start processing: "+tbl+" for saving at path: "+tbl_path)
        start_tbl = time.time()
        tc.track_event('Table processing', { 'table': tbl_path })
        tc.flush()
        try:
          process_files(reduced_tbl_file[tbl],root_path+tbl+"/", tbl_path,tc)
          elapsed_tbl_time = time.time() - start_tbl
          elapsed_queue_time=time.strftime("%H:%M:%S", time.gmtime(elapsed_tbl_time))
          print("Finished processing table {0} in {1}".format(tbl,elapsed_tbl_time)) 
          for message in tbl_messages_ids:
            queue_service.delete_message(queue_name, message['id'], message['pop_receipt'])
            queue_service.put_message(archive_queue_name, message['content'])
          
        except Exception as e:
          print(e)
          failed_messages= [message['id'] for message in tbl_messages_ids]
          tc.track_event('exception', { 'error message': str(e) },{ 'failed messageid': str(failed_messages) })
          tc.flush()
                                   

      elapsed_batch_time = time.time() - start_batch
      elapsed_batch_time=time.strftime("%H:%M:%S", time.gmtime(elapsed_batch_time))
      print("finish batch {0}, processed {1} files in {2}".format(batch, filelistlen,elapsed_batch_time))
      tc.track_event('finish batch processing', { 'number of files': filelistlen,'duration': elapsed_batch_time })
      tc.flush()
      batch=batch+1
    else:
      #Wait for messages to arrive
      elapsed_queue_time = time.time() - start_queue
      elapsed_queue_time=time.strftime("%H:%M:%S", time.gmtime(elapsed_queue_time))
      print("Finished queue in ",elapsed_queue_time) 
      print("Nothing in queue, wait {} seconds for next batch".format(wait_time))
      
      tc.track_event('waiting for new messages', { 'wait_time': wait_time })
      tc.flush()
      time.sleep(wait_time)
      continue 




In [8]:
#calling function a single time
main_proc(2)

In [9]:
#take 1 table CLARITY.AVAILABILITY, do the merge, table version, origin  change then destination -> see how the original 

In [10]:

import threading
#start 5 jobs simultaneously
for i in range(5):
    t = threading.Thread(target=main_proc)
    t.start()