Skip to content

Commit

Permalink
format and lint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
lalmei committed May 3, 2021
1 parent a89f539 commit e43274c
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 156 deletions.
15 changes: 7 additions & 8 deletions examples/rov_whylogs/kafkaConnector.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
from kafka import KafkaConsumer, TopicPartition, KafkaProducer
import json

from kafka import KafkaConsumer, KafkaProducer, TopicPartition


def create_producer():
return KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
return KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=lambda v: json.dumps(v).encode("utf-8"))


def create_consumer(topic):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
consumer = KafkaConsumer(bootstrap_servers="localhost:9092", value_deserializer=lambda x: json.loads(x.decode("utf-8")))
# Manually assign partitions
# https://github.com/dpkp/kafka-python/issues/601#issuecomment-331419097
assignments = []
partitions = consumer.partitions_for_topic(topic)
for p in partitions:
print(f'topic {topic} - partition {p}')
print(f"topic {topic} - partition {p}")
assignments.append(TopicPartition(topic, p))
consumer.assign(assignments)

return consumer


15 changes: 8 additions & 7 deletions examples/rov_whylogs/logger_tools.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from datetime import datetime
from whylogs import get_or_create_session
import pandas as pd

def log_session(dataset_name,session_data):
from whylogs import get_or_create_session


def log_session(dataset_name, session_data):
session = get_or_create_session()
df = pd.DataFrame(session_data)
df["timestamp"] = pd.to_datetime(df['timestamp'],unit="ms")
df_minutes = df.groupby(pd.Grouper(key='timestamp',freq='min'))
for minute_batch,batch in df_minutes:
with session.logger(dataset_name=dataset_name,dataset_timestamp=minute_batch) as logger:
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
df_minutes = df.groupby(pd.Grouper(key="timestamp", freq="min"))
for minute_batch, batch in df_minutes:
with session.logger(dataset_name=dataset_name, dataset_timestamp=minute_batch) as logger:
logger.log_dataframe(batch)
38 changes: 13 additions & 25 deletions examples/rov_whylogs/model_features.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,22 @@
features = [
"mtarg1",
"mtarg2",
"mtarg1",
"mtarg2",
"mtarg3",

"roll",
"pitch",

"LACCX",
"LACCY",
"LACCX",
"LACCY",
"LACCZ",

"GYROX",
"GYROY",

"SC1I",
"SC2I",
"GYROY",
"SC1I",
"SC2I",
"SC3I",


"BT1I",
"BT2I",
"vout",
"iout",
"BT1I",
"BT2I",
"vout",
"iout",
"cpuUsage",

]
]

fault_features = [
'fault',
'fault_type',
'fault_value',
'fault_duration'
]
fault_features = ["fault", "fault_type", "fault_value", "fault_duration"]
62 changes: 31 additions & 31 deletions examples/rov_whylogs/predict-producer-longrun.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,72 @@
import json
import joblib
import pandas as pd
from model_features import features,fault_features
import os
import pickle

import joblib
import numpy as np
from kafkaConnector import create_producer,create_consumer
import os
from kafkaConnector import create_consumer, create_producer
from model_features import features

# Loading regression model
model_folder = "model_files"
reg_model = joblib.load(open(os.path.join(model_folder,'BL_GYROZ.joblib'), 'rb'))
target = 'GYROZ'
reg_model = joblib.load(open(os.path.join(model_folder, "BL_GYROZ.joblib"), "rb"))
target = "GYROZ"

# Initializing Kafka consumer (for telemetry topic) and producker (for prediction topic)
topic = 'telemetry-rov'
topic = "telemetry-rov"
producer = create_producer()
consumer = create_consumer(topic)


def main():

consumer.seek_to_beginning()
session_number = 0
prev = None
session_data = []
session_timeout = 10
while True:
record = consumer.poll(timeout_ms=session_timeout*1000, max_records=100, update_offsets=True)
record = consumer.poll(timeout_ms=session_timeout * 1000, max_records=100, update_offsets=True)
if not record:
print("{} seconds without new data!".format(session_timeout))
for k,v in record.items():
for k, v in record.items():
for row in v:
current = row.value
current_ts = row.value.get("timestamp")
res = calculate_residual(current,prev)
to_send = {'residual':res,'timestamp':current_ts}
producer.send('prediction-rov', to_send)
res = calculate_residual(current, prev)
to_send = {"residual": res, "timestamp": current_ts}
producer.send("prediction-rov", to_send)
producer.flush()
prev = current
prev_ts = current_ts
finished = False

def calculate_residual(current,prev):

def calculate_residual(current, prev):
if prev:
# More than 0.5s has passed without the ROV sending new telemetry data
if current['timestamp']-prev['timestamp']>500:
if current["timestamp"] - prev["timestamp"] > 500:
print("OVER TIME LIMIT")
return np.nan
else:
prev = { ft: float(prev[ft]) for ft in features }
prev = {ft: float(prev[ft]) for ft in features}
x = np.array(list(prev.values()))
x=x.reshape(1,-1)
x = x.reshape(1, -1)

# Min-max scaler for input features
with open(os.path.join(model_folder,'BL_x.pickle'), 'rb') as f:
scaler_x=pickle.load(f)
with open(os.path.join(model_folder, "BL_x.pickle"), "rb") as f:
scaler_x = pickle.load(f)

x=scaler_x.transform(x)
x = scaler_x.transform(x)

py = reg_model.predict(x)
# Min-max scaler for the target - GYROZ
with open(os.path.join(model_folder,'BL_y.pickle'), 'rb') as f:
scaler_y=pickle.load(f)
py = py.reshape(1,-1)
with open(os.path.join(model_folder, "BL_y.pickle"), "rb") as f:
scaler_y = pickle.load(f)
py = py.reshape(1, -1)
py = scaler_y.inverse_transform(py)
py = py.ravel()
y = float(current[target])
# return residual (diff. between predicted and current)
return abs(py[0]-y)
return abs(py[0] - y)
else:
return np.nan
if (__name__=='__main__'):
main()


if __name__ == "__main__":
main()
57 changes: 30 additions & 27 deletions examples/rov_whylogs/predict_logger_longrun.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import json
from collections import deque
from logger_tools import log_session
from kafkaConnector import create_consumer

import numpy as np
from kafkaConnector import create_consumer
from logger_tools import log_session

topic = 'prediction-rov'
topic = "prediction-rov"
consumer = create_consumer(topic)


def main():
resid_window = deque(maxlen=15)
session_number = 0
Expand All @@ -17,58 +18,60 @@ def main():
total = 0

while True:
record = consumer.poll(timeout_ms=session_timeout*1000, max_records=100, update_offsets=True)
record = consumer.poll(timeout_ms=session_timeout * 1000, max_records=100, update_offsets=True)

if not record:
print("{} Seconds without new data!".format(session_timeout))
print("total points>>",total)
print("total points>>", total)
if session_data:
print("Logging remaining data points into session ",session_number)
print("Logging remaining data points into session ", session_number)
dataset_name = "rov_prediction_{}".format(session_number)
log_session(dataset_name,session_data)
log_session(dataset_name, session_data)

session_data = []
session_number+=1
for k,v in record.items():
session_number += 1

for k, v in record.items():
for row in v:
total += 1
current = row.value
current_ts = current['timestamp']
resid_window.append(current['residual'])
current = add_moving_averages(resid_window,current)
current_ts = current["timestamp"]
resid_window.append(current["residual"])
current = add_moving_averages(resid_window, current)

if not prev_ts:
prev_ts = current_ts

if abs(current_ts - prev_ts) > 5*60*1000:
print("Logging session ",session_number)
if abs(current_ts - prev_ts) > 5 * 60 * 1000:
print("Logging session ", session_number)
dataset_name = "rov_prediction_{}".format(session_number)
log_session(dataset_name,session_data)
log_session(dataset_name, session_data)
session_data = []
session_number+=1
session_number += 1
session_data.append(current)
prev_ts = current_ts

def moving_average(slice_,wd):

def moving_average(slice_, wd):
if np.nan in slice_:
return np.nan
if len(slice_) != wd:
return np.nan
return sum(slice_)/wd
return sum(slice_) / wd


def add_moving_averages(resid_window,current):
def add_moving_averages(resid_window, current):

r_5 = list(resid_window)[-5:]
r_10 = list(resid_window)[-10:]
r_15 = list(resid_window)[-15:]

current['residual_m5'] = moving_average(r_5,5)
current['residual_m10'] = moving_average(r_10,10)
current['residual_m15'] = moving_average(r_15,15)
current["residual_m5"] = moving_average(r_5, 5)
current["residual_m10"] = moving_average(r_10, 10)
current["residual_m15"] = moving_average(r_15, 15)

return current


if (__name__=='__main__'):
main()
if __name__ == "__main__":
main()
42 changes: 21 additions & 21 deletions examples/rov_whylogs/telemetry_logger_longrun.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import json
from logger_tools import log_session
from kafkaConnector import create_consumer
from logger_tools import log_session

topic = 'telemetry-rov'
topic = "telemetry-rov"
consumer = create_consumer(topic)


def main():
consumer.seek_to_beginning()

Expand All @@ -15,38 +15,38 @@ def main():
session_timeout = 10

while True:
finished = True
record = consumer.poll(timeout_ms=session_timeout*1000, max_records=100, update_offsets=True)
record = consumer.poll(timeout_ms=session_timeout * 1000, max_records=100, update_offsets=True)

if not record:
print("{} Seconds without new data!".format(session_timeout))
print("Total points:",total)
print("Total points:", total)
if session_data:
print("Logging remaining data points into session ",session_number)
print("Logging remaining data points into session ", session_number)
dataset_name = "rov_telemetry_{}".format(session_number)
log_session(dataset_name,session_data)
log_session(dataset_name, session_data)

session_data = []
session_number+=1
session_number += 1

for k,v in record.items():
for k, v in record.items():
for row in v:
total+=1
current_ts = row.value.get('timestamp',None)
total += 1
current_ts = row.value.get("timestamp", None)

if not prev_ts:
prev_ts = current_ts
if abs(current_ts - prev_ts) > 5*60*1000:
print("Logging session ",session_number)

if abs(current_ts - prev_ts) > 5 * 60 * 1000:
print("Logging session ", session_number)
dataset_name = "rov_telemetry_{}".format(session_number)
log_session(dataset_name,session_data)
log_session(dataset_name, session_data)

session_data = []
session_number+=1
session_number += 1

session_data.append(row.value)
prev_ts = current_ts

if (__name__=='__main__'):
main()

if __name__ == "__main__":
main()
Loading

0 comments on commit e43274c

Please sign in to comment.