#### Create Microsoft Purview SQL Server Database Stored Procedure Lineage Dynamically
The code snippet provided below is designed to automate the process of creating metadata entities and lineage information for Azure SQL Database stored procedures within Microsoft Purview. This automation is achieved by dynamically retrieving information about tables and stored procedures, and then utilizing the PyApacheAtlas API to establish the metadata relationships.

In [1]:
import json
import os
import openai
from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core import PurviewClient, AtlasClassification, AtlasEntity, AtlasProcess
from pyapacheatlas.core.util import AtlasException
import pyodbc
import uuid
from dotenv import load_dotenv  

### Load environment variables and setup database connection

In [2]:
load_dotenv()  

# Load AOAI API Key and Endpoint from .env file
openai.api_type: str = "azure"  
openai.api_key = os.getenv("AZURE_OPENAI_API_KEY")  
openai.api_base = os.getenv("AZURE_OPENAI_ENDPOINT")  
openai.api_version = os.getenv("AZURE_OPENAI_API_VERSION") 
model =  os.getenv("AZURE_OPENAI_DEPLOYED_MODEL")

# Loads SQL Server environment variables
server = os.environ.get("DB_SERVER") 
database = os.environ.get("DATABASE") 
driver= os.environ.get("DRIVER") 
username = os.environ.get("USER_NAME") 
password = os.environ.get("PASSWORD") 
schema = os.environ.get("SCHEMA")

# Purview fully qualified name
qualified_name=f"mssql://{server}.database.windows.net/{database}/{schema}/"
print(model)

text-davinci-003


In [3]:
# Authenticate against your Atlas server
oauth = ServicePrincipalAuthentication(
    tenant_id= os.environ.get("TENANT_ID"),
    client_id= os.environ.get("CLIENT_ID"),
    client_secret= os.environ.get("CLIENT_SECRET")
)
client = PurviewClient(
    account_name = os.environ.get("PURVIEW_NAME", ""),
    authentication=oauth
)

# Connect to SQL Server
conn = pyodbc.connect('DRIVER='+driver+';SERVER='+server+';PORT=1433;DATABASE='+database+';UID='+username+';PWD='+password)

In [4]:
# Check if the table exists in Purview
def check_entity_if_exist(client, qualifiedName, typeName):
    entities = client.get_entity(qualifiedName=qualifiedName, typeName=typeName)
    return len(entities) > 0

#check_entity_if_exist(client, "mssql://xxxx.database.windows.net/sqldb/SalesLT/TargetTableName1", "azure_sql_table")

#### Create tables in Purview

In [8]:
table_type = "azure_sql_table"

# Define the SQL query
sql_query = """
    select schema_name(t.schema_id) as schema_name,
       t.name as table_name,
       t.create_date,
       t.modify_date
    from sys.tables t
    order by schema_name, table_name;    
"""

# Create a cursor to execute SQL queries
cursor = conn.cursor()

# Execute the query
cursor.execute(sql_query)

# Loop through the query results and create Atlas entities
for row in cursor.fetchall():
    tb_name = row.table_name
    source_qual_name = f"{qualified_name}{tb_name}"
    #print(f"Table Name: {source_qual_name}\n")
    if not check_entity_if_exist(client, source_qual_name, table_type):
        tb_input = AtlasEntity(
            name=tb_name,
            typeName=table_type,
            qualified_name=source_qual_name,
        )
        
        results = client.upload_entities(
            batch=[tb_input]
        )
        print(f"Table {tb_name} created in Purview \n")
        
# Close the cursor and connection
cursor.close()


### Create Lineage

In [6]:
# Define the SQL query that returns the stored procedure name and text
sql_query = """
    SELECT name AS sp_name, object_definition(object_id) AS sp_text
    FROM sys.procedures
"""

# Create a cursor to execute SQL queries
cursor = conn.cursor()

# Execute the query
cursor.execute(sql_query)

# Create a dictionary to store the key-value pairs
sp_name_text_dict = {}

# Loop through the query results and print sp_name and sp_text
for row in cursor.fetchall():
    sp_name = row.sp_name
    sp_text = row.sp_text
    sp_name_text_dict[sp_name] = sp_text
    #print(f"Stored Procedure Name: {sp_name}")
    #print(f"Stored Procedure Text:\n{sp_text}\n")


# Close the cursor and connection
cursor.close()


In [7]:
# Close the cursor and connection
conn.close()

In [None]:
# Loop through the dictionary and print the key-value pairs
for sp_name, sp_text in sp_name_text_dict.items():
    print(f"Stored Procedure Name: {sp_name}")
    print(f"Stored Procedure Text:\n{sp_text}\n")

In [9]:
inputs = []
outputs = []
table_type = "azure_sql_table"
process_type = "Process"

# Loop through the dictionary and print the key-value pairs
for sp_name, sp_text in sp_name_text_dict.items():
    #print(f"Stored Procedure Name: {sp_name}")
    #print(f"Stored Procedure Text:\n{sp_text}\n")
    PROMPT_TEMPLATE = f"""
                    Instructions:

                    Use below stored procedure to generate the Source and Target Table names processed by the stored procedure. Do not include temp table names. 

                    Output source table name and target table name in below json format only:

                    [{{"SourceTableName": ["SourceTableName1", "SourceTableName2"]}}, {{"TargetTableName": ["TargetTableName1", "TargetTableName2"]}}]'

                    **DO NOT provide any Solution or explanation or any other output except for SourceTableName and TargetTableName**

                    Input:
                    {sp_text}
                    Output: 

                    """
    process_name = sp_name
    #print(PROMPT_TEMPLATE)

    response = openai.Completion.create(
        engine=model,
        prompt=PROMPT_TEMPLATE,
        temperature=0.7,
        max_tokens=2000,
        top_p=1,
        frequency_penalty=0,
        presence_penalty=0,
        stop=None)

    data_list = json.loads(response.choices[0].text)
    print(data_list)

    # Loop through the list of dictionaries
    for item in data_list:
        for key, values in item.items():
            if key == "SourceTableName":
                for value in values:
                    #print(f"  {value}")
                    source_qual_name = f"{qualified_name}{value}"
                    #print(source_qual_name)
                    if check_entity_if_exist(client, source_qual_name, table_type):
                        input_entity = AtlasEntity(
                            name=value,
                            typeName=table_type,
                            qualified_name=f"{qualified_name}{value}",
                            #guid=str(uuid.uuid4())
                        )
                        # Append the entities to the inputs and outputs lists
                        print(value)
                        inputs.append(input_entity)
        
            elif key == "TargetTableName":
                for value in values:
                    #print(f"  {value}")
                    target_qual_name = f"{qualified_name}{value}"
                    if check_entity_if_exist(client, target_qual_name, table_type):
                        output_entity = AtlasEntity(
                            name=value,
                            typeName=table_type,
                            qualified_name=f"{qualified_name}{value}",
                            #guid=str(uuid.uuid4())
                        )
                        print(value)
                        outputs.append(output_entity)

    # Create the AtlasProcess and upload all entities. Either source or target table name is required to create the process.
    if inputs or outputs:
        process = AtlasProcess(
            name=process_name,
            typeName=process_type,
            qualified_name=f"{qualified_name}{process_name}",
            inputs=inputs,
            outputs=outputs,
            guid="-108" #str(uuid.uuid4())
        )

        # Convert the individual entities into JSON before uploading.
        results = client.upload_entities(
            batch=[process] + inputs + outputs
        )

[{'SourceTableName': []}, {'TargetTableName': []}]
[{'SourceTableName': ['ErrorLog']}, {'TargetTableName': ['ErrorLog']}]
ErrorLog
ErrorLog
