In [1]:
import os
from datetime import datetime
import tarfile
from six.moves import urllib
import numpy as np
import pandas as pd
from zlib import crc32
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedShuffleSplit
from pandas.plotting import scatter_matrix
from sklearn.impute import SimpleImputer

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, LinearRegressionModel

In [2]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = SparkContext.getOrCreate()

In [3]:
# download data from csv
data = pd.read_csv('2023-01-27_17_10_influxdb_data.csv')

In [4]:
# view existing keys
filtered_data = data.drop(["result", "table", "_start", "_stop", "_measurement", "key", "Unnamed: 0"], axis=1)
filtered_data["_field"].value_counts()

buzz    45326
fill    45326
stol    45326
Name: _field, dtype: int64

In [5]:
# parse keys into seperate dataframes
buzz_df = pd.DataFrame([element for element in filtered_data.to_numpy() if element[2] == "buzz"])
fill_df = pd.DataFrame([element for element in filtered_data.to_numpy() if element[2] == "fill"])
stol_df = pd.DataFrame([element for element in filtered_data.to_numpy() if element[2] == "stol"])

In [6]:
# filter past and future values for prediction
#future_values = [filtered_data['_value'].loc[[int(i[0]+1440)]][i[0]+1440] for i in enumerate(filtered_data.to_numpy()) if i[0] < len(filtered_data['_value']-1440) and i[1][2] == "fill"]
#dist_past_30m = [filtered_data['_value'].loc[[int(i[0]-1440)]][i[0]-1440] for i in enumerate(filtered_data.to_numpy()) if i[0] > 0 and i[1][2] == "fill"]
#dist_past_30m.insert(0, dist_past_30m[0])
past_values_30m = [filtered_data["_value"].loc[[i[0]-131]][i[0]-131] for i in enumerate(filtered_data.to_numpy()) if i[0]>45326+131 and i[0]<90653]
for i in range(0,131):
    past_values_30m.insert(0, -1)
past_values_2h = [filtered_data["_value"].loc[[i[0]-545]][i[0]-545] for i in enumerate(filtered_data.to_numpy()) if i[0]>45326+545 and i[0]<90653]
for i in range(0,545):
    past_values_2h.insert(0, -1)
past_values_4h = [filtered_data["_value"].loc[[i[0]-1091]][i[0]-1091] for i in enumerate(filtered_data.to_numpy()) if i[0]>45326+1091 and i[0]<90653]
for i in range(0,1091):
    past_values_4h.insert(0, -1)
future_values = [filtered_data["_value"].loc[[i[0]+1091]][i[0]+1091] for i in enumerate(filtered_data.to_numpy()) if i[0]>45326 and i[0]<89561 and i[1][2] == "fill"]
for i in range(0,1092):
    future_values.append(-1)

In [7]:
# merge dataframe for use in spark
temp_all_rows = pd.merge(fill_df, buzz_df, how='inner', on=0)
temp_all_rows["fill_future_4h"] = future_values
temp_all_rows["fill_past_30m"] = past_values_30m
temp_all_rows["fill_past_2h"] = past_values_2h
temp_all_rows["fill_past_4h"] = past_values_4h
all_rows = pd.merge(temp_all_rows, stol_df, how='inner', on=0)
renamed_rows = all_rows.rename(columns={'1_x': 'fill', '1_y': 'buzz', 1: 'stol', 0: 'tod'})
pandas_df = renamed_rows.drop(['2_x', '2_y', 2], axis=1)
pandas_df["tod"] = [datetime.strptime(row, '%Y-%m-%dT%H:%M:%S') for row in [row.split('Z')[0] for row in [row.split('.')[0] for row in pandas_df["tod"]]]]
pandas_df["year"] = [date.year for date in pandas_df["tod"]]
pandas_df["month"] = [date.month for date in pandas_df["tod"]]
pandas_df["day"] = [date.day for date in pandas_df["tod"]]
pandas_df["hour"] = [date.hour for date in pandas_df["tod"]]
pandas_df["minute"] = [date.minute for date in pandas_df["tod"]]
pandas_df["second"] = [date.second for date in pandas_df["tod"]]
pandas_df = pandas_df.drop(['tod'], axis=1)
pandas_df = pandas_df.drop(pandas_df[pandas_df.fill_future_4h < 0].index)
pandas_df = pandas_df.drop(pandas_df[pandas_df.fill_past_4h < 0].index)

Unnamed: 0,fill,buzz,fill_future_4h,fill_past_30m,fill_past_2h,fill_past_4h,stol,year,month,day,hour,minute,second
1091,39,0,39,39,39,39,0,2023,1,19,14,4,10
1092,39,0,39,39,39,39,0,2023,1,19,14,4,20
1093,39,0,39,39,39,39,0,2023,1,19,14,4,40
1094,39,0,39,39,39,39,0,2023,1,19,14,4,50
1095,39,0,39,39,39,39,0,2023,1,19,14,5,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
44229,22,0,97,22,35,47,0,2023,1,26,8,28,20
44230,22,0,97,22,35,47,0,2023,1,26,8,28,40
44231,22,0,97,22,35,45,0,2023,1,26,8,28,50
44232,22,0,97,22,35,47,0,2023,1,26,8,29,0


In [8]:
# create spark dataframe
sparkDF=spark.createDataFrame(pandas_df)

In [9]:
# view spark types (only number types allowed)
sparkDF.dtypes

[('fill', 'bigint'),
 ('buzz', 'bigint'),
 ('fill_future_4h', 'bigint'),
 ('fill_past_30m', 'bigint'),
 ('fill_past_2h', 'bigint'),
 ('fill_past_4h', 'bigint'),
 ('stol', 'bigint'),
 ('year', 'bigint'),
 ('month', 'bigint'),
 ('day', 'bigint'),
 ('hour', 'bigint'),
 ('minute', 'bigint'),
 ('second', 'bigint')]

In [10]:
# specify feature columns to prepare for ML
feature_cols = ['year', 'month', 'day', 'hour', 'minute', 'second', 'fill', 'buzz', 'stol', 'fill_past_30m', 'fill_past_2h', 'fill_past_4h']

vect_assembler = VectorAssembler(inputCols = feature_cols, outputCol = "features")

data_w_features = vect_assembler.transform(sparkDF)

In [11]:
# specify output column
data_for_training = data_w_features.select('features', 'fill_future_4h')

In [12]:
# train-test-split
train_dataset, test_dataset = data_for_training.randomSplit([0.7, 0.3])

In [13]:
# train model
LinReg = LinearRegression(featuresCol = "features", labelCol = "fill_future_4h")

# Train the model on the training using fit() method.
model = LinReg.fit(train_dataset)

# Predict the Grades using the evulate method
pred = model.evaluate(test_dataset)

#pred.predictions.show()

+--------------------+--------------+------------------+
|            features|fill_future_4h|        prediction|
+--------------------+--------------+------------------+
|[2023.0,1.0,19.0,...|            39| 52.13295376805202|
|[2023.0,1.0,19.0,...|            39| 52.14657228475502|
|[2023.0,1.0,19.0,...|            39| 52.15338154310652|
|[2023.0,1.0,19.0,...|            39| 52.11724556106994|
|[2023.0,1.0,19.0,...|            39|52.121965129142374|
|[2023.0,1.0,19.0,...|            39| 52.09944766380881|
|[2023.0,1.0,19.0,...|            39|52.083739456826734|
|[2023.0,1.0,19.0,...|            39| 52.23379603517424|
|[2023.0,1.0,19.0,...|            39|  52.0659415595656|
|[2023.0,1.0,19.0,...|            39| 52.23851560324667|
|[2023.0,1.0,19.0,...|            39|  52.2228073962646|
|[2023.0,1.0,19.0,...|            39|52.229616654616095|
|[2023.0,1.0,19.0,...|            39| 52.20028993093103|
|[2023.0,1.0,19.0,...|            39| 52.21390844763403|
|[2023.0,1.0,19.0,...|         

In [14]:
#evaluate predictions
from pyspark.ml.evaluation import RegressionEvaluator

evaluation = RegressionEvaluator(labelCol="fill_future_4h", predictionCol="prediction")
rmse = evaluation.evaluate(pred.predictions, {evaluation.metricName: "rmse"})
#print("RMSE: %.3f" % rmse)
mae = evaluation.evaluate(pred.predictions, {evaluation.metricName: "mae"})
#print("MAE: %.3f" % mae)
r2 = evaluation.evaluate(pred.predictions, {evaluation.metricName: "r2"})
#print("r2: %.3f" %r2)

RMSE: 11.865
MAE: 7.892
r2: 0.638


In [15]:
# using it for detection
#model.write().overwrite().save('./models/')


In [16]:
#model_copy = LinearRegressionModel.load('./models/')

In [50]:
#print(type(model))

<class 'pyspark.ml.regression.LinearRegressionModel'>


In [18]:
#pred2 = model_copy.evaluate(test_dataset)
#pred2.predictions.show()

+--------------------+--------------+------------------+
|            features|fill_future_4h|        prediction|
+--------------------+--------------+------------------+
|[2023.0,1.0,19.0,...|            39| 52.13295376805202|
|[2023.0,1.0,19.0,...|            39| 52.14657228475502|
|[2023.0,1.0,19.0,...|            39| 52.15338154310652|
|[2023.0,1.0,19.0,...|            39| 52.11724556106994|
|[2023.0,1.0,19.0,...|            39|52.121965129142374|
|[2023.0,1.0,19.0,...|            39| 52.09944766380881|
|[2023.0,1.0,19.0,...|            39|52.083739456826734|
|[2023.0,1.0,19.0,...|            39| 52.23379603517424|
|[2023.0,1.0,19.0,...|            39|  52.0659415595656|
|[2023.0,1.0,19.0,...|            39| 52.23851560324667|
|[2023.0,1.0,19.0,...|            39|  52.2228073962646|
|[2023.0,1.0,19.0,...|            39|52.229616654616095|
|[2023.0,1.0,19.0,...|            39| 52.20028993093103|
|[2023.0,1.0,19.0,...|            39| 52.21390844763403|
|[2023.0,1.0,19.0,...|         

In [19]:
#test_dataset.show()

In [75]:
def predictFillLevel(current, buzz, stol, last_30m, last_2h, last_4h):
    data = [{"year": datetime.now().year, "month": datetime.now().month, "day": datetime.now().day, "hour": datetime.now().hour, "minute": datetime.now().minute, "second": datetime.now().second, "fill": current, "buzz": buzz , "stol": stol, "fill_past_30m": last_30m, "fill_past_2h": last_2h, "fill_past_4h": last_4h, "fill_future_4h":-1}]
    spark_data = spark.createDataFrame(data)
    feature_cols = ['year', 'month', 'day', 'hour', 'minute', 'second', 'fill', 'buzz', 'stol', 'fill_past_30m', 'fill_past_2h', 'fill_past_4h']
    vect_assembler = VectorAssembler(inputCols = feature_cols, outputCol = "features")
    data_w_features = vect_assembler.transform(spark_data)
    data_for_training = data_w_features.select('features', 'fill_future_4h')
    pred = model.evaluate(data_for_training)
    filtered_preds = pred.predictions.drop("fill_future_4h")
    filtered_preds.show()

In [77]:
#predictFillLevel(12, 0, 0, 12, 12, 12)

+--------------------+-----------------+
|            features|       prediction|
+--------------------+-----------------+
|[2023.0,1.0,29.0,...|1.313549944846848|
+--------------------+-----------------+



----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 37616)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.8/socketserver.py", line 720, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 262, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 235, in poll
    if func():
  File "/usr/local/spark/python/pyspark/accumulators.py", line 239, in accum_updates
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/pysp

In [54]:
data = [{"year": 2023, "month": 1, "day": 27, "hour": 10, "minute": 10, "second": 1, "fill": 30, "buzz": 0 , "stol": 0, "fill_past_30m": 30, "fill_past_2h": 30, "fill_past_4h": 30, "fill_future_4h":0}]
spark_data = spark.createDataFrame(data)
#'year', 'month', 'day', 'hour', 'minute', 'second', 'fill', 'buzz', 'stol', 'fill_past_30m', 'fill_past_2h', 'fill_past_4h']
feature_cols = ['year', 'month', 'day', 'hour', 'minute', 'second', 'fill', 'buzz', 'stol', 'fill_past_30m', 'fill_past_2h', 'fill_past_4h']

vect_assembler = VectorAssembler(inputCols = feature_cols, outputCol = "features")

data_w_features = vect_assembler.transform(spark_data)

data_for_training = data_w_features.select('features', 'fill_future_4h')
pred = model.evaluate(data_for_training)

In [59]:
#model.predict(np.array([0.0]))

Py4JJavaError: An error occurred while calling z:org.apache.spark.ml.python.MLSerDe.loads.
: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:773)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:213)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:123)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:136)
	at org.apache.spark.mllib.api.python.SerDeBase.loads(PythonMLLibAPI.scala:1322)
	at org.apache.spark.ml.python.MLSerDe.loads(MLSerDe.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
