In [66]:
from kafka import KafkaProducer, KafkaConsumer
import json

In [67]:
# Consume the unprocessed kafka stream
class StreamPreprocessor:
    def __init__(
        self, 
        input_topic, 
        output_topic, 
        input_keys,
        bootstrap_servers='localhost:9092', 
        auto_offset_reset='earliest',
    ):
        self.input_topic = input_topic
        self.output_topic = output_topic
        self.input_keys = input_keys
        self.consumer = KafkaConsumer(
            input_topic, 
            bootstrap_servers=bootstrap_servers, 
            auto_offset_reset=auto_offset_reset,
            enable_auto_commit=True,
            value_deserializer=lambda x: json.loads(x.decode('utf-8')))
        self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
    
    def map_keys(self, message, input_keys):
        #print(message)
        output_dict = {
            'time': message.pop('time'),
            'features': {},
            'labels': {}
        }
        for key in input_keys:
            if key == 'time':
                continue
            if key.startswith('attack'):
                output_dict['labels'][key] = message.pop(key)
            else:
                output_dict['features'][key] = message.pop(key)
        
        return output_dict
    
    def run(self):
        for i, message in enumerate(self.consumer):
            output_dict = self.map_keys(message.value, self.input_keys)
            self.producer.send(self.output_topic, json.dumps(output_dict).encode('utf-8'))
            if i % 10000 == 0:
                print(f'Processed {i} messages')

In [68]:
input_columns = ['time', 'P1_B2004', 'P1_B2016', 'P1_B3004', 'P1_B3005', 'P1_B4002', 'P1_B4005', 'P1_B400B', 'P1_B4022', 'P1_FCV01D', 'P1_FCV01Z', 'P1_FCV02D', 'P1_FCV02Z', 'P1_FCV03D', 'P1_FCV03Z', 'P1_FT01', 'P1_FT01Z', 'P1_FT02', 'P1_FT02Z', 'P1_FT03', 'P1_FT03Z', 'P1_LCV01D', 'P1_LCV01Z', 'P1_LIT01', 'P1_PCV01D', 'P1_PCV01Z', 'P1_PCV02D', 'P1_PCV02Z', 'P1_PIT01', 'P1_PIT02', 'P1_PP01AD', 'P1_PP01AR', 'P1_PP01BD', 'P1_PP01BR', 'P1_PP02D', 'P1_PP02R', 'P1_STSP', 'P1_TIT01', 'P1_TIT02', 'P2_24Vdc', 'P2_ASD', 'P2_AutoGO', 'P2_CO_rpm', 'P2_Emerg', 'P2_HILout', 'P2_MSD', 'P2_ManualGO', 'P2_OnOff', 'P2_RTR', 'P2_SIT01', 'P2_SIT02', 'P2_TripEx', 'P2_VT01', 'P2_VTR01', 'P2_VTR02', 'P2_VTR03', 'P2_VTR04', 'P2_VXT02', 'P2_VXT03', 'P2_VYT02', 'P2_VYT03', 'P3_FIT01', 'P3_LCP01D', 'P3_LCV01D', 'P3_LH', 'P3_LIT01', 'P3_LL', 'P3_PIT01', 'P4_HT_FD', 'P4_HT_LD', 'P4_HT_PO', 'P4_HT_PS', 'P4_LD', 'P4_ST_FD', 'P4_ST_GOV', 'P4_ST_LD', 'P4_ST_PO', 'P4_ST_PS', 'P4_ST_PT01', 'P4_ST_TT01', 'attack', 'attack_P1', 'attack_P2', 'attack_P3']

In [69]:
stream_preprocessor = StreamPreprocessor('hai-input', 'hai-preprocessed', input_columns)

In [None]:
stream_preprocessor.run()

Processed 0 messages
Processed 10000 messages
Processed 20000 messages
Processed 30000 messages
Processed 40000 messages
