In [0]:
storage_account = "stocsocailogs"
storage_key = "PASTE YOUR KEY HERE"   # your NEW key

# clear any old/bad config for both endpoints
for host in ["dfs.core.windows.net", "blob.core.windows.net"]:
    try:
        spark.conf.unset(f"fs.azure.account.key.{storage_account}.{host}")
    except:
        pass

# set for ADLS Gen2 (dfs endpoint)
spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
    storage_key.strip()
)

print("Key length:", len(storage_key.strip()))



Key length: 88


In [0]:
storage_account = "stocsocailogs"

# paste the NEW Key1 from THIS SAME storage account (Access keys)
storage_key = "PASTE YOUR KEY HERE"

# hard reset configs (dfs + blob)
for host in ["dfs.core.windows.net", "blob.core.windows.net"]:
    try:
        spark.conf.unset(f"fs.azure.account.key.{storage_account}.{host}")
    except:
        pass

# set key
spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
    storage_key.strip()
)

print("Key length:", len(storage_key.strip()))
print("Key preview:", storage_key.strip()[:4], "....", storage_key.strip()[-4:])



Key length: 88
Key preview: S+dT .... +A==


In [0]:
dbutils.fs.ls(f"abfss://curated@{storage_account}.dfs.core.windows.net/")



[FileInfo(path='abfss://curated@stocsocailogs.dfs.core.windows.net/silver_security_events/', name='silver_security_events/', size=0, modificationTime=1767733827000)]

In [0]:

silver_path = f"abfss://curated@{storage_account}.dfs.core.windows.net/silver_security_events"
silver_df = spark.read.format("delta").load(silver_path)

print("Silver rows:", silver_df.count())
display(silver_df)


Silver rows: 5


event_type,severity,source_ip,username,event_time,severity_score
LoginFailure,High,203.0.113.45,jdoe,2025-01-01T09:12:33Z,3
LoginSuccess,Low,10.0.0.5,asmith,2025-01-01T09:15:02Z,1
MalwareDetected,Critical,198.51.100.23,system,2025-01-01T09:18:45Z,4
PortScan,Medium,192.0.2.10,unknown,2025-01-01T09:22:10Z,2
LoginFailure,High,203.0.113.45,jdoe,2025-01-01T09:30:55Z,3


In [0]:
from pyspark.sql.functions import col, window, count, desc


In [0]:

gold_failed_by_user = (
    silver_df
    .filter(col("event_type") == "LoginFailure")
    .groupBy("username")
    .agg(count("*").alias("failed_login_count"))
    .orderBy(desc("failed_login_count"))
)

display(gold_failed_by_user)


username,failed_login_count
jdoe,2


In [0]:
gold_critical_by_ip = (
    silver_df
    .filter(col("severity") == "Critical")
    .groupBy("source_ip")
    .agg(count("*").alias("critical_event_count"))
    .orderBy(desc("critical_event_count"))
)

display(gold_critical_by_ip)



source_ip,critical_event_count
198.51.100.23,1


In [0]:

gold_severity_dist = (
    silver_df
    .groupBy("severity")
    .agg(count("*").alias("event_count"))
    .orderBy(desc("event_count"))
)

display(gold_severity_dist)


severity,event_count
High,2
Low,1
Medium,1
Critical,1


In [0]:

gold_events_time = (
    silver_df
    .groupBy(window(col("event_time"), "10 minutes"))
    .agg(count("*").alias("event_count"))
    .orderBy("window")
)

display(gold_events_time)


window,event_count
"List(2025-01-01T09:10:00Z, 2025-01-01T09:20:00Z)",3
"List(2025-01-01T09:20:00Z, 2025-01-01T09:30:00Z)",1
"List(2025-01-01T09:30:00Z, 2025-01-01T09:40:00Z)",1


In [0]:
gold_base_path = f"abfss://curated@{storage_account}.dfs.core.windows.net/gold"

gold_failed_by_user.write.mode("overwrite").format("delta") \
    .save(f"{gold_base_path}/failed_logins_by_user")

gold_critical_by_ip.write.mode("overwrite").format("delta") \
    .save(f"{gold_base_path}/critical_events_by_ip")

gold_severity_dist.write.mode("overwrite").format("delta") \
    .save(f"{gold_base_path}/event_severity_distribution")

gold_events_time.write.mode("overwrite").format("delta") \
    .save(f"{gold_base_path}/events_over_time_10min")



In [0]:
 gold_base_path = f"abfss://curated@{storage_account}.dfs.core.windows.net/gold"
export_base = f"abfss://curated@{storage_account}.dfs.core.windows.net/powerbi"

tables = {
    "failed_logins_by_user": gold_failed_by_user,
    "critical_events_by_ip": gold_critical_by_ip,
    "event_severity_distribution": gold_severity_dist,
    "events_over_time_10min": gold_events_time
}

for name, df in tables.items():
    df.write.mode("overwrite").parquet(f"{export_base}/{name}")
