# External Access Audit — PySpark Notebook

This notebook filters **activityEventEntities** in a Lakehouse to find events indicating **external data access**, then writes summary tables for Power BI.

**External-access operations included:**
- `AcceptExternalDataShare`
- `AddExternalResource`
- `AddLinkToExternalResource`
- `AnalyzedByExternalApplication`

> Source: Microsoft Fabric Operation list (Audit) and Activity Events API.

In [None]:
# --- Configuration: edit these for your environment ---
lakehouse_path = "/lakehouse/audit/activityEventEntities"  # Delta path containing exported activity events
external_ops = [
    "AcceptExternalDataShare",
    "AddExternalResource",
    "AddLinkToExternalResource",
    "AnalyzedByExternalApplication"
]

# Date range example (UTC).
start_date = "2025-11-01"
end_date   = "2025-12-02"

# Output locations (Delta)
actor_summary_path = "/lakehouse/audit/summary_actor"
operation_summary_path = "/lakehouse/audit/summary_operation"


In [None]:
# --- PySpark analysis ---
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

spark = SparkSession.builder.appName("ExternalAccessAuditAnalysis").getOrCreate()

# Load activity events from Delta
df = spark.read.format("delta").load(lakehouse_path)

# Filter by operations and date range
filtered_df = df.filter(
    (col("Operation").isin(external_ops)) &
    (col("Timestamp") >= start_date) &
    (col("Timestamp") <= end_date)
)

display_cols = ["Timestamp", "Actor", "Operation", "WorkspaceId", "ItemId"]
filtered_df.select(*display_cols).show(50, truncate=False)

# Summaries
summary_actor = filtered_df.groupBy("Actor").agg(count("Operation").alias("ExternalAccessCount"))
summary_operation = filtered_df.groupBy("Operation").agg(count("Actor").alias("AccessCount"))

summary_actor.show()
summary_operation.show()

# Persist summaries to Delta for Power BI
summary_actor.write.format("delta").mode("overwrite").save(actor_summary_path)
summary_operation.write.format("delta").mode("overwrite").save(operation_summary_path)

# (Optional) Register as tables for SQL access in Fabric
spark.sql(f"DROP TABLE IF EXISTS ActorSummary")
spark.sql(f"DROP TABLE IF EXISTS OperationSummary")
spark.sql(f"CREATE TABLE ActorSummary USING DELTA LOCATION '{actor_summary_path}'")
spark.sql(f"CREATE TABLE OperationSummary USING DELTA LOCATION '{operation_summary_path}'")


### Notes
- The `Timestamp` column is expected to be in ISO-8601 (UTC).
- If your lakehouse path differs, update `lakehouse_path`.
- In a non-Delta environment, load as CSV/Parquet and adjust `read.format(...)`.