- **JIRA Ticket** : PLI-13998
- **Version** : 0.1v
- **Last_Modified_Date** : 03/28/2024 
- **Author** : Fnu Pratik
- **Remark** : Created Script to iterate through source files, identify specific node within each XML file, and verify 
               updates in corresponding Hive tables in raw and Datamart.


In [0]:
# Define the widget parameters
widget_params = {
    "Date": {
        "type": "date",
        "default": "2024-03-22", 
    },
    "BaseFolderPath": {
        "type": "text",
        "default": "/mnt/raw/API_DATA/MAJESCO_PAS/POLICY/XML/",  
    },
    "Load_Folder": {
        "type": "text",
        "default": "No",  
    }
}
#  dbfs:/mnt/raw/API_DATA/MAJESCO_PAS/POLICY/XML/2024/03/22/2000001673-01_2024-03-19T11:47:53_convertQuote.xml

# Register the widgets
for widget_name, widget_config in widget_params.items():
    dbutils.widgets.text(widget_name, widget_config.get("default"), widget_config.get("label"))

# Create a multi-select widget to choose nodes
dbutils.widgets.text("Nodes", "QuotePolicy")

# Get the selected nodes from the widget
selected_nodes = dbutils.widgets.get("Nodes").split(',')

In [0]:
import os
from datetime import datetime
 
# Retrieve the widget value
selected_date_str = dbutils.widgets.get("Date")
selected_date = datetime.strptime(selected_date_str, "%Y-%m-%d")
# Process the retrieved date to the required format
selected_date_formatted = selected_date.strftime("%Y/%m/%d")
print("Selected Date:", selected_date_formatted)

# Read processed file names - This is to exclude these files in the current run as they are successfully 
# processed in the last run 
 
file_path = f"/tmp/data_validation_test/API_DATA/MAJESCO_PAS/POLICY/XML/{selected_date_formatted}/processed_file.txt"
 
try:
  # Read processed-file.txt from DBFS
  head_lines = dbutils.fs.head(file_path)
    
  # Split the content by commas and store in a list
  processed_files_list = [file.strip() for file in head_lines.split(',')]
  print("Processed files:", processed_files_list)
except Exception as e:
  # Create an empty file if it doesn't exist
  dbutils.fs.put(file_path, "", overwrite=True)
  processed_files_list = []
  print("Processed files: (processed_file.txt not found, created new file)")

Selected Date: 2024/03/22
Processed files: ['2000001673-01_2024-03-19T11:47:53_convertQuote.xml', '2000001751-01_2024-03-22T10:07:26_endorsePolicy.xml', '2000001790-01_2024-03-22T10:17:25_quoteEndorseConvert.xml', '2000001790-01_2024-03-22T09:53:56_createPolicy.xml', '2000001786-01_2024-03-22T08:15:16_cancellation.xml', '2000001791-01_2024-03-22T10:33:46_convertQuote.xml']


In [0]:
new_files = []

try:
    # Get the folder path
    base_folder_path = dbutils.widgets.get("BaseFolderPath")
    folder_path = f"{base_folder_path}{selected_date_formatted}"
    print("Folder path:", folder_path)
    
    # List files in the folder
    files = dbutils.fs.ls(folder_path)
    
    # Check if Load_Folder is Yes
    load_folder = dbutils.widgets.get("Load_Folder")
    if load_folder.lower() == "yes":
        for file in files:
            new_files.append(file.name)
    else:
        # Extract file names and filter out those not present in processed-file.txt
        for file in files:
            file_name = file.name
            if file_name not in processed_files_list:
                new_files.append(file_name)

    print("New files:", new_files)
 
except FileNotFoundError:
    print("Folder not found:", folder_path)
 
except Exception as e:
    print("An error occurred while listing files:", str(e))


An error occurred while listing files: name 'selected_date_formatted' is not defined


In [0]:
import re

# Define the destination folder path where you want to copy the files
destination_folder_path = f"/tmp/data_validation_test/API_DATA/MAJESCO_PAS/POLICY/XML/{selected_date_formatted}/new_files/"

# Create a new list to store the updated destination file names
updated_new_files = []

# Create an empty dictionary to store the mapping of original filenames to destination filenames
file_mapping = {}

# List files in the destination folder
existing_files = [file.name for file in dbutils.fs.ls(destination_folder_path)]

for file_name in new_files:
    try:
        # Extract the prefix of the filename up to the first occurrence of "T"
        prefix, _, suffix = file_name.partition("T")
        
        # Extract the suffix of the filename after the last occurrence of "_"
        suffix = suffix.rsplit("_", 1)[-1]
        
        destination_file_name = f"{prefix}{suffix}"
        
        # Check if the destination file already exists
        if destination_file_name in existing_files:
            print(f"File '{destination_file_name}' already exists in '{destination_folder_path}'. Skipping copy.")
        else:
            source_file_path = f"{folder_path}/{file_name}"
            destination_file_path = f"{destination_folder_path}/{destination_file_name}"

            # Copy the file with updated name to the destination folder
            dbutils.fs.cp(source_file_path, destination_file_path)

            print(f"File '{file_name}' copied to '{destination_folder_path}' with updated name '{destination_file_name}'.")

        # Append the destination file name to the updated new files list
        updated_new_files.append(destination_file_name)
        
        # Add mapping of original filename to the new destination filename in the dictionary
        file_mapping[file_name] = destination_file_name
    except Exception as e:
        print(f"An error occurred while processing '{file_name}': {str(e)}")

# Update the new_files list with the updated destination file names
new_files = updated_new_files

# Print the updated new_files list
print("Updated new files list:")
print(new_files)

# Print the file mapping dictionary
print("File mapping dictionary:")
print(file_mapping)


File '2000001786-01_2024-03-22cancellation.xml' already exists in '/tmp/data_validation_test/API_DATA/MAJESCO_PAS/POLICY/XML/2024/03/22/new_files/'. Skipping copy.
File '2000001790-01_2024-03-22createPolicy.xml' already exists in '/tmp/data_validation_test/API_DATA/MAJESCO_PAS/POLICY/XML/2024/03/22/new_files/'. Skipping copy.
File '2000001790-01_2024-03-22quoteEndorseConvert.xml' already exists in '/tmp/data_validation_test/API_DATA/MAJESCO_PAS/POLICY/XML/2024/03/22/new_files/'. Skipping copy.
File '2000001791-01_2024-03-22convertQuote.xml' already exists in '/tmp/data_validation_test/API_DATA/MAJESCO_PAS/POLICY/XML/2024/03/22/new_files/'. Skipping copy.
Updated new files list:
['2000001786-01_2024-03-22cancellation.xml', '2000001790-01_2024-03-22createPolicy.xml', '2000001790-01_2024-03-22quoteEndorseConvert.xml', '2000001791-01_2024-03-22convertQuote.xml']
File mapping dictionary:
{'2000001786-01_2024-03-22T08:15:16_cancellation.xml': '2000001786-01_2024-03-22cancellation.xml', '2000

##### Step - 4 - Iterate through new XML files and extract node information

In [0]:
# Mapping of nodes to raw tables
node_table_mapping = {
    "QuotePolicy": "raw.maj_policy_Wc_QuotePolicy",
    "InsuredList": "raw.maj_policy_wc_insuredlist_insured",
    "ProducerList": "raw.maj_policy_wc_producerlist_producer",
    "Product": "raw.maj_policy_wc_product_form",
    "CommissionList": "raw.maj_policy_wc_commissionlist_commission",
    "SurchargesList": "raw.maj_policy_wc_product_surcharges"
}

# Mapping of tables to entity reference columns
entity_reference_col_mapping = {
    "raw.maj_policy_Wc_QuotePolicy": "QPList_QP_EntityReference",
    "raw.maj_policy_wc_insuredlist_insured": "InsuredList_IL_EntityReference",
    "raw.maj_policy_wc_producerlist_producer": "ProducerList_PR_EntityReference",
    "raw.maj_policy_wc_product_form": "Product_PR_EntityReference",
    "raw.maj_policy_wc_commissionlist_commission": "CList_Comm_PolicyReference_newValue",
    "raw.maj_policy_wc_product_surcharges": "SurchargesList_SL_EntityReference"
}


In [0]:
# Define function to run SQL count queries
def run_count_query(table_name, entity_reference_value):
    # Get the corresponding entity reference column name
    entity_reference_col = entity_reference_col_mapping.get(table_name)
    if not entity_reference_col:
        print(f"No entity reference column found for table: {table_name}")
        return None
    # Generate SQL query with the appropriate column name
    sql_query = f"SELECT count(*) as count FROM {table_name} WHERE {entity_reference_col} = '{entity_reference_value}'"
    # Run the count SQL query using Spark SQL
    count_df = spark.sql(sql_query)
    count_value = count_df.collect()[0]["count"]
    return count_value

In [0]:
# Function to insert values into Hive table
def insert_into_hive_table(load_id, created_datetime, updated_datetime, xml_filename, listed_nodes, updated_table_name, status, remark, file_received_date):
    try:
        spark.sql(f"""
            INSERT INTO TABLE test.data_load_monitoring_v1
            VALUES (
                '{load_id}',
                '{created_datetime}',
                '{updated_datetime}',
                '{xml_filename}',
                '{listed_nodes}',
                '{updated_table_name}',
                '{status}',
                '{remark}',
                '{file_received_date}'
            )
        """)
        print("Row inserted into test.data_load_monitoring_v1 successfully.")
    except Exception as e:
        print(f"Error inserting row into test.data_load_monitoring_v1: {str(e)}")


In [0]:
from pyspark.sql import SparkSession
from datetime import datetime, date
import uuid

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("XML Processing") \
    .config("spark.sql.caseSensitive", "true") \
    .getOrCreate()

# Loop over new_files
for file_name in new_files:
    file_path = f"{destination_folder_path}{file_name}"
    
    try:
        # Read the XML file into a Spark DataFrame
        spark_df = spark.read \
                        .format("xml") \
                        .option('rowTag', 'QuotePolicy') \
                        .load(f"dbfs:{file_path}")
        
        # Check if the Spark DataFrame is not null (i.e., not empty)
        if spark_df:
            # Check if the specified columns are present
            column_existence = {col_name: col_name in spark_df.columns for col_name in selected_nodes}
            column_existence["QuotePolicy"] = True
            entity_reference_value = spark_df.select("_EntityReference").first()[0]
            print(f"File: {file_name}")
            print("EntityReference Value:", entity_reference_value)
            updated_table_list =[]
            fail_update_table_list= []

            for col_name, exists in column_existence.items():
                if exists:
                    table_name = node_table_mapping.get(col_name)
                    if table_name:
                        count_value = run_count_query(table_name, entity_reference_value)
                        if count_value is not None:
                            if count_value == 1:
                                print(f"{col_name} node is updated in {table_name}")
                                updated_table_list.append(table_name)
                                policy_master_id_df = spark.sql("""
                                    SELECT DISTINCT PolicyMasterID 
                                    FROM mart.policydimension 
                                    WHERE SystemID = 70 AND SourcePolicyGUID = '{0}'
                                """.format(entity_reference_value))

                                policy_master_id_row = policy_master_id_df.first()

                                if policy_master_id_row:
                                    updated_table_list.append('mart.policydimension')
                                    policy_master_id = policy_master_id_row["PolicyMasterID"]
                                    policy_transaction_detail_df = spark.sql("""
                                        SELECT *
                                        FROM mart.policytransactiondetail
                                        WHERE Policymasterid = '{0}'
                                    """.format(policy_master_id))
                                    total_count = policy_transaction_detail_df.count()

                                    if total_count > 0:
                                        updated_table_list.append('mart.policytransactiondetail')
                                        print(f"{col_name} node is updated in mart.policydimension and mart.policytransactiondetail table")
                                    else:
                                        print(f"{entity_reference_value} is not updated in mart.policytransactiondetail ")
                                        fail_update_table_list.append('mart.policytransactiondetail')
                                else:
                                    print(f"{entity_reference_value} is not updated in mart.policydimension ")
                                    fail_update_table_list.append('mart.policydimension')

                            else:
                                print(f"{col_name} node is not updated in {table_name}")
                                fail_update_table_list.append(table_name)
                    else:
                        print(f"No mapping found for {col_name}")

            # Generate UUID for load_id
            load_id = str(uuid.uuid4())

            # Generate FileReceivedDate
            file_received_date = selected_date
            updated_table_list = list(set(updated_table_list))
            fail_update_table_list = list(set(fail_update_table_list))

            # Get original file name from file_mapping dictionary
            original_filename = next((key for key, val in file_mapping.items() if val == file_name), None)
            

            # Insert into Hive table test.data_load_monitoring_v1
            status = "Fail" if fail_update_table_list else "Success"
            remark = "Update failed for tables: " + ", ".join(fail_update_table_list) if fail_update_table_list else "Update successful for all tables"
            insert_into_hive_table(
                load_id=load_id, 
                created_datetime=datetime.now(), 
                updated_datetime=datetime.now(), 
                xml_filename=original_filename, 
                listed_nodes=", ".join(selected_nodes), 
                updated_table_name=", ".join(updated_table_list), 
                status=status, 
                remark=remark,
                file_received_date=file_received_date
            )

            print("-----------------------------------------------------------")
        else:
            print(f"File: {file_name} - No data found in the XML file.")

    except Exception as e:
        print(f"An error occurred while processing {file_name}: {str(e)}")





File: 2000001786-01_2024-03-22cancellation.xml
EntityReference Value: P01WK0000017860001
QuotePolicy node is updated in raw.maj_policy_Wc_QuotePolicy
QuotePolicy node is updated in mart.policydimension and mart.policytransactiondetail table
CommissionList node is not updated in raw.maj_policy_wc_commissionlist_commission
Row inserted into test.data_load_monitoring_v1 successfully.
-----------------------------------------------------------
File: 2000001790-01_2024-03-22createPolicy.xml
EntityReference Value: P01WK0000017900000
QuotePolicy node is updated in raw.maj_policy_Wc_QuotePolicy
QuotePolicy node is updated in mart.policydimension and mart.policytransactiondetail table
CommissionList node is updated in raw.maj_policy_wc_commissionlist_commission
CommissionList node is updated in mart.policydimension and mart.policytransactiondetail table
Row inserted into test.data_load_monitoring_v1 successfully.
-----------------------------------------------------------
File: 2000001790-01_20

##### Step - 5 - Add the new file names to processed_file.txt and delete new files from the temp folder

In [0]:

# Execute SQL query
result_df = spark.sql("""
    SELECT DISTINCT XML_Filename 
    FROM test.data_load_monitoring_v1 
    WHERE FileReceivedDate = '2024-03-22'
""")

# Collect distinct filenames into a Python list
distinct_filenames = [row['XML_Filename'] for row in result_df.collect()]

# Print or use the distinct filenames as needed
print(distinct_filenames)

['2000001790-01_2024-03-22T10:17:25_quoteEndorseConvert.xml', '2000001790-01_2024-03-22T09:53:56_createPolicy.xml', '2000001786-01_2024-03-22T08:15:16_cancellation.xml', '2000001791-01_2024-03-22T10:33:46_convertQuote.xml']


In [0]:
# Read the entire file
head_lines = dbutils.fs.head(file_path)

# Concatenate the existing content with the filenames obtained from SQL
updated_content = head_lines.strip() + ', ' + ', '.join(distinct_filenames)

# Write the updated content back to the file
dbutils.fs.put(file_path, updated_content, overwrite=True)


Wrote 317 bytes.
Out[20]: True

### -------------------------------------------------------------------------------

##### miscellaneous

In [0]:
%sql

-- Create a new table with the required schema and partition by FileReceivedDate
CREATE TABLE IF NOT EXISTS test.data_load_monitoring_v1
(
    Load_ID STRING,
    CreatedDateTime TIMESTAMP,
    UpdatedDateTime TIMESTAMP,
    XML_Filename STRING,
    ListedNodes STRING,
    UpdatedTableName STRING,
    Status STRING,
    Remark STRING
)
USING DELTA
PARTITIONED BY (FileReceivedDate DATE);



In [0]:
%sql
select PolicyMasterID from mart.policydimension where SystemID = 70 and SourcePolicyGUID = "P01WK0000017900001"

PolicyMasterID
90020650


In [0]:
%sql
select * from mart.policytransactiondetail where Policymasterid = '90020650'

PolicyTransactionDetailID,PolicyPrefix,CoverageID,TransactionEffectiveDate,PremiumType,CompanyCode,LineOfBusinessCode,RatingType,ClassCode,RiskNumber,UserCode,LimitDeductibleID,PolicyVehicleID,SystemID,ExposureState,PolicyEffectiveDate,PolicyNumber,EntryDate,EntryTime,RSN,RiskID,TransactionType,WorkCompClassID,RevenueTypeDescription,RevenueType,AccountID,Policymasterid,PolicyDecNumber,UnitNumber,CoverageCode,SIRAmount,RevenueAmount,PolicyCoverageMasterID,AddDate,AgentID,BranchNumber,DeleteDate,DriverIndexNumber,EarnedInFullIndicator,EarnedPremium,ExcludeFromCommission,InforceChange,InforceChangeCalculated,InforceNew,InforceOld,InsuredID,LimitCode,ManualCorrectionFlag,Master1099ID,PolicyDriverID,PolicyPrefixID,ProtectiveShareAmount,RatingPeriodID,RedBrickRecordKey,ScorecardPointsID,SliceNumber,SplitNumber,StopDate,TruckingBureauCoveragesID,TruckingCoverageID,VehicleUnitType,WrittenChangeCalculated,WrittenNew,WrittenOld,WrittenChange,Rate,DWHCreatedBy,DWHCreatedDateTime,DWHUpdatedBy,DWHUpdatedDateTime,CompanyID,LineOfBusinessID,AnnualPremium,EvaluatedPremium,PolicyTermPremium,PriorPremium,TotalProRatedChangePremium,Premium,ChangePremium,Commission,CoverageTypeCode
90026114,AW,7394,2024-03-22T00:00:00.000+0000,P,PIC,SBWC,,16,10,kayec@protective.com,0,,70,,2024-03-22T00:00:00.000+0000,2000001790,2024-03-22,101725,,2190757,,14865939,,,20952,90020650,1,0,WR,,,,2024-03-22T10:17:25.000+0000,30318,,,,,,N,3035.0,3035.0,3035.0,0.0,5084412,,,,,139,,,,,,,,,,,3035.0,3035.0,0.0,3035.0,,5649e4c2-3d5f-4377-be81-3c1e7949d907,2024-03-22T15:25:52.123+0000,5649e4c2-3d5f-4377-be81-3c1e7949d907,2024-03-22T15:25:52.123+0000,11,10,3035.0,,3035.0,0.0,3035.0,3035.0,3035.0,242.8,
90026115,AW,7394,2024-03-22T00:00:00.000+0000,P,PIC,SBWC,,35,10,kayec@protective.com,0,,70,,2024-03-22T00:00:00.000+0000,2000001790,2024-03-22,101725,,2190757,,14865941,,,20952,90020650,1,0,WR,,,,2024-03-22T10:17:25.000+0000,30318,,,,,,N,0.0,0.0,3250.0,3250.0,5084412,,,,,139,,,,,,,,,,,0.0,3250.0,3250.0,0.0,,5649e4c2-3d5f-4377-be81-3c1e7949d907,2024-03-22T15:25:52.123+0000,5649e4c2-3d5f-4377-be81-3c1e7949d907,2024-03-22T15:25:52.123+0000,11,10,3250.0,,3250.0,3250.0,0.0,3250.0,3250.0,260.0,


In [0]:


# Qp  -- raw.maj_policy_Wc_QuotePolicy
# Insured -- raw.
# Producer
# Product
# comission  -- raw.maj_policy_wc_commissionlist_commission
# surcharges
 
 
# Mart.
# PolicyDimension
# PolicyTransactionDetail
# PolicySurchargeTransacxtionDetail
# AccountDimension
# InsuredDImesnion

In [0]:
# spark.sql("SELECT * FROM raw.maj_policy_Wc_QuotePolicy WHERE QPList_QP_EntityReference = 'P01WK0000017900001'").show()

spark.sql("SELECT DISTINCT * FROM raw.maj_policy_wc_commissionlist_commission WHERE CList_Comm_PolicyReference_newValue = 'P01WK0000017860001'").show()


+--------------------+--------------------------------+--------------------------------+------------------------------+------------------------------+--------------------------------+--------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+--------------------------------+----------------------+----------------------+-------------------------------------+-------------------------------------+-----------------------------------+-----------------------------------+---------------------------------+---------------------------------+-----------------------------------+-----------------------------------+--------------------------------+--------------------------------+------------------------+------------------------+-------------------------------+-------------------------------+--------------------------------+--------------------------------+--------------------------------+------------

In [0]:
%sql 

SELECT DISTINCT * FROM raw.maj_policy_wc_commissionlist_commission WHERE CList_Comm_PolicyReference_newValue = 'P01WK0000017860001'

CList_Comm_AuditFlag,CList_Comm_CommPlanComm_oldValue,CList_Comm_CommPlanComm_newValue,CList_Comm_CommPlanId_oldValue,CList_Comm_CommPlanId_newValue,CList_Comm_CommPlanName_oldValue,CList_Comm_CommPlanName_newValue,CList_Comm_Commission_oldValue,CList_Comm_Commission_newValue,CList_Comm_DateCreated_oldValue,CList_Comm_DateCreated_newValue,CList_Comm_DateModified_newValue,CList_Comm_Id_oldValue,CList_Comm_Id_newValue,CList_Comm_IsCommPlanPercent_oldValue,CList_Comm_IsCommPlanPercent_newValue,CList_Comm_IsCommPlanTired_oldValue,CList_Comm_IsCommPlanTired_newValue,CList_Comm_NetCommission_oldValue,CList_Comm_NetCommission_newValue,CList_Comm_PolicyReference_oldValue,CList_Comm_PolicyReference_newValue,CList_Comm_ProducerCode_oldValue,CList_Comm_ProducerCode_newValue,CList_Comm_Type_oldValue,CList_Comm_Type_newValue,CList_Comm_UserCreated_oldValue,CList_Comm_UserCreated_newValue,CList_Comm_UserModified_newValue,CList_Comm_DateModified_oldValue,CList_Comm_UserModified_oldValue,CList_Comm_AdjustedCommission_oldValue,CList_Comm_AdjustedCommission_newValue,CList_Comm_CommPlanAdjustment_oldValue,CList_Comm_CommPlanAdjustment_newValue,CList_Comm_CommPlanDesc_oldValue,CList_Comm_CommPlanDesc_newValue
M,10,10,433,433,Test,Test,133.5,0,2024-03-21T06:52:59,2024-03-22T08:17:19,,3824,3838,Y,Y,0,0,133.5,0,P01WK0000017860000,P01WK0000017860001,40901003,40901003,Commission,Commission,batchuser@protective.com,batchuser@protective.com,,2024-03-21T06:52:59,batchuser@protective.com,,,,,,
M,17,17,658,658,Commission1,Commission1,226.95,0,2024-03-21T06:52:59,2024-03-22T08:17:19,,3823,3837,Y,Y,0,0,226.95,0,P01WK0000017860000,P01WK0000017860001,40901003,40901003,Commission,Commission,batchuser@protective.com,batchuser@protective.com,,2024-03-21T06:52:59,batchuser@protective.com,,,,,Protective comm plan,Protective comm plan
