This workbook does 4 things:

1. first it provides some setup cells, to enable authentication and create storage dataframes for the results

2. it explores all workspaces to which the provided user/service principal has access and extracts:

    - Tables with their columns (this uses a SQL query, therefore it can be performed also from a SQL client application external to Fabric)

    - Reports with their source tables (using Semantic Labs lib, therefore this can only be performed within Fabric, unless you leverage the low-level APIs which are used by SemanticLink labs)

    - Details of Copy pipelines with column-level mappings (the source json is extracted via Semantic Labs lib)

3. eventually all metadata is uploaded into MS Purview Data Governance, and lineage links with column mappings are created for the discovered artifacts above

4. some examples are provided for the use of Semantic Link Lab, Scanner APIs, etc. for extracting Fabric metadata and lineage to external sources e.g. Azure Databricks. 


**1. Setup**

In [None]:
#first let's install needed packages

%pip install semantic-link-labs pyapacheatlas

In [3]:
# Enter your SERVICE PRINCIPAL credentials here, or better use an Azure Keyvault
# the clientid, clientsecret, tenantid variables are used to access one or more MS Fabric Workspaces
# the clientid4purview, clientsecret4purview, tenantid4purview variables are used to access a Purview Data Governance instance 
# where the metadata (tables, columns, reports, pipelines) will be uploaded
# The demo_workspace and demo_workspace_id can be used for demo/test purposes in order to show/scan a single workspace

tenant_id="<your tenant id>"
tenantid4purview=tenant_id
client_id="<the client ID of the Service Principal>"
clientid4purview=client_id
client_secret="<the client secret, only for non production. Preferably use a KeyVault >"
clientsecret4purview=client_secret
PurviewAccount_name="<your Purview Account name>"
# you may want to limit extraction to just one workspace for a quick try. in this case uncomment lines 17 and 18 in cell named "Extract metadata from workspaces"
demo_workspace="<the workspace name>"
demo_workspace_id="<the workspace id>"

StatementMeta(, 16f63d8f-a1ac-4875-9731-887abccedfa4, 11, Finished, Available, Finished)

In [5]:
import sempy_labs as labs
import sempy_labs.report as rep
from sempy_labs.report import ReportWrapper

import requests
import pyodbc
import json

StatementMeta(, a51aff94-49ec-4185-805d-4c2c2aeef1e3, 13, Finished, Available, Finished)

In [6]:
# First authenticate and create an access token
base_url_auth = 'https://login.microsoftonline.com'
relative_url_auth=f'/{tenant_id}/oauth2/v2.0/token'
url_auth= f'{base_url_auth}/{relative_url_auth}'
data_auth = f'client_id={client_id}\n&client_secret={client_secret}\n&grant_type=client_credentials\n&scope=https://api.fabric.microsoft.com/.default'
header_auth={"Content-Type": "application/x-www-form-urlencoded"}

authentication_response = requests.post(url_auth, data = data_auth, headers=header_auth)

StatementMeta(, a51aff94-49ec-4185-805d-4c2c2aeef1e3, 14, Finished, Available, Finished)

In [8]:
#extract auth token to use in Fabric API requests

auth_token=json.loads(authentication_response.text)["access_token"]
bearer=f'Bearer {auth_token}'

StatementMeta(, a51aff94-49ec-4185-805d-4c2c2aeef1e3, 16, Finished, Available, Finished)

In [None]:
# these are the SQL queries used to scan tables and fields, they can be used also from an external SQL client 
# instead of a notebook inside Fabric

service_principal_id = f"{client_id}@{tenant_id}" # this is a very important pattern client_id@tenant_id

odbc_connection_string_template= (
    f"DRIVER={{ODBC Driver 18 for SQL Server}};"
    f"UID={service_principal_id};"
    f"PWD={client_secret};"
    f"Authentication=ActiveDirectoryServicePrincipal;"
)
# the "DATABASE={database_name}" and SERVER="{Fabric SQL connection string}" part will be added later, 
# for each workspace being scanned

query_tables='SELECT name, object_id, create_date, modify_date, type, type_desc from sys.tables'
query_columns_template1=(
    f"SELECT c.name AS column_name "  
    f",c.column_id "  
    f",SCHEMA_NAME(t.schema_id) AS type_schema "  
    f",t.name AS type_name "  
    f",t.is_user_defined "  
    f",t.is_assembly_type "  
    f",c.max_length "  
    f",c.precision "  
    f",c.scale "  
    f"FROM sys.columns AS c "
    f"JOIN sys.types AS t ON c.user_type_id=t.user_type_id "  
    f"WHERE c.object_id = OBJECT_ID('"
)
query_columns_template2="') ORDER BY c.column_id" 

In [None]:
# we create some container dataframes to hold the metadata we gather

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
# record template for table columns metadata
df_columns_datarow={"WorkspaceName": "dummy_column1", "WorkspaceID": "dummy_id", \
        "WarehouseName": "", \
        "WarehouseID": "", \
        "LakehouseName": "", \
        "LakehouseID": "", \
        "TableName": "", \
        "TableID": "", \
        "ColumnName": "", \
        "ColumnType": "", \
        "ColumnID":""}

schema_df_columns = StructType([ \
    StructField("WorkspaceName",StringType(),True), \
    StructField("WorkspaceID",StringType(),True), \
    StructField("WarehouseName",StringType(),True), \
    StructField("WarehouseID",StringType(),True), \
    StructField("LakehouseName",StringType(),True), \
    StructField("LakehouseID",StringType(),True), \
    StructField("TableName",StringType(),True), \
    StructField("TableID", StringType(), True), \
    StructField("ColumnName", StringType(), True), \
    StructField("ColumnType", StringType(), True), \
    StructField("ColumnID", StringType(), True) \
  ])
 
df_columns = spark.createDataFrame(data=[],schema=schema_df_columns)
#df_columns.show()

# record template for table metadata
# table type in this df is for tables/views/storedprocs...
# currently only tables are exctracted
df_tables_datarow={"WorkspaceName": "dummy_table_1", "WorkspaceID": "dummy_id", \
        "WarehouseName": "", \
        "WarehouseID": "", \
        "LakehouseName": "", \
        "LakehouseID": "", \
        "TableName": "", \
        "TableType": "", \
        "TableID": "", \
        "PurviewGUID":"", \
        "PurviewFQN":""}
 

schema_df_tables = StructType([ \
    StructField("WorkspaceName",StringType(),True), \
    StructField("WorkspaceID",StringType(),True), \
    StructField("WarehouseName",StringType(),True), \
    StructField("WarehouseID",StringType(),True), \
    StructField("LakehouseName",StringType(),True), \
    StructField("LakehouseID",StringType(),True), \
    StructField("TableName",StringType(),True), \
    StructField("TableID", StringType(), True), \
    StructField("PurviewGUID", StringType(), True), \
    StructField("PurviewFQN", StringType(), True)
    ])
 
df_tables = spark.createDataFrame(data=[],schema=schema_df_tables)

# record template for report metadata
# source can be a table or a view. Only tables are currently extracted
df_reports_datarow={"WorkspaceName": "dummy_report_1", "WorkspaceID": "dummy_id", \
        "ReportName": "", \
        "ReportID": "", \
        "ColumnName": "", \
        "SourceName": "", \
        "SourceType": "", \
        "SourceID":"", \
        "PurviewGUID":"", \
        "PurviewFQN":""}

schema_df_reports = StructType([ \
    StructField("WorkspaceName",StringType(),True), \
    StructField("WorkspaceID",StringType(),True), \
    StructField("ReportName",StringType(),True), \
    StructField("ReportID", StringType(), True), \
    StructField("ColumnName", StringType(), True), \
    StructField("SourceName", StringType(), True), \
    StructField("SourceType", StringType(), True), \
    StructField("SourceID", StringType(), True), \
    StructField("PurviewGUID", StringType(), True), \
    StructField("PurviewFQN", StringType(), True)
  ])
 
df_reports = spark.createDataFrame(data=[],schema=schema_df_reports)
#df_reports.show()

# record template for DataPipelines metadata
# source can be a table or a view. Only tables are currently extracted
df_lineage_datarow={"WorkspaceName": "dummy_lineage_1", "WorkspaceID": "dummy_id", \
        "PipelineID": "", \
        "PipelineName": "", \
        "ActivityName": "", \
        "ActivityType": "", \
        "SourceName": "", \
        "SourceType": "", \
        "SourceContainerName": "", \
        "SourceContainerID": "", \
        "SinkType": "", \
        "SinkName": "", \
        "SinkContainer Name": "", \
        "SinkContainer ID": "", \
        "ColumnMappings": "", \
        "PurviewGUID":"", \
        "PurviewFQN":""}

schema_df_lineage = StructType([ \
    StructField("WorkspaceName",StringType(),True), \
    StructField("WorkspaceID",StringType(),True), \
    StructField("PipelineID",StringType(),True), \
    StructField("PipelineName",StringType(),True), \
    StructField("ActivityName", StringType(), True), \
    StructField("ActivityType", StringType(), True), \
    StructField("SourceName",StringType(),True), \
    StructField("SourceType", StringType(), True), \
    StructField("SourceContainerName", StringType(), True), \
    StructField("SourceContainerID", StringType(), True), \
    StructField("SinkName",StringType(),True), \
    StructField("SinkType", StringType(), True), \
    StructField("SinkContainerName", StringType(), True), \
    StructField("SinkContainerID", StringType(), True), \
    StructField("ColumnMappings", StringType(), True), \
    StructField("PurviewGUID", StringType(), True), \
    StructField("PurviewFQN", StringType(), True)
  ])
 
df_lineage = spark.createDataFrame(data=[],schema=schema_df_lineage)
#df_lineage.show()


**2. Start of main metadata extraction process**

**Extract metadata from workspaces**

In [None]:
# this is the actual metadata extraction process
# first get the list of workspaces accessible by the Service Principal we authenticated with
get_workspaces_url = f'https://api.fabric.microsoft.com/v1/workspaces'
header_token={"Authorization": bearer, 'Content-Type': 'application/json'}
data_dummy={"dummy":"true"}
workspaces_response=requests.get(get_workspaces_url,headers=header_token, data=data_dummy)
print("List of workspaces that can be accessed for metadata extraction:\n",workspaces_response.text)
print()

# loop through the workspaces and extract their items
wkspaces=json.loads(workspaces_response.text)["value"]
for wkspc in wkspaces:
    
    current_wkspace_id=wkspc["id"]
    current_wkspace_name=wkspc["displayName"]
    ## hack for demo: if you need to just display one workspace (for test or demo) uncomment these 2 lines
    #if current_wkspace_id != demo_workspace_id:
    #  continue
    
    print(f'Processing workspace:  {current_wkspace_name} ({current_wkspace_id})\n')
    # add a new row to the list of tables to track the workspaceID
    new_row = df_tables_datarow.copy()
    new_row.update({"WorkspaceName": current_wkspace_name, "WorkspaceID": current_wkspace_id})
    df_tables = df_tables.union(spark.createDataFrame(data=[new_row],schema=schema_df_tables))
    #df_tables.show()
    
    # get the items within the current workspace
    get_items_url = f'https://api.fabric.microsoft.com/v1/workspaces/{current_wkspace_id}/items'
    items_response=requests.get(get_items_url,headers=header_token, data=data_dummy)
    #print(items_response.text)
    
    # loop through the items to extract the ones we can manage
    items=json.loads(items_response.text)["value"]
    for item in items:
      item_name=item["displayName"]
      item_type=item["type"]
      item_id=item["id"]
      print(f'\n{item_name} is of type {item_type} and has id={item["id"]}')
      if item_type == "Warehouse":
          print(f"Exploring {item_name}")
          database_name=item_name
          # get the SERVER part of the SQL connection string
          # https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/warehouses/{warehouseId} 
          get_SQLString_url = f'https://api.fabric.microsoft.com/v1/workspaces/{current_wkspace_id}/warehouses/{item_id}'
          sql_url_response=requests.get(get_SQLString_url,headers=header_token, data=data_dummy)
          fabric_SQL_connection_string=json.loads(sql_url_response.text)["properties"]["connectionString"]
          print(f"SQLconn_string for {item_name} is {fabric_SQL_connection_string}")
          odbc_conn_string=f'{odbc_connection_string_template}SERVER={fabric_SQL_connection_string};DATABASE={database_name}'
          conn = pyodbc.connect(odbc_conn_string)
          # Execute the query to extract the tables
          cursor = conn.cursor()
          cursor.execute(query_tables)
          tableList = cursor.fetchall()
          table_name=""
          for row in tableList:
              #print("Found table: ",row)
              table_name=row[0]

              # save table name in dataframe for tables
              new_dict=df_tables_datarow.copy()
              new_dict.update({"WorkspaceName": current_wkspace_name, "WorkspaceID": current_wkspace_id, \
              "WarehouseName": f"{database_name}", \
              "WarehouseID": f"{item_id}", \
              "TableName": f'{table_name}', \
              "TableType": 'Table', \
              "TableID": f'{row[1]}'})
              #print(f'new_dict={new_dict}')
              new_row = spark.createDataFrame(data=[new_dict],schema=schema_df_tables)
              #print(f'new_dict={new_row}')
              df_tables = df_tables.union(new_row)
              #df_tables.show()

              #extract  details of table columns
              #
              
              query_columns=f'{query_columns_template1}{table_name}{query_columns_template2}'
              #print(query_columns,"\n")
              cursor.execute(query_columns)
              columnList=cursor.fetchall()
              for column in columnList:
                # extract  column_name,column_id,type_name
                # save column name in dataframe for columns
                new_dict=df_columns_datarow.copy()
                new_dict.update({"WorkspaceName": current_wkspace_name, "WorkspaceID": current_wkspace_id, \
                "WarehouseName": f"{database_name}", \
                "WarehouseID": f"{item_id}", \
                "TableName": f'{table_name}', \
                "TableID": f'{row[1]}', \
                "ColumnName": f'{column[0]}', \
                "ColumnID": f'{column[1]}', \
                "ColumnType": f'{column[3]}'})
                #print(f'new_dict={new_dict}')
                new_row = spark.createDataFrame(data=[new_dict],schema=schema_df_columns)
                #print(f'new_dict={new_row}')
                df_columns = df_columns.union(new_row)
              
          #close correctly
          cursor.close()
          conn.close()
          

      if item_type == "Lakehouse":
          print(f"Exploring {item_name}")
          database_name=item_name
          # get the SERVER part of the SQL connection string
          # https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/warehouses/{warehouseId} 
          get_SQLString_url = f'https://api.fabric.microsoft.com/v1/workspaces/{current_wkspace_id}/lakehouses/{item_id}'
          sql_url_response=requests.get(get_SQLString_url,headers=header_token, data=data_dummy)
          fabric_SQL_connection_string=json.loads(sql_url_response.text)["properties"]["sqlEndpointProperties"]["connectionString"]
          print(f"SQLconn_string for {item_name} is {fabric_SQL_connection_string}")
          odbc_conn_string=f'{odbc_connection_string_template}SERVER={fabric_SQL_connection_string};DATABASE={database_name}'
          conn = pyodbc.connect(odbc_conn_string)
          # Execute the query to extract tables
          cursor = conn.cursor()
          cursor.execute(query_tables)
          tableList = cursor.fetchall()
          table_name=""
          for row in tableList:
              print("Found table: ",row)
              table_name=row[0]
              
              # save table name in dataframe for tables
              new_dict=df_tables_datarow.copy()
              new_dict.update({"WorkspaceName": current_wkspace_name, "WorkspaceID": current_wkspace_id, \
              "LakehouseName": f"{database_name}", \
              "LakehouseID": f"{item_id}", \
              "TableName": f'{table_name}', \
              "TableType": 'Table', \
              "TableID": f'{row[1]}'})
              #print(f'new_dict={new_dict}')
              new_row = spark.createDataFrame(data=[new_dict],schema=schema_df_tables)
              #print(f'new_dict={new_row}')
              df_tables = df_tables.union(new_row)
              #df_tables.show()

              #extract  details of table columns
              #
              
              query_columns=f'{query_columns_template1}{table_name}{query_columns_template2}'
              #print(query_columns,"\n")
              cursor.execute(query_columns)
              columnList=cursor.fetchall()
              for column in columnList:
                # extract  column_name,column_id,type_name
                # save column name in dataframe for columns
                new_dict=df_columns_datarow.copy()
                new_dict.update({"WorkspaceName": current_wkspace_name, "WorkspaceID": current_wkspace_id, \
                "LakehouseName": f"{database_name}", \
                "LakehouseID": f"{item_id}", \
                "TableName": f'{table_name}', \
                "TableID": f'{row[1]}', \
                "ColumnName": f'{column[0]}', \
                "ColumnID": f'{column[1]}', \
                "ColumnType": f'{column[3]}'})
                #print(f'new_dict={new_dict}')
                new_row = spark.createDataFrame(data=[new_dict],schema=schema_df_columns)
                #print(f'new_dict={new_row}')
                df_columns = df_columns.union(new_row)
          #close correctly
          cursor.close()
          conn.close()
          

      if item_type == "Report":
          print(f"Exploring {item_name}")
          # use semantic toolkit library for reports
          # catch exceptions if the report is not in the new PBIP/PBIR format, because only the new format can be parsed
          try:
                rpt = ReportWrapper(report=item_name, workspace=current_wkspace_name)
                pandadf_report_lineage=rpt.list_semantic_model_objects()
                #print(pandadf_report_lineage)
                for row in pandadf_report_lineage.itertuples():
                    #print("pandas row=",row)
                    #print(row[2])
                    if row[3] == "Column":
                        #
                        # save report lineage item in dataframe for reports
                        new_dict=df_reports_datarow.copy()
                        new_dict.update({"WorkspaceName": current_wkspace_name, "WorkspaceID": current_wkspace_id, \
                        "ReportName": f'{item_name}', \
                        "ReportID": f'{item_id}', \
                        "ColumnName": f'{row[2]}', \
                        "SourceName": f'{row[1]}', \
                        #"Source ID": f'{column[1]}', \
                        "SourceType": "Table"}) #only tables so far
                        #print(f'new_dict={new_dict}')
                        new_row = spark.createDataFrame(data=[new_dict],schema=schema_df_reports)
                        #print(f'new_dict={new_row}')
                        df_reports = df_reports.union(new_row)
                        #
          except ValueError as verr:
            print("report not valid")
            print("error=",verr)
          except Fabric as httpex:
            print("report not valid")
            print("error=",httpex)

                
     
      if item_type == "DataPipeline":
          print(f"Exploring {item_name}")
          # use semantic toolkit for pipelines, to extract the JSON definition of activities
          pipeline_definition=labs.get_data_pipeline_definition(item_name,current_wkspace_id,decode=True)
          # check which type, and if we can manage this kind of pipeline
          # only pipelines with copy activity are managed so far
          all_activities=json.loads(pipeline_definition)["properties"]["activities"]
          try:
                for activity in all_activities:
                        #print("activity ",activity)
                        activity_type=activity["type"]
                        if activity_type == "Copy":
                            #gather metadata
                            activity_name=activity["name"]
                            source_type = activity["typeProperties"]["source"]["type"]
                            source_container=activity["typeProperties"]["source"]["datasetSettings"]["linkedService"]["name"]
                            source_container_id=activity["typeProperties"]["source"]["datasetSettings"]["linkedService"]["properties"]["typeProperties"]["artifactId"]
                            source_name=activity["typeProperties"]["source"]["datasetSettings"]["typeProperties"]["table"]
                            sink_type=activity["typeProperties"]["sink"]["type"]
                            sink_container=activity["typeProperties"]["sink"]["datasetSettings"]["linkedService"]["name"]
                            sink_container_id=activity["typeProperties"]["sink"]["datasetSettings"]["linkedService"]["properties"]["typeProperties"]["artifactId"]
                            sink_name=activity["typeProperties"]["sink"]["datasetSettings"]["typeProperties"]["table"]
                            source_sink_mappings=activity["typeProperties"]["translator"]["mappings"]
                            mappings_list=[]
                            for mapping_item in source_sink_mappings:
                                mappings_list.append({"Source": f'{mapping_item["source"]["name"]} ({mapping_item["source"]["type"]} {mapping_item["source"]["physicalType"]})', \
                                "Sink": f'{mapping_item["source"]["name"]} ({mapping_item["source"]["physicalType"]})'})
                            mappings_string=json.dumps(mappings_list)
                            print("mappings to string",mappings_string)
                            # save metadata in dataframe
                            new_dict=df_lineage_datarow.copy()
                            new_dict.update({"WorkspaceName": current_wkspace_name, "WorkspaceID": current_wkspace_id, \
                            "PipelineName": item_name, \
                            "PipelineID": item_id, \
                            "ActivityName": activity_name, \
                            "ActivityType": activity_type, \
                            "SourceName": source_name, \
                            "SourceType": source_type, \
                            "SourceContainerName": source_container, \
                            "SourceContainerID": source_container_id, \
                            "SinkType": sink_type, \
                            "SinkName": sink_name, \
                            "SinkContainerName": sink_container, \
                            "SinkContainerID": sink_container_id, \
                            "ColumnMappings": f'{mappings_string}', \
                            "PurviewFQN":""})
                            
                            #print(f'new_dict={new_dict}')
                            new_row = spark.createDataFrame(data=[new_dict],schema=schema_df_lineage)
                            #print(f'new_dict={new_row}')
                            df_lineage = df_lineage.union(new_row)
                            break
          except KeyError as keyerr:
            print("Error parsing DataPipelines definition: ", keyerr)
            #print("error parsing activities: ",all_activities)
            print("error details ", keyerr)
          except NameError as namerr:
            #print("error parsing activities: ",all_activities)
            print("Error parsing DataPipelines definition: ", namerr)
                    

              
              
print("Recap of Metadata found:")

print("Found Tables (includes partial rows for Lakehouses and Warehouses):")
display(df_tables)
#df_tables.show()

print("\n\nFound Table columns (for all tables) found:")
#df_columns.show()
display(df_columns)

print("\n\nFound PBIR Reports lineage to tables/views:")
display(df_reports)
#df_reports.show()

print("\n\nFound Copy DataPipelines, with lineage to tables/views:")
display(df_lineage)
#df_lineage.show()

print("\nMetadata Extraction Completed.")

**3. Upload metadata into Purview Data Governance**

Code for upload is taken from Pyapacheatlas samples by Will Johnson ( https://github.com/wjohnson/pyapacheatlas )

In [14]:
import json
import pyapacheatlas
# PyApacheAtlas packages are used to upload the metadata into MS Purview Data Governance
# Connect to Atlas (MS Purview DG) via a Service Principal
from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core import PurviewClient, AtlasEntity, AtlasProcess
from pyapacheatlas.core.typedef import EntityTypeDef, AtlasAttributeDef
from pyapacheatlas.core.util import GuidTracker

from pyspark.sql import Row
from pyspark.sql.functions import col, when
import pyspark.sql

StatementMeta(, a51aff94-49ec-4185-805d-4c2c2aeef1e3, 22, Finished, Available, Finished)

In [15]:
# enter Service principal id and secret to access the MS Purview account APIs
# (by default the code reuses the same Svc Principal used for accessing Fabric, feel free to modify the variables in this cell to use a different Svc Principal)
oauth = ServicePrincipalAuthentication(
        tenant_id=tenantid4purview,
        client_id=clientid4purview,
        client_secret=clientsecret4purview
    )
client = PurviewClient(
        account_name=PurviewAccount_name,
        authentication=oauth
    )
    

StatementMeta(, a51aff94-49ec-4185-805d-4c2c2aeef1e3, 23, Finished, Available, Finished)

In [16]:
# We need a custom process entity type that contains the definition for
# a columnMapping attribute to store column-level details.
# This cell should preferably be run only once (further runs would only overwrite the entity type)
procType = EntityTypeDef(
    "Lineage_Extractor_Process",
    superTypes=["Process"],
    attributeDefs = [
        AtlasAttributeDef("columnMapping")
    ]
)

# Upload the type definition
type_results = client.upload_typedefs(entityDefs=[procType], force_update=True)
#print(json.dumps(type_results,indent=2))

StatementMeta(, a51aff94-49ec-4185-805d-4c2c2aeef1e3, 24, Finished, Available, Finished)

**Upload tables and their columns**

In [None]:
# create a new spark dataframe (df for short), where we will update the Purview fullyQualifiedName and GUID
df_tables_registered=spark.createDataFrame(data=[],schema=schema_df_tables)

created_GUIDs=[] # used to keep track of newly created items, in case we want to move such items to a new collection
# create Table nodes
gt = GuidTracker()
for table in df_tables.collect():
    table_dict=table.asDict()
    #print("table_dict=",table_dict)
    tablename=table_dict["TableName"]
    
    if tablename:
        if table_dict["WarehouseName"]:
            artifactName=table_dict["WarehouseName"]
            artifactType="warehouse"
            artifactID=table_dict["WarehouseID"]
        else:
            artifactName=table_dict["LakehouseName"]
            artifactType="lakehouse"
            artifactID=table_dict["LakehouseID"]
        table_entity = AtlasEntity(
            name=tablename,
            typeName="DataSet",
            qualified_name=f'LineageExtractor://workspace/{table_dict["WorkspaceID"]}/{artifactType}/{artifactID}/table/{table_dict["TableID"]}',
            guid=gt.get_guid(),
            attributes={
                "userDescription": f'<div>Fabric Table<p> {tablename} is a Table contained within {artifactType} {artifactName} in Workspace {table_dict["WorkspaceName"]}</p>'
            },
            # This custom attribute flips a switch inside of the Purview UI to render
            # the rich text description.
            customAttributes={
                "microsoft_isDescriptionRichText": "true"
            }
        )
        #print(table_entity)
        #print(table_entity.guid)
        #print(table_entity.qualifiedName)
        results = client.upload_entities([table_entity])
        
        table_dict["PurviewGUID"] = results["guidAssignments"][str(table_entity.guid)]    
        table_dict["PurviewFQN"] = str(table_entity.qualifiedName)
        #print(f'new guid, new fqn={table_dict["PurviewGUID"]}, {table_dict["PurviewFQN"]}')
        
        created_GUIDs.append(table_dict["PurviewGUID"])

        new_dict=df_tables_datarow.copy()
        for key in table_dict.keys():
            #print("key=",key)
            new_dict[key]=table_dict[key]
        #print("new_dict=",new_dict)
        new_row=spark.createDataFrame(data=[new_dict],schema=schema_df_tables)
        
        df_tables_registered=df_tables_registered.union(new_row)
          
        #df_tables.union(table)
        #df_tables=df_tables.withColumn("PurviewGUID", when(col("TableName") == tablename, createdPurviewGUID))
        #df_tables=df_tables.withColumn("PurviewFQN", when(col("TableName") == tablename, createdPurviewFQN))

display(df_tables_registered)

**Upload reports and their lineage**

In [None]:
#new df where we will update the Purview fullyQualifiedName and GUID
df_reports_registered=df_reports.alias("df_reports_registered")
#generates an empty copy of the Dataframe


gt = GuidTracker()
# create Reports nodes and lineage
#first find all detected reports IDs
report_distinctIds=df_reports.select('ReportID').distinct().collect()
#print("\nreport_distinctIds:")
#display(report_distinctIds)
#print()


for report_id in report_distinctIds:
    rep_id_dict=report_id.asDict()
    #print("\nrep_id_dict",rep_id_dict)
    current_report_id=rep_id_dict["ReportID"]
    report_rows=df_reports_registered.filter(col("ReportID")==current_report_id)
    report_sources=report_rows.collect()
    #print("report sources=",report_sources)
    #print()

    #for each report register it, and create lineage with tables
    #first register it if not already scanned by Purview
    report_metadata=report_sources[0].asDict()
    report_guid=report_metadata["PurviewGUID"]
    report_fqn=report_metadata["PurviewFQN"]
    report_workspace_id=report_metadata["WorkspaceID"]
    if report_guid == "":
        # try to find whether a report was scanned by Purview, use its FQN 
        scanned_report_FQN=f'https://app.powerbi.com/groups/{report_workspace_id}/reports/{current_report_id}'
        response_if_any_report_entity = client.get_entity(typeName="powerbi_report",qualifiedName=scanned_report_FQN)
        if len(response_if_any_report_entity)>0 :
            scanned_report=response_if_any_report_entity["entities"][0]
            report_guid = scanned_report['guid']    
            report_fqn = scanned_report['attributes']['qualifiedName']
            print(f"Found scanned report: {report_metadata['ReportName']} ; Purview GUID={report_guid} ; Purview QualifiedName={report_fqn}")            
        else:
            # register a new one
            report_entity = AtlasEntity(
                name=report_metadata["ReportName"],
                typeName="DataSet",
                qualified_name=f'LineageExtractor://workspace/{report_metadata["WorkspaceID"]}/report/{current_report_id}',
                guid=gt.get_guid(),
                attributes={
                    "userDescription": f'<div>Fabric Report<p>in PBIR format</p><p>contained within {report_metadata["WorkspaceName"]}</p><p>Fabric ID is {current_report_id}</p>'
                },
                # This custom attribute flips a switch inside of the Purview UI to render
                # the rich text description.
                customAttributes={
                    "microsoft_isDescriptionRichText": "true"
                }
            )
            #print(report_entity)
            #print(report_entity.guid)
            #print(report_entity.qualifiedName)
            results = client.upload_entities([report_entity])
            #print("\nrisultato upload=",json.dumps(results))

            report_guid = results["guidAssignments"][str(report_entity.guid)]    
            report_fqn = str(report_entity.qualifiedName)
            #print(f'\nnew guid, new fqn={report_guid}, {report_fqn}')
            
            created_GUIDs.append(report_guid)
        #change all row occurrences of the report by updating FQN and GUID
        df_reports_registered=df_reports_registered.withColumn("PurviewGUID", when(col("ReportID") == current_report_id, report_guid).otherwise(col("PurviewGUID")))
        df_reports_registered=df_reports_registered.withColumn("PurviewFQN", when(col("ReportID") == current_report_id, report_fqn).otherwise(col("PurviewFQN")))       
        #refresh report_rows
        report_rows=df_reports_registered.filter(col("ReportID")==current_report_id)
        
    #we use a join between the source table name here in the df of reports and the df of tables, then for each row we create a lineage link
    reports_short=report_rows.select('ReportID','SourceName',"ColumnName","PurviewFQN").withColumnRenamed("PurviewFQN","ReportFQN")
    #print("\nreports_short:")
    #print(reports_short)
    #display(reports_short)
    #print()

    lineage_links=reports_short.join(df_tables_registered.select("TableName","PurviewFQN"),reports_short["SourceName"] == df_tables_registered["TableName"],"inner").orderBy("TableName").collect()
    #print("\nlinks=",lineage_links)
    #display(lineage_links)

    # loop through source tables to create lineage columns mappings
    column_mapping_total=[]
    column_mapping_partial=[]
    inputs=[]
    sourceFQN=""
    sinkFQN=""
    current_table=lineage_links[0]["TableName"]
    previous_table=current_table
    #print(f'\nprevious table={previous_table}, current table={current_table}')
    for link in lineage_links:
        link_dict=link.asDict()
        #print("\nlink_dict=",link_dict)
        current_table=link_dict["TableName"]
        if previous_table == current_table:
            # just add another mapping entry
            # print("column in same table as previous loop")
            columnname=link_dict["ColumnName"]
            column_mapping_partial.append({"Source": columnname,"Sink":"Visual"})
            sourceFQN=link_dict["PurviewFQN"]
            sinkFQN=link_dict["ReportFQN"]
        else:
            # pack info for current table
            # print(f'\nprevious table={previous_table}, current table={current_table}')
            # print(f'different tables from previous loop')

            column_mapping_total.append(
                # This object defines the column lineage between table souces and the current report
                {"ColumnMapping": column_mapping_partial,
                "DatasetMapping": {
                        "Source": sourceFQN, "Sink": sinkFQN}
                }
            )
            column_mapping_partial=[]
            registered_table_entity = client.get_entity(typeName="DataSet",qualifiedName=sourceFQN)["entities"][0]
            #print("found table entity =",registered_table_entity)
            inputs.append(registered_table_entity)
            # take new values
            columnname=link_dict["ColumnName"]
            column_mapping_partial.append({"Source": columnname,"Sink":"Visual"})
            sourceFQN=link_dict["PurviewFQN"]
            sinkFQN=link_dict["ReportFQN"]
            previous_table=current_table
    # process and close last table column
    # print("\nlast")
    registered_table_entity = client.get_entity(typeName="DataSet",qualifiedName=sourceFQN)["entities"][0]
    #print("found table entity =",registered_table_entity)
    inputs.append(registered_table_entity)
    # take new values
    columnname=link_dict["ColumnName"]
    #column_mapping_partial.append({"Source:": columnname,"Sink:":"Visual"})
    
    column_mapping_total.append(
                # This object defines the column lineage between table sources and the current report
                {"ColumnMapping": column_mapping_partial,
                "DatasetMapping": {
                        "Source": sourceFQN, "Sink": sinkFQN}
                }
            )

    registered_report_entity = client.get_entity(typeName="DataSet",qualifiedName=report_fqn)["entities"][0]

    print('\n\n\ncolumn_mapping_total=', json.dumps(column_mapping_total))
    lineage_link_obj = AtlasProcess(
        name=f'Report Lineage for {report_metadata["ReportName"]}',
        typeName="Lineage_Extractor_Process",
        qualified_name=f'LineageExtractor://workspace/{report_metadata["WorkspaceID"]}/report/{current_report_id}/ReportLineage',
        inputs=inputs,
        outputs=[registered_report_entity],
        guid=gt.get_guid(),
        attributes={
            "columnMapping": json.dumps(column_mapping_total),
            "userDescription": f'<div>Fabric Report Mapping<p> for Report {report_metadata["ReportName"]} contained within workspace {report_metadata["WorkspaceName"]}</p> <p> Report Id is {current_report_id}</p>'
        },
        # This custom attribute flips a switch inside of the Purview UI to render
        # the rich text description.
        customAttributes={
            "microsoft_isDescriptionRichText": "true"
        }
    )
    #print("\ncreated guids=",created_GUIDs)
    #print("\nentity to be created=",lineage_link_obj)
    results = client.upload_entities([lineage_link_obj])
    #print("\n upload result=",json.dumps(results))

 **Upload pipelines and their lineage**

In [None]:
df_pipelines_registered=df_lineage.alias("df_pipelines_registered")

gt = GuidTracker()

# create Reports nodes and lineage
#first find all found reports IDs
pipelines_list=df_pipelines_registered.collect()
print("\npipelines_list:")
display(pipelines_list)
print()


for pipeline in pipelines_list:
    source_table=df_tables_registered.filter((col("TableName")== pipeline["SourceName"]) & (col("WorkspaceID")==pipeline["WorkspaceID"]) & ((col("LakehouseID")==pipeline["SourceContainerID"]) | (col("WarehouseID")==pipeline["SourceContainerID"]))).first()
    source_table_entity = client.get_entity(typeName="DataSet",qualifiedName=source_table["PurviewFQN"])["entities"][0]
    #print("found source table entity =",source_table_entity)
    inputs=[source_table_entity]

    sink_table=df_tables_registered.filter((col("TableName")== pipeline["SinkName"]) & (col("WorkspaceID")==pipeline["WorkspaceID"]) & ((col("LakehouseID")==pipeline["SinkContainerID"]) | (col("WarehouseID")==pipeline["SinkContainerID"]))).first()
    sink_table_entity = client.get_entity(typeName="DataSet",qualifiedName=sink_table["PurviewFQN"])["entities"][0]
    #print("found sink table entity =",sink_table_entity)
    outputs=[sink_table_entity]
    
    column_mapping_partial=json.loads(pipeline["ColumnMappings"])
    column_mapping_total=[{"ColumnMapping": column_mapping_partial,
        "DatasetMapping": {"Source": source_table["PurviewFQN"], "Sink": sink_table["PurviewFQN"]}
        }]


    pipeline_entity=AtlasProcess(
        name=f'DataPipeline {pipeline["PipelineName"]}',
        typeName="Lineage_Extractor_Process",
        qualified_name=f'DataPipeline://workspace/{pipeline["WorkspaceID"]}/pipeline/{pipeline["PipelineID"]}/CopyActivity',
        inputs=inputs,
        outputs=outputs,
        guid=gt.get_guid(),
        attributes={
            "columnMapping": json.dumps(column_mapping_total),
            "userDescription": f'<div>Fabric DataPipeline Mapping<p> contained within {pipeline["WorkspaceName"]}</p> <p> </p>'
        },
        # This custom attribute flips a switch inside of the Purview UI to render
        # the rich text description.
        customAttributes={
            "microsoft_isDescriptionRichText": "true"
        }
    )
    results = client.upload_entities([pipeline_entity])
    #print(results)
    #print("\nupload result=",json.dumps(results))

**The End**

In [None]:
# The list of newly created entities in Purview DG can be used to move them to a specific Collection using the Move Entity API
created_GUIDs

**APPENDIX**

 **Example: Scanner API call sequence to extract metadata from Fabric**


In [None]:
# Extract list of Workspaces accessible by the Service Principal
#
# IMPORTANT NOTE: The Svc Principal must NOT have "_itemType_.ReadAll" and "_itemType_.ReadWriteAll" authorizations 
# for Tables, DataPipelines, Workspaces, etc. when used with Scanner APIs
#
# This is just an example/demo cell and does not interfere with the metadata extraction in the previous cells
#
# this part uses the API Call: GET https://api.powerbi.com/v1.0/myorg/admin/workspaces/modified
get_modified_wkspc_url = f'https://api.powerbi.com/v1.0/myorg/admin/workspaces/modified'
header_token={"Authorization": bearer, 'Content-Type': 'application/json'}
data_dummy={"dummy":"true"}

response_modified_wkspc_url=requests.get(get_modified_wkspc_url,headers=header_token, data=data_dummy)
print("List of accessible workspaces that have been recently modified: \n",response_modified_wkspc_url.text)

In [None]:
# Submit a metadata request
#
# this is just an example/demo cell and does not interfere with the metadata extraction in the previous cells
# you can skip it
#
get_scanner_url = f'https://api.powerbi.com/v1.0/myorg/admin/workspaces/getInfo?reportObjects=True&getTridentArtifacts=True&getArtifactUsers=true&lineage=True&datasourceDetails=True&datasetSchema=True&datasetExpressions=True'
header_token={"Authorization": bearer, 'Content-Type': 'application/json'}

list_of_workspaces=f"{{'workspaces' : [ '{demo_workspace_id}']}}"
#print("list=", list_of_workspaces)

response_scanner_url=requests.post(get_scanner_url,headers=header_token, data=list_of_workspaces)
print("response=",response_scanner_url.text)

In [None]:
# Check status for the metadata request, this is needed because the server may take some time to accumulate the resulting metadata: the scan is done asynchronously
# this part uses the API Call: GET https://api.powerbi.com/v1.0/myorg/admin/workspaces/scanStatus/{scanId}

scanid=response_scanner_url.json().get('id')
print("scan_id=",scanid)

get_scanstatus_url = f'https://api.powerbi.com/v1.0/myorg/admin/workspaces/scanStatus/{scanid}'
header_token={"Authorization": bearer, 'Content-Type': 'application/json'}

response_scanstatus_url=requests.get(get_scanstatus_url,headers=header_token)
print("scan status is:\n",response_scanstatus_url.text)
# check the "status" to be "Succeeded" before running the next cell

In [None]:
#4 Finally get metadata
#this part uses the API Call: GET https://api.powerbi.com/v1.0/myorg/admin/workspaces/scanResult/{scanId}
# this is just an example/demo cell and does not interfere with the metadata extraction in the following cells
# you can skip it
get_scanresult_url = f'https://api.powerbi.com/v1.0/myorg/admin/workspaces/scanResult/{scanid}'
header_token={"Authorization": bearer, 'Content-Type': 'application/json'}

workspaces_response=requests.get(get_scanresult_url,headers=header_token)
print("workspaces_response:\n",workspaces_response.text)
# The output can be quite bulky, so I suggest to explore it via a good JSON editor or specific json queries.

 **Example: Metadata extraction using SQL**

In [None]:
#EXTRACT TABLES USING SQL
# this is provided just as an example 
df2 = spark.sql("SELECT name, object_id, create_date, modify_date, type, type_desc from sys.tables")
display(df2)

 **Example: use of Semantic Link Library**

In [None]:
# Example Use of Semantic Link Labs
# For getting report source tables and columns in Fabric
#
# this is just an example/demo cell and does not interfere with the metadata extraction in the previous cells
# 
report_name = '<Name of report here>' # Enter the report name
report_workspace = demo_workspace # Enter the workspace in which the report exists
rpt = ReportWrapper(report=report_name, workspace=report_workspace)

rpt.list_semantic_model_objects()

In [None]:
# Example Use of Semantic Link Labs
# For getting source tables and columns for Fabric report whose sources are in Azure Databricks
#
# this is just an example/demo cell and does not interfere with the metadata extraction in the previous cells
# 
import sempy.fabric as sdtfabric
import sempy_labs as labs
dataset = '<Name of report here>' # Enter report name with Databricks source tables
workspace = None # Enter workspace name
 
df = sdtfabric.list_partitions(dataset=dataset, workspace=workspace)
display(df)
# use the M code in the "Query" column in the result to get source tables and fields from non-Fabric sources e.g. Azure Databricks