In [28]:
from azure.purview.catalog import PurviewCatalogClient
from azure.purview.administration.account import PurviewAccountClient
from azure.identity import DefaultAzureCredential
import pandas as pd
import math

In [None]:
reference_name_purview = ''
purview_account_name = ""
purview_endpoint = f"https://{purview_account_name}.purview.azure.com"
csv_file_path = ""

In [30]:
def replace_nan_with_none(obj):
    if isinstance(obj, dict):
        return {k: replace_nan_with_none(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [replace_nan_with_none(v) for v in obj]
    elif isinstance(obj, float) and math.isnan(obj):
        return None
    else:
        return obj

def get_credentials():
    return DefaultAzureCredential()

def get_catalog_client(reference_name_purview):
    credential = get_credentials()
    client = PurviewCatalogClient(endpoint=f"https://{reference_name_purview}.purview.azure.com/", credential=credential, logging_enable=True)
    return client

def get_admin_client(reference_name_purview):
    credential = get_credentials()
    client = PurviewAccountClient(endpoint=f"https://{reference_name_purview}.purview.azure.com/", credential=credential, logging_enable=True)
    return client

def get_collection_Id(collection_name):
    client = get_admin_client(reference_name_purview)
    collection_name_unique_id = ''
    collection_list = client.collections.list_collections()
    for collection in collection_list:
        if collection["friendlyName"].lower() == collection_name.lower():
            collection_name_unique_id = collection["name"]
            print('Collection ID found:', collection_name_unique_id)
    return collection_name_unique_id

def queryCollection(collection_name, reference_name_purview):
    payload = {
        "keywords": "*",
        "filter": {
            "and": [
                {
                    "or": [
                        {
                            "collectionId": get_collection_Id(collection_name)
                        }
                    ]
                }
            ]
        }
    }
    catalog_client = get_catalog_client(reference_name_purview)
    json_results = catalog_client.discovery.query(payload)
    return json_results

In [31]:
def update_column_metadata(catalog_client, referred_entities, df_subset_columns, dry_run=False):
    """Update column descriptions in referred entities and push changes to Purview.
    referred_entities: dict mapping guid -> entity metadata
    df_subset_columns: DataFrame with columns to update (AssetName, AssetDescription, ParentAssetFQN, ...)
    dry_run: if True, will print the payload and not call Purview (safe testing)
    This implementation sends one API call per updated referred entity (safer than a single batch).
    """
    if not referred_entities:
        print("No referred entities to update")
        return
    if df_subset_columns.empty:
        print("No columns in scope for update")
        return

    updated_referred = {}
    updated_any = False

    asset_names = df_subset_columns['AssetName'].tolist()

    # Prepare updates
    for column_guid, column_metadata in list(referred_entities.items()):
        try:
            if not column_metadata or not isinstance(column_metadata, dict):
                print(f"Skipping invalid referred entity (null or not a dict): {column_guid}")
                continue

            attrs = column_metadata.get('attributes')
            type_name = column_metadata.get('typeName') or column_metadata.get('type')

            if not attrs or not isinstance(attrs, dict):
                print(f"Skipping entity without attributes or bad attributes: {column_guid}")
                continue

            col_name = attrs.get('name')
            if not col_name:
                print(f"Skipping entity without a column name: {column_guid}")
                continue

            if col_name in asset_names:
                rows = df_subset_columns.loc[df_subset_columns['AssetName'] == col_name, 'AssetDescription']
                if rows.empty:
                    print(f" - No description found for column {col_name}, skipping")
                    continue

                column_desc = rows.values[0]
                if pd.isna(column_desc):
                    print(f" - Description is NaN for column {col_name}, skipping")
                    continue

                # Build filtered attributes (don't modify original)
                new_attrs = dict(attrs)
                new_attrs['userDescription'] = column_desc
                filtered_attrs = {k: v for k, v in new_attrs.items() if v is not None}

                updated_referred[column_guid] = {'typeName': type_name, 'attributes': filtered_attrs}
                updated_any = True
                #print(f"Prepared update for {column_guid}: attributes keys={list(filtered_attrs.keys())}")
        except Exception as e:
            print(f"Error preparing column {column_guid} for update: {e}")

    if not updated_any:
        print('No referred column entities required updating')
        return

    # If dry_run, return the prepared updates
    if dry_run:
        print('Dry run enabled - prepared per-entity payloads:')
        print({k: {'typeName': v.get('typeName'), 'attributes_keys': list(v.get('attributes', {}).keys())} for k, v in updated_referred.items()})
        return {'referredEntities': updated_referred}

    # Send updates one-by-one to reduce blast radius and get per-entity error info
    failures = {}
    success_count = 0
    for guid, ent in updated_referred.items():
        try:
            per_entity_payload = {
                'entity': {
                    'guid': guid,
                    'typeName': ent.get('typeName'),
                    'attributes': ent.get('attributes')
                }
            }
            #print(f"Sending update for GUID {guid}, typeName={ent.get('typeName')}, attributes_keys={list(ent.get('attributes', {}).keys())}")
            catalog_client.entity.create_or_update(per_entity_payload)
            success_count += 1
        except Exception as e:
            print(f"Failed to update entity {guid}: {e}")
            try:
                resp = getattr(e, 'response', None)
                if resp is not None:
                    print('Exception response:', resp)
            except Exception:
                pass
            failures[guid] = str(e)

    print(f"Column updates finished: {success_count} success, {len(failures)} failures")
    if failures:
        print('Failures detail (guid -> error):')
        print(failures)
    return {'success': success_count, 'failures': failures}


In [None]:
df = pd.read_csv(csv_file_path)

for collection_name in df['CollectionName'].unique():
        df_subset = df[df['CollectionName'] == collection_name]
  
        response = queryCollection(collection_name, reference_name_purview)
        assets = response['value']
        assets_to_update = [asset for asset in assets if asset['qualifiedName'] in df_subset['AssetFQN'].tolist()]
        print(f"Found {len(assets_to_update)} assets to update in collection: {collection_name}")
        for asset in assets_to_update:
            catalog_client = get_catalog_client(reference_name_purview)
            asset_name = asset['name']
            asset_id = asset['id']
            #print(f"Updating metadata for asset: {asset_name}, id: {asset_id}")
            entity_response = catalog_client.entity.get_by_guid(asset_id)
            _entity_response = entity_response
            # Remove the userDescription field if present
            _entity_response['entity']['attributes'].pop('userDescription', None)
            # Add the enhanced userDescription field from the csv file
            enhanced_description = df[df['AssetName'] == asset_name]['AssetDescription'].values[0]
            _entity_response['entity']['attributes']['userDescription'] = enhanced_description
            # Get new owner id from CSV
            new_owner_id = df[df['AssetName'] == asset_name]['OwnerId'].values[0]
            print(f"New owner ID from CSV: {new_owner_id}")
            
            # Only update if OwnerId is valid
            if pd.isna(new_owner_id) or str(new_owner_id).lower() == 'nan':
                print(f"Skipping owner update for asset '{asset_name}' due to missing OwnerId.")
            else:
                # Get or create the contacts dictionary
                contacts = _entity_response['entity'].get('contacts', {})
                # Always set the Owner list to the new owner
                contacts['Owner'] = [{'id': new_owner_id}]
                # Assign back to the entity
                _entity_response['entity']['contacts'] = contacts
                print(f"Setting owner: {new_owner_id}")

            # Clean NaN values before sending to Purview
            _entity_response = replace_nan_with_none(_entity_response)
            
            try:
                catalog_client.entity.create_or_update(_entity_response)
                print(f"Successfully updated asset: {asset_name}")
            except Exception as e:
                print(f"Error updating asset {asset_name}: {str(e)}")
                continue

            # Update columns if needed
            try:
                response_referredEntities = _entity_response.get('referredEntities', {})
                df_subset_columns = df_subset[(df_subset['ParentAssetFQN'] == asset['qualifiedName']) & (df_subset['IsColumn'] == "Yes")]
                # Delegate column updates to helper function
                update_column_metadata(catalog_client, response_referredEntities, df_subset_columns)
            except Exception as e:
                print(f"Error updating columns for asset {asset_name}: {str(e)}")
