In [0]:
dbutils.library.installPyPI("mlflow")
dbutils.library.restartPython()

In [0]:
from pyspark.sql.types import StructType
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.functions import *      # for window() function
from typing import List
from pyspark.sql.types import *
import pandas as pd
import time
from datetime import datetime, timedelta, timezone
import mlflow
from pyspark.sql.types import StructType
import dateutil.parser
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import from_unixtime
from pyspark.sql.types import StructType

version = "job_v-3.17"

In [0]:
%sql
drop table raw_log_data_delta_PN_job;
drop table anomalies_data_delta_PN_job;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS raw_log_data_delta_PN_job (
  account_id STRING,
  agent_id STRING,
  event STRING, 
  timestamp TIMESTAMP
 )
USING DELTA;


CREATE TABLE IF NOT EXISTS anomalies_data_delta_PN_job (
  user_id STRING,
  Ips LONG,
  prediction DOUBLE
)
USING DELTA;


In [0]:
eventsDF = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "td-guardian.tdx.sandcitadel.com:9093")
  .option("subscribe", "event-splitter.audit_logs")
  .load())


In [0]:
display(eventsDF)

key,value,topic,partition,offset,timestamp,timestampType
,eyJ0aW1lc3RhbXAiOiIyMDIwLTEwLTE1VDE0OjIxOjA2LjgyMjQ2NFoiLCJhY2NvdW50X2lkIjogImFjY291bnRfNTgwIiwiZXZlbnRfdHlwZSI6ICJjcmVhdGVfYXVkaXRfb3BlcmF0aW9uIiwiZXZlbnQiOiB7ImF1ZGl0Ijo= (truncated),event-splitter.audit_logs,0,439344,2020-10-15T14:21:06.822+0000,0
,eyJ0aW1lc3RhbXAiOiIyMDIwLTEwLTE1VDE0OjIxOjA2LjgyMzU5MFoiLCJhY2NvdW50X2lkIjogImFjY291bnRfNjA5IiwiZXZlbnRfdHlwZSI6ICJjcmVhdGVfYXVkaXRfb3BlcmF0aW9uIiwiZXZlbnQiOiB7ImF1ZGl0Ijo= (truncated),event-splitter.audit_logs,0,439345,2020-10-15T14:21:06.823+0000,0
,eyJ0aW1lc3RhbXAiOiIyMDIwLTEwLTE1VDE0OjIxOjA2LjgyNDcxNVoiLCJhY2NvdW50X2lkIjogImFjY291bnRfNzI3IiwiZXZlbnRfdHlwZSI6ICJjcmVhdGVfYXVkaXRfb3BlcmF0aW9uIiwiZXZlbnQiOiB7ImF1ZGl0Ijo= (truncated),event-splitter.audit_logs,0,439346,2020-10-15T14:21:06.824+0000,0
,eyJ0aW1lc3RhbXAiOiIyMDIwLTEwLTE1VDE0OjIxOjA2LjgyNTgzNFoiLCJhY2NvdW50X2lkIjogImFjY291bnRfODYwIiwiZXZlbnRfdHlwZSI6ICJjcmVhdGVfYXVkaXRfb3BlcmF0aW9uIiwiZXZlbnQiOiB7ImF1ZGl0Ijo= (truncated),event-splitter.audit_logs,0,439347,2020-10-15T14:21:06.825+0000,0
,eyJ0aW1lc3RhbXAiOiIyMDIwLTEwLTE1VDE0OjIxOjA2LjgyNjk1NloiLCJhY2NvdW50X2lkIjogImFjY291bnRfMjM1IiwiZXZlbnRfdHlwZSI6ICJjcmVhdGVfYXVkaXRfb3BlcmF0aW9uIiwiZXZlbnQiOiB7ImF1ZGl0Ijo= (truncated),event-splitter.audit_logs,0,439348,2020-10-15T14:21:06.826+0000,0
,eyJ0aW1lc3RhbXAiOiIyMDIwLTEwLTE1VDE0OjIxOjA2LjgyODA4OFoiLCJhY2NvdW50X2lkIjogImFjY291bnRfNjIxIiwiZXZlbnRfdHlwZSI6ICJjcmVhdGVfYXVkaXRfb3BlcmF0aW9uIiwiZXZlbnQiOiB7ImF1ZGl0Ijo= (truncated),event-splitter.audit_logs,0,439349,2020-10-15T14:21:06.828+0000,0
,eyJ0aW1lc3RhbXAiOiIyMDIwLTEwLTE1VDE0OjIxOjA2LjgyODEyOFoiLCJhY2NvdW50X2lkIjogImFjY291bnRfNzE4IiwiZXZlbnRfdHlwZSI6ICJjcmVhdGVfYXVkaXRfb3BlcmF0aW9uIiwiZXZlbnQiOiB7ImF1ZGl0Ijo= (truncated),event-splitter.audit_logs,0,439350,2020-10-15T14:21:06.828+0000,0
,eyJ0aW1lc3RhbXAiOiIyMDIwLTEwLTE1VDE0OjIxOjA2LjgyOTI0NVoiLCJhY2NvdW50X2lkIjogImFjY291bnRfOTI5IiwiZXZlbnRfdHlwZSI6ICJjcmVhdGVfYXVkaXRfb3BlcmF0aW9uIiwiZXZlbnQiOiB7ImF1ZGl0Ijo= (truncated),event-splitter.audit_logs,0,439351,2020-10-15T14:21:06.829+0000,0
,eyJ0aW1lc3RhbXAiOiIyMDIwLTEwLTE1VDE0OjIxOjA2LjgzMDM4MFoiLCJhY2NvdW50X2lkIjogImFjY291bnRfNDE0IiwiZXZlbnRfdHlwZSI6ICJjcmVhdGVfYXVkaXRfb3BlcmF0aW9uIiwiZXZlbnQiOiB7ImF1ZGl0Ijo= (truncated),event-splitter.audit_logs,0,439352,2020-10-15T14:21:06.830+0000,0
,eyJ0aW1lc3RhbXAiOiIyMDIwLTEwLTE1VDE0OjIxOjA2LjgzMTUwOFoiLCJhY2NvdW50X2lkIjogImFjY291bnRfNTE4IiwiZXZlbnRfdHlwZSI6ICJjcmVhdGVfYXVkaXRfb3BlcmF0aW9uIiwiZXZlbnQiOiB7ImF1ZGl0Ijo= (truncated),event-splitter.audit_logs,0,439353,2020-10-15T14:21:06.831+0000,0


In [0]:
#57c7413abca837e974000009
inputPath = "dbfs:/mnt/kafka_raw/57c7413abca837e974000009/"

schema_raw_logs = (  StructType()
  .add("account_id","string")
  .add("agent_id","string")
  .add("event","string")
  .add("timestamp","timestamp") 
)



#reads From Kafka
eventsDF = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "td-guardian.tdx.sandcitadel.com:9093")
  .option("subscribe", "event-splitter.audit_logs")
  .load())


eventsDF = eventsDF.selectExpr("CAST(value AS STRING)")

#reads From S3
#eventsDF = (
#  spark
#    .readStream
#    .schema(schema_raw_logs) # Set the schema of the JSON data
#    .option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time
#    .json(inputPath)
#)


display(eventsDF)

key,value
,"{""timestamp"":""2020-10-15T14:31:19.778496Z"",""account_id"": ""account_441"",""event_type"": ""create_audit_operation"",""event"": {""audit"": {""operation"": ""user_session_created""},""actor"": {""user_id"":""user_60636"",""ip_address"": [""user_60636_ip_2""]}}}"
,"{""timestamp"":""2020-10-15T14:31:19.779625Z"",""account_id"": ""account_798"",""event_type"": ""create_audit_operation"",""event"": {""audit"": {""operation"": ""user_session_created""},""actor"": {""user_id"":""user_30847"",""ip_address"": [""user_30847_ip_1""]}}}"
,"{""timestamp"":""2020-10-15T14:31:19.780761Z"",""account_id"": ""account_457"",""event_type"": ""create_audit_operation"",""event"": {""audit"": {""operation"": ""user_session_created""},""actor"": {""user_id"":""user_7707"",""ip_address"": [""user_7707_ip_2""]}}}"
,"{""timestamp"":""2020-10-15T14:31:19.781892Z"",""account_id"": ""account_268"",""event_type"": ""create_audit_operation"",""event"": {""audit"": {""operation"": ""user_session_created""},""actor"": {""user_id"":""user_27802"",""ip_address"": [""user_27802_ip_1""]}}}"
,"{""timestamp"":""2020-10-15T14:31:19.783017Z"",""account_id"": ""account_675"",""event_type"": ""create_audit_operation"",""event"": {""audit"": {""operation"": ""user_session_created""},""actor"": {""user_id"":""user_94104"",""ip_address"": [""user_94104_ip_1""]}}}"
,"{""timestamp"":""2020-10-15T14:31:19.783055Z"",""account_id"": ""account_235"",""event_type"": ""create_audit_operation"",""event"": {""audit"": {""operation"": ""user_session_created""},""actor"": {""user_id"":""user_22389"",""ip_address"": [""user_22389_ip_0""]}}}"
,"{""timestamp"":""2020-10-15T14:31:19.784174Z"",""account_id"": ""account_641"",""event_type"": ""create_audit_operation"",""event"": {""audit"": {""operation"": ""user_session_created""},""actor"": {""user_id"":""user_18072"",""ip_address"": [""user_18072_ip_0""]}}}"
,"{""timestamp"":""2020-10-15T14:31:19.785310Z"",""account_id"": ""account_766"",""event_type"": ""create_audit_operation"",""event"": {""audit"": {""operation"": ""user_session_created""},""actor"": {""user_id"":""user_45042"",""ip_address"": [""user_45042_ip_0""]}}}"
,"{""timestamp"":""2020-10-15T14:31:19.786549Z"",""account_id"": ""account_872"",""event_type"": ""create_audit_operation"",""event"": {""audit"": {""operation"": ""user_session_created""},""actor"": {""user_id"":""user_39556"",""ip_address"": [""user_39556_ip_2""]}}}"
,"{""timestamp"":""2020-10-15T14:31:19.787685Z"",""account_id"": ""account_72"",""event_type"": ""create_audit_operation"",""event"": {""audit"": {""operation"": ""user_session_created""},""actor"": {""user_id"":""user_14687"",""ip_address"": [""user_14687_ip_1""]}}}"


In [0]:
#writes to Raw table
(eventsDF.writeStream.queryName("write_raw_table")
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json_PN_"+version)
  .table("raw_log_data_delta_PN_job")
)



In [0]:
#We may need to user OPTIMIZE, which deals with small files, merge them and compact them into larger files
raw_data = spark.readStream.format("delta").table("raw_log_data_delta_PN_job")



In [0]:
fullschema = (  StructType()
  .add("logger_event_id", "string")
  .add("logger_timestamp","timestamp")                
  .add("account_id","string")
  .add("agent_id","string")
  .add("event",StructType())
         .add("actor",StructType()
             .add("user_id","string")
             .add("ip_addresses",ArrayType(StringType()))
             .add("session_id","string")
             .add("impersonated_user_id","string")
             .add("id","string")
             .add("type","string")
             .add("user_agent","string")
         )
         .add("account_id","string")
         .add("event_type","string")
         .add("audit",StructType()
              .add("severity","string")
              .add("resource_id","string")
              .add("operation","string")
              .add("timestamp","timestamp")
              .add("status","string")
         )
         .add("logger_event_id","string")     
         .add("object",StructType())
         .add("timestamp","timestamp") 
   
   .add("timestamp","timestamp") 
)


In [0]:

run_id = "09840597c6e04f279aaa27be313c6e73"
model_uri = "runs:/" + run_id + "/sklearn-model"
model = mlflow.pyfunc.spark_udf(spark, model_uri)


filtered_data = (raw_data
                  .select( "timestamp"    ,from_json("event", fullschema).alias("data"))
                  .withColumn("timestamp",to_timestamp(to_date("timestamp","yyyy-MM-dd"),"yyyy-MM-dd"))
                  .select( "timestamp" ,   "data.actor.user_id",  "data.actor.ip_addresses")       
                  .where(col("user_id").isNotNull())
                  .where(col("timestamp") >= datetime.now().astimezone(timezone.utc).strftime("%Y-%m-%dT00:00:00.000+0000") )
                  .withWatermark("timestamp", "24 hours")
                  .groupBy(col("timestamp"),"user_id").agg(approx_count_distinct('ip_addresses').alias('Ips'))
                  .select("user_id","Ips")
                  .withColumn("prediction", model("Ips"))
                )





In [0]:
#write data to anomalies' table
(filtered_data.writeStream.queryName("write_anomalies_table")
  .format("delta") 
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/anomalies_"+version)
  .table("anomalies_data_delta_PN_job")
)


In [0]:
%sql
select * from anomalies_data_delta_PN_job

user_id,Ips,prediction


In [0]:
%s