In [0]:
dbutils.widgets.dropdown('Process','audit',['audit', 'bona_fides_verification', 'capa_action', 'change_action', 'common', 'effectiveness_check', 'extension_request', 'organization', 'pharmaceutical_development_report', 'qualification', 'quality_event', 'schedule', 'scope_of_service'])
dbutils.widgets.text('start_date','1990-09-10T23:58:01.000Z')
dbutils.widgets.text('end_date','2024-15-00T23:58:01.000Z')

In [0]:
process = dbutils.widgets.get('Process')
start_date = dbutils.widgets.get('start_date')
end_date = dbutils.widgets.get('end_date')
print(process, start_date, end_date)

In [0]:
%run ./Parameters

In [0]:
object_list = object_name[process]
print(object_list)

In [0]:
from pyspark.sql.functions import *

data_val = None

for object in object_list:
#Defining Veeva to Read...   
  df1 = spark.read.csv(f'dbfs:/FileStore/Project_name/{process}/{object}.csv',header=True,sep=',',escape='"', multiLine=True)
  df1 = df1.filter(f"modified_date__v between '{start_date}' and '{end_date}'")
  print(object)
  df1.createOrReplaceTempView('Veeva')
  df2 = spark.sql(f"select * from Veeva")
  veeva_object = 'Veeva_'+object
  df2.createOrReplaceTempView(veeva_object)
  veeva_cnt = df2.count()
  print(veeva_cnt)

#Defining S3 to Read...  
  df3 = spark.read.csv(f's3://dev_bckt/inbound/project_name/{process}/{object}/delta/*',header=True,sep='|')
  df3 = df3.filter(f"modified_date__v between '{start_date}' and '{end_date}'").distinct()
  s3_temp_table = 'S3'
  df3.createOrReplaceTempView(s3_temp_table)
  s3_count_query = f"select distinct * from {s3_temp_table} where (id,modified_date__v) in(select id,max(modified_date__v) from {s3_temp_table} group by id)"
  df4 = spark.sql(s3_count_query)
  s3_object = 'S3_'+object
  df4.createOrReplaceTempView(s3_object)
  s3_cnt = df4.distinct().count()
  #print(df4.columns)
  if veeva_cnt == s3_cnt and veeva_cnt != 0:
    count_status = 'Pass'
  elif veeva_cnt == 0:
    count_status = 'No Records'
  else:
    count_status = 'Fail'
  #print(count_status)
  
#Defining Columns for Veeva and S3
  veeva_columns = df2.columns
  #print(veeva_columns)
  veeva_cols = [cols for cols in veeva_columns if cols not in excluded_columns]
  veeva_columns = [f"substr(`{col}`,instr(`{col}`, '_')+1) as `{col}`" if '.' in col and col.split('.')[1] == 'global_id' else f"""replace(replace(replace(replace(`{col}`, "\\\\", "\\\\ "),'"',"''"),'\\n',' '),'\\t',' ') as `{col}`""" for col in veeva_cols]
  
  alias_veeva = []
  for i in veeva_columns:
    #alias_veeva_columns = f"`{col}`" + ' as ' + i
    i = i.replace('site__c.global_id','site__c').replace('proactive_initiative__qdm.global_id','proactive_initiative__qdm') # columns rename
    alias_veeva.append(i)
  veeva_columns = ','.join(sorted(alias_veeva, key=lambda x: ('.' in x.split(' as ')[1], x.split(' as ')[1])))
  #print(veeva_columns)

  s3_columns = df4.columns
  s3_columns = [f"`{col}`" for col in s3_columns if col not in excluded_columns]
  alias_s3_columns = []
  for c in s3_columns:
    cols = c.replace('site__cr.id','site__c').replace('proactive_initiative__qdmr.id','proactive_initiative__qdm') # columns rename
    alias_s3_columns.append(cols + ' as ' + c)
  final_s3_columns = []
  for v in alias_s3_columns:
    x, y = v.split(" as ")
    interchange = f"{y} as {x}"
    final_s3_columns.append(interchange)
  sorted_list = sorted(final_s3_columns, key=lambda x: ('.' in x.split(' as ')[1],x.split(' as ')[1]))
  s3_columns = ','.join(sorted_list)
  #print(s3_columns)
  
#Data validation query and execution
  veeva_m_s3_query = f"select {veeva_columns} from {veeva_object} minus select {s3_columns} from {s3_object} where (id,modified_date__v) in(select id,max(modified_date__v) from {s3_object} group by id)"
  print(veeva_m_s3_query)
  veeva_m_s3 = spark.sql(veeva_m_s3_query).count()
  s3_m_veeva_query = f"select {s3_columns} from {s3_object} where (id,modified_date__v) in(select id,max(modified_date__v) from {s3_object} group by id) minus select {veeva_columns} from {veeva_object}"
  s3_m_veeva = spark.sql(s3_m_veeva_query).count()

#Comparing the data count of records in Veeva and S3
  if veeva_m_s3 == 0 and s3_m_veeva == 0 and s3_cnt != 0:
    data_val_status = 'Pass'
  elif s3_cnt == 0:
    data_val_status = 'No Records'
  else:
    data_val_status = 'Fail'

#Creating a dataframe for displaying all the content  
  if data_val is None:
    data_val = spark.createDataFrame([(object, veeva_cnt, s3_cnt, count_status, veeva_m_s3, s3_m_veeva, data_val_status,veeva_m_s3_query,s3_m_veeva_query)], ['object', 'veeva_cnt', 's3_cnt', 'count_status', 'veeva_m_s3', 's3_m_veeva', 'data_val_status','veeva_m_s3_query','s3_m_veeva_query'])
  else:
    data_val = data_val.union(spark.createDataFrame([(object, veeva_cnt, s3_cnt, count_status, veeva_m_s3, s3_m_veeva, data_val_status,veeva_m_s3_query,s3_m_veeva_query)], ['object', 'veeva_cnt', 's3_cnt', 'count_status', 'veeva_m_s3', 's3_m_veeva', 'data_val_status','veeva_m_s3_query','s3_m_veeva_query']))

data_val.display()
veeva_s3_temp_table = "veeva_s3_temp_table"
data_val.createOrReplaceTempView(veeva_s3_temp_table)


In [0]:
from pyspark.sql.functions import *

s3_src_val = None

for object in object_name[process]:
  print(object)
  df1 = spark.read.csv(f's3://dev_bckt/inbound/project_name/{process}/{object}/delta/*', header=True, inferSchema=False, sep='|')
  df2 = df1.filter(f"modified_date__v between '{start_date}' and '{end_date}'")
  s3_object = object
  df2.createOrReplaceTempView(s3_object)
  s3_cnt_query = f"select distinct(*) from {s3_object}"
  s3_query = spark.sql(s3_cnt_query)
  s3_cnt = s3_query.count()

  s3_cols = df2.columns
  renamed_columns = []
  for col_name in s3_cols:
      renamed_columns.append(f"`{col_name}`") #appending s3 columns which are renamed

  s3_cols = ', '.join(sorted(renamed_columns))

  s3_temptable = "s3_" + object
  #displayHTML(s3_cols) # Displays s3 columns
  s3_query.createOrReplaceTempView(s3_temptable)

  src_count_query = """select * from env_name.src_schema.{} where modified_date__v between '{}' AND '{}'""".format(object, start_date, end_date)
  df3 = spark.sql(src_count_query)
  src_temptable ="src_" + object
  src_count = df3.count()
  src_columns = df3.columns

  src_audit_columns = ['edf_created_ts', 'edf_created_by', 'edf_updated_ts', 'edf_updated_by', 'edf_hash_key', 'edf_pipeline_name', 'edf_delete_flag']
  src_columns = [col for col in src_columns if col not in src_audit_columns ]
  src_columns =', '.join(sorted(src_columns))
  #displayHTML(src_columns) # Displays src columns
  df3.createOrReplaceTempView(src_temptable)

  if s3_cnt !=0 and s3_cnt == src_count:
    status = 'Pass'
  elif s3_cnt == 0:
    status = 'No Records'
  else:
    status = 'Fail'

  s3_minus_src = f"select distinct {s3_cols} from {s3_temptable} minus select {src_columns} from {src_temptable}"
  count_s3_minus_src = spark.sql(s3_minus_src).count()

  src_minus_s3 = f"select {src_columns} from {src_temptable} minus select {s3_cols} from {s3_temptable} where"
  count_src_minus_s3 = spark.sql(src_minus_s3).count()

  if s3_cnt != 0 and count_s3_minus_src == 0 and count_src_minus_s3 ==0:
    data_validation = 'Pass'
  elif s3_cnt == 0:
    data_validation = 'No Records'
  else:
    data_validation = 'Fail'

  if s3_src_val is None:
    s3_src_val = spark.createDataFrame([(object,s3_cnt,src_count, status,count_s3_minus_src,count_src_minus_s3, data_validation,s3_cnt_query,src_count_query,src_count_query,s3_minus_src,src_minus_s3)], ['object','s3_count','src_count', 'cnt_status','s3_minus_src','src_minus_s3', 'data_validation','s3_cnt_query','src_count_query','src_count_query','s3_minus_src_query','src_minus_s3_query'])
  else:
    s3_src_val = s3_src_val.union(spark.createDataFrame([(object,s3_cnt,src_count, status,count_s3_minus_src,count_src_minus_s3, data_validation,s3_cnt_query,src_count_query,src_count_query,s3_minus_src,src_minus_s3)], ['object','s3_count','src_count', 'cnt_status','s3_minus_src','src_minus_s3', 'data_validation','s3_cnt_query','src_count_query','src_count_query','s3_minus_src_query','src_minus_s3_query']))
s3_src_val.display()

In [0]:
from pyspark.sql.functions import *

src_pub_val = None

for object in object_name[process]:
  print(object)
  src_count_query = """select * from env_name.src_schema.{} where modified_date__v between '{}' AND '{}'""".format(object, start_date, end_date)
  df3 = spark.sql(src_count_query)
  src_temptable ="src_" + object
  src_count = df3.count()
  src_columns = df3.columns
  src_columns =', '.join(sorted(src_columns))
  df3.createOrReplaceTempView(src_temptable)

  pub_count_query = """select * from env_name.pub_schema.{} where modified_date__v between '{}' AND '{}'""".format(object, start_date, end_date)
  df4 = spark.sql(pub_count_query)
  pub_temptable ="pub_" + object
  pub_count = df4.count()
  pub_columns = df4.columns
  pub_columns =', '.join(sorted(pub_columns))
  #displayHTML(pub_columns) # Displays pub columns
  df4.createOrReplaceTempView(pub_temptable)

  if s3_cnt !=0 and src_count == pub_count:
    status = 'Pass'
  elif s3_cnt == 0:
    status = 'No Records'
  else:
    status = 'Fail'

  src_minus_pub = f"select {src_columns} from {src_temptable} minus select {pub_columns} from {pub_temptable}"
  count_src_minus_pub = spark.sql(src_minus_pub).count()

  pub_minus_src = f"select {pub_columns} from {pub_temptable} minus select {src_columns} from {src_temptable}"
  count_pub_minus_src = spark.sql(pub_minus_src).count()

  if s3_cnt != 0 and count_src_minus_pub == 0 and count_pub_minus_src ==0:
    data_validation = 'Pass'
  elif s3_cnt == 0:
    data_validation = 'No Records'
  else:
    data_validation = 'Fail'

  if src_pub_val is None:
    src_pub_val = spark.createDataFrame([(object,src_count,pub_count, status,count_src_minus_pub,count_pub_minus_src, data_validation,src_count_query,pub_count_query,src_minus_pub,pub_minus_src)], ['object','src_count','pub_count', 'cnt_status','src_minus_pub','pub_minus_src', 'data_validation','src_count_query','pub_count_query','src_minus_pub_query','pub_minus_src_query'])
  else:
    src_pub_val = src_pub_val.union(spark.createDataFrame([(object,src_count,pub_count, status,count_src_minus_pub,count_pub_minus_src, data_validation,src_count_query,pub_count_query,src_minus_pub,pub_minus_src)], ['object','src_count','pub_count', 'cnt_status','src_minus_pub','pub_minus_src', 'data_validation','src_count_query','pub_count_query','src_minus_pub_query','pub_minus_src_query']))
src_pub_val.display()