In [None]:
from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType, LongType
import json

def find_purview_logs_database(data_provider, target_table="PurviewDataSensitivityLogs"):
    """
    Find the first database that contains the target table.
    
    Args:
        data_provider: Object with .list_databases() and .list_tables(database).
        target_table (str): The table name to look for. Defaults to 'PurviewDataSensitivityLogs'.
    
    Returns:
        str | None: The database name if found, otherwise None.
    """
    for database in data_provider.list_databases():
        tables = {table.name for table in data_provider.list_tables(database)}
        if target_table in tables:
            return database
    return None

data_provider = MicrosoftSentinelProvider(spark)

table_name = 'PurviewDataSensitivityLogs'
workspace_name = find_purview_logs_database(data_provider, table_name)

if workspace_name:
    print(f"Found '{workspace_name}' containing the table.")
else:
    print("Table not found in any database.")


StatementMeta(MSGSmall, 30, 7, Finished, Available, Finished)

Table not found in any database.


In [None]:
def translate_source_type(source_type: str) -> str:
    """
    Translate Purview Data SourceType enums into normalized OneTrust categories.
    Defaults to 'Unknown' if no mapping exists.
    """

    mapping = {
        "Fabric": "Azure",
        "AzureSubscription": "Azure",
        "AzureResourceGroup": "Azure",
        "AzureSynapseWorkspace": "Azure",
        "AzureSynapse": "Azure SQL Server",
        "AzureBlob": "Azure Blob Storage",
        "AdlsGen1": "Azure Blob Storage",
        "AdlsGen2": "Azure Blob Storage",
        "AmazonAccount": "AWS",
        "AmazonS3": "AWS",
        "AmazonSql": "Amazon RDS for SQL Server",
        "AzureCosmosDb": "Azure Cosmos",
        "AzureDataExplorer": "Azure",
        "AzureFileService": "Azure Files",
        "AzureSqlDatabase": "Azure SQL Server",
        "ArcEnabledSqlServer": "Microsoft SQL",
        "AmazonPostgreSql": "PostgreSQL",
        "AzurePostgreSql": "PostgreSQL",
        "Databricks": "Databricks",
        "SqlServerDatabase": "Microsoft SQL",
        "AzureSqlDatabaseManagedInstance": "Azure SQL Server",
        "AzureSqlDataWarehouse": "Azure SQL Server",
        "AzureMySql": "MySQL",
        "Hdfs": "Hive",
        "TableauServer": "Tableau",
        "AzureStorage": "Azure Blob Storage",
        "Teradata": "Teradata",
        "Oracle": "Oracle RDBMS",
        "PostgreSql": "PostgreSQL",
        "AmazonRedShift": "Redshift",
        "DatabricksHms": "Databricks",
        "SapS4Hana": "SAP HANA",
        "SapEcc": "SAP HANA",
        "Snowflake": "Snowflake",
        "PowerBI": "Microsoft Teams",  # closest in Purview/M365 world
        "Trident": "Custom Connector",
        "Dataverse": "Azure",
        "DatabricksUnityCatalog": "Databricks"
    }

    return mapping.get(source_type, "Generic JDBC")

def update_classifications_objects_in_string(obj_str) -> str:
    """
    Remove 'Id' fields from each object in a JSON array string.
    If parsing fails, return the original string.
    """
    try:
        arr = json.loads(obj_str)
        for obj in arr:
            if isinstance(obj, dict) and 'Id' in obj:
                del obj['Id']
        return json.dumps(arr)
    except Exception:
        return obj_str  # Return original if parsing fails

def azure_region_to_geo(region: str) -> str:
    """
    Convert Azure region codes into geographic locations (country or region).
    Based on official Azure region list.
    Returns 'Unknown' if region not mapped.
    """
    mapping = {
        # United States
        "eastus": "United States",
        "eastus2": "United States",
        "eastus2euap": "United States",
        "eastusstage": "United States",
        "eastus2stage": "United States",
        "eastusstg": "United States",
        "eastus3": "United States",
        "centralus": "United States",
        "centraluseuap": "United States",
        "centralusstage": "United States",
        "northcentralus": "United States",
        "northcentralusstage": "United States",
        "southcentralus": "United States",
        "southcentralusstage": "United States",
        "southcentralusstg": "United States",
        "westus": "United States",
        "westus2": "United States",
        "westus3": "United States",
        "westusstage": "United States",
        "westus2stage": "United States",
        "westcentralus": "United States",
        "unitedstates": "United States",
        "unitedstateseuap": "United States",

        # Canada
        "canadacentral": "Canada",
        "canadaeast": "Canada",
        "canada": "Canada",

        # Mexico
        "mexicocentral": "Mexico",
        "mexico": "Mexico",

        # South America
        "brazilsouth": "Brazil",
        "brazilsoutheast": "Brazil",
        "brazilus": "Brazil",
        "brazil": "Brazil",
        "chilecentral": "Chile",

        # Europe
        "northeurope": "Europe",
        "westeurope": "Europe",
        "austriaeast": "Austria",
        "francecentral": "France",
        "francesouth": "France",
        "germanywestcentral": "Germany",
        "germanynorth": "Germany",
        "italynorth": "Italy",
        "norwayeast": "Norway",
        "norwaywest": "Norway",
        "polandcentral": "Poland",
        "spaincentral": "Spain",
        "swedencentral": "Sweden",
        "switzerlandnorth": "Switzerland",
        "switzerlandwest": "Switzerland",
        "uksouth": "United Kingdom",
        "ukwest": "United Kingdom",
        "denmarkeast": "Denmark",
        "greececentral": "Greece",
        "europe": "Europe",
        "france": "France",
        "germany": "Germany",
        "italy": "Italy",
        "norway": "Norway",
        "poland": "Poland",
        "spain": "Spain",
        "sweden": "Sweden",
        "switzerland": "Switzerland",
        "uk": "United Kingdom",

        # Middle East
        "uaenorth": "United Arab Emirates",
        "uaecentral": "United Arab Emirates",
        "qatarcentral": "Qatar",
        "israelcentral": "Israel",
        "saudiarabiacentral": "Saudi Arabia",
        "uae": "United Arab Emirates",
        "qatar": "Qatar",
        "israel": "Israel",

        # Africa
        "southafricanorth": "South Africa",
        "southafricawest": "South Africa",
        "southafrica": "South Africa",

        # Asia Pacific
        "eastasia": "Hong Kong",
        "eastasiastage": "Hong Kong",
        "southeastasia": "Singapore",
        "southeastasiastage": "Singapore",
        "indonesiacentral": "Indonesia",
        "indonesia": "Indonesia",
        "japaneast": "Japan",
        "japanwest": "Japan",
        "japan": "Japan",
        "koreacentral": "South Korea",
        "koreasouth": "South Korea",
        "korea": "South Korea",
        "malaysiawest": "Malaysia",
        "malaysia": "Malaysia",
        "newzealandnorth": "New Zealand",
        "newzealand": "New Zealand",
        "australiaeast": "Australia",
        "australiasoutheast": "Australia",
        "australiacentral": "Australia",
        "australiacentral2": "Australia",
        "australia": "Australia",
        "centralindia": "India",
        "southindia": "India",
        "westindia": "India",
        "india": "India",
        "jioindiawest": "India",
        "jioindiacentral": "India",
        "taiwannorth": "Taiwan",
        "taiwan": "Taiwan",
        "singapore": "Singapore",
        "asiapacific": "Asia",
        "asia": "Asia",

        # Global
        "global": "Global",

        # Government
        "usgovvirginia": "United States",
        "usgovarizona": "United States",
        "usgovtexas": "United States",
        "usgoviowa": "United States",
        "usdodeast": "United States",
        "usdodcentral": "United States",
        "ussecwestcentral": "United States",
    }

    return mapping.get(region.lower(), "Unknown")

translate_source_type_udf = F.udf(translate_source_type, StringType())
update_classifications_objects_in_string_udf = F.udf(update_classifications_objects_in_string, StringType())
azure_region_to_geo_udf = F.udf(azure_region_to_geo, StringType())


StatementMeta(MSGSmall, 30, 8, Finished, Available, Finished)

In [8]:
data = data_provider.read_table(table_name, workspace_name)

# Target table
unique_purview_table = "UniquePurviewTable_SPRK"

# Delete existing table if needed
# try:
#     data_provider.delete_table(unique_purview_table, "default")
# except Exception as e:
#     print(f"Error deleting existing table: {e}")

# Read existing table if present
try:
    existing_table = data_provider.read_table(unique_purview_table, "default")
except Exception:
    existing_table = None

# Window for row_number per AssetPath (latest per asset)
window_spec = Window.partitionBy("AssetPath").orderBy(F.col("TimeGenerated").desc())

# UDF to sort array safely
def sort_array_safe(arr):
    if arr is None:
        return []
    return sorted([str(x) for x in arr])

# note: current UDF to sort does a string sort on Classification, not actual array because they are stored as a string in the table
sort_array_udf = F.udf(sort_array_safe, ArrayType(StringType()))

# Filter valid records
distinct_records = data.filter(
    (F.col("Classification").isNotNull()) &
    (~F.col("Classification").isin("", "[]")) &           # remove empty arrays (stored as strings in table)
    (F.col("ClassificationDetails").isNotNull()) &
    (F.length(F.col("ClassificationDetails")) > 2) &
    (F.col("TimeGenerated") >= F.expr("current_timestamp() - interval 1440 hours")) &
    (F.col("ActivityType") == "Classification") &
    (F.col("AssetType").isin("File", "Table"))
) \
.withColumn("row_num", F.row_number().over(window_spec)) \
.filter(F.col("row_num") == 1) \
.filter(~F.col("AssetPath").contains("#")) \
.withColumn("Classification_sorted", sort_array_udf(F.col("Classification"))) \
.withColumn(
    "Id",
    F.sha2(
        F.concat_ws("|", F.col("AssetPath"), F.concat_ws(",", F.col("Classification_sorted"))),
        256
    )
) \
.select(
    F.col("AssetPath"),
    F.col("Classification"),
    F.col("ClassificationDetails"),
    F.col("TimeGenerated"),
    F.col("Id"),
    F.col("SourceName").alias("ExternalID"),
    F.col("SourceName"),
    F.col("SourceType"),
    F.col("SourceRegion"),
    F.col("AssetName"),
    F.col("AssetType"),
    F.col("ItOwner") if "ItOwner" in data.columns else F.lit(None).alias("ItOwner") # TODO: Set to null if not present
)

# Remove duplicates already in the target table
if existing_table:
    distinct_records = distinct_records.join(
        existing_table.select("id").alias("existing"),
        on="Id",
        how="left_anti"
    )

distinct_records = distinct_records.orderBy(F.col("TimeGenerated").asc())

distinct_records = (
    distinct_records
    .withColumn("SourceType", translate_source_type_udf(F.col("SourceType")))
    .withColumn("ClassificationDetails", update_classifications_objects_in_string_udf(F.col("ClassificationDetails")))
    .withColumn("SourceRegion", azure_region_to_geo_udf(F.col("SourceRegion")))
)

# Show sample
distinct_records.show(10, False)

# Save records
record_count = distinct_records.count()
print("new_record_count:", record_count)

write_options = {'mode': 'append'}
if record_count > 0:
    data_provider.save_as_table(distinct_records, unique_purview_table, 'default', write_options)
    print(f"Appended {record_count} new records to {unique_purview_table}.")
else:
    print("No new records to append. Table is already up-to-date.")


StatementMeta(MSGSmall, 30, 9, Finished, Available, Finished)

{"level": "INFO", "run_id": "047dc0a4-1cf7-4682-a2d7-5fb5dcbe4721", "message": "Loading table: PurviewDataSensitivityLogsz"}
{"level": "ERROR", "run_id": "047dc0a4-1cf7-4682-a2d7-5fb5dcbe4721", "message": "Table PurviewDataSensitivityLogsz not found"}


TableNotFoundException: Table PurviewDataSensitivityLogsz not found during getlaketabledef

In [None]:
df = data_provider.read_table(unique_purview_table)
# Sort by RowID ascending
df = df.orderBy(F.col("TimeGenerated").asc())
df.show(20, False)