In [16]:
# !pip install pyspark

In [17]:
# !pip install scikit-learn

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

In [2]:
# Initialize a Spark session
spark = SparkSession.builder\
        .appName("KafkaSparkStructuredStreaming")\
        .master("local[2]")\
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")\
        .getOrCreate()



23/12/17 16:07:48 WARN Utils: Your hostname, jbara-Dell-G15-5511 resolves to a loopback address: 127.0.1.1; using 192.168.0.71 instead (on interface wlp0s20f3)
23/12/17 16:07:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/jbara/anaconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jbara/.ivy2/cache
The jars for the packages stored in: /home/jbara/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-59e346e6-2ff0-4abb-b369-59e720b500b9;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.1 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 307ms :: artifacts dl 8ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 from central in

In [3]:
spark

In [4]:
# Define the Kafka parameters
kafka_bootstrap_servers = "localhost:9093"  # Use the Kafka broker's host and port
kafka_topic = "spark_stream"


In [5]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType

schema = StructType() \
      .add("Datetime",StringType()) \
      .add("Temperature",FloatType()) \
      .add("Humidity",FloatType()) \
      .add("WindSpeed",FloatType()) \
      .add("GeneralDiffuseFlows",FloatType()) \
      .add("DiffuseFlows",FloatType()) \
      .add("PowerConsumption_Zone1",FloatType()).add("PowerConsumption_Zone2",FloatType()).add("PowerConsumption_Zone3",FloatType())

In [6]:
# Read data from Kafka in a streaming manner
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .load()

In [7]:
personStringDF = df.selectExpr("CAST(value AS STRING)")

In [8]:
personStringDF

DataFrame[value: string]

In [9]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [10]:
from pyspark.sql.functions import from_json, col
df = df.select(col("value").cast("string").alias("data"))
df.printSchema()

root
 |-- data: string (nullable = true)



In [19]:
import pandas as pd
from sklearn.preprocessing import StandardScaler
import joblib
from confluent_kafka import Producer
import json
from influx_line_protocol import Metric
from datetime import datetime, timezone
import time
from datetime import datetime

In [38]:

features = ['Temperature', 'Humidity', 'WindSpeed', 'GeneralDiffuseFlows', 'DiffuseFlows',
            'PowerConsumption_Zone1', 'PowerConsumption_Zone2', 'PowerConsumption_Zone3']

# Kafka producer configuration
kafka_producer_conf = {
    'bootstrap.servers': 'localhost:9093',  # Adjust the Kafka broker address as needed
}

# Create a Kafka producer instance
output_kafka_topic = 'telegraf'

# Kafka topic to which you want to send all the data
kafka_producer = Producer(kafka_producer_conf)



def preprocess_batch(df_batch, df_batch_id):
    collected_data = df_batch.collect()
    if len(collected_data) == 400:
        batch = pd.DataFrame([row.asDict() for row in collected_data])
        row_data = batch.copy()
        #print(row_data)

        # missing values handling
        batch[features] = batch[features].fillna(batch[features].mean())

        # scaling
        scaler = StandardScaler()
        batch[features] = scaler.fit_transform(batch[features])

        anomalies = anomaly(batch)
        #print(anomalies)
        row_data['label'] = anomalies[0]

        features1 = ['Datetime', 'Temperature', 'Humidity', 'WindSpeed', 'GeneralDiffuseFlows', 'DiffuseFlows',
            'PowerConsumption_Zone1', 'PowerConsumption_Zone2', 'PowerConsumption_Zone3', 'label']

        # Send all data to another Kafka topic
        counter = 0
        for _, row in row_data.iterrows():
            json_payload = row[features1].to_dict()
            #date_string = json_payload['Datetime']
            # Parse the date string
            #date_object = datetime.strptime(date_string, "%m/%d/%Y %H:%M")
            # Convert the datetime object to a Unix timestamp with nanosecond precision
            #unix_timestamp_ns = int(date_object.replace(tzinfo=timezone.utc).timestamp() * 1e9)
            current_time_ns = time.time_ns()
            metric = Metric("jbara")
            metric.with_timestamp(current_time_ns)
            metric.add_value('Temperature', float(json_payload['Temperature']))
            metric.add_value('Humidity', float(json_payload['Humidity']))
            metric.add_value('WindSpeed', float(json_payload['WindSpeed']))
            metric.add_value('GeneralDiffuseFlows', float(json_payload['GeneralDiffuseFlows']))
            metric.add_value('DiffuseFlows', float(json_payload['DiffuseFlows']))
            metric.add_value('PowerConsumption_Zone1', float(json_payload['PowerConsumption_Zone1']))
            metric.add_value('PowerConsumption_Zone2', float(json_payload['PowerConsumption_Zone2']))
            metric.add_value('PowerConsumption_Zone3', float(json_payload['PowerConsumption_Zone3']))
            metric.add_value('label', json_payload['label'])
            #producer.flush()
            #if(counter%50==0):
            
            #counter+=1
            kafka_producer.produce(output_kafka_topic, key=None, value=str(metric))
            kafka_producer.flush()
            time.sleep(1.5)

    else:
        pass

    #kafka_producer.flush()

In [39]:
def anomaly(processed_data):
    #first model : isolation forest
    iso_model = joblib.load(filename='isolation_forest_model.joblib')
    anomaly_scores_iso=iso_model.predict(processed_data[features].values)
    
    #second model : LOF

    lof_model  = joblib.load(filename='local_outlier_factor.joblib')
    anomaly_scores_lof=iso_model.predict(processed_data[features].values)
    
    #third model: SVM

    svm_model  = joblib.load(filename='local_outlier_factor.joblib')
    anomaly_scores_svm=svm_model.fit_predict(processed_data[features].values)

    anomalies=anomaly_scores_svm+anomaly_scores_lof+anomaly_scores_iso
    anomaly_df = pd.DataFrame(anomalies<=-1)
    anomaly_df[0]= anomaly_df[0].apply(lambda x: 1 if x == True else 0)
    #print(anomaly_df)
    print(pd.DataFrame(anomalies<=-1).groupby(0).size())
    return anomaly_df


    

# Pipeline

In [40]:
# Read data from Kafka in a streaming manner with the defined schema
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .load() \
    .selectExpr("CAST(value AS STRING)").select(from_json(col("value"), schema).alias("data")).select("data.*")
# Print the streaming data to the console
query = df \
    .writeStream \
    .outputMode("update") \
    .foreachBatch(preprocess_batch) \
    .trigger(processingTime='5 seconds') \
    .start()

# Wait for the termination of the query (Ctrl+C to stop in Jupyter)
query.awaitTermination()

# Stop the Spark session
spark.stop()


23/12/17 18:14:27 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b8e288c0-6c6f-43bc-add0-ecab07cd5dbd. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/12/17 18:14:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/12/17 18:14:28 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
23/12/17 18:14:28 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
23/12/17 18:14:28 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
23/12/17 18:14:28 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known con

0
False    398
True       2
dtype: int64
0
False    398
True       2
dtype: int64


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/jbara/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/jbara/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/home/jbara/anaconda3/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

23/12/17 18:24:38 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 603655 milliseconds
23/12/17 18:24:38 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 603681 milliseconds


0
False    398
True       2
dtype: int64
0
False    398
True       2
dtype: int64


23/12/17 18:40:47 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 602901 milliseconds
23/12/17 18:40:47 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 602902 milliseconds
                                                                                

Defaulting to user installation because normal site-packages is not writeable


In [1]:
import sklearn
print(sklearn.__version__)

1.3.2


In [25]:
!pip install sklearn==1.3.2


[31mERROR: Ignored the following yanked versions: 0.0.post2[0m[31m
[0m[31mERROR: Could not find a version that satisfies the requirement sklearn==1.3.2 (from versions: 0.0, 0.0.post1, 0.0.post4, 0.0.post5, 0.0.post7, 0.0.post9, 0.0.post10, 0.0.post11, 0.0.post12)[0m[31m
[0m[31mERROR: No matching distribution found for sklearn==1.3.2[0m[31m
[0m

In [21]:
!pip install cython



In [20]:
!pip install blosc2~=2.0.0

Collecting blosc2~=2.0.0
  Downloading blosc2-2.0.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.9 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m14.3 MB/s[0m eta [36m0:00:00[0m MB/s[0m eta [36m0:00:01[0m:01[0m
Installing collected packages: blosc2
Successfully installed blosc2-2.0.0


In [28]:
from datetime import datetime

# Get the current time
current_time = datetime.now()

# Print the current time
print("Current Time:", current_time)

Current Time: 2023-12-17 18:12:21.599288


In [37]:
import time

# Get the current time in Unix format with nanosecond precision
current_time_ns = time.time_ns()

# Print the current time
print(type(current_time_ns))

<class 'int'>


In [29]:
date_string = "2023-12-17 18:11:41"
            # Parse the date string
date_object = datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
            # Convert the datetime object to a Unix timestamp with nanosecond precision
unix_timestamp_ns = int(date_object.replace(tzinfo=timezone.utc).timestamp() * 1e9)

In [30]:
unix_timestamp_ns

1702836701000000000

In [None]:
1702833173975256151
1702836701000000000