In [24]:
from ortools.linear_solver import pywraplp
from dataclasses import dataclass
import pandas as pd   
from datetime import datetime
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import threading
import random
import concurrent
from packaging import version

if version.parse(pd.__version__) < version.parse("1.2.0"):
    print("Pandas 1.2 required")

## Load Pods


In [137]:
pods_data_src = pd.read_csv("DEV_10_dec - pods.csv")

pods_data_src['req_cpu_milli_core'] = pd.to_numeric(pods_data_src['req_cpu_milli_core'])

pods_data_src['req_mem_mb'] = pd.to_numeric(pods_data_src['req_mem_byte']) / 1000000

#set zero values to a low value
pods_data_src.loc[pods_data_src.req_mem_mb == 0, 'req_mem_mb'] = 0.001
pods_data_src.loc[pods_data_src.req_cpu_milli_core == 0, 'req_cpu_milli_core'] = 1

pods_data_src.owner_name.fillna(pods_data_src.pod_name, inplace=True)



## Load Nodes

In [138]:
import re

#regex to clean out any text after the number
def clean_data(value):
    return (re.sub(r'^([0-9]+).*', '\\1', value))


In [139]:
nodes_data_src = pd.read_csv("instances.csv")

nodes_data_src.vCPUs = nodes_data_src.vCPUs.apply(clean_data)

nodes_data_src['cpu']  = pd.to_numeric(nodes_data_src.vCPUs, errors='coerce')*1000

nodes_data_src['memory'] = pd.to_numeric(nodes_data_src.Memory, errors='coerce')*1000

nodes_data_src['cost'] =  pd.to_numeric(nodes_data_src['Linux Reserved cost'], errors='coerce')

nodes_data_src = nodes_data_src[['API Name','Name','cpu', 'memory', 'cost']]





In [140]:
#Create a data model
#
def create_data_model(cpu, memory, pods):
    """Create the data for the example."""
    data = {}    
    data['req_cpu'] = pods['req_cpu_milli_core'].tolist()
    data['req_memory'] = pods['req_mem_mb'].tolist()
    data['items'] = list(range(len( pods)))
    data['bins'] = data['items']
    data['cpu_capacity'] = cpu
    data['memory_capacity'] = memory
    return data

In [141]:
def create_solver(data):
    x = {}
    y = {}
    # Create the mip solver with the SCIP backend.
    solver = pywraplp.Solver.CreateSolver('SCIP')
    #solver.SetNumThreads(6)
    
   # solver.SetTimeLimit(30000)

    # Variables
    # x[i, j] = 1 if item i is packed in bin j.

    for i in data['items']:
        for j in data['bins']:
            x[(i, j)] = solver.IntVar(0, 1, 'x_%i_%i' % (i, j))

    # y[j] = 1 if bin j is used.

    for j in data['bins']:
        y[j] = solver.IntVar(0, 1, 'y[%i]' % j)
    
  
    # Constraints
    # Each item must be in exactly one bin.
    for i in data['items']:
        solver.Add(sum(x[i, j] for j in data['bins']) == 1)


    # The amount packed in each bin cannot exceed its capacity.
    for j in data['bins']:
        solver.Add(
            sum(x[(i, j)] * data['req_cpu'][i] for i in data['items']) <= y[j] * data['cpu_capacity'])


    for j in data['bins']:
        solver.Add(
            sum(x[(i, j)] * data['req_memory'][i] for i in data['items']) <= y[j] * data['memory_capacity'])
    
    
    return solver, x, y

In [142]:
def solve(solver, data, x, y, pods):
    solver.Minimize(solver.Sum([y[j] for j in data['bins']]))
    status = solver.Solve()
    solution = pd.DataFrame(columns = pods.columns, index = pods.index).sort_index().truncate(-1, -1 ).reindex()
    solution['node_name'] = "" #TODO - fix the dataframe definition to include all columns
    if status == pywraplp.Solver.OPTIMAL:
        num_bins = 0.
        for j in data['bins']:
            if y[j].solution_value() == 1:
                bin_items = []
                bin_pods = pd.DataFrame(columns = pods.columns, index = pods.index).sort_index().truncate(-1, -1 ).reindex()
                bin_cpu = 0
                bin_memory = 0
                for i in data['items']:
                    if x[i, j].solution_value() > 0:
                        bin_items.append(i)
                       # print(pods.iloc[[i]])
                        bin_pods = bin_pods.append(pods.iloc[[i]])
                        bin_pods['node_name'] = 'node' + str(j)
                        bin_cpu += data['req_cpu'][i]
                        bin_memory += data['req_memory'][i]
                if bin_cpu > 0 or bin_memory>0:
                    solution = solution.append(bin_pods.copy())
                    num_bins += 1

    else:
        print('The problem does not have an optimal solution.')
        
    return solution

In [143]:
def get_solution(node_group, curr_node, pods_data):
    global solutions
    #print (f"{datetime.now().strftime('%D %H:%M:%S')}: Solving for {curr_node['API Name']} (cpu: {curr_node.cpu}, memory: {curr_node.memory})")
    
    #pod_placement = pd.DataFrame(['node_name', 'node_type', 'node_cpu', 'node_memory'] + [pods_data.columns])
    
    if (len(solutions[(solutions.cpu == curr_node.cpu) & (solutions.memory == curr_node.memory)]) == 0):
        
        data = create_data_model(curr_node.cpu, curr_node.memory, pods_data)
        solver,x,y = create_solver(data)
        pod_placement = solve(solver, data, x, y, pods_data)    
        if len(pod_placement) > 0:
            i = 5
           # print (f"{datetime.now().strftime('%D %H:%M:%S')}: Solution for {curr_node['API Name']}: {len(pod_placement.node_name.unique())} nodes, cost: {len(pod_placement) * curr_node.cost}")
        else:
            i= 6
           # print (f"{datetime.now().strftime('%D %H:%M:%S')}: No solution found for {curr_node['API Name']}")
    else:
        old_node = solutions[(solutions.cpu == curr_node.cpu) & (solutions.memory == curr_node.memory)].iloc[0]
        pod_placement = old_node.pod_placement.copy()        
        #print (f"{datetime.now().strftime('%D %H:%M:%S')}: Reusing solution for {old_node['name']}")
            
    pod_placement['node_type'] = curr_node['API Name']
    pod_placement['node_cpu'] = curr_node.cpu
    pod_placement['node_memory'] = curr_node.memory
    pod_placement['node_group'] = node_group
    
    #solutions = solutions.append({'name': curr_node['API Name'], 'cpu': curr_node['cpu'], 'memory': curr_node['memory'], 
    #                 'num_nodes': len(solution), 'cost': curr_cost, 
    #                 'solution': solution}, ignore_index = True)
    
    #if curr_cost < min_cost:
    #    best_node = nodes_data.iloc[i]
    #    min_cost = curr_cost
    #    best_solution = solution
    
    #print(solutions.columns)
        
    return pd.DataFrame(
        np.array([node_group, curr_node['API Name'], curr_node['cpu'], curr_node['memory'],
                len(pod_placement.node_name.unique()), curr_node.cost * len(pod_placement.node_name.unique()), pod_placement]).reshape(1,7),
        columns = solutions.columns)

In [None]:
from multiprocessing.pool import ThreadPool

all_solutions = pd.DataFrame(columns = ["node_group", "name", "cpu", "memory", "num_nodes", "cost", "pod_placement"])

for node_group in pods_data_src.node_group.unique():
    
    print(f"Starting nodegroup {node_group}... {datetime.now().strftime('%D %H:%M:%S')}")

    pods_data = pods_data_src[(pods_data_src.node_group == node_group) &  
                              (pods_data_src.owner_kind != 'DaemonSet') & 
                              (pods_data_src.req_cpu_milli_core>0) &
                              (pods_data_src.req_mem_byte>0) ][['namespace', 'owner_name', 'req_mem_mb', 'req_cpu_milli_core' ]]

    daemonset = pods_data_src[(pods_data_src.node_group == node_group) & (pods_data_src.owner_kind == 'DaemonSet')].\
                groupby([ "owner_name", "namespace"]).agg({'req_cpu_milli_core':'mean', 'req_mem_mb':'mean'})
    
    overhead = {'cpu': daemonset.req_cpu_milli_core.sum(), 'memory': daemonset.req_mem_mb.sum()}
    
    nodes_data = nodes_data_src[(nodes_data_src.cpu >= pods_data.req_cpu_milli_core.max()) & 
                                (nodes_data_src.memory >= pods_data.req_mem_mb.max()) ]

    nodes_data.cpu = nodes_data.cpu - overhead['cpu']
    nodes_data.memory = nodes_data.memory - overhead['memory']

    print(f"Nodegroup {node_group}. Detected {len(nodes_data)} suitable nodes out of {total_node_types} total")

    pods = pods_data

    nodes_data = nodes_data
    
    solutions = pd.DataFrame(columns = all_solutions.columns)

    for i in range(1, len(nodes_data)):
        curr_node = nodes_data.iloc[i]    
        solution = get_solution(node_group, curr_node, pods)
        solutions = solutions.append(solution, ignore_index = True)

        
    #Add daemonsets 
    for i in range(0, len(solutions) -1):
        all_nodes = solutions.iloc[i].pod_placement.groupby(['node_group', 'node_name', 'node_type', 'node_cpu', 'node_memory']).size().\
                reset_index().iloc[:,0:5]  

        daemonset_placement = all_nodes.merge(daemonset.reset_index(), how='cross', left_on=None, right_on=None)

        solutions.iloc[i].pod_placement  = solutions.iloc[i].pod_placement.append(daemonset_placement)

    solutions.node_group = node_group
    
    all_solutions = all_solutions.append(solutions.copy())
    
    
print("All solutions found")



In [127]:
#solutions[solutions.columns[:-1]]
len(solutions)
#display(solutions[["name", "cpu", "memory", "num_nodes", "cost"]].sort_values(by = "cost",
#                    ascending = "False").reset_index(drop = True))

solutions[["node_group", "name", "cpu", "memory", "num_nodes", "cost"]].sort_values(by = "cost",
                    ascending = "False").reset_index(drop = True).to_csv("solutions.csv", index=False, )


solutions = solutions[solutions.cost > 0].sort_values(by = "cost", axis=0,
                    ascending = "False").reset_index(drop = True)


In [128]:
best_solution = solutions.iloc[0]

print(f"Best solution: \n")    
print(f"Node: {best_solution['name']}, cpu: {best_solution.cpu},\
          memory: {best_solution.memory}, hourly cost: {best_solution.cost}, number of nodes: {len(best_solution.pod_placement.node_name.unique())}")


#display(best_solution.pod_placement)

best_solution.pod_placement.to_csv('best_solution.csv')


Best solution: 

Node: r5ad.xlarge, cpu: 3588,          memory: 31408.603136, hourly cost: 0.165, number of nodes: 1


In [131]:
all_placements = solutions.iloc[0].pod_placement.copy()

for i in range(1, len(all_solutions)):
    all_placements = all_placements.append( all_solutions.pod_placement.iloc[i].copy())
    
all_placements.to_csv("all_solutions.csv")

In [107]:
all_placements.node_group.unique()

array([1, 3, 4, 5, 0, 2, 9, 7, 6, 8], dtype=int64)

## Stats
#20 nodes - 1 hour with 6 threads
#20 nodes - 1 hour with 3 threads
#20 nodes - 1 hour with 12 threads


#Number of Pods
#200 pods  7 sec
#400 pods - 64 sec
#600 pods - 9 mins
#800 pods - 36 mins
#1000

#2000 pods - stopped after 18 hours?


#on parallelism https://github.com/google/or-tools/issues/1656