In [1]:
import subprocess
import time
import PrometheusMetricsCSV as prom
import pandas as pd
import numpy as np

In [2]:
class Environment:
    
    def __init__(self, numberOfTopics):
        subprocess.call("cd Kafka-Docker && docker-compose up -d", shell=True)
        
        time.sleep(10)
        
        self.n = numberOfTopics
        self.numberOfTopics = self.n
        self.numberOfProducers = self.n
        self.numberOfConsumers = self.n

        self.minNumberOfBrokers = 3
        self.maxNumberOfBrokers = 7

        self.activeBrokers = 3

        self.BROKER = 3
        self.PLISTENER = 19095
        self.ALISTENER = 9095
        self.KAFKA_ID = 4
        self.AGENT = 7074
        
        self.producerLatency = 0
        self.consumerLatency = 0
        
        self.SLA = 300 # se ms , 1s=1000ms
        time.sleep(10)

        for i in range(self.numberOfTopics):
            subprocess.call(["bash", "Scripts/CreateTopics.sh", str(i)])

        for i in range(self.numberOfProducers):
            subprocess.Popen(["bash", "Scripts/Producers.sh", str(i)])

        for i in range(self.numberOfConsumers):
            subprocess.Popen(["bash", "Scripts/Consumers.sh", str(i)])
       
    def state(self):
        while True:
            prom.writeMetricsCSV()
            df = pd.read_csv("prometheus.csv", header=0)
            na = np.array(df['value'])
            if (na.shape)[0] == 37:
                break
        producer = df[(df['name'] == 'quantile_total_time_per_sec:quantile_rate1m') & (df['job'] == 'Produce')]
        consumer = df[(df['name'] == 'quantile_total_time_per_sec:quantile_rate1m') & (df['job'] == 'FetchConsumer')]
        self.producerLatency = producer.iloc[0]['value']
        self.consumerLatency = consumer.iloc[0]['value']
        return np.round(na, 4)
    
    
    def reward(self, producerLatency, consumerLatency, function):
        if function == 0:
            
            return (-1 * self.activeBrokers)
        
        elif function == 1:
            temp = (producerLatency+consumerLatency)
            temp = ((temp-self.SLA)/self.SLA) *100
            print(f"im here:{self.producerLatency},{self.consumerLatency},{temp}")
            cost = -1 * 10
            slaCost = 0
            if temp >= 200:
                slaCost = 2*cost
            elif temp >= 150:
                slaCost = (150/100)*cost
            elif temp >= 100:
                slaCost = cost
            elif temp >= 50:
                slaCost = (25/100)*cost
            elif temp > 0:
                slaCost = (10/100)*cost
            else :
                slaCost = 0

            return slaCost
        else:
            
            normalized =  -1*(self.activeBrokers - self.minNumberOfBrokers)/(self.maxNumberOfBrokers-self.minNumberOfBrokers)
            temp = (producerLatency+consumerLatency)
            temp = ((temp-self.SLA)/self.SLA) *100
            print(f"im here:{self.producerLatency},{self.consumerLatency},{temp}")
            slaCost = -1

            if temp >= 200:
                slaCost = slaCost
            elif temp >= 150:
                slaCost = (80/100)*slaCost
            elif temp >= 100:
                slaCost = (60/100)* slaCost
            elif temp >= 50:
                slaCost = (40/100)*slaCost
            elif temp > 0:
                slaCost = (20/100)*slaCost 
            else :
                slaCost = 0

            return np.round((30/100)*normalized + (70/100)*slaCost, 4)  

    
    def step(self, action, function):
        #We have 3 possible action
        #Scale up
        if action == 0:
            if self.activeBrokers >= self.maxNumberOfBrokers:
                print("No scaling up. We have reached the maximum number of brokers!")
                time.sleep(20)
                state = self.state()
                reward = self.reward(self.producerLatency,self.consumerLatency,function)
                return state, reward
            print("We are scaling up!")
            subprocess.call(["bash", "Scripts/ScaleUp.sh", str(self.BROKER), str(self.PLISTENER), str(self.ALISTENER), str(self.KAFKA_ID), str(self.AGENT)])
            time.sleep(5)
            subprocess.call(["bash", "Scripts/Partitioner.sh", "ScaleUp"])
            self.BROKER += 1
            self.PLISTENER += 1
            self.ALISTENER += 1
            self.KAFKA_ID += 1
            self.AGENT += 1

            self.activeBrokers += 1
        #Scale down
        elif action == 1:
            if self.activeBrokers <= self.minNumberOfBrokers:
                print("No scaling down. The minimun number of brokers must be 3!")
                time.sleep(20)
                state = self.state()
                reward = self.reward(self.producerLatency,self.consumerLatency,function)
                return state, reward
            print("we are scaling down!")
            subprocess.call(["bash", "Scripts/Partitioner.sh", "ScaleDown", str(self.KAFKA_ID - 1)])
            subprocess.call(["bash", "Scripts/ControlledShutdown.sh", str(self.BROKER - 1)])
            self.BROKER -= 1
            self.PLISTENER -= 1
            self.ALISTENER -= 1
            self.KAFKA_ID -= 1
            self.AGENT -= 1

            self.activeBrokers -= 1
        #Do nothing
        else:
            print(f"No scaling action was taken!")
            
        time.sleep(20)
        state = self.state()
        reward = self.reward(self.producerLatency,self.consumerLatency,function)
        return state, reward
    
    def reset(self):
        subprocess.call(["bash", "Scripts/Restart.sh"])

In [3]:
# env = Environment(4)

In [4]:
# env.reset()