In [None]:
!pip3 int

In [1]:
from pyspark import SparkContext, SparkConf, SQLContext, Row
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
from os import listdir
from os.path import isfile, join
import platform
import pandas as pd
from itertools import chain
import pyspark.sql.functions as func
from datetime import datetime
import numpy as np
import os

In [2]:
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 --conf spark.cassandra.connection.host=127.0.0.1 --conf spark.cassandra.connection.port=9042 --conf spark.cassandra.auth.username=cassandra --conf spark.cassandra.auth.password=cassandra pyspark-shell"

In [3]:
KAFKA_BOOTSTRAP_SERVER = "localhost:9092"

In [4]:
spark = SparkSession \
        .builder \
        .appName("LogsAnalysisWithSpark") \
        .master("local[*]") \
        .getOrCreate()

In [5]:
sample = spark.read.json("logs/calls_data.json")
schema = sample.schema

In [6]:
def get_stream_for_topics(topics, spark, kafka_server):
    return spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", kafka_server)\
    .option("subscribe", ",".join(topics))\
    .load()

In [7]:
streamingInputDF = get_stream_for_topics(["callListUpdate", "callInfoUpdate"], spark, KAFKA_BOOTSTRAP_SERVER)

In [8]:
valuesDF = streamingInputDF.selectExpr("CAST(value AS STRING)")

In [9]:
eventsDF = valuesDF.withColumn("event", from_json(valuesDF.value, schema)).select("event")

In [10]:
eventsDF.printSchema()

root
 |-- event: struct (nullable = true)
 |    |-- date: string (nullable = true)
 |    |-- message: struct (nullable = true)
 |    |    |-- messageId: long (nullable = true)
 |    |    |-- subscriptionIndex: long (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- updates: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- call: string (nullable = true)
 |    |    |    |    |-- callCorrelator: string (nullable = true)
 |    |    |    |    |-- callType: string (nullable = true)
 |    |    |    |    |-- distributedInstances: long (nullable = true)
 |    |    |    |    |-- endpointRecording: string (nullable = true)
 |    |    |    |    |-- lockState: string (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- participants: long (nullable = true)
 |    |    |    |    |-- reason: string (nullable = true)
 |    |    |    |    |-- recording: string (nullable = tr

In [11]:
messagesDF = eventsDF.withColumn("date", eventsDF.event.date)\
                     .withColumn("message", eventsDF.event.message)\
                     .select("message", "date")

In [12]:
preprocessedDF = messagesDF.select("date", explode(messagesDF.message.updates))

In [13]:
preprocessedDF.printSchema()

root
 |-- date: string (nullable = true)
 |-- col: struct (nullable = true)
 |    |-- call: string (nullable = true)
 |    |-- callCorrelator: string (nullable = true)
 |    |-- callType: string (nullable = true)
 |    |-- distributedInstances: long (nullable = true)
 |    |-- endpointRecording: string (nullable = true)
 |    |-- lockState: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- participants: long (nullable = true)
 |    |-- reason: string (nullable = true)
 |    |-- recording: string (nullable = true)
 |    |-- streaming: string (nullable = true)
 |    |-- updateType: string (nullable = true)



In [14]:
col = preprocessedDF.col

In [15]:
finalDF = preprocessedDF.select(col.call.alias("call"),
                      col.callCorrelator.alias("callCorrelator"),
                      col.callType.alias("callType"),
                      col.distributedInstances.alias("distributedInstances"),
                      col.endpointRecording.alias("endpointRecording"),
                      col.lockState.alias("lockState"),
                      col.participants.alias("participants"),
                      col.reason.alias("reason"),
                      col.recording.alias("recording"),
                      col.streaming.alias("streaming"),
                      col.updateType.alias("updateType"),
                      preprocessedDF.date
                     )

In [16]:
# Czas rzeczywisty -> current_time
# Czas od rozpoczęcia spotkania -> time_diff
# Spotkanie nagrywane -> recording
# Spotkanie streamowane -> streaming
# Spotkanie zablokowane -> locked
# Spotkanie adHoc -> adhoc
# Spotkanie
# Aktualna liczba uczestników -> current_participants
# Średnia liczba uczestników w danym spotkaniu -> mean_participants
# Maxymalna liczba uczestników danego spotkania -> max_participants

In [17]:
groupedDF = finalDF.groupBy("call")\
                       .agg(func.sort_array(func.collect_list(finalDF.date)).alias("dateArray"), 
                        func.collect_list(finalDF.recording).alias("recordingArray"),
                        func.collect_list(finalDF.streaming).alias("streamingArray"),
                        func.collect_list(finalDF.lockState).alias("lockStateArray"),
                        reverse(func.collect_list(finalDF.callType)).getItem(0).alias("callType"),
                        reverse(func.collect_list(finalDF.participants)).getItem(0).alias("current_participants"),
                        func.max(finalDF.participants).alias("max_participants"),
                        func.mean(finalDF.participants).alias("mean_participants"))

In [18]:
groupedDF.printSchema()

root
 |-- call: string (nullable = true)
 |-- dateArray: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- recordingArray: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- streamingArray: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- lockStateArray: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- callType: string (nullable = true)
 |-- current_participants: long (nullable = true)
 |-- max_participants: long (nullable = true)
 |-- mean_participants: double (nullable = true)



In [19]:
def find_diff(dates):
    pattern = '%Y-%m-%dT%H:%M:%S.%f'
    start_date = datetime.strptime(dates[0], pattern)
    end_date = datetime.strptime(dates[-1], pattern)
    return int((end_date - start_date).total_seconds())

find_diff_udf = udf(lambda x: find_diff(x))

In [20]:
def get_last_date(dates):
    return dates[-1]

get_last_date_udf = udf(lambda x: get_last_date(x))

In [21]:
def get_last_nonempty_value(values):
    nonempty_values = [i for i in values if i]
    return nonempty_values[-1] if nonempty_values else None

In [22]:
def get_if_active(values):
    state = get_last_nonempty_value(values)
    return state == "active"    
    
get_if_active_udf = udf(lambda x: get_if_active(x))

In [23]:
def get_if_locked(values):
    state = get_last_nonempty_value(values)
    return state == "locked"

get_if_locked_udf = udf(lambda x: get_if_locked(x))

In [24]:
def get_if_type(current_type, expected_type):
    return current_type == expected_type

get_if_adhoc_udf = udf(lambda x: get_if_type(x, "adHoc"))
get_if_lync_udf = udf(lambda x: get_if_type(x, "lyncConferencing"))
get_if_forwarding_udf = udf(lambda x: get_if_type(x, "forwarding"))
get_if_cospace_udf = udf(lambda x: get_if_type(x, "coSpace"))

In [25]:
concat_udf = udf(lambda cols: "".join([x if x is not None else "*" for x in cols]), StringType())

In [26]:
finalDF = groupedDF\
                .withColumn("current_time", get_last_date_udf(groupedDF.dateArray))\
                    .withColumn("time_diff", find_diff_udf(groupedDF.dateArray))\
                    .withColumn("call_id", groupedDF.call)\
                    .withColumn("recording", get_if_active_udf(groupedDF.recordingArray))\
                    .withColumn("streaming", get_if_active_udf(groupedDF.streamingArray))\
                    .withColumn("locked", get_if_locked_udf(groupedDF.lockStateArray))\
                    .withColumn("cospace", get_if_cospace_udf(groupedDF.callType))\
                    .withColumn("adhoc", get_if_adhoc_udf(groupedDF.callType))\
                    .withColumn("lync_conferencing", get_if_lync_udf(groupedDF.callType))\
                    .withColumn("forwarding", get_if_forwarding_udf(groupedDF.callType))\
                    .select("current_time", "time_diff", "call_id", "recording", "streaming", 
                            "locked", "cospace", "adhoc", "lync_conferencing", "forwarding",
                            "current_participants", "mean_participants", "max_participants")

In [27]:
finalDF.printSchema()

root
 |-- current_time: string (nullable = true)
 |-- time_diff: string (nullable = true)
 |-- call_id: string (nullable = true)
 |-- recording: string (nullable = true)
 |-- streaming: string (nullable = true)
 |-- locked: string (nullable = true)
 |-- cospace: string (nullable = true)
 |-- adhoc: string (nullable = true)
 |-- lync_conferencing: string (nullable = true)
 |-- forwarding: string (nullable = true)
 |-- current_participants: long (nullable = true)
 |-- mean_participants: double (nullable = true)
 |-- max_participants: long (nullable = true)



In [28]:
testDF = finalDF.withColumn("id", concat_udf(func.array(finalDF.call_id, finalDF.current_time)))

In [29]:
testDF.printSchema()

root
 |-- current_time: string (nullable = true)
 |-- time_diff: string (nullable = true)
 |-- call_id: string (nullable = true)
 |-- recording: string (nullable = true)
 |-- streaming: string (nullable = true)
 |-- locked: string (nullable = true)
 |-- cospace: string (nullable = true)
 |-- adhoc: string (nullable = true)
 |-- lync_conferencing: string (nullable = true)
 |-- forwarding: string (nullable = true)
 |-- current_participants: long (nullable = true)
 |-- mean_participants: double (nullable = true)
 |-- max_participants: long (nullable = true)
 |-- id: string (nullable = true)



In [30]:
def writeToCassandra(writeDF, epochId):
     writeDF.write \
        .format("org.apache.spark.sql.cassandra") \
        .options(table="test", keyspace="engineering")\
        .mode("append") \
        .save()

In [31]:
writer = testDF\
        .writeStream\
        .trigger(processingTime="10 seconds") \
        .outputMode("update")\
        .foreachBatch(writeToCassandra)

In [None]:
query = writer.start()
query.awaitTermination()