### Install and load packages

In [1]:
!pip install influxdb-client



In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

import os
import json
import numpy as np
import pandas as pd

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

### Spark Instance

In [3]:
spark = SparkSession.builder.master('local[*]').getOrCreate()
spark.sparkContext.setLogLevel('ERROR')



:: loading settings :: url = jar:file:/usr/local/spark-3.2.0-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-20239031-5d56-4b2e-93d2-59374dbe30e0;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.0 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central

### Define an input stream

In [4]:
# Define a schema for the data frame
cols = ['Time',
 'HMI_FIT101', 'HMI_LIT101', 'HMI_AIT201', 'HMI_AIT202', 'HMI_AIT203', 'HMI_FIT201',
 'HMI_DPIT301', 'HMI_FIT301', 'HMI_LIT301', 'HMI_AIT401', 'HMI_AIT402', 'HMI_FIT401', 
 'HMI_LIT401', 'HMI_AIT501', 'HMI_AIT502', 'HMI_AIT503', 'HMI_AIT504', 'HMI_FIT501',
 'HMI_FIT502', 'HMI_FIT503', 'HMI_FIT504', 'HMI_PIT501', 'HMI_PIT502', 'HMI_PIT503', 
 'HMI_FIT601', 'HMI_MV101', 'HMI_P101', 'HMI_P102', 'HMI_MV201', 'HMI_P201', 'HMI_P202',
 'HMI_P203', 'HMI_P204', 'HMI_P205', 'HMI_P206','HMI_MV301', 'HMI_MV302', 'HMI_MV303',
 'HMI_MV304', 'HMI_P301', 'HMI_P302', 'HMI_P401', 'HMI_P402', 'HMI_P403', 'HMI_P404',
 'HMI_P501', 'HMI_P502', 'HMI_P601', 'HMI_P602', 'HMI_P603', 'HMI_UV401']

fields = [StructField(col_name, StringType(), True) for col_name in cols]
schema = StructType(fields)

In [5]:
# Read stream from json and fit schema
inputStream = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "SWAT")\
    .load()

inputStream = inputStream.select(col("value").cast("string").alias("data"))
# inputStream = inputStream.select(from_json(col("value").cast("string"), schema).alias("data"))
inputStream.printSchema()
# data = inputStream.select("data.*")

root
 |-- data: string (nullable = true)



### Batch Processing and Storage

In [6]:
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

def pca(data):
    sc = StandardScaler()
    data = sc.fit_transform(data)
    pca = PCA(n_components = .95)
    pca.fit(data)
    reduced = pca.transform(data)
    principal_components = pd.DataFrame(data = reduced, columns=[f"P{col + 1}" for col in range(reduced.shape[1])])
    return principal_components

In [7]:
from pyspark.sql import Row
from datetime import datetime
import pickle

class InfluxDBWriter:
    def __init__(self, cloud=False):
        self.url = "http://influxdb:8086"
        self.token = "iJHZR-dq4I5LIpFZCc5bTUHx-I7dyz29ZTO-B4W5DpU4mhPVDFg-aAb2jK4Vz1C6n0DDb6ddA-bJ3EZAanAOUw=="
        self.org = "primary"
        self.bucket = "swat"
        if cloud: # Connect to InfluxDB Cloud
            self.client = InfluxDBClient(
                url="https://westeurope-1.azure.cloud2.influxdata.com", 
                token="iJHZR-dq4I5LIgFZCc5jTUNx-I7dyz29ZTO-B4W5DpU4mhPVDFg-aAb2jK4Vz1C6n0DDb6ddA-bJ3EZAanAOUw==", 
                org="ahr9oi@inf.elte.hu"
            )
        else: # Connect to a local instance of InfluxDB
            self.client = InfluxDBClient(url=self.url, token=self.token, org=self.org)
        # Create a writer API
        self.write_api = self.client.write_api()

    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True
    
    def preprocess(self, row):
        row_dict = json.loads(row)
        for key, val in row_dict.items():
            if key == 'AIT401' or key == 'P102' or key == 'P202' or key == 'P204'\
            or key == 'P206' or key == 'P302' or key == 'P402' or key == 'P403'\
            or key == 'P404' or key == 'P502' or key == 'P601' or key == 'P603':
                row_dict.pop(key)
                
        row_list = [val for key, val in row_dict if key != 'Time']
        principal_comps = pca(row_list)
        return principal_comps

    def process(self, row):
        try:
            preprocessed = self.preprocess(row["data"])
            print(len(preprocessed))
            #self.write_api.write(bucket=self.bucket, org=self.org, record=self._row_to_point(row["data"]))
            #print(f"> Inserted into {self.bucket} bucket")
        except Exception as ex:
            print(f"[x] Error {str(ex)}")

    def _row_to_point(self, row):
        # Load to dictionary
        row_dict = json.loads(row)
        # String to timestamp
        # timestamp = datetime.datetime.strptime(row_dict["Time"], "%d/%m/%Y %H:%M:%S.%f %p")
        # Create a data point
        point = Point("data").tag("anomaly", "false")
        # Add fields to the point
        for key, val in row_dict.items():
            if key != 'Time':
                point.field(key, float(val))
        # Add timestamp
        point.time(datetime.now())
        return point
    
    def _is_anomaly(self, row):
        row_dict = json.loads(row)
        data_row = []
        # Process dict to array of data
        for key, val in row_dict.items():
            if key != 'Time':
                data_row.append(val)
        # Polynomial SVC on Isolation Trees model
        svc = pickle.load(open('./models/svc_poly.pickle', 'rb'))
        # Detect anomalies
        preds = pca(np.asarray(data_row).reshape(-1, 1))
        return preds
        

### Delete records from a bucket

In [8]:
# DELETE RECORDS
#from datetime import datetime

#client = InfluxDBClient(url="http://influxdb:8086", token="iJHZR-dq4I5LIpFZCc5bTUHx-I7dyz29ZTO-B4W5DpU4mhPVDFg-aAb2jK4Vz1C6n0DDb6ddA-bJ3EZAanAOUw==", org="primary")
#delete_api = client.delete_api()
#start = "1970-01-01T00:00:00Z"
#measurement = "mem"
#delete_api.delete(start, datetime.now(), f'_measurement="{measurement}"', bucket="primary", org="primary")
#print("> Deleted")

### Read stream and process

In [9]:
# 3. Output data: show result in the console
query = (inputStream
         .writeStream
         .outputMode("append")
         .foreach(InfluxDBWriter())
         .start())

query.awaitTermination()

Opened 0, 0                                                         (0 + 1) / 1]
Opened 0, 1                                                         (0 + 1) / 1]
51
51
51
51
51
Opened 0, 2                                                                     
51
Opened 0, 3
51
Opened 0, 4
51tage 4:>                                                          (0 + 1) / 1]
Opened 0, 5                                                         (0 + 1) / 1]
51
Opened 0, 6                                                                     
51
Opened 0, 7                                                         (0 + 1) / 1]
51
Opened 0, 8                                                                     
51
Opened 0, 9
51


KeyboardInterrupt: 