In [None]:
SSIS_AdlsAccountName = ''
SSIS_AdlsConatinerName = ''
SSIS_PackageFileName = ''
SISS_AppName = ''

StatementMeta(notebookrun, 13, 75, Finished, Available)



In [None]:
import json
import re
import os
from enum import Enum
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import explode
from pyspark.sql import SparkSession, Row
from notebookutils import mssparkutils
from pyspark.sql.functions import to_json

StatementMeta(notebookrun, 22, 2, Finished, Available)



In [None]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

my_jars = os.environ.get("SPARK_HOME")
myconf = SparkConf()
myconf.set("spark.jars","%s/jars/log4j-1.2.17.jar" % my_jars)
spark = SparkSession\
 .builder\
 .appName("DB2_Test")\
 .config(conf = myconf) \
 .getOrCreate()


Logger= spark._jvm.org.apache.log4j.Logger
mylogger = Logger.getLogger(SISS_AppName)

StatementMeta(notebookrun, 22, 59, Finished, Available)



In [None]:
def log_msgs(msg_type,msg):
        
        if msg_type.upper() == "ERROR":
            print('ERROR: %s' % msg)
            mylogger.error(msg)
        elif msg_type.upper() == "DEBUG":
            print('DEBUG: %s' % msg)
            mylogger.debug("Fim")
        elif msg_type.upper() == "TRACE":
            print('TRACE: %s' % msg)
            mylogger.trace("Fim")
        else: # INFO
            print('INFO: %s' % msg)
            mylogger.info("Fim")

log_msgs('INFO', "Logger started!")
           

StatementMeta(notebookrun, 22, 60, Finished, Available)

INFO: Logger started!

In [None]:
if SSIS_AdlsAccountName == '':
    log_msgs('ERROR',"SSIS_AdlsAccountName not configured")
if SSIS_AdlsConatinerName == '':
    log_msgs('ERROR',"SSIS_AdlsConatinerName not configured")
if SSIS_PackageFileName == '':
    log_msgs('ERROR',"SSIS_AdlsConatinerName not configured")

StatementMeta(notebookrun, 22, 61, Finished, Available)



In [None]:
# Template names
package_template_nm = 'legacy_ssis_package_template.json'
process_template_nm = 'legacy_ssis_package_process_template.json'

StatementMeta(notebookrun, 22, 62, Finished, Available)



In [None]:
# Paths for SSIS files
package_path = 'abfss://' + SSIS_AdlsConatinerName + '@' + SSIS_AdlsAccountName + '.dfs.core.windows.net/pccsa_main/ssis-connector/working/' + SSIS_PackageFileName
param_path = 'abfss://' + SSIS_AdlsConatinerName + '@' + SSIS_AdlsAccountName + '.dfs.core.windows.net/pccsa_main/ssis-connector/ssis-package/pulled_params.txt'
connection_path = 'abfss://' + SSIS_AdlsConatinerName + '@' + SSIS_AdlsAccountName + '.dfs.core.windows.net/pccsa_main/ssis-connector/working/*.conmgr.json'


StatementMeta(notebookrun, 22, 63, Finished, Available)



In [None]:
# Paths for Purview type templates and results storage
template_path = 'abfss://' + SSIS_AdlsConatinerName + '@' + SSIS_AdlsAccountName + '.dfs.core.windows.net/pccsa_main/ssis-connector/ssis-type-templates/'
result_blob_path = 'abfss://' + SSIS_AdlsConatinerName + '@' + SSIS_AdlsAccountName + '.dfs.core.windows.net/pccsa_main/incoming/results1.json'

StatementMeta(notebookrun, 22, 64, Finished, Available)



## Pull SSIS Package

In [None]:
# Read in dataframes from json
package_df = spark.read.option("multiLine", "true").load(package_path, format='json')
params_df = spark.read.load(param_path, format='csv',header = True,inferSchema = True)

StatementMeta(notebookrun, 22, 65, Finished, Available)



In [None]:
def get_parameter_values(package: package_df, params: params_df):
    # Retrieves name / value dictionary of parameters from SSIS package + parameters SSISDB output
    try:
        c_params = params_df.toJSON().collect()
        d_param = {}
        for param in c_params:
            dict_p = json.loads(param)
            param_name = dict_p["parameter_name"]
            if (param_name[0:3] == 'CM.'):
                param_name = param_name[3:]
            d_param[param_name] = dict_p["parameter_value"]
    except AnalysisException:
        # This package seems to have no parameters
        return {}
    return d_param

StatementMeta(notebookrun, 22, 66, Finished, Available)



In [None]:
# convert dataframes
package_json = json.loads(package_df.toJSON().collect()[0])
package_params = get_parameter_values(package_df, params_df)

StatementMeta(notebookrun, 22, 67, Finished, Available)



## Extract SSIS Package Data

In [None]:
# Grouping string manipulations
# Examples:
# REFERENCE_ID: input = 'Package\\Data Flow Task\\Customers SQL MI.Inputs[OLE DB Destination Input]'
#   output = 'Package\\Data Flow Task\\Customers SQL MI'
# PACKAGE_PROPERTIES: input =  '@[$Package::BlobName]'
#   output = 'BlobName'
# CONMGR_TYPE: input = 'Package.ConnectionManagers[SSIS Connection Manager for Azure Storage]'
#   output = 'Package.'
# CONMGR_CON: input = 'Project.ConnectionManagers[purview-sqlmi.public.1ff9b19f84fa.database.windows.net,3342.purview-sqlmi-db.sqladmin]'
#   output = 'purview-sqlmi.public.1ff9b19f84fa.database.windows.net,3342.purview-sqlmi-db.sqladmin'


class string_type (Enum):
    PARAM_SEARCH = 0
    REFERENCE_ID = 1
    PACKAGE_PROPERTIES = 2
    CONMGR_TYPE = 3
    CONMGR_CON = 4


def transform_string(manip_type, t_string):
    if manip_type == string_type.REFERENCE_ID:
        return re.search('(.+)\.',t_string).group(1)
    if manip_type == string_type.PACKAGE_PROPERTIES:
        return re.search('(?<=::)(.*)(?=\])',t_string).group()
    if manip_type == string_type.CONMGR_TYPE:
        return re.search('.+\.',t_string).group()
    if manip_type == string_type.CONMGR_CON:
        return re.search('\[(.*?)\]',t_string).group(1)

# tests
transform_string(string_type.REFERENCE_ID, "Package\\Data Flow Task\\Customers SQL MI.Inputs[OLE DB Destination Input]")

StatementMeta(notebookrun, 22, 69, Finished, Available)

'Package\\Data Flow Task\\Customers SQL MI'

In [None]:
def get_ssis_property(package, component, property_name, params):
    prop_val = None
    for prop in component["properties"]["property"]:
        if prop["@name"] == property_name:
            try:
                prop_val = prop["_value_"]
            except KeyError:
                # no value indicates that it is possibly an SSIS parameter check here
                if params:
                    package_props = package["DTS:Executable"]["DTS:Executables"]["DTS:Executable"]["DTS:PropertyExpression"]
                    for p in package_props:
                        if p["DTS:@Name"] == "[" + component["@name"] + "].[" + prop["@name"] + "]":
                            prop_val = params[transform_string(string_type.PACKAGE_PROPERTIES,p["_value_"])]
    return prop_val

StatementMeta(notebookrun, 22, 70, Finished, Available)



In [None]:
def connection_string_to_dictionary(constr):
    output = {}
    nm_vals = constr.split(';')
    for nm_val in nm_vals:
        if nm_val != '':
            list_nm_and_val = nm_val.split('=')
            output[list_nm_and_val[0]] = list_nm_and_val[1]
    return output

StatementMeta(notebookrun, 22, 96, Finished, Available)

{'Data Source': '20.206.80.219', 'User ID': 'vmadmin', 'Initial Catalog': 'purview-sqlmi-db', 'Provider': 'SQLNCLI11.1', 'Auto Translate': 'False'}

In [None]:
def get_mappings(package, params):
    # package = package_json
    # params = package_params
    ds_map = {}
    ds_map["DatasetMapping"] = {}
    src_clmn = '*'

    connections = []

    components = package["DTS:Executable"]["DTS:Executables"]["DTS:Executable"]["DTS:ObjectData"]\
                                            ["pipeline"]["components"]["component"]

    # Need to loop through paths and figure out how to create multiple outputs in lineage

    for component in components:
        conmgrRefId = component["connections"]["connection"]["@connectionManagerRefId"]
        if transform_string(string_type.CONMGR_TYPE,conmgrRefId) == "Package.":
            connection = transform_string(string_type.CONMGR_CON,conmgrRefId)
            if connection + '.ConnectionString' in params:
                conStr = params[connection + '.ConnectionString']
            else:
                for con in package["DTS:Executable"]["DTS:ConnectionManagers"]["DTS:ConnectionManager"]:
                    if con["DTS:@refId"] == conmgrRefId:
                        if con["DTS:@CreationName"] == "OLEDB":
                            conStr = con["DTS:ObjectData"]["DTS:ConnectionManager"]["DTS:@ConnectionString"]
                        elif con["DTS:@CreationName"] == "AzureStorage":
                            conStr = con["DTS:ObjectData"]["AzureStorageConnectionManager"]["@ConnectionString"]
        else:
            conStr = transform_string(string_type.CONMGR_CON, conmgrRefId)
        if component["@name"] == "Azure Blob Source":
            blob_name = get_ssis_property(package_json, component, "Blob Name", params)
            blob_container = get_ssis_property(package_json, component, "Blob Container", params)
            # returns a dictionary e.g. input = "AccountName=purviewaccdl;DefaultEndpointsProtocol=https" output = {'AccountName':'purviewaccdl','DefaultEndpointsProtocol':'https'}
            con_props = dict(x.split("=") for x in re.split(";",conStr[:-1]))
            ds_map["DatasetMapping"]["Source"] = con_props["DefaultEndpointsProtocol"] + "://" + con_props["AccountName"] + ".blob." + con_props["EndpointSuffix"] + "/" \
                                                + blob_container + "/" + blob_name.strip()
            #ds_map["DatasetMapping"]["Source"] = src_clmn
        if component["@name"] == "OLE DB Destination":
            tbl = params["Table"]
            tbl = tbl.replace('[', '')
            tbl = tbl.replace(']', '')
            tbl = tbl.replace('.', '/')
            con_dict = connection_string_to_dictionary(conStr)
            source_string = "mssql://" + con_dict['Data Source'] + "/" + con_dict['Initial Catalog'] + "/" + tbl
            ds_map["DatasetMapping"]["Sink"] = source_string
            ds_map["ColumnMapping"] = []
            columns = component["inputs"]["input"]["inputColumns"]["inputColumn"]
            for col in columns:
                src_col = col["@cachedName"]
                dst_col = col["@lineageId"]
                dst_col = dst_col[dst_col.rfind('[')+1:-1]
                ds_map["ColumnMapping"].append({"Source": src_col,"Sink": dst_col})

    return [ds_map]
    # print(ds_map)

StatementMeta(notebookrun, 22, 102, Finished, Available)



## Transform Purview Templates

In [None]:
# Get output templates
purview_package_df = spark.read.option("multiLine", "true").load(template_path + package_template_nm, format='json')
purview_package_process_df = spark.read.option("multiLine", "true").load(template_path + process_template_nm, format='json')

StatementMeta(notebookrun, 22, 85, Finished, Available)



In [None]:
# convert DataFrames to JSON objects
purview_package_json = json.loads(purview_package_df.toJSON().collect()[0])
purview_package_process_json = json.loads(purview_package_process_df.toJSON().collect()[0])

StatementMeta(notebookrun, 22, 86, Finished, Available)



In [None]:
def transform(source, process_template, package_template):
    # Note: this 'hello world' example is designed for clarity and easy customization in a PoC / MVP. 
    # ToDo: Make these pipeline variables 
    package_type = "legacy_ssis_package"
    package_process_type = "legacy_ssis_package_process"
    package_fqn = "ssisleg://ssis_legacy.net/adflineage/PurviewSSISTest/SSIS_Movie_Loader_package_3"
    package_process_fqn = "ssisleg://ssis_legacy.net/adflineage/PurviewSSISTest/Data_Flow_Task"

    # source FQN, destination FQN, Column mappings string
    in_out_map = get_mappings(package_json, package_params)[0]

    # Apply mappings legacy_ssis_package_mapping
    package_template["attributes"]["name"] = source["DTS:Executable"]["DTS:@ObjectName"]
    package_template["attributes"]["description"] = source["DTS:Executable"]["DTS:@ExecutableType"]
    package_template["attributes"]["description"] = source["DTS:Executable"]["DTS:@ExecutableType"]
    package_template["attributes"]["qualifiedName"] = package_fqn
    package_template["typeName"] = package_type

    # Apply mappings legacy_ssis_package_process_mapping
    process_template["attributes"]["qualifiedName"] = package_process_fqn
    process_template["attributes"]["name"] = source["DTS:Executable"]["DTS:Executables"]["DTS:Executable"]["DTS:@ObjectName"]
    process_template["attributes"]["inputs"][0]["uniqueAttributes"]["qualifiedName"] = in_out_map["DatasetMapping"]["Source"]
    process_template["attributes"]["outputs"][0]["uniqueAttributes"]["qualifiedName"] = in_out_map["DatasetMapping"]["Sink"]
    process_template["relationshipAttributes"]["inputs"][0]["displayText"] = source["DTS:Executable"]["DTS:Executables"]["DTS:Executable"]["DTS:ObjectData"]["pipeline"]["components"]["component"][0]["@description"]
    process_template["relationshipAttributes"]["inputs"][0]["qualifiedName"] = in_out_map["DatasetMapping"]["Source"]
    process_template["relationshipAttributes"]["outputs"][0]["displayText"] = source["DTS:Executable"]["DTS:Executables"]["DTS:Executable"]["DTS:ObjectData"]["pipeline"]["components"]["component"][1]["@description"]
    process_template["relationshipAttributes"]["outputs"][0]["qualifiedName"] = in_out_map["DatasetMapping"]["Sink"]
    process_template["relationshipAttributes"]["parent"]["displayText"] = source["DTS:Executable"]["DTS:Executables"]["DTS:Executable"]["DTS:@refId"]
    # Need to substitue '*' for source - todo change this
    mystr = json.dumps([in_out_map])
    mystr = mystr.replace("https://purviewaccdl.blob.core.windows.net/sourcefiles/Movies.csv", "*")

    process_template["attributes"]["columnMapping"] = mystr
    #process_template["attributes"]["columnMapping"] = json.dumps([in_out_map])
    process_template["typeName"] = package_process_type


    return package_template, process_template


StatementMeta(notebookrun, 22, 103, Finished, Available)



In [None]:
# run the transformation then package up the results to write to file
package_transformed, package_process_transformed = transform(package_json,purview_package_process_json,purview_package_json)
concat_transform = [package_transformed,package_process_transformed]
concat_transform_str = json.dumps(concat_transform)

StatementMeta(notebookrun, 22, 106, Finished, Available)



In [None]:
# Test Output
log_msgs('TRACE', concat_transform_str)

StatementMeta(notebookrun, 22, 107, Finished, Available)

TRACE: [{"attributes": {"description": "Microsoft.Package", "lastRunTime": "", "name": "Package", "qualifiedName": "ssisleg://ssis_legacy.net/adflineage/PurviewSSISTest/SSIS_Movie_Loader_package_3"}, "typeName": "legacy_ssis_package"}, {"attributes": {"columnMapping": "[{\"DatasetMapping\": {\"Source\": \"https://pccsastorage2732136.blob.core.windows.net/pccsa/pccsa_main/ssis-connector/example-data/MovCustomers.csv\", \"Sink\": \"mssql://20.206.80.219/purview-sqlmi-db/dbo/MovCustomers\"}, \"ColumnMapping\": [{\"Source\": \"Customer_ID\", \"Sink\": \"Customer_ID\"}, {\"Source\": \"Last_Name\", \"Sink\": \"Last_Name\"}, {\"Source\": \"First_Name\", \"Sink\": \"First_Name\"}, {\"Source\": \"Addr_1\", \"Sink\": \"Addr_1\"}, {\"Source\": \"Addr_2\", \"Sink\": \"Addr_2\"}, {\"Source\": \"City\", \"Sink\": \"City\"}, {\"Source\": \"State\", \"Sink\": \"State\"}, {\"Source\": \"Zip_Code\", \"Sink\": \"Zip_Code\"}, {\"Source\": \"Phone_Number\", \"Sink\": \"Phone_Number\"}, {\"Source\": \"Creat

## Copy JSON to ADLS

In [None]:
# Write result file
mssparkutils.fs.put(result_blob_path, concat_transform_str, overwrite=True)

StatementMeta(notebookrun, 22, 108, Finished, Available)

True