In [26]:
import pandas as pd
import os
import json
import xmltodict
import json
import xml.etree.ElementTree as ET

In [27]:
# parse nodes - Microsoft.Package 8


# Define namespaces
namespaces = {
    'DTS': 'www.microsoft.com/SqlServer/Dts'
}

# Function to parse Executable elements (Control Flow)
def parse_executables(executables, depth=0):
    executables_info = []
    for executable in executables:
        exec_type = executable.attrib.get(f'{{{namespaces["DTS"]}}}ExecutableType')
        name = executable.attrib.get(f'{{{namespaces["DTS"]}}}Name')
        refid = executable.attrib.get(f'{{{namespaces["DTS"]}}}refId')
        description = executable.attrib.get(f'{{{namespaces["DTS"]}}}Description')
        object_data = executable.attrib.get(f'{{{namespaces["DTS"]}}}ObjectData')

        executables_info.append({'name': name, 'type': exec_type, 'depth': depth, 'refid': refid, 'description': description, 'object_data': object_data})
        
        # Check for nested executables
        nested_executables = executable.findall('DTS:Executables/DTS:Executable', namespaces)
        if nested_executables:
            executables_info.extend(parse_executables(nested_executables, depth + 1))
    
    return executables_info

# Function to parse Data Flow components
def parse_data_flows(data_flow_tasks):
    data_flow_info = []
    for task in data_flow_tasks:
        task_name = task.attrib.get(f'{{{namespaces["DTS"]}}}Name')
        components = task.findall('.//component', namespaces)
        for component in components:
            comp_name = component.attrib.get('name')
            comp_class_id = component.attrib.get('componentClassID')
            refid = component.attrib.get('refId')
            name = component.attrib.get('name')
            data_flow_info.append({'task_name': task_name, 'component_name': comp_name, 'classID': comp_class_id, 'name':name, 'refid': refid})
    return data_flow_info

# Load and parse the uploaded .dtsx file
def parse_dtsx(file_path):
    tree = ET.parse(file_path)
    root = tree.getroot()

    # Parse control flow
    control_flow = parse_executables(root.findall('.//DTS:Executable', namespaces))
    
    # Parse data flow tasks
    data_flow_tasks = root.findall('.//DTS:Executable[@DTS:ExecutableType="Microsoft.Pipeline"]', namespaces)
    data_flow_info = parse_data_flows(data_flow_tasks)
    
    return control_flow, data_flow_info



In [28]:
# parse nodes - DailyETLMain.dtsx specific

# Define namespaces
namespaces = {
    'DTS': 'www.microsoft.com/SqlServer/Dts'
}

# Function to parse Executable elements (Control Flow)
def parse_executables(executables, depth=0):
    executables_info = []
    for executable in executables:
        print(executable)

       # executables_info.append({'name': name, 'type': exec_type, 'depth': depth, 'refid': refid, 'description': description, 'object_data': object_data})
        
        # Check for nested executables
        nested_executables = executable.findall('DTS:Executables/DTS:Executable')#, namespaces)
        if nested_executables:
            for i in nested_executables:
                print("    " + i)
    
    return executables_info


# Load and parse the uploaded .dtsx file
def parse_dtsx(file_path):
    tree = ET.parse(file_path)
    root = tree.getroot()

    # Parse control flow
    control_flow = parse_executables(root.findall('.//DTS:Executable'))
    
    return control_flow

In [29]:
file_path = 'data/DailyETLMain.dtsx'
control_flow, data_flow_info = parse_dtsx(file_path)

(control_flow, data_flow_info)

SyntaxError: prefix 'DTS' not found in prefix map (<string>)

In [30]:
file_path = 'data/ISP_01.dtsx' # good
control_flow, data_flow_info = parse_dtsx(file_path)

(control_flow, data_flow_info)

SyntaxError: prefix 'DTS' not found in prefix map (<string>)

In [31]:
file_path = 'data/ISP_02.dtsx' # good
control_flow, data_flow_info = parse_dtsx(file_path)

(control_flow, data_flow_info)

SyntaxError: prefix 'DTS' not found in prefix map (<string>)

In [32]:
file_path = 'data/ISP_04.dtsx' # good
control_flow, data_flow_info = parse_dtsx(file_path)

(control_flow, data_flow_info)

SyntaxError: prefix 'DTS' not found in prefix map (<string>)

## Convert to json

In [33]:
class Load():
    def __init__(self, path):
        self.path = path


    def remove_at_signs(self, obj):
        if isinstance(obj, dict):
            return {key.replace('@', ''): self.remove_at_signs(value) for key, value in obj.items()}
        elif isinstance(obj, list):
            return [self.remove_at_signs(item) for item in obj]
        else:
            return obj
        

    def remove_first_layer(self, json_dict):
        # Extract values from the first layer
        values = list(json_dict.values())
        return values

    def run(self):

        # Path to the XML file
        with open(self.path, 'rb') as f:
            self.xml = f.read()

   
        # open the xml file
        o = xmltodict.parse(self.xml)  # every time you reload the file in colab the key changes (file (1).xml becomes file (2).xml ...)

        json = self.remove_first_layer(self.remove_at_signs(o))[0]

        return json
    


files = [file for file in os.listdir('data/')]
#files = [file for file in os.listdir('data/Creating a Simple ETL Package/Completed Packages/')]

nodes_list = []
filter_list = []

for file in files[:2]:
    

    if file.endswith('.dtsx'):
        print(file)
        json = Load(f"data/{file}").run()
        #json = Load(f'data/Creating a Simple ETL Package/Completed Packages/{file}').run()
    #print(json)

DailyETLMain.dtsx


In [41]:
# print control flow nodes and data flow nodes tree


for control_node in json['DTS:Executables']['DTS:Executable']:

    print("control node name: ",control_node['DTS:ObjectName'])
    print("control node description: ",control_node['DTS:Description'])

    print()

    if control_node['DTS:Description'] == 'Sequence Container':
        for executable in control_node['DTS:Executables']['DTS:Executable']:
           # print("     ", executable)
            print("     executable name: ", executable['DTS:ObjectName'])
            print("     executable description: ", executable['DTS:Description'])

            #print()

            if executable['DTS:CreationName'] == 'Microsoft.Pipeline': # if the node is a pipeline, then get into data flow components (source and destination)

                for component in executable['DTS:ObjectData']['pipeline']['components']['component']:
                    print()
                    print("          component name: ", component['name'])
                    print("          component description: ", component['description'])
                    print("          component properties: ", component['properties'])

                    for property in component['properties']['property']:

                        # !!! remove the ifs, does not depend on source and destination!!!!!
                        if property['name'] == 'SqlCommand' and component['description'] == 'OLE DB Source': # extract sql command
                            print("          SQL command: ", property['#text'])
                        if property['name'] == 'OpenRowset' and component['description'] == 'OLE DB Destination':  # extract source or destination tables (change if)
                            print("          table name: ", property['#text'])
                        if property['name'] == 'ParameterMapping' and component['description'] == 'OLE DB Source': # list of variables of sql query
                            print("          parameters query: ", property['#text'])
                    print()

            elif executable['DTS:CreationName'] =='Microsoft.ExecuteSQLTask': # if the task is ExecuteSQL then print query and variables
                print("     SQL query: ", executable['DTS:ObjectData']['SQLTask:SqlTaskData']['SQLTask:SqlStatementSource'])
                try:
                    variables = executable['DTS:ObjectData']['SQLTask:SqlTaskData']['SQLTask:ParameterBinding']
                    if type(variables) == list:
                        print("     Variables: ", [i['SQLTask:DtsVariableName'] for i in executable['DTS:ObjectData']['SQLTask:SqlTaskData']['SQLTask:ParameterBinding']])
                    elif type(variables) == dict:
                        print("     Variables: ", executable['DTS:ObjectData']['SQLTask:SqlTaskData']['SQLTask:ParameterBinding']['SQLTask:DtsVariableName'])
                except: 
                    pass

                print()

            elif executable['DTS:CreationName'] =='Microsoft.ExpressionTask': 
                print("     SQL query: ", executable['DTS:ObjectData']['ExpressionTask']['Expression'])
                print()


control node name:  Calculate ETL Cutoff Time backup
control node description:  Expression Task

control node name:  Ensure Date Dimension includes current year
control node description:  Execute SQL Task

control node name:  Load City Dimension
control node description:  Sequence Container

     executable name:  Extract Updated City Data to Staging
     executable description:  Data Flow Task

          component name:  Integration_City_Staging
          component description:  OLE DB Destination
          component properties:  {'property': [{'dataType': 'System.Int32', 'description': 'The number of seconds before a command times out.  A value of 0 indicates an infinite time-out.', 'name': 'CommandTimeout', '#text': '0'}, {'dataType': 'System.String', 'description': 'Specifies the name of the database object used to open a rowset.', 'name': 'OpenRowset', '#text': '[Integration].[City_Staging]'}, {'dataType': 'System.String', 'description': 'Specifies the variable that contains the n

In [37]:
# order nodes in sequence containers

def sort_precedeneces(precedences_list, precedences_list_sorted):

    # for element in precedence list, recursively add all the node one by one in order
    for i in precedences_list:
        # if the last element of the sorted list is the same as the first element of the list of lists
        if precedences_list_sorted[-1] == i[0]:

            precedences_list_sorted.append(i[1]) # append to sorted list
            sort_precedeneces(precedences_list, precedences_list_sorted) # recursively recall the function

    return precedences_list_sorted


            
for control_node in json['DTS:Executables']['DTS:Executable']:
    #print(control_node)
    precedences_list = []

    # if node is sequence node
    if control_node['DTS:Description'] == 'Sequence Container':
        print("control node name: ", control_node['DTS:ObjectName'])

        # for precedences in precedence, create list of lists where each list contains the from and to nodes
        for constraint in control_node['DTS:PrecedenceConstraints']['DTS:PrecedenceConstraint']:
            ffrom = f"{constraint['DTS:From']}".split("\\")[-1]
            to = f"{constraint['DTS:To']}".split("\\")[-1]

            precedences_list.append([ffrom, to])

        list_1 = [i[1] for i in precedences_list] # list with second elements of all pairs

        precedences_list_sorted = []

        # for element in precedence list, 
        for i in precedences_list:
            # if the first node is never a second node then the node is the first one in the sequence container, therefore append both elements to list, as first and second, then keep adding until done
            if i[0] not in list_1:
                precedences_list_sorted.append(i[0])
                precedences_list_sorted.append(i[1])


        print(sort_precedeneces(precedences_list, precedences_list_sorted))
        print()


control node name:  Load City Dimension
['Set TableName to City', 'Get Lineage Key', 'Truncate City_Staging', 'Get Last City ETL Cutoff Time', 'Extract Updated City Data to Staging', 'Migrate Staged City Data']

control node name:  Load Customer Dimension
['Set TableName to Customer', 'Get Lineage Key', 'Truncate Customer_Staging', 'Get Last Customer ETL Cutoff Time', 'Extract Updated Customer Data to Staging', 'Migrate Staged Customer Data']

control node name:  Load Employee Dimension
['Set TableName to Employee', 'Get Lineage Key', 'Truncate Employee_Staging', 'Get Last Employee ETL Cutoff Time', 'Extract Updated Employee Data to Staging', 'Migrate Staged Employee Data']

control node name:  Load Movement Fact
['Set TableName to Movement', 'Get Lineage Key', 'Truncate Movement_Staging', 'Get Last Movement ETL Cutoff Time', 'Extract Updated Movement Data to Staging', 'Migrate Staged Movement Data']

control node name:  Load Order Fact
['Set TableName to Order', 'Get Lineage Key', 'Tr