In [65]:
import pandas as pd
import math
import numpy as np
from datetime import datetime as dt
import warnings
warnings.filterwarnings("ignore")


In [66]:
def optimize(cluster_perc, MemRequired, NormalizedDataLoad, slices, presliced_flag, VMem, VCores, Nodes , CurrentInfraCalc):
    
    start = dt.now()
    df=pd.read_parquet("./data/factors.parquet")
    df = df.astype({"Cores_x":"int64",	"Memory":"int64",	"node_count":"int64",	"Exec":"int64",	"Cores_y":"int64"})
    if CurrentInfraCalc:
        df = df[df.Cores_x==VCores][df.Memory==VMem][df.node_count<=Nodes]
    
    df["TotalCoresUsed"] = (df.Exec * df.Cores_y)

    #Limit possibilities -??????
    df=df[df.TotalCoresUsed <= 3 * slices]

    df["available_memory_per_node"] = df.Memory * cluster_perc
    df["available_memory"] = df.node_count * df.available_memory_per_node
    df["available_cores_per_node"] = np.floor(df.Cores_x * cluster_perc).astype("int64")
    df["available_cores"] = (df.node_count * df.available_cores_per_node)

    #Constraints 1 and 2
    mask1 = df["TotalCoresUsed"] <= df["available_cores"]
    mask2 = df["Cores_y"] <= df["available_cores_per_node"]
    df = df[mask1][mask2]

    df["TotalMemUsed"] = df.TotalCoresUsed * (MemRequired + NormalizedDataLoad) + df.Exec
    df["MaxExecPerNode"] = np.ceil(df.Exec/df.node_count).astype("int64")
    df["MaxSerialSlices"]=np.ceil(slices/df.TotalCoresUsed).astype("int64")
    df["WorkerMemory"] = round(df.Cores_y * MemRequired, 0)
    df["MemoryOverhead"] = df.WorkerMemory + 1
    df["ExecutorMemory"] = 2 if presliced_flag else round(df.Cores_y * NormalizedDataLoad, 0) 
    df["TotalCoresUsed%"] = (df["TotalCoresUsed"] / df.available_cores) * 100
    df["TotalMemUsed"] = (df["MemoryOverhead"] + df["ExecutorMemory"]) * df.Exec

    #CONSTRAINT 3 and 4
    mask3 = df.TotalMemUsed < df.available_memory
    mask4 = df.MaxExecPerNode * (df.ExecutorMemory + df.MemoryOverhead) < df.available_memory_per_node
    df = df[mask3][mask4]

    df["TotalMemUsed%"] = (df["TotalMemUsed"] / df.available_memory) * 100

    end = dt.now()
    runtime = (end-start).total_seconds()
    print(f"{runtime} seconds")
    return df


In [67]:
#Inputs
cluster_perc = 0.8
MemRequired = 10
NormalizedDataLoad=3
slices=7000
presliced_flag = False
VMem = 256
VCores = 32
Nodes=4
CurrentInfraCalc = True

df1 = optimize(cluster_perc, MemRequired, NormalizedDataLoad, slices, presliced_flag, VMem, VCores, Nodes, CurrentInfraCalc)\
    .sort_values(["MaxSerialSlices", "Exec", "TotalMemUsed" ], \
        ascending=[True, True, True])\
        .reset_index(drop=True)


7.217381 seconds


In [68]:
df1

Unnamed: 0,Cores_x,Memory,node_count,Exec,Cores_y,TotalCoresUsed,available_memory_per_node,available_memory,available_cores_per_node,available_cores,TotalMemUsed,MaxExecPerNode,MaxSerialSlices,WorkerMemory,MemoryOverhead,ExecutorMemory,TotalCoresUsed%,TotalMemUsed%
0,32,256,4,4,15,60,204.8,819.2,25,100,784,1,117,150,151,45,60.000000,95.703125
1,32,256,4,12,5,60,204.8,819.2,25,100,792,3,117,50,51,15,60.000000,96.679688
2,32,256,4,20,3,60,204.8,819.2,25,100,800,5,117,30,31,9,60.000000,97.656250
3,32,256,4,19,3,57,204.8,819.2,25,100,760,5,123,30,31,9,57.000000,92.773438
4,32,256,4,4,14,56,204.8,819.2,25,100,732,1,125,140,141,42,56.000000,89.355469
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
435,32,256,4,2,1,2,204.8,819.2,25,100,28,1,3500,10,11,3,2.000000,3.417969
436,32,256,1,1,1,1,204.8,204.8,25,25,14,1,7000,10,11,3,4.000000,6.835938
437,32,256,2,1,1,1,204.8,409.6,25,50,14,1,7000,10,11,3,2.000000,3.417969
438,32,256,3,1,1,1,204.8,614.4,25,75,14,1,7000,10,11,3,1.333333,2.278646
