# Proactive autoscaler testing

In [None]:
from asyncio import sleep
from prometheus_client import Summary, Counter, start_http_server, Gauge
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY, CollectorRegistry
import requests
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import time
from tensorflow.keras.models import load_model
import math
from sklearn.preprocessing import MinMaxScaler



from kubernetes import client, config
    # Constants
from kubernetes.client.rest import ApiException
from pprint import pprint

#INTERVAL=input("enter the time interval in seconds for next prediction ")
output_data = [0,0,0,0,0,0,0,0,0,0]
output_from_prom = [0,0,0,0,0,0,0,0,0,0]
results=''
while True:

    # Step 1: Query the data from Prometheus
    query = 'haproxy_frontend_http_responses_total'

    
    #end_time = datetime.utcnow()
    #start_time = end_time - timedelta(minutes=10) # last 10 data points
    #step = '15s' # adjust this based on your data resolution
    #url = 'http://localhost:9091/query_range?query={}&start={}&end={}&step={}'.format(query, start_time.isoformat(), end_time.isoformat(), step)
    #response = requests.get(url)
    url='http://localhost:9091/api/v1/query?query=haproxy_frontend_http_responses_total'
    response = requests.get(url)
    #print(response)
    if response.status_code == 200:
        data = response.json()['data']['result']
        #print(data[0])
    else:
        raise PrometheusException("Failed to query Prometheus: {}".format(response.content))
    
    
    time.sleep(10)
    
    cur_req_val = int(data[2]['value'][1])
    print(cur_req_val)
    
    now = datetime.now()
    current_time = now.strftime("%H:%M:%S")
    results+=','+str(current_time)
    results+=','+str(cur_req_val)
    

    if len(output_data) == 0:
        output_data.append(cur_req_val)
    else:
         
        output_data.append(cur_req_val-output_from_prom[-1])
        output_data.pop(0)
        
        output_from_prom.append(cur_req_val)
        output_from_prom.pop(0)

    #print(output_data,output_from_prom)
    
    #Step 2: Preprocess the data
  

    df = pd.DataFrame(output_data)
    input_data = df.values # convert to NumPy array
    #print(input_data)

    scaler = MinMaxScaler(feature_range=(0, 1))
    df_input_data = scaler.fit_transform(input_data)


    input_data_reshaped = np.array(df_input_data).reshape((1,10,1))


    #print(input_data_reshaped)



    # Step 3: Load the pre-trained modelbi_lstmwith_paper_specs.h5
    model = load_model('bi_lstmwith_paper_specs.h5')

    # Step 4: Make predictions using the loaded model
    #print("input to model",input_data_reshaped)
    predictions = model.predict(input_data_reshaped)



    testPredict = scaler.inverse_transform(predictions)
    predicted_workload=math.ceil(testPredict)
    predicted_workload+= cur_req_val
    print("predicted workload(in rps):",predicted_workload)
    
    # step 5:actual pods scaling
    NAMESPACE = "default"
    DEPLOYMENT_NAME = "loadtestv2"
    CPU_THRESHOLD = 80
    RRS = 0.1  # Remove 10% of the surplus pods in case of scaling in
    POD_NAME = "loadtestv2-969c8b8bb-6m2f9"
    CONTAINER_NAME="loadtestv2"
    INTERVAL = 60.0  # seconds
    pods_minimum=3
   


    # Initialize Kubernetes client
    config.load_kube_config()
    app_api = client.AppsV1Api()
    api=client.CoreV1Api()
    custom_obj_api = client.CustomObjectsApi()

    #hpa_api=client.AutoscalingV2Api()



    # Helper function to get CPU and memory resources of a pod
    def get_pod_resources():
        api_response = api.read_namespaced_pod(name=POD_NAME, namespace=NAMESPACE)
        cpu_capacity =api_response.spec.containers[0].resources.limits["cpu"]
        memory_capacity = api_response.spec.containers[0].resources.limits["memory"]
        return cpu_capacity, memory_capacity

    # Function to calculate maximum workload
    def calculate_max_workload():
        cpu_capacity, memory_capacity = get_pod_resources()
        print( cpu_capacity, memory_capacity)
        cpu_cores = int(cpu_capacity[:-1])# remove the 'm' suffix
        memory_mb = int(memory_capacity[:-2])# remove the 'Mi' suffix
        cpu_utilization = 1  # assume maximum CPU utilization
        memory_usage_per_unit = 1  # assume constant memory usage per unit of workload
        max_workload_cpu = cpu_cores * cpu_utilization
        max_workload_memory = memory_mb / memory_usage_per_unit
        #max_workload = min(max_workload_cpu, max_workload_memory)
        max_workload =max_workload_cpu

        metrics = api.read_namespaced_pod_status(name=POD_NAME, namespace=NAMESPACE)
        resource = custom_obj_api.list_namespaced_custom_object(group="metrics.k8s.io",version="v1beta1", namespace=NAMESPACE, plural="pods")
        #pprint(resource)
        avg_cpu_usage = 0
        avg_mem_usage = 0
        count=0
        for pod in resource['items']:
            for container in pod['containers']:
                if container['name']==CONTAINER_NAME:
                    count+=1
                    #print(container['usage'])
                    cpu_usage = container['usage']["cpu"]
                    mem_usage = container['usage']["memory"]
                    if cpu_usage!='0':
                        cpu_usage = float(cpu_usage[:-1])# remove the 'n' suffix
                    else:
                        cpu_usage= float(cpu_usage)
                    if mem_usage!='0':
                        mem_usage = float(mem_usage[:-2])# remove the 'Ki' suffix
                    else:
                        mem_usage=float(mem_usage)
                    #convert Ki to Mi and n to m
                    mem_usage*=0.000976562
                    cpu_usage*=0.000001
                    avg_cpu_usage+=cpu_usage
                    avg_mem_usage+=mem_usage
                    
                     
                
        avg_cpu_usage/=count
        avg_mem_usage/=count
        print(avg_cpu_usage,avg_mem_usage)
        #curr_bottlenecked_resource_utilization=min(cpu_usage,mem_usage)#need this value in 'm' if cpu is considered or in Mi if memory is a bottleneck
        curr_bottlenecked_resource_utilization=cpu_usage
        if  curr_bottlenecked_resource_utilization==0:
            curr_bottlenecked_resource_utilization=1
        curr_rps= cur_req_val 
        if curr_rps==0:
            curr_rps=1
                    #can fetch the current rps from prometheus
        print("curr rps",curr_rps)
#         max_workload_per_pod=(max_workload*curr_rps)/curr_bottlenecked_resource_utilization # in rps 
        max_workload_per_pod=99999999999
        if(abs(max_workload-curr_bottlenecked_resource_utilization)<=2):
            max_workload_per_pod=curr_rps
            
        return max_workload_per_pod


    # Helper function to get the current number of replicas
    def get_replica_count():
        
        try:
            deployment = app_api.read_namespaced_deployment(name=DEPLOYMENT_NAME, namespace=NAMESPACE)
            #pprint(deployment)
        except ApiException as e:
            print("Exception when calling AppsV1Api->read_namespaced_deployment: %s\n" % e)
        
        #deployment = app_api.read_namespaced_deployment(name=DEPLOYMENT_NAME, namespace=NAMESPACE)
        return deployment.spec.replicas

    # Helper function to scale up
    def scale_up(replicas):
        app_api.patch_namespaced_deployment_scale(
            name=DEPLOYMENT_NAME,
            namespace=NAMESPACE,
            body={"spec": {"replicas": int(replicas)}}
        )
        print(f"Scaled up to {int(replicas)} replicas")

    # Helper function to scale down
    def scale_down(replicas):
        app_api.patch_namespaced_deployment_scale(
            name=DEPLOYMENT_NAME,
            namespace=NAMESPACE,
            body={"spec": {"replicas": int(replicas)}}
        )
        print(f"Scaled down to {int(replicas)} replicas")

    # Main loop
    if True:
        max_workload_per_pod = calculate_max_workload() 
        print("max workload per pod: ", max_workload_per_pod)
        predicted_future_pods=predicted_workload/max_workload_per_pod
        predicted_future_pods=int(predicted_future_pods)
        #predicted future pods should be in whole integer number not float
        print("predicted future pods: ",predicted_future_pods)
        current_pods=get_replica_count()

 

        if current_pods+predicted_future_pods>current_pods:
            results+=','+str(int(predicted_future_pods+current_pods))
            scale_up(predicted_future_pods+current_pods)
        elif predicted_future_pods<current_pods:
            #print("future pods",future_pods)
            future_pods=max(predicted_future_pods,pods_minimum)
            pods_surplus=(current_pods-future_pods)*RRS
            future_pods_for_scaling_down=current_pods-pods_surplus
            results+=','+str(int(future_pods_for_scaling_down))
            scale_down(future_pods_for_scaling_down)
        else:
            print("No scaling required")
        #time.sleep(INTERVAL)
#     savetxt('results.csv', data, delimiter=',')
    results+='\n'
    with open('results.csv','a') as fd:
        fd.write(results)
    results=''
    
    
    
    
   
    
    
   

2024-01-08 15:23:43.116092: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-01-08 15:23:43.139447: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


0


2024-01-08 15:23:54.033753: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:266] failed call to cuInit: CUDA_ERROR_UNKNOWN: unknown error
2024-01-08 15:23:54.033782: I tensorflow/compiler/xla/stream_executor/cuda/cuda_diagnostics.cc:168] retrieving CUDA diagnostic information for host: csemtechgpu
2024-01-08 15:23:54.033788: I tensorflow/compiler/xla/stream_executor/cuda/cuda_diagnostics.cc:175] hostname: csemtechgpu
2024-01-08 15:23:54.033908: I tensorflow/compiler/xla/stream_executor/cuda/cuda_diagnostics.cc:199] libcuda reported version is: 535.129.3
2024-01-08 15:23:54.033923: I tensorflow/compiler/xla/stream_executor/cuda/cuda_diagnostics.cc:203] kernel reported version is: 525.60.13
2024-01-08 15:23:54.033927: E tensorflow/compiler/xla/stream_executor/cuda/cuda_diagnostics.cc:312] kernel version 525.60.13 does not match DSO version 535.129.3 -- cannot find working devices in this configuration


predicted workload(in rps): 1
500m 512Mi
0.0046570000000000005 17.316397384000002
curr rps 1
max workload per pod:  99999999999
predicted future pods:  0
Scaled down to 3 replicas
0
predicted workload(in rps): 1
500m 512Mi
0.0066103333333333335 17.316397384000002
curr rps 1
max workload per pod:  99999999999
predicted future pods:  0
Scaled down to 3 replicas
0
predicted workload(in rps): 1
500m 512Mi
0.0066103333333333335 17.316397384000002
curr rps 1
max workload per pod:  99999999999
predicted future pods:  0
Scaled down to 3 replicas
0
predicted workload(in rps): 1
500m 512Mi
0.005085 17.316397384000002
curr rps 1
max workload per pod:  99999999999
predicted future pods:  0
Scaled down to 3 replicas
0
predicted workload(in rps): 1
500m 512Mi
0.007461 17.316397384000002
curr rps 1
max workload per pod:  99999999999
predicted future pods:  0
Scaled down to 3 replicas
0
predicted workload(in rps): 1
500m 512Mi
0.007461 17.316397384000002
curr rps 1
max workload per pod:  99999999999
p

2571118
predicted workload(in rps): 2805444
500m 512Mi
495.38359136363636 81.011677272
curr rps 2571118
max workload per pod:  2571118
predicted future pods:  1
Scaled up to 12 replicas
2571118
predicted workload(in rps): 2705579
500m 512Mi
495.0016119 82.3331609704
curr rps 2571118
max workload per pod:  2571118
predicted future pods:  1
Scaled up to 13 replicas
2840982
predicted workload(in rps): 3093598
500m 512Mi
480.38137863636365 80.53440478909091
curr rps 2840982
max workload per pod:  2840982
predicted future pods:  1
Scaled up to 14 replicas
3107759
predicted workload(in rps): 3364883
500m 512Mi
483.4766772307693 78.9611975526154
curr rps 3107759
max workload per pod:  3107759
predicted future pods:  1
Scaled up to 15 replicas
3107759
predicted workload(in rps): 3252222
500m 512Mi
496.96305185714294 78.50470310914285
curr rps 3107759
max workload per pod:  3107759
predicted future pods:  1
Scaled up to 16 replicas
3476234
predicted workload(in rps): 3782990
500m 512Mi
496.9630

# HPA  Testing

In [None]:
from kubernetes import client, config
from asyncio import sleep
from prometheus_client import Summary, Counter, start_http_server, Gauge
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY, CollectorRegistry
import requests
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import time
from tensorflow.keras.models import load_model
import math
from sklearn.preprocessing import MinMaxScaler

def get_running_pods(namespace, deployment_name):
    # Load kubeconfig file (or use in-cluster config if running inside a pod)
    config.load_kube_config()

    # Create Kubernetes API client
    api_instance = client.AppsV1Api()
    api=client.CoreV1Api()
    


    try:
        # Get the deployment object
        deployment = api_instance.read_namespaced_deployment(deployment_name, namespace)

        # Get the number of replicas (desired replicas)
        desired_replicas = deployment.spec.replicas

        # Get the number of currently running pods
        label_selector = f"app={deployment_name}"
        running_pods = api.list_namespaced_pod(
            namespace,
            label_selector=label_selector,
        )

        # Print the results
        print(f"Deployment: {deployment_name}")
        print(f"Desired Replicas: {desired_replicas}")
        print(f"Running Pods: {len(running_pods.items)}")

    except Exception as e:
        print(f"Error: {e}")
              
    return desired_replicas

results=''              
while(True):
    # Step 1: Query the data from Prometheus
    query = 'haproxy_frontend_http_responses_total'
    url='http://localhost:9091/api/v1/query?query=haproxy_frontend_http_responses_total'
    response = requests.get(url)
    #print(response)
    if response.status_code == 200:
        data = response.json()['data']['result']
        #print(data[0])
    else:
        raise PrometheusException("Failed to query Prometheus: {}".format(response.content))
    
    
    time.sleep(10)
    
    cur_req_val = int(data[2]['value'][1])
    print(cur_req_val)
    
    now = datetime.now()
    current_time = now.strftime("%H:%M:%S")
    results+=','+str(current_time)
    results+=','+str(cur_req_val)
    results+=','+str(get_running_pods("default", "loadtestv2"))
    results+='\n'
    with open('hpa_results.csv','a') as fd:
        fd.write(results)
    results=''
    time.sleep(10)


2024-01-08 15:38:56.267264: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-01-08 15:38:56.291068: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


0
Deployment: loadtestv2
Desired Replicas: 5
Running Pods: 5
0
Deployment: loadtestv2
Desired Replicas: 5
Running Pods: 5
102526
Deployment: loadtestv2
Desired Replicas: 5
Running Pods: 5
239597
Deployment: loadtestv2
Desired Replicas: 7
Running Pods: 7
526075
Deployment: loadtestv2
Desired Replicas: 7
Running Pods: 7
681773
Deployment: loadtestv2
Desired Replicas: 10
Running Pods: 7
1024130
Deployment: loadtestv2
Desired Replicas: 10
Running Pods: 10
1200679
Deployment: loadtestv2
Desired Replicas: 10
Running Pods: 10
1394645
Deployment: loadtestv2
Desired Replicas: 14
Running Pods: 14
1815428
Deployment: loadtestv2
Desired Replicas: 14
Running Pods: 14
2001189
Deployment: loadtestv2
Desired Replicas: 14
Running Pods: 14
2256506
Deployment: loadtestv2
Desired Replicas: 17
Running Pods: 17
2842718
Deployment: loadtestv2
Desired Replicas: 20
Running Pods: 20
3174001
Deployment: loadtestv2
Desired Replicas: 20
Running Pods: 20
3879112
Deployment: loadtestv2
Desired Replicas: 27
Running P

# Dev environment(Proactive autoscaler)

In [23]:
from asyncio import sleep
from prometheus_client import Summary, Counter, start_http_server, Gauge
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY, CollectorRegistry
import requests
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import time
from tensorflow.keras.models import load_model
import math
from sklearn.preprocessing import MinMaxScaler

import pprint

from kubernetes import client, config
    # Constants
from kubernetes.client.rest import ApiException
from pprint import pprint
predicted_workload=5254337
if(True):
    # step 5:actual pods scaling
    NAMESPACE = "default"
    DEPLOYMENT_NAME = "loadtestv2"
    CPU_THRESHOLD = 80
    RRS = 0.1  # Remove 10% of the surplus pods in case of scaling in
    POD_NAME = "loadtestv2-969c8b8bb-bqk69"
    INTERVAL = 60.0  # seconds
    pods_minimum=3
   


    # Initialize Kubernetes client
    config.load_kube_config()
    app_api = client.AppsV1Api()
    api=client.CoreV1Api()
    custom_obj_api = client.CustomObjectsApi()

    

    #hpa_api=client.AutoscalingV2Api()



    # Helper function to get CPU and memory resources of a pod
    def get_pod_resources():
        api_response = api.read_namespaced_pod(name=POD_NAME, namespace=NAMESPACE)
        cpu_capacity =api_response.spec.containers[0].resources.limits["cpu"]
        memory_capacity = api_response.spec.containers[0].resources.limits["memory"]
        return cpu_capacity, memory_capacity

    # Function to calculate maximum workload
    def calculate_max_workload():
        cpu_capacity, memory_capacity = get_pod_resources()
        print( cpu_capacity, memory_capacity)
        cpu_cores = int(cpu_capacity[:-1])# remove the 'm' suffix
        memory_mb = int(memory_capacity[:-2])# remove the 'Mi' suffix
        cpu_utilization = 1  # assume maximum CPU utilization
        memory_usage_per_unit = 1  # assume constant memory usage per unit of workload
        max_workload_cpu = cpu_cores * cpu_utilization
        max_workload_memory = memory_mb / memory_usage_per_unit
        #max_workload = min(max_workload_cpu, max_workload_memory)
        max_workload =max_workload_cpu

        metrics = api.read_namespaced_pod_status(name=POD_NAME, namespace=NAMESPACE)
        resource = custom_obj_api.list_namespaced_custom_object(group="metrics.k8s.io",version="v1beta1", namespace=NAMESPACE, plural="pods")
        #pprint(resource)
        for pod in resource['items']:
               if pod['metadata']['name']==POD_NAME:
                    print(pod['containers'][0]['usage'])
                    cpu_usage = pod['containers'][0]['usage']["cpu"]
                    mem_usage=pod['containers'][0]['usage']["memory"]
                    if cpu_usage!='0':
                        cpu_usage = int(cpu_usage[:-1])# remove the 'n' suffix
                    else:
                        cpu_usage= int(cpu_usage)
                    if mem_usage!='0':
                        mem_usage = int(mem_usage[:-2])# remove the 'Ki' suffix
                    else:
                        mem_usage=int(mem_usage)
                    #convert Ki to Mi and n to m
                    mem_usage*=0.000976562
                    cpu_usage*=0.000001    
                    print(cpu_usage,mem_usage)
      
        #curr_bottlenecked_resource_utilization=min(cpu_usage,mem_usage)#need this value in 'm' if cpu is considered or in Mi if memory is a bottleneck
        curr_bottlenecked_resource_utilization=cpu_usage
        if  curr_bottlenecked_resource_utilization==0:
            curr_bottlenecked_resource_utilization=1
        curr_rps= cur_req_val 
        if curr_rps==0:
            curr_rps=1
                    #can fetch the current rps from prometheus
        print("curr rps",curr_rps)
#         max_workload_per_pod=(max_workload*curr_rps)/curr_bottlenecked_resource_utilization # in rps 
        max_workload_per_pod=99999999999
        if(abs(max_workload-curr_bottlenecked_resource_utilization)<=2):
            max_workload_per_pod=curr_rps
            
        return max_workload_per_pod



    # Helper function to get the current number of replicas
    def get_replica_count():
        
        try:
            deployment = app_api.read_namespaced_deployment(name=DEPLOYMENT_NAME, namespace=NAMESPACE)
            #pprint(deployment)
        except ApiException as e:
            print("Exception when calling AppsV1Api->read_namespaced_deployment: %s\n" % e)
        
        #deployment = app_api.read_namespaced_deployment(name=DEPLOYMENT_NAME, namespace=NAMESPACE)
        return deployment.spec.replicas

    # Helper function to scale up
    def scale_up(replicas):
        app_api.patch_namespaced_deployment_scale(
            name=DEPLOYMENT_NAME,
            namespace=NAMESPACE,
            body={"spec": {"replicas": int(replicas)}}
        )
        print(f"Scaled up to {int(replicas)} replicas")

    # Helper function to scale down
    def scale_down(replicas):
        app_api.patch_namespaced_deployment_scale(
            name=DEPLOYMENT_NAME,
            namespace=NAMESPACE,
            body={"spec": {"replicas": int(replicas)}}
        )
        print(f"Scaled down to {int(replicas)} replicas")

    # Main loop
    if True:
        max_workload_per_pod = calculate_max_workload()
        print("max workload per pod: ", max_workload_per_pod)
        predicted_future_pods=predicted_workload/max_workload_per_pod
        print("predicted future pods: ",predicted_future_pods)
        current_pods=get_replica_count()
 
        

        if predicted_future_pods>current_pods:
            results+=','+str(int(predicted_future_pods))
            scale_up(predicted_future_pods)
        elif predicted_future_pods<current_pods:
            #print("future pods",future_pods)
            future_pods=max(predicted_future_pods,pods_minimum)
            pods_surplus=(current_pods-future_pods)*RRS
            future_pods_for_scaling_down=current_pods-pods_surplus
            results+=','+str(int(future_pods_for_scaling_down))
            scale_down(future_pods_for_scaling_down)
        else:
            print("No scaling required")
        #time.sleep(INTERVAL)
#     savetxt('results.csv', data, delimiter=',')
    
    
    
    
   
    
    
   

500m 512Mi
{'cpu': '8441n', 'memory': '15724Ki'}
0.008440999999999999 15.355460888000001
max workload per pod:  31098211112.42744
predicted future pods:  0.00016895946139809523


NameError: name 'results' is not defined

# Load testing

In [1]:
import requests
import concurrent.futures

def make_request(url):
    try:
        response = requests.get(url)
        # You can customize this based on your application's response
        if response.status_code == 200:
            return True
        else:
            return False
    except Exception as e:
        print(f"Error: {e}")
        return False

def test_concurrent_requests(url, num_requests):
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_requests) as executor:
        # Use a list to store the futures
        futures = [executor.submit(make_request, url) for _ in range(num_requests)]

        # Wait for all futures to complete
        concurrent.futures.wait(futures)

        # Get the results
        results = [future.result() for future in futures]

    # Count successful requests
    successful_requests = sum(results)
    print(f"Successful requests: {successful_requests}/{num_requests}")

if __name__ == "__main__":
    # Set your endpoint and the number of concurrent requests
    endpoint_url = "http://localhost:9999/hello"
    num_concurrent_requests = 100000

    test_concurrent_requests(endpoint_url, num_concurrent_requests)


Successful requests: 100000/100000
