## To-do list
<li>Filter out large gap not alerting field</li>
<li>Fix streaming abort(daily/weekly/monthly)</li>

In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql.types import StructType
import json
emailSchema = StructType() \
        .add("metadata", StructType()\
             .add("path",StringType())\
             .add("_attachment_mimetype",StringType())\
             .add("type_prefix",StringType())\
             .add("host",StringType())\
             .add("json",StringType())\
             .add("producer",StringType())\
             .add("topic",StringType())\
             .add("_id",StringType())\
             .add("type",StringType())\
             .add("timestamp",LongType())\
            )\
        .add("data",StructType()\
             .add("code",StringType())\
             .add("system",StringType())\
             .add("uri_path",StringType())\
             .add("method",StringType())\
             .add("clientip",StringType())\
             .add("client",StringType())\
             .add("rec_date",StringType())\
             .add("dn",StringType())\
             .add("api",StringType())\
             .add("rec_timestamp",StringType())\
             .add("frontend",StringType())\
            )\

In [2]:

# Subscribe to 1 topic

# .option("kafka.bootstrap.servers", "188.185.79.229:9092")\
# .option("startingOffsets", "earliest") \

raw_data = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", "monit-kafka.cern.ch:9092")\
.option("subscribe","cmsweb_logs")\
.option("failOnDataLoss",False)\
.load()\
.select(\
        from_json(col("value").cast("string"),emailSchema)\
            .getField("metadata").alias("metadata").getField("host").alias("host")\
        ,col("timestamp").alias("timestamp")\
        ,from_json(col("value").cast("string"),emailSchema)\
            .getField("data").alias("data").getField("system").alias("system")\
        ,from_json(col("value").cast("string"),emailSchema)\
            .getField("data").alias("data").getField("dn").alias("user")\
        ,from_json(col("value").cast("string"),emailSchema)\
            .getField("data").alias("data").getField("api").alias("api")
)
raw_data=raw_data.filter(~raw_data.system.rlike("^(%|/)"))

raw_data.printSchema()

root
 |-- host: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- system: string (nullable = true)
 |-- user: string (nullable = true)
 |-- api: string (nullable = true)



Exclude system starting with % and / from query system
since they are not representing real CMS systems.


In [3]:
groupped_req_data=raw_data\
.withWatermark("timestamp", "1 minute")\
.groupBy(window('timestamp', "1 minute", "1 minute"),"system","api","user")\
.agg(count("api").alias("count_req"))

groupped_req_data.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- system: string (nullable = true)
 |-- api: string (nullable = true)
 |-- user: string (nullable = true)
 |-- count_req: long (nullable = false)



In [11]:
raw_data_flow = raw_data.writeStream.queryName("email").outputMode("Append").format("memory").start()

In [13]:
groupped_req_flow = groupped_req_data.writeStream.queryName("groupped_req").outputMode("Append").format("memory").start()

In [14]:
req_hdfs_flow=groupped_req_data.writeStream \
.outputMode("append")\
.format("parquet")\
 .option("path", "/cms/users/carizapo/ming/data_cmsweb_logs") \
 .option("checkpointLocation", "/cms/users/carizapo/ming/req_checkpoint_cmsweb_logs") \
 .outputMode("append") \
 .start()

In [15]:
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x7f9328af7438>,
 <pyspark.sql.streaming.StreamingQuery at 0x7f9328b0b080>,
 <pyspark.sql.streaming.StreamingQuery at 0x7f9328af7208>]

In [3]:
# groupped_data=raw_data\
# .withWatermark("timestamp", "1 minute")\
# .groupBy(window('timestamp', "1 minute", "1 minute"),"system")\
# .agg(count("system").alias("count"))
# groupped_data.printSchema()

In [77]:
# groupped_load_data=raw_data\
# .withWatermark("timestamp", "1 minute")\
# .groupBy(window('timestamp', "1 minute", "1 minute"),"system","api","user")\
# .agg(count("api").alias("count_req_load"))

# groupped_load_data.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- system: string (nullable = true)
 |-- api: string (nullable = true)
 |-- user: string (nullable = true)
 |-- count_req_load: long (nullable = false)



In [79]:
# groupped_sys_load=raw_data\
# .withWatermark("timestamp", "1 minute")\
# .groupBy(window('timestamp', "1 minute", "1 minute"),"system")\
# .agg(count("system").alias("count_sys_load"))

# groupped_load_data=groupped_load_data.join(groupped_sys_load,["system","window"],"inner")
# groupped_load_data.printSchema()

root
 |-- system: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- api: string (nullable = true)
 |-- user: string (nullable = true)
 |-- count_req_load: long (nullable = false)
 |-- count_sys_load: long (nullable = false)



In [81]:
# groupped_user_load=raw_data\
# .withWatermark("timestamp", "1 minute")\
# .groupBy(window('timestamp', "1 minute", "1 minute"),"user")\
# .agg(count("user").alias("count_user_load"))

# groupped_load_data=groupped_load_data.join(groupped_user_load,["user","window"],"inner")
# groupped_load_data.printSchema()

root
 |-- user: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- system: string (nullable = true)
 |-- api: string (nullable = true)
 |-- count_req_load: long (nullable = false)
 |-- count_sys_load: long (nullable = false)
 |-- count_user_load: long (nullable = false)



In [87]:
# groupped_api_load=raw_data\
# .withWatermark("timestamp", "1 minute")\
# .groupBy(window('timestamp', "1 minute", "1 minute"),"api")\
# .agg(count("api").alias("count_api_load"))

# groupped_load_data=groupped_load_data.join(groupped_api_load,["api","window"],"inner")
# groupped_load_data.printSchema()

root
 |-- api: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- user: string (nullable = true)
 |-- system: string (nullable = true)
 |-- count_req_load: long (nullable = false)
 |-- count_sys_load: long (nullable = false)
 |-- count_user_load: long (nullable = false)
 |-- count_api_load: long (nullable = false)



In [89]:
# groupped_load_flow = groupped_load_data.writeStream.queryName("groupped_load").outputMode("Append").format("memory").start()

In [63]:
# groupped_data_flow = groupped_data.writeStream.queryName("groupped").outputMode("Append").format("memory").start()

In [6]:
# hdfs_data_flow=groupped_data.writeStream \
# .outputMode("append")\
# .format("parquet")\
#  .option("path", "/cms/users/carizapo/ming/groupdata_cmsweb_logs") \
#  .option("checkpointLocation", "/cms/users/carizapo/ming/checkpoint_cmsweb_logs") \
#  .outputMode("append") \
#  .start()

In [11]:
# groupped_req_flow.lastProgress
# groupped_req_flow.processAllAvailable()
# groupped_req_flow.isActive
# groupped_load_flow.lastProgress
# groupped_load_flow.isActive
# req_hdfs_flow.processAllAvailable()
# req_hdfs_flow.lastProgress
# req_hdfs_flow.isActive

True

In [54]:
raw_data_flow.stop()

In [94]:
groupped_req_flow.stop()
req_hdfs_flow.stop()

In [86]:
# groupped_data_flow.stop()
# groupped_load_flow.stop()

In [93]:
alerts = spark.sql("select * from groupped_req")
alerts.show()

+--------------------+-------------+--------------------+--------------------+---------+
|              window|       system|                 api|                user|count_req|
+--------------------+-------------+--------------------+--------------------+---------+
|[2019-07-17 09:12...|      couchdb|pdmvserv_task_EXO...|/DC=ch/DC=cern/OU...|        1|
|[2019-07-17 09:12...|      couchdb|pdmvserv_task_HIG...|/DC=ch/DC=cern/OU...|        1|
|[2019-07-17 09:12...|      couchdb|pdmvserv_task_SUS...|/DC=ch/DC=cern/OU...|        1|
|[2019-07-17 09:12...|      couchdb|pdmvserv_task_EXO...|/DC=ch/DC=cern/OU...|        1|
|[2019-07-17 09:12...|      couchdb|nwickram_RVCMSSW_...|/DC=ch/DC=cern/OU...|        1|
|[2019-07-17 09:12...|wmstatsserver|zhenhu_RVCMSSW_10...|/DC=ch/DC=cern/OU...|        1|
|[2019-07-17 09:12...|      couchdb|cmsunified_ACDC0_...|/DC=ch/DC=cern/OU...|        1|
|[2019-07-17 09:12...|      couchdb|cmsunified_ACDC0_...|/DC=ch/DC=cern/OU...|        1|
|[2019-07-17 09:12...

In [None]:
raw_data_old = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", "monit-kafka.cern.ch:9092")\
.option("subscribe","cmsweb_logs")\
.option("auto.offset.reset", "earliest")\
.option("startingOffsets", "earliest")\
.load()\
.select(\
        from_json(col("value").cast("string"),emailSchema)\
            .getField("metadata").alias("metadata").getField("host").alias("host")\
        ,col("timestamp").alias("timestamp")\
        ,from_json(col("value").cast("string"),emailSchema)\
            .getField("data").alias("data").getField("system").alias("system")\
        ,from_json(col("value").cast("string"),emailSchema)\
            .getField("data").alias("data").getField("dn").alias("user")\
)

raw_data_old=raw_data_old.filter(~raw_data.system.rlike("^(%|/)"))

In [None]:
groupped_data_hour=raw_data_old\
.withWatermark("timestamp", "15 minutes")\
.groupBy(window('timestamp', "1 hour", "30 minutes"),"system")\
.agg(count("system").alias("count"))

In [None]:
groupped_data_day=raw_data_old\
.withWatermark("timestamp", "1 hours")\
.groupBy(window('timestamp', "1 day", "1 day"),"system")\
.agg(count("system").alias("count"))

In [None]:
groupped_data_week=raw_data_old\
.withWatermark("timestamp", "1 hours")\
.groupBy(window('timestamp', "1 week", "1 week"),"system")\
.agg(count("system").alias("count"))

In [None]:
groupped_data_month=raw_data_old\
.withWatermark("timestamp", "1 hours")\
.groupBy(window('timestamp', "4 weeks", "2 weeks"),"system")\
.agg(count("system").alias("count"))

In [None]:
hdfs_data_hour_flow=groupped_data_hour.writeStream \
.outputMode("append")\
.format("parquet")\
 .option("path", "/cms/users/carizapo/ming/groupdata_hour_cmsweb_logs") \
 .option("checkpointLocation", "/cms/users/carizapo/ming/checkpoint_1a_cmsweb_logs") \
 .outputMode("append") \
 .start()

In [None]:
hdfs_data_day_flow=groupped_data_day.writeStream \
.outputMode("append")\
.format("parquet")\
 .option("path", "/cms/users/carizapo/ming/groupdata_day_cmsweb_logs") \
 .option("checkpointLocation", "/cms/users/carizapo/ming/checkpoint_2a_cmsweb_logs") \
 .outputMode("append") \
 .start()

In [None]:
hdfs_data_week_flow=groupped_data_week.writeStream \
.outputMode("append")\
.format("parquet")\
 .option("path", "/cms/users/carizapo/ming/groupdata_week_cmsweb_logs") \
 .option("checkpointLocation", "/cms/users/carizapo/ming/checkpoint_3a_cmsweb_logs") \
 .outputMode("append") \
 .start()

In [None]:
hdfs_data_month_flow=groupped_data_month.writeStream \
.outputMode("append")\
.format("parquet")\
 .option("path", "/cms/users/carizapo/ming/groupdata_month_cmsweb_logs") \
 .option("checkpointLocation", "/cms/users/carizapo/ming/checkpoint_4a_cmsweb_logs") \
 .outputMode("append") \
 .start()

In [None]:
# hdfs_data_flow.stop()
hdfs_data_hour_flow.stop()
hdfs_data_day_flow.stop()
hdfs_data_week_flow.stop()
hdfs_data_month_flow.stop()

In [None]:
raw_data_old_flow=raw_data_old.writeStream.queryName("old_data").outputMode("Append").format("memory").start()

In [None]:
alerts = spark.sql("select * from old_data")
alerts.show()