In [7]:
import redis 

client = redis.Redis('host.docker.internal', 6379, 0)

client.flushdb()

True

In [8]:
import json
import numpy as np
import threading
import time
import io
from concurrent.futures import ThreadPoolExecutor
from pcomp_utils.kafka_confluent_utils import KafkaProducerHandler, KafkaConsumerHandler
from pcomp_utils.activation_functions import ACTIVATIONS, relu, softmax
from pcomp_utils.redis_utils import RedisHandler
from pcomp_utils.minio_utils import MinioClient
from pcomp_utils.utils import batch_generator

In [9]:
# Kafka Configuration
KAFKA_BROKER = 'kafka:9092'

class Neuron(threading.Thread):
    def __init__(self, layer_id, neuron_id, weights, bias, activation, is_final_layer):
        threading.Thread.__init__(self)
        self.layer_id = layer_id
        self.layer_id_num = int(self.layer_id.replace("layer_", ""))
        self.neuron_id = neuron_id
        self.weights = np.array(weights)
        self.bias = np.array(bias)
        self.activation_func = ACTIVATIONS.get(activation, relu)
        self.is_final_layer = is_final_layer
        self.redis_handler = RedisHandler('host.docker.internal', 6379, 0)
        self.executor = ThreadPoolExecutor(max_workers=4)
        self.producer = None

    def fetch_input(self, batch_id, batch_size, columns_size):
        key = f"initial_data:{batch_id}" if self.layer_id == 'layer_0' else f"layer_{int(self.layer_id[-1]) - 1}_{batch_id}"
        # Poll Redis until the data is available.
        while True:
            data = self.redis_handler.get_batch(key, int(batch_size), int(columns_size))
            if data is not None:
                return data

    def process_and_send(self, batch_id, batch_size, columns_size):
        input_data = self.fetch_input(batch_id, batch_size, columns_size)
        z = np.dot(input_data, self.weights) + self.bias
        output = z if self.is_final_layer else self.activation_func(z)
        self.redis_handler(f"n_{self.layer_id_num}_{self.neuron_id}", output, True, 1000)
        msg = f"{self.neuron_id}|{batch_id}|{batch_size}|{columns_size}"
        self.producer.send(f'layer-{self.layer_id_num}-complete', msg, self.layer_id_num)
        #self.producer.send(f'requests-responses', 'www.neuron.example')

    def run(self):
        # Instantiate Kafka consumer and producer inside the thread.
        consumer = KafkaConsumerHandler(f'layer-{self.layer_id_num}', KAFKA_BROKER, partition=self.neuron_id)
        self.producer = KafkaProducerHandler(KAFKA_BROKER)
        last_msg_time = time.time()
        while True:
            got_message = False
            for message in consumer.consume():
                got_message = True
                last_msg_time = time.time()
                layer, batch_id_str, batch_size, columns_size = message.split('|')
                if layer == self.layer_id:
                    batch_id = int(batch_id_str)
                    self.executor.submit(self.process_and_send, batch_id, batch_size, columns_size)
            if not got_message and (time.time() - last_msg_time > 10):
                consumer.commit()
                consumer.close()
                self.producer.close()
                break
            time.sleep(0.05)


class LayerCoordinator(threading.Thread):
    def __init__(self, layer_id, neuron_count, is_final_layer=False):
        threading.Thread.__init__(self)
        self.layer_id = layer_id
        self.layer_id_num = int(self.layer_id.replace("layer_", ""))
        self.neuron_count = neuron_count
        self.is_final_layer = is_final_layer
        self.completed_neurons = {}
        self.redis_handler = RedisHandler('host.docker.internal', 6379, 0)
        self.executor = ThreadPoolExecutor(max_workers=4)
        self.producer = None

    def run(self):
        consumer = KafkaConsumerHandler(f'layer-{self.layer_id_num}-complete', KAFKA_BROKER, self.layer_id_num)
        self.producer = KafkaProducerHandler(KAFKA_BROKER)
        last_msg_time = time.time()
        while True:
            got_message = False
            for message in consumer.consume():
                got_message = True
                last_msg_time = time.time()
                neuron_id, batch_id, batch_size, columns_size = message.split('|')
                neuron_id = int(neuron_id)
                batch_id = int(batch_id)
                if batch_id not in self.completed_neurons:
                    self.completed_neurons[batch_id] = set()
                self.completed_neurons[batch_id].add(neuron_id)

                if len(self.completed_neurons[batch_id]) == self.neuron_count:
                    batch_size = int(batch_size)
                    columns_size = int(columns_size)
                    local_outputs = self.outputs[batch_id].copy()
                    self.executor.submit(self.aggregate_neuron_outputs, batch_id, local_outputs, batch_size, columns_size)
                    del self.completed_neurons[batch_id]
            if not got_message and (time.time() - last_msg_time > 10):
                consumer.commit()
                consumer.close()
                self.producer.close()
                break
            time.sleep(0.05)

    def aggregate_neuron_outputs(self, batch_id, local_outputs, batch_size, columns_size):
        #outputs = np.array([local_outputs.get(neuron_id) for neuron_id in range(self.neuron_count)])
        outputs = np.stack([self.redis_handler.get_batch(f"n_{self.layer_id_num}_{neuron}") for neuron in range(self.neuron_count)], axis=1)
        # Store the aggregated result in Redis.
        self.redis_handler.set(f"{self.layer_id}_{batch_id}", outputs)
        if not self.is_final_layer:
            self.activate_next_layer(batch_id)
        else:
            self.redis_handler.hset('predictions', batch_id, prediction)
            self.redis_handler.delete(f"initial_data:{batch_id}")

    def activate_next_layer(self, batch_id):
        next_layer = f'layer_{self.layer_id_num + 1}'
        self.producer.send('activate-layer', f"{next_layer}|{batch_id}", self.layer_id_num + 1)
        #self.producer.send(f'requests-responses', 'www.layercoordinator.example')
            

class Layer(threading.Thread):
    def __init__(self, layer_id, neuron_count):
        threading.Thread.__init__(self)
        self.layer_id = layer_id
        self.neuron_count = neuron_count
        self.layer_id_num = int(self.layer_id.replace("layer_", ""))
        self.executor = ThreadPoolExecutor(max_workers=8)
        self.producer = None

    def activate_neurons(self, batch_id, batch_size, columns_size):
        for neuron_id in range(self.neuron_count):
            self.executor.submit(self.send_activation, neuron_id, batch_id, batch_size, columns_size)
            print(f"✅ Activated Neuron {neuron_id} in {self.layer_id} for image {batch_id}")

    def send_activation(self, neuron_id, batch_id, batch_size, columns_size):
        self.producer.send(f'layer-{self.layer_id_num}', f"{self.layer_id}|{batch_id}|{batch_size}|{columns_size}", neuron_id)
        #self.producer.send(f'requests-responses', 'www.layer.example')

    def run(self):
        consumer = KafkaConsumerHandler('activate-layer', KAFKA_BROKER, self.layer_id_num)
        self.producer = KafkaProducerHandler(KAFKA_BROKER)
        last_msg_time = time.time()
        while True:
            got_message = False
            for message in consumer.consume():
                got_message = True
                last_msg_time = time.time()
                layer, batch_id_str, batch_size, columns_size = message.split('|')
                if layer == self.layer_id:
                    batch_id = int(batch_id_str)
                    self.activate_neurons(batch_id, batch_size, columns_size)
            if not got_message and (time.time() - last_msg_time > 10):
                consumer.commit()
                consumer.close()
                self.producer.close()
                break
            time.sleep(0.05)

def predict_data():
    producer = KafkaProducerHandler(KAFKA_BROKER)
    redis_handler = RedisHandler('host.docker.internal', 6379, 0)
    file = MinioClient("host.docker.internal:9000", "admin", "admin123").get_object("my-bucket", "mnist.csv")
    data = np.genfromtxt(io.StringIO(file.read().decode('utf-8')), delimiter=',', skip_header=1)
    features = data[:, :-1]
    for idx, batch in enumerate(batch_generator(features, 1000), start=0):
        redis_handler.set(f"initial_data:{idx}", batch, True, 1000)
        producer.send('activate-layer', f"layer_0|{idx}|1000|784", 0)
    producer.close()

# Load network and dataset
data = json.load(open("node_based_model.json"))
#df = pd.read_csv('data/mnist.csv').head(10)

neurons = []
layers = []
coordinators = []

for layer_name, layer_info in data.items():
    neurons += [Neuron(layer_id=layer_name, neuron_id=i, weights=node['weights'], bias=node['biases'], activation=node['activation'], is_final_layer=(layer_name == list(data.keys())[-1])) for i, node in enumerate(layer_info['nodes'])]
    layers.append(Layer(layer_id=layer_name, neuron_count=len(layer_info['nodes'])))
    coordinators.append(LayerCoordinator(layer_id=layer_name, neuron_count=len(layer_info['nodes']), is_final_layer=(layer_name == list(data.keys())[-1])))

# Start all threads
for thread in neurons + layers + coordinators:
    thread.start()

print("Threads started")

predict_data()

%4|1742662485.998|TERMINATE|rdkafka#producer-563| [thrd:app]: Producer terminating with 1 message (18 bytes) still in queue or transit: use flush() to wait for outstanding message delivery


Threads started


%4|1742662489.279|OFFSET|rdkafka#consumer-847| [thrd:main]: activate-layer [0]: offset reset (at offset 4244 (leader epoch 0), broker 1) to offset END (leader epoch -1): fetch failed due to requested offset not available on the broker: Broker: Offset out of range


✅ Activated Neuron 0 in layer_0 for image 0
✅ Activated Neuron 1 in layer_0 for image 0
✅ Activated Neuron 2 in layer_0 for image 0
✅ Activated Neuron 3 in layer_0 for image 0
✅ Activated Neuron 4 in layer_0 for image 0
✅ Activated Neuron 5 in layer_0 for image 0
✅ Activated Neuron 6 in layer_0 for image 0
✅ Activated Neuron 7 in layer_0 for image 0
✅ Activated Neuron 8 in layer_0 for image 0
✅ Activated Neuron 9 in layer_0 for image 0
✅ Activated Neuron 10 in layer_0 for image 0
✅ Activated Neuron 11 in layer_0 for image 0
✅ Activated Neuron 12 in layer_0 for image 0
✅ Activated Neuron 13 in layer_0 for image 0
✅ Activated Neuron 14 in layer_0 for image 0
✅ Activated Neuron 15 in layer_0 for image 0
✅ Activated Neuron 16 in layer_0 for image 0
✅ Activated Neuron 17 in layer_0 for image 0
✅ Activated Neuron 18 in layer_0 for image 0
✅ Activated Neuron 19 in layer_0 for image 0
✅ Activated Neuron 20 in layer_0 for image 0
✅ Activated Neuron 21 in layer_0 for image 0
✅ Activated Neuron 2

%5|1742662539.961|REQTMOUT|rdkafka#consumer-847| [thrd:GroupCoordinator]: GroupCoordinator/1: Timed out LeaveGroupRequest in flight (after 5014ms, timeout #0)
%5|1742662539.961|REQTMOUT|rdkafka#consumer-849| [thrd:GroupCoordinator]: GroupCoordinator/1: Timed out LeaveGroupRequest in flight (after 5014ms, timeout #0): possibly held back by preceeding HeartbeatRequest with timeout in 29158ms
%4|1742662539.961|REQTMOUT|rdkafka#consumer-849| [thrd:GroupCoordinator]: GroupCoordinator/1: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%4|1742662539.961|REQTMOUT|rdkafka#consumer-847| [thrd:GroupCoordinator]: GroupCoordinator/1: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%3|1742662539.963|FAIL|rdkafka#consumer-849| [thrd:GroupCoordinator]: GroupCoordinator: host.docker.internal:9092: 1 request(s) timed out: disconnect (average rtt 711.432ms) (after 53711ms in state UP)
%3|1742662539.963|FAIL|rdkafka#consumer-847| [thrd:GroupCoord

In [6]:
# file = MinioClient("host.docker.internal:9000", "admin", "admin123").get_object("my-bucket", "mnist.csv")
# data = np.genfromtxt(io.StringIO(file.read().decode('utf-8')), delimiter=',', skip_header=1)
# features = data[:, :-1]
# labels = data[:, -1]

# redis_handler = RedisHandler('host.docker.internal', 6379, 0)

# pipe = redis_handler.pipeline()
# for idx, label in enumerate(labels):
#     pipe.hset("images_label", str(idx), int(label))

# pipe.execute()

In [7]:
# for idx, batch in enumerate(batch_generator(features, 1000), start=0):
#     redis_handler.set(f"initial_data:{idx}", batch, True, 1000)

# batch = redis_handler.get_batch("initial_data:0", 1000, 784)
# print(batch[0])

In [8]:
redis_handler = RedisHandler('host.docker.internal', 6379, 0)
file = MinioClient("host.docker.internal:9000", "admin", "admin123").get_object("my-bucket", "mnist.csv")
data = np.genfromtxt(io.StringIO(file.read().decode('utf-8')), delimiter=',', skip_header=1)
features = data[:, :-1]
labels = data[:, -1]
pipe = redis_handler.pipeline()
for idx, label in enumerate(labels):
    pipe.hset("images_label", str(idx), int(label))
pipe.execute()

[0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
