In [0]:
# Importing packages and initiating variables

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, lit
import pandas as pd

port = 1433
server = "xlc-azu-eus2-eds-prd-dw-server01.database.windows.net"
database = "dw_xle_sz"
synapse_schema_suffix, adls_schema_suffix = "", "_polybase"
schemas = ["cad"]

In [0]:
# Function to run SELECT query on Synapse
def run_synapse_query(query):
  jdbcUrl = f"jdbc:sqlserver://{server}:{port};database={database};encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30"

  df = spark.read \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("query", query) \
    .option("user", "") \
    .option("password", "") \
    .load()

  return df


In [0]:
# Function to compare DataFrames (Table Structure)

def compare_table_structure(df1, df2):
  compared_df = (df1.withColumnRenamed("data_type", "adls_data_type")
                .join(df2.withColumnRenamed("data_type", "synapse_data_type"),
                      on = ["table_name", "col_name"], how = "inner")
                ).filter(col("synapse_data_type") != col("adls_data_type"))

  differences = {}
  for row in compared_df.collect():
    if row["adls_data_type"] != row["synapse_data_type"]:
      differences[row["col_name"]] = f"{row['synapse_data_type']} -> {row['adls_data_type']}"

  return str(differences)

In [0]:
# Compare rows, columns, data types and data for each table
products, table_names, synapse_size, adls_size, col_dtypes, syn_adls_rows, adls_syn_rows = [], [], [], [], [], [], []

for schema in schemas:
  synapse_schema, adls_schema = f"{schema}{synapse_schema_suffix}", f"{schema}{adls_schema_suffix}"
  
  table_query = f"""SELECT TABLE_NAME AS table_name FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{adls_schema}' """
  tables = run_synapse_query(table_query)
  
  for table in tables.collect():
    table_name = table["table_name"]

    try:
      synapse_row_count = run_synapse_query(f"SELECT COUNT(1) AS count FROM [{synapse_schema}].[{table_name}]").collect()[0]["count"]
      synapse_column_query = f"""SELECT
        TABLE_NAME AS table_name,
        COLUMN_NAME AS col_name, 
        CASE DATA_TYPE
          WHEN 'int' THEN DATA_TYPE
          WHEN 'char' THEN CONCAT(DATA_TYPE, '(', TRIM(STR(CHARACTER_MAXIMUM_LENGTH)), ')' )
          WHEN 'varchar' THEN CONCAT(DATA_TYPE, '(', TRIM(STR(CHARACTER_MAXIMUM_LENGTH)) ,')' )
          WHEN 'decimal' THEN CONCAT(DATA_TYPE, '(', TRIM(STR(NUMERIC_PRECISION)), ',', TRIM(STR(NUMERIC_SCALE)), ')' )
          ELSE DATA_TYPE END AS data_type
        FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}' AND TABLE_SCHEMA = '{synapse_schema}' """
      synapse_columns = run_synapse_query(f"{synapse_column_query}")
      synapse_col_count = synapse_columns.count()
      synapse_rc = (f"({synapse_row_count}, {synapse_col_count})")
      synapse_adls_data_match_query = f"""SELECT * FROM [{synapse_schema}].[{table_name}] EXCEPT SELECT * FROM [{adls_schema}].[{table_name}]"""
      synapse_adls_data = run_synapse_query(synapse_adls_data_match_query).count()
      compared_columns = "-"
    
    except:
      synapse_row_count, synapse_col_count, compared_columns, adls_synapse_data, synapse_adls_data = "-", "-", "{}", "-", "-"

    adls_row_count = run_synapse_query(f"SELECT COUNT(1) AS count FROM [{adls_schema}].[{table_name}]").collect()[0]["count"]
    adls_column_query = f"""SELECT
      TABLE_NAME AS table_name,
      COLUMN_NAME AS col_name, 
      CASE DATA_TYPE
        WHEN 'int' THEN DATA_TYPE
        WHEN 'char' THEN CONCAT(DATA_TYPE, '(', TRIM(STR(CHARACTER_MAXIMUM_LENGTH)), ')' )
        WHEN 'varchar' THEN CONCAT(DATA_TYPE, '(', TRIM(STR(CHARACTER_MAXIMUM_LENGTH)) ,')' )
        WHEN 'decimal' THEN CONCAT(DATA_TYPE, '(', TRIM(STR(NUMERIC_PRECISION)), ',', TRIM(STR(NUMERIC_SCALE)), ')' )
        ELSE DATA_TYPE END AS data_type
      FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}' AND TABLE_SCHEMA = '{adls_schema}' """
    adls_columns = run_synapse_query(f"{adls_column_query}")
    adls_col_count = adls_columns.count()
    adls_rc = (f"({adls_row_count}, {adls_col_count})")

    if compared_columns == "-":
      adls_synapse_data_match_query = f"""SELECT * FROM [{adls_schema}].[{table_name}] EXCEPT SELECT * FROM [{synapse_schema}].[{table_name}]"""
      adls_synapse_data = run_synapse_query(adls_synapse_data_match_query).count()
      compared_columns = compare_table_structure(adls_columns, synapse_columns)
    
    products.append(f"{schema}")
    table_names.append(f"{table_name}")
    synapse_size.append(synapse_rc)
    adls_size.append(adls_rc)
    col_dtypes.append(compared_columns)
    syn_adls_rows.append(synapse_adls_data)
    adls_syn_rows.append(adls_synapse_data)

data = {
  'product': products,
  'table_name': table_names,
  'synapse_size': synapse_size,
  'adls_size': adls_size,
  'synapse_adls_dtypes_mismatch': col_dtypes,
  'synapse-adls_mismatch_rows': syn_adls_rows,
  'adls-synapse_mismatch_rows': adls_syn_rows
  }

final_df = pd.DataFrame(data)
display(final_df)

In [0]:

synapse_schema = 'gismo_btl'
table_name = 't_ax_supp_policy_details'
adls_schema = 'dw_btl_gismo'
table_name_adls = 't_ax_supp_policy_details'
# table_query = f"""SELECT TABLE_NAME AS table_name FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{synapse_schema}' """
table_query = f"""select * from {synapse_schema}.{table_name}"""
df = run_synapse_query(table_query)
df.createOrReplaceTempView(f"""{table_name}_vw_rs""")





#### compare the columns
synapse_tbl_column = df.columns

df = spark.sql("describe table dw_btl_rdapp.claim_metrics")
adls_column_list = [row.col_name for row in df.select('col_name').collect()]
diff_columns = set(synapse_tbl_column) - set(adls_column_list)
if diff_columns:
  print('column mismatch, the difference', diff_columns)

syn_count = run_synapse_query(f'select * from {synapse_schema}.{table_name}').count()
adls_count = spark.sql(f'select count(*) from {adls_schema}.{table_name_adls}').count()



column mismatch, the difference {'incoming_office_region_l2', 'primary_layer', 'incept_to_issue_days', 'insured_client', 'gismo_line_of_business', 'pfo_date', 'layering_flag', 'policy_inception_date', 'underwriting_assistant', 'last12months3matured', 'quote_number', 'parent_client_name', 'modifiedhandoverdate', 'invoice_issue_date', 'underwriting_year', 'endorsement_date', 'incoming_office_region_l1', 'program_currency', 'brokeraddref', 'incoming_producing', 'run_date', 'master_broker_group', 'network_partner_instruction_date', 'true_gp', 'net_ceded_premium', 'policy_id', 'layered_policies', 'endorsement_number', 'incept_to_instruct_days', 'instruct_to_issue_days', 'fir_to_instruct_days', 'local_currency', 'policy_current_status', 'policy_identification', 'date_premium_received_by_np', 'rsr_date', 'incoming_office_type_descr', 'gismo_local_plcy_id', 'pending_due_yn', 'local_broker_group', 'global_program_number', 'edit_date', 'fir_to_issue_days', 'contract_type_code', 'pfx_date', 'prod

In [0]:
%sql
describe dw_btl_gismo.t_ax_supp_policy_details;

col_name,data_type,comment
policy_number,string,
endorsement_number,string,
policy_identification,string,
policy_current_status,string,
global_program_number,string,
account_name,string,
insured_client,string,
local_broker_group,varchar(100),
master_broker_group,varchar(100),
captive_flag,boolean,


In [0]:
synapse_query = "SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 't_ax_supp_policy_gismo_local_policy' AND TABLE_SCHEMA = 'gismo_btl'"
df = run_synapse_query(synapse_query)
display(df)

COLUMN_NAME,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH


In [0]:

syn_count = run_synapse_query(f'select * from {synapse_schema}.{table_name}').count()
adls_count = spark.sql(f'select * from {adls_schema}.{table_name_adls}').count()
print(syn_count)
print(adls_count)


187720
187720


In [0]:
diff_df = spark.sql(f"""
                    select policy_rate_of_exchange from gismo_btl.t_ax_supp_policy_details
                    minus 
                    select policy_rate_of_exchange from dw_btl_gismo.t_ax_supp_policy_details
                    """
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-453372701124406>, line 1[0m
[0;32m----> 1[0m diff_df [38;5;241m=[39m [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[38;5;124;43mf[39;49m[38;5;124;43m"""[39;49m
[1;32m      2[0m [38;5;124;43m                    select policy_rate_of_exchange from gismo_btl.t_ax_supp_policy_details[39;49m
[1;32m      3[0m [38;5;124;43m                    minus [39;49m
[1;32m      4[0m [38;5;124;43m                    select policy_rate_of_exchange from dw_btl_gismo.t_ax_supp_policy_details[39;49m
[1;32m      5[0m [38;5;124;43m                    [39;49m[38;5;124;43m"""[39;49m
[1;32m      6[0m [43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m

In [0]:
%sql
select * from {table_name}_vw_rs;

In [0]:
df = spark.sql("Select net_ceded_premium from dw_btl_gismo.t_ax_supp_policy_details");
display(df)

net_ceded_premium
2975.0
14337.990234375
2700.0
2125.0
116070.046875
-647.1900024414062
7943.60986328125
645537.125
1700.0
1703.75


In [0]:
df = spark.sql(f"SELECT net_ceded_premium FROM {table_name}_vw_rs");
display(df)

net_ceded_premium
2625.0
2700.0
1275.0
2975.0
4250.0
2125.0
6882.39013671875
2550.0
5451.58984375
2454.800048828125


In [0]:
display(diff_df)

Claim Identifier,Claim Number,Global Claim Number,Claim Incident Date,Claim Cause of Loss Description,Claim Status Description,Planning Line of Business Name,Large Catastrophe Attritional Threat Code,Claim Event Code,Claim Event Name,Bulk Bordereaux Indicator
22821886,M990902747,125317.0,1900-01-01,Unknown,,Unknown,CAT,UNM,Unmapped,
23024578,N40020680300,,1900-01-01,Unknown,,Unknown,CAT,UKN,Unknown,
22698539,N60053630103,197992.0,1900-01-01,Unknown,,Unknown,CAT,UNM,Unmapped,
22715753,N60051550302,196631.0,1900-01-01,Unknown,,Unknown,Large,UNM,Unmapped,
23324995,N60163691101,244569.0,1900-01-01,Unknown,,Unknown,Large,UNM,Unmapped,
23134235,A61026640998,32199.0,1900-01-01,Unknown,,Unknown,Large,UNM,Unmapped,
23422389,A61018540901,29632.0,1900-01-01,Unknown,,Unknown,Large,UNM,Unmapped,
23132401,A60008560204,12910.0,1900-01-01,Unknown,,Unknown,Large,UNM,Unmapped,
22940143,N60062651199,203630.0,1900-01-01,Unknown,,Unknown,CAT,UNM,Unmapped,
22933820,M020103082,,1900-01-01,Unknown,,Unknown,Large,UKN,Unknown,


In [0]:
%sql
select * from btl_src.art_claims limit 10;

Claim_currency,Claim_handling_office,Claim_title,ClaimantKey,ClaimHandlerKey,ClaimKey,ClaimReference,ClaimsCauseCategoryKey,ClaimStatusKey,Date_of_closing,Date_of_loss,Date_of_notification,dAxaShareOfTotal,dBase2ClaimExchangeRate,dCurrentExpenses,dCurrentExpensesLegal,dCurrentExpensesReserve,dCurrentIndemnity,dCurrentIndemnityReserve,dCurrentPaidTotal,dCurrentReserveTotal,dGrossTotalIncurred,dInitialExpensesReserve,dInitialIndemnityReserve,dInitialReserve,dRecovery,dtpolicyperiodfrom,EventKey,Last_payment,Loss_location_country,LossTypeKey,OfficeKey,PolicyKey,PolicyLocationKey,PolicyReference,sEventCode
EUR,Paris,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX-X,31311,45658,11929,80212,11,2054,2001-11-19T00:00:00Z,2000-09-19T00:00:00Z,2000-09-26T00:00:00Z,100.0,1.0,364.66,364.66,0.0,0.0,0.0,364.66,0.0,364.66,0.0,14483.0,14483.0,,2004-01-01T00:00:00Z,,2001-11-19T00:00:00Z,,,7,58544,,808008,
EUR,Paris,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX-X,33671,45658,12256,80213,11,2118,2002-10-04T00:00:00Z,2000-10-03T00:00:00Z,2000-10-06T00:00:00Z,100.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,152.0,152.0,,2000-03-21T00:00:00Z,,2002-10-04T00:00:00Z,,,7,65268,,840944,
EUR,Paris,Date de cr?ation du dossier : 06/11/2000,33879,45658,11685,80214,11,2054,2000-11-22T00:00:00Z,2000-10-23T00:00:00Z,2000-11-03T00:00:00Z,100.0,1.0,364.66,364.66,0.0,0.0,0.0,364.66,0.0,364.66,0.0,6098.0,6098.0,,2005-08-01T00:00:00Z,,2000-11-22T00:00:00Z,,,7,60869,,811162,
EUR,Paris,Date de cr?ation du dossier : 13/11/2000,31483,45658,11446,80215,11,2054,2000-12-04T00:00:00Z,2000-10-13T00:00:00Z,2000-10-18T00:00:00Z,100.0,1.0,0.0,0.0,0.0,274.41,0.0,274.41,0.0,274.41,0.0,274.0,274.0,,2004-12-01T00:00:00Z,,2000-12-04T00:00:00Z,,,7,58687,,808267,
EUR,Paris,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX-X,31877,45658,11447,80216,11,2054,2006-10-16T00:00:00Z,2000-10-03T00:00:00Z,2000-10-18T00:00:00Z,100.0,1.0,3152.8,2554.8,0.0,18000.0,0.0,21152.8,0.0,21152.8,0.0,7622.0,7622.0,,2000-12-27T00:00:00Z,,2006-10-16T00:00:00Z,,,7,59144,,809086,
EUR,Paris,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX-X,32782,45658,11817,80217,11,2118,2003-10-16T00:00:00Z,2000-03-14T00:00:00Z,2000-10-09T00:00:00Z,100.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,7622.0,7622.0,,1999-05-01T00:00:00Z,,2003-10-16T00:00:00Z,,,7,59848,,810037,
EUR,Paris,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX-X,33092,45658,12001,80218,11,2054,2001-01-10T00:00:00Z,2000-10-23T00:00:00Z,2000-10-30T00:00:00Z,100.0,1.0,0.0,0.0,0.0,503.08,0.0,503.08,0.0,503.08,0.0,1067.0,1067.0,,2002-04-01T00:00:00Z,,2001-01-10T00:00:00Z,,,7,60288,,810533,
EUR,Paris,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX-X,33239,45658,10124,80219,11,2118,2000-12-12T00:00:00Z,2000-09-01T00:00:00Z,2000-10-30T00:00:00Z,100.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3811.0,3811.0,,2000-06-01T00:00:00Z,,2000-12-12T00:00:00Z,,,7,60373,,810626,
EUR,Paris,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX-X,33321,45658,11819,80220,11,2118,2004-04-02T00:00:00Z,2000-10-24T00:00:00Z,2000-11-02T00:00:00Z,100.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,152.0,152.0,,2002-06-01T00:00:00Z,,2004-04-02T00:00:00Z,,,7,60783,,811070,
EUR,Paris,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX-X,30818,45658,12254,80221,11,2118,2003-08-12T00:00:00Z,2000-10-15T00:00:00Z,2000-10-26T00:00:00Z,100.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2287.0,2287.0,,2000-06-06T00:00:00Z,,2003-08-12T00:00:00Z,,,7,65396,,841074,
