Skip to content

Commit

Permalink
implement model-based power estimator
Browse files Browse the repository at this point in the history
Signed-off-by: Sunyanan Choochotkaew <sunyanan.choochotkaew1@ibm.com>
  • Loading branch information
sunya-ch authored and root committed Aug 10, 2022
1 parent a51f9c7 commit 008580e
Show file tree
Hide file tree
Showing 23 changed files with 592 additions and 1 deletion.
5 changes: 4 additions & 1 deletion .gitignore
Expand Up @@ -19,4 +19,7 @@ Session.vim
/exporter

# test-bin
test-bin
test-bin

estimator/py/__pycache__
load_test
7 changes: 7 additions & 0 deletions Makefile
Expand Up @@ -4,8 +4,10 @@ export TIMESTAMP ?=$(shell echo $(BIN_TIMESTAMP) | tr -d ':' | tr 'T' '-' | tr -
SOURCE_GIT_TAG :=$(shell git describe --tags --always --abbrev=7 --match 'v*')

SRC_ROOT :=$(shell pwd)
ESTIMATOR_SRC_ROOT :=$(shell pwd)/estimator

IMAGE_REPO :=quay.io/sustainable_computing_io/kepler
ESTIMATOR_IMAGE_REPO :=quay.io/sustainable_computing_io/kepler-estimator
IMAGE_VERSION := "latest"
OUTPUT_DIR :=_output
CROSS_BUILD_BINDIR :=$(OUTPUT_DIR)/bin
Expand Down Expand Up @@ -86,6 +88,11 @@ build-containerized-cross-build:
+$(MAKE) build-containerized-cross-build-linux-arm64
.PHONY: build-containerized-cross-build

build-estimator:
$(CTR_CMD) build -t $(ESTIMATOR_IMAGE_REPO):$(SOURCE_GIT_TAG) \
-f "$(ESTIMATOR_SRC_ROOT)"/Dockerfile estimator
$(CTR_CMD) tag $(ESTIMATOR_IMAGE_REPO):$(SOURCE_GIT_TAG) $(ESTIMATOR_IMAGE_REPO):$(IMAGE_VERSION)

# for testsuite
PWD=$(shell pwd)
ENVTEST_ASSETS_DIR=./test-bin
Expand Down
10 changes: 10 additions & 0 deletions estimator/Dockerfile
@@ -0,0 +1,10 @@
FROM python:3.7

WORKDIR /usr/src/app
RUN mkdir -p py
COPY py/estimator.py py/estimator.py
COPY model model
COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt

CMD ["python", "py/estimator.py"]
1 change: 1 addition & 0 deletions estimator/model/CorrRatio/metadata.json
@@ -0,0 +1 @@
{"model_name": "CorrRatio", "features": ["curr_cache_misses", "curr_cpu_cycles", "curr_cpu_instructions"], "abs_model": "", "model_class": "ratio", "model_file": "ratio.json", "fe_files": []}
22 changes: 22 additions & 0 deletions estimator/model/CorrRatio/ratio.json
@@ -0,0 +1,22 @@
{
"core_score": {
"curr_cache_misses": 0.260257,
"curr_cpu_cycles": 0.368648,
"curr_cpu_instructions": 0.371095
},
"dram_score": {
"curr_cache_misses": 0.319293,
"curr_cpu_cycles": 0.340407,
"curr_cpu_instructions": 0.340299
},
"gpu_score": {
"curr_cache_misses": 0,
"curr_cpu_cycles": 0,
"curr_cpu_instructions": 1
},
"other_score": {
"curr_cache_misses": 0,
"curr_cpu_cycles": 0,
"curr_cpu_instructions": 1
}
}
1 change: 1 addition & 0 deletions estimator/model/GradientBoostingRegressor_10/metadata.json
@@ -0,0 +1 @@
{"model_name": "GradientBoostingRegressor_10", "features": ["curr_bytes_read", "curr_bytes_writes", "curr_cache_misses", "curr_cgroupfs_cpu_usage_us", "curr_cgroupfs_memory_usage_bytes", "curr_cgroupfs_system_cpu_usage_us", "curr_cgroupfs_user_cpu_usage_us", "curr_cpu_cycles", "curr_cpu_instructions", "curr_cpu_time"], "mae": 3.7872997125827212, "mse": 37.24224844019559, "mae_val": 8.12220183063894, "mse_val": 243.80501448783286, "abs_model": "Linear Regression_10", "model_class": "scikit", "model_file": "model.sav", "fe_files": []}
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions estimator/model/Linear Regression_10/metadata.json
@@ -0,0 +1 @@
{"model_name": "Linear Regression_10", "features": ["curr_bytes_read", "curr_bytes_writes", "curr_cache_misses", "curr_cgroupfs_cpu_usage_us", "curr_cgroupfs_memory_usage_bytes", "curr_cgroupfs_system_cpu_usage_us", "curr_cgroupfs_user_cpu_usage_us", "curr_cpu_cycles", "curr_cpu_instructions", "curr_cpu_time"], "mae": 7.383054002296603, "mse": 134.6482207295629, "mae_val": 8.788559081019159, "mse_val": 244.1422191394082, "abs_model": "Linear Regression_10", "model_class": "scikit", "model_file": "model.sav", "fe_files": []}
Binary file added estimator/model/Linear Regression_10/model.sav
Binary file not shown.
Binary file added estimator/model/Linear Regression_10/scaler.pkl
Binary file not shown.
1 change: 1 addition & 0 deletions estimator/model/Polynomial Regression_10/metadata.json
@@ -0,0 +1 @@
{"model_name": "Polynomial Regression_10", "features": ["curr_bytes_read", "curr_bytes_writes", "curr_cache_misses", "curr_cgroupfs_cpu_usage_us", "curr_cgroupfs_memory_usage_bytes", "curr_cgroupfs_system_cpu_usage_us", "curr_cgroupfs_user_cpu_usage_us", "curr_cpu_cycles", "curr_cpu_instructions", "curr_cpu_time"], "mae": 5.849873578657095, "mse": 91.2052680114667, "mae_val": 8.388975669136238, "mse_val": 263.0132828537165, "abs_model": "Linear Regression_10", "model_class": "scikit", "model_file": "model.sav", "fe_files": ["poly_scaler.pkl"]}
Binary file not shown.
Binary file not shown.
Binary file added estimator/model/Polynomial Regression_10/scaler.pkl
Binary file not shown.
19 changes: 19 additions & 0 deletions estimator/py/client_load_tester.py
@@ -0,0 +1,19 @@
from estimator_client import Client
from estimator_test import generate_request, model_names
from estimator import SERVE_SOCKET
import numpy as np
import time
loads = range(10, 101, 10)
duration = 120

if __name__ == '__main__':
client = Client(SERVE_SOCKET)
for model_name in model_names:
for load in loads:
request_json = generate_request(model_name, load)
start_time = time.time()
client.make_request(request_json)
elapsed_time = time.time() - start_time
output = '{},{},{}'.format(model_name, load, elapsed_time)
print(output)
time.sleep(1)
290 changes: 290 additions & 0 deletions estimator/py/estimator.py
@@ -0,0 +1,290 @@
# predict.py <model name> <feature values>
# load model, scaler, and metadata from model folder ../data/model/<model name>
# apply scaler and model to metadata
import pandas as pd
import numpy as np
import json
import pickle
import keras
import os

dirname = os.path.dirname(__file__)

MODEL_FOLDER = os.path.join(dirname, '../model')
METADATA_FILENAME = 'metadata.json'
SCALER_FILENAME = 'scaler.pkl'

SERVE_SOCKET = '/tmp/estimator.sock'

###############################################
# power request

class PowerRequest():
def __init__(self, metrics, values, model_name="", core_power=[], dram_power=[], gpu_power=[], other_power=[]):
self.model_name = model_name
self.datapoint = pd.DataFrame(values, columns=metrics)
self.core_power = core_power
self.dram_power = dram_power
self.gpu_power = gpu_power
self.other_power = other_power

###############################################
# load data

def _modelpath(model_name):
return "{}/{}/".format(MODEL_FOLDER, model_name)

def load_metadata(model_name):
metadata_file = _modelpath(model_name) + METADATA_FILENAME
try:
with open(metadata_file) as f:
metadata = json.load(f)
except Exception as e:
print(e)
return None
return metadata

def load_model_by_pickle(model_name, model_filename):
model_file = _modelpath(model_name) + model_filename
try:
with open(model_file, 'rb') as f:
model = pickle.load(f)
except Exception as e:
print(e)
return None
return model

def load_model_by_keras(model_name, model_filename):
try:
model = keras.models.load_model(model_filename)
except Exception as e:
print(e)
return None
return model

def load_model_by_json(model_name, model_filename):
model_file = _modelpath(model_name) + model_filename
try:
with open(model_file) as f:
model = json.load(f)
except Exception as e:
print(e)
return None
return model
###############################################

###############################################
# define model
def transform_and_predict(model, request):
msg = ""
try:
x_values = request.datapoint[model.features].values
normalized_x = model.scaler.transform(x_values)
for fe in model.fe_list:
if fe is None:
continue
normalized_x = fe.transform(normalized_x)
y = model.model.predict(normalized_x)
y = list(y)
except Exception as e:
msg = '{}\n'.format(e)
y = []
return y, msg

class ScikitModel():
def __init__(self, model_name, model_file, features, fe_files):
self.name = model_name
self.features = features
self.scaler = load_model_by_pickle(model_name, SCALER_FILENAME)
self.model = load_model_by_pickle(model_name, model_file)
self.fe_list = []
for fe_filename in fe_files:
self.fe_list += [load_model_by_pickle(model_name, fe_filename)]

def get_power(self, request):
return transform_and_predict(self, request)

class KerasModel():
def __init__(self, model_name, model_file, features, fe_files):
self.name = model_name
self.features = features
self.scaler = load_model_by_pickle(model_name, SCALER_FILENAME)
self.model = load_model_by_keras(model_name, model_file)
self.fe_list = []
for fe_filename in fe_files:
self.fe_list += [load_model_by_pickle(model_name, fe_filename)]

def get_power(self, request):
return transform_and_predict(self, request)


class RatioModel():
def __init__(self, model_name, model_file, features, fe_files):
self.name = model_name
self.features = features
self.model = load_model_by_json(model_name, model_file)
self.power_components = ['core', 'dram', 'gpu', 'other']

def get_power(self, request):
msg = ""
try:
df = request.datapoint[self.features]
if len(df) == 1:
total_power = 0
for component in self.power_components:
total_power += np.sum(getattr(request, '{}_power'.format(component)))
return [total_power], msg
sum_wl_stat = pd.DataFrame([df.sum().values], columns=df.columns, index=df.index)
ratio_df = df.join(sum_wl_stat, rsuffix='_sum')
output_df = pd.DataFrame()
for component in self.power_components:
for metric in self.features:
ratio_df[metric +'_{}_ratio'.format(component)] = ratio_df[metric]/ratio_df[metric+'_sum']*self.model['{}_score'.format(component)][metric]
sum_ratio_df = ratio_df[[col for col in ratio_df if '{}_ratio'.format(component) in col]].sum(axis=1)
total_power = getattr(request, '{}_power'.format(component))
output_df[component] = sum_ratio_df*total_power
y = list(output_df.sum(axis=1).values.squeeze())
except Exception as e:
msg = '{}\n'.format(e)
y = []
return y, msg

###############################################
# model wrapper

MODELCLASS = {
'scikit': ScikitModel,
'keras': KerasModel,
'ratio': RatioModel
}

class Model():
def __init__(self, model_class, model_name, model_file, features, fe_files=[], mae=None, mse=None, mae_val=None, mse_val=None, abs_model=None):
self.model_name = model_name
self.model = MODELCLASS[model_class](model_name, model_file, features, fe_files)
self.mae = mae
self.mse = mse
self.abs_model = abs_model

def get_power(self, request):
return self.model.get_power(request)

def init_model(model_name):
metadata = load_metadata(model_name)
if metadata is not None:
metadata_str = json.dumps(metadata)
try:
model = json.loads(metadata_str, object_hook = lambda d : Model(**d))
return model
except Exception as e:
print(e)
return None
return None


def load_all_models():
model_names = [f for f in os.listdir(MODEL_FOLDER) if not os.path.isfile(os.path.join(MODEL_FOLDER,f))]
print("Load models:", model_names)
items = []
for model_name in model_names:
model = init_model(model_name)
if model is not None:
item = {'name': model.model_name, 'model': model, 'mae': model.mae, 'mse': model.mse}
items += [item]
model_df = pd.DataFrame(items)
model_df = model_df.sort_values([DEFAULT_ERROR_KEY])
return model_df

###############################################
# serve

import sys
import socket
import signal

DEFAULT_ERROR_KEY = 'mae'

def handle_request(model_df, data):
try:
power_request = json.loads(data, object_hook = lambda d : PowerRequest(**d))
except Exception as e:
msg = 'fail to handle request: {}'.format(e)
return {"powers": [], "msg": msg}

if len(model_df) > 0:
best_available_model = model_df.iloc[0]['model']
else:
best_available_model = None
if power_request.model_name == "":
model = best_available_model
else:
selected = model_df[model_df['name']==power_request.model_name]
if len(selected) == 0:
print('cannnot find model: {}, use best available model'.format(power_request.model_name))
model = best_available_model
else:
model = selected.iloc[0]['model']
if model is not None:
print('Estimator model: ', model.model_name)
powers, msg = model.get_power(power_request)
return {"powers": powers, "msg": msg}
else:
return {"powers": [], "msg": "no model to apply"}

class EstimatorServer:
def __init__(self, socket_path):
self.socket_path = socket_path
self.model_df = load_all_models()

def start(self):
s = self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.bind(self.socket_path)
s.listen(1)
try:
while True:
connection, address = s.accept()
self.accepted(connection, address)
finally:
os.remove(self.socket_path)
sys.stdout.write("close socket\n")

def accepted(self, connection, address):
data = b''
while True:
shunk = connection.recv(1024).strip()
data += shunk
if shunk is None or shunk.decode()[-1] == '}':
break
decoded_data = data.decode()
y = handle_request(self.model_df, decoded_data)
response = json.dumps(y)
connection.send(response.encode())

def clean_socket():
print("clean socket")
if os.path.exists(SERVE_SOCKET):
os.unlink(SERVE_SOCKET)

def sig_handler(signum, frame) -> None:
clean_socket()
sys.exit(1)

import argparse

if __name__ == '__main__':
clean_socket()
signal.signal(signal.SIGTERM, sig_handler)
try:
parser = argparse.ArgumentParser()
parser.add_argument('-e', '--err',
required=False,
type=str,
default='mae',
metavar="<error metric>",
help="Error metric for determining the model with minimum error value" )
args = parser.parse_args()
DEFAULT_ERROR_KEY = args.err
server = EstimatorServer(SERVE_SOCKET)
server.start()
finally:
clean_socket()

0 comments on commit 008580e

Please sign in to comment.