In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from datetime import datetime
import logging
df_target = spark.createDataFrame([(11,'Sam',1000,'ind','IT','2/11/2019'),(22,'Tom',2000,'usa','HR','2/11/2019'),
                                 (33,'Kom',3500,'uk','IT','2/11/2019'),(44,'Nom',4000,'can','HR','2/11/2019'),
                                 (55,'Vom',5000,'mex','IT','2/11/2019'),(66,'XYZ',5000,'mex','IT','2/11/2019')],
                                 ['Id','Name','Sal','Address','Dept','Join_Date']) 
df_source = spark.createDataFrame([(11,'Sam',1000,'ind','IT','2/11/2019'),(22,'Tom',2000,'usa','HR','2/11/2019'),
                                  (33,'Kom',3000,'GB','MKT','12/12/2019'),(44,'Nom',4000,'can','HR','2/11/2019'),
                                  (45,'Xom',5000,'mex','IT','2/11/2019'),(77,'XYZ',5000,'mex','IT','2/11/2019')],
                                  ['Id','Name','Sal','Address','Dept','Join_Date']) 

In [0]:
#target = add_metadata(df_target)
#source = add_metadata(df_source)
target = df_target
source = df_source
cols = target.columns[1:]
print(cols)

['Name', 'Sal', 'Address', 'Dept', 'Join_Date']


In [0]:
def check_changes(col1,col2):
  compare_cols = (col1 == col2)
  return ((col1.isNull()) & (col2.isNull())) | compare_cols

def add_to_string_col(col,fail_string,check_col):
 """
 append a string to error_col
 """
 fail_text = (F.when((check_col == True), "")
              .otherwise(F.when(col == "", fail_string)
                         .otherwise(f",{fail_string}")))
 return F.concat(col, fail_text)


In [0]:
def find_changes(logging,df_source_exists,cols):
  change_col = "data_changes"
  col = df_source_exists.columns[0]
  logging.info('Using ID column: ' + col)
  print(df_source_exists.columns)
  df_changes = df_source_exists.withColumn(change_col,F.lit(""))
 
  logging.info('Tracking changes for:')
  loop_count = 0
  for col in cols:
    logging.info(col)
    print(col)
    loop_count += 1
    df_changes = df_changes.withColumn(change_col, add_to_string_col(
      df_changes[change_col],f"{col}",
      check_changes(df_changes[col],df_changes[col+'_t'])))
    # incase if you want to avoid over writing previous non null values
    # df_changes = df_changes.withColumn(col,F.coalesce(df_changes[col],df_changes[col+'_t']))
    if loop_count %3 == 0:
      df_changes.cache().count()
      
  return df_changes


In [0]:
def process_save_changes(logging,df_source,df_target,change_cols):
  """
  identify changes in two dataframes
  
  parameters:
    df_source - dataframe containing new dataset
    df_target - existing datasets (target dataframe will updated accordingly)
    change_cols - list of varibles to for capturing chnages    
  """
  logging.info('START - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
  print('START - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
  change_col = "data_changes"
  col = df_source.columns[0]
  
  # separate target data by live_flag
  df_active = df_target#.filter(df_target['live_flag'] == True) 
  df_inactive = df_target#.filter(df_target['live_flag'] == False) 
  df_active.cache().count()
  df_inactive.cache().count()
  df_source.cache().count()

  # records already exist
  df_t = df_active.select(*(F.col(x).alias(x + '_t') for x in df_active.columns))
  df_amends = df_source.join(df_t,df_source[col] == df_t[col+'_t'],'inner')\
                           .select('*')\
                           .dropDuplicates([col]) 
  
  # birth
  df_birth = df_source.join(df_target, on=[col], how='leftanti')
  df_birth = df_birth.withColumn(change_col,F.lit(2))
  #df_birth.show(truncate=False)   
  
  df_birth.cache().count()
  
  # death
  df_death = df_target.join(df_source, on=[col], how='leftanti')
  df_death = df_death.withColumn(change_col,F.lit(99))
  #df_death.show(truncate=False)   
  df_death.cache().count()
  
  # identify updated records
  df_changes = find_changes(logging,df_amends,change_cols)
  df_changes = df_changes.drop(*df_t.columns)
  #df_changes.show(truncate=False)   
  
  # join new and updated source records
  df_all_changes = df_birth.union(df_changes)
  df_all_changes = df_all_changes.union(df_death)
  #df_all_changes.show(truncate=False)   
                             
  return  df_all_changes

In [0]:

df = process_save_changes(logging,source,target,cols)


START - identify changes: 202302271311
['Id', 'Name', 'Sal', 'Address', 'Dept', 'Join_Date', 'Id_t', 'Name_t', 'Sal_t', 'Address_t', 'Dept_t', 'Join_Date_t']
Name
Sal
Address
Dept
Join_Date


In [0]:
df.show(truncate=False)

+---+----+----+-------+----+----------+--------------------------+
|Id |Name|Sal |Address|Dept|Join_Date |data_changes              |
+---+----+----+-------+----+----------+--------------------------+
|45 |Xom |5000|mex    |IT  |2/11/2019 |2                         |
|77 |XYZ |5000|mex    |IT  |2/11/2019 |2                         |
|22 |Tom |2000|usa    |HR  |2/11/2019 |                          |
|33 |Kom |3000|GB     |MKT |12/12/2019|Sal,Address,Dept,Join_Date|
|44 |Nom |4000|can    |HR  |2/11/2019 |                          |
|11 |Sam |1000|ind    |IT  |2/11/2019 |                          |
|55 |Vom |5000|mex    |IT  |2/11/2019 |99                        |
|66 |XYZ |5000|mex    |IT  |2/11/2019 |99                        |
+---+----+----+-------+----+----------+--------------------------+

