MP DS SS22 - Plots

In [None]:
# imports
import pandas as pd
import numpy as np
import os
import pathlib
from typing import Tuple, List
import sys
import re
import warnings
import matplotlib.pyplot as plt
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import make_pipeline
from sklearn import preprocessing
from joblib import dump, load
import shutil
from string import Template
warnings.filterwarnings("ignore")

In [None]:
# constants
indexCol = "name"
numericCols = ['%{0}'.format("cpu"), 'rchar', 'wchar', "instructions","peak_vmem"]
relevantColumns = numericCols + [indexCol, "name"]
inputPath = "./traces"
modelDir = "./models"
plotDir = "./plots"
templatesDir = "./templates"
preRenderedDir = "./preRenderedTemplates"
renderedDir = "./rendered"
executionName = "execution_count"
conversionDict = {
    "B": 1,
    "KB": 1024,
    "MB": 1024**2,
    "GB": 1024**3,
    "TB": 1024**4,
    "PB": 1024**5,
    "EC": 1024**6
}

Create a pandas data frame from the contents of the generated trace. <-- Change this

In [None]:
def parseFile(filename: str, filePath: str) -> Tuple[str,pd.DataFrame]:
    return tuple([filename,pd.read_csv(f"{filePath}/{filename}", sep="\t").astype(str)])

In [None]:
sorted = [int(f.replace(".txt", "")) for f in os.listdir(inputPath) if not f.startswith('.')]
sorted.sort()
traceTuples = [parseFile(f"{str(f)}.txt",inputPath) for f in sorted]
print(traceTuples)

Create a table for each task with the relevant values in it. 

Additionally create a list of the file names where the index responds to the to the row index in the previously created table.

In [None]:
# searches for occurance of '(' and removes everything including and after that char.
def cleanName(name: str):
    nameIndex = name.find('(')
    if nameIndex > 0:
        return name[:nameIndex].rstrip()
    else:
	    return name
    
# returns concated values consisting of the taskIDs and the cleaned names
def getTasks(data: pd.DataFrame):
    return data[indexCol].apply(cleanName)

# converts a single value using the conversion dict
def convert(item: str):
    item = re.match('(\d+[.,]\d{1,2}|\d+)', item).group()
    for key in conversionDict.keys():
        if key in item:
            item = float(item) * conversionDict[key]
            break
    return item

# Applies convert function to every item in Series
def convertValues(cols: pd.Series):
    return cols.apply(convert)

# Calculate the mean for multiple executions of a task
def calcMean(col: pd.Series):
    return col.str.extract('(\d+[.,]\d{1,2}|\d+)', expand=False).astype(float).mean().round(decimals=2)

# Creates dictionaries that contain tuples which in turn contain the tasks and the values of the different traces.
def createDictionaries(tasks: pd.Series, traces: Tuple):
    
    dfList = {}
    rowLookup = {}
    rowLookupCount = 0
    for task in tasks:
        dfList[task] = pd.DataFrame(columns=numericCols + [executionName])
        for fileName,trace in traces:
            trace[indexCol] = trace[indexCol].apply(cleanName)
            
            # Create dict to lookup which row came from which trace.
            if fileName not in rowLookup.keys():
                rowLookup[fileName] = rowLookupCount
                rowLookupCount += 1
                
            # Filter the trace and convert numeric values when needed. 
            relevantTrace = trace.loc[trace[indexCol] == task][numericCols]
            convertedValues = relevantTrace[numericCols].apply(convertValues).apply(calcMean).append(pd.Series(data=len(relevantTrace), index=[executionName]))
            
            # Add to end of dict.
            dfList[task].loc[len(dfList[task])] = convertedValues.to_numpy().flatten().tolist()
    return dfList, rowLookup

In [None]:
# get taskIDs and then create taskDict + rwoLookup to later create regression model and plot the values
tasks = list(getTasks(traceTuples[0][1]).drop_duplicates())
taskDict, rowLookup = createDictionaries(tasks, traceTuples)
print(taskDict)
print(rowLookup)

The following cells will create the scatter plots from the dictionary. The rowLookup is used to store which value comes from which trace. The name of the trace should also be the input size.

For each task #numericCols plots will be created.

In [None]:

def cleanFileName(fileName):
    return fileName.replace(".txt","")

# Generates a single plot and create a poly reg model based on a task
def plotAndModel(x, y, xTitle, yTitle, title):
    
    plt.title(title, loc='center')
    plt.scatter(x, y, c="r", alpha=0.5)
    plt.xlabel(xTitle)
    plt.ylabel(yTitle)
    
    # fit polynomial curve
    polyRegModel = createPolyModel(x,y)
    regCurve = np.linspace(min(x), max(x), 100).reshape(-1, 1)
    plt.plot(regCurve, polyRegModel.predict(regCurve))
    
    # Save model
    dir = f"{modelDir}/{title}"
    ensureDir(dir)
    joblib_file = f"{dir}/{yTitle}.pkl"  
    dump(polyRegModel, joblib_file)
    
    # Save plot
    dir = f"{plotDir}/{title}"
    ensureDir(dir)
    plt.savefig(f"{dir}/{yTitle}.jpg",bbox_inches='tight', dpi=150)
    # plt.show()
    plt.close()
    
    return

# Attempts to create a dir.
def ensureDir(dir: str):
    if not os.path.exists(dir):
        os.makedirs(dir)
    assert os.path.exists(dir), f"{dir} existence could not be ensured."

# Ensure that directory exists and delete all files
def clearDir(dir: str):
    if os.path.exists(dir):
        shutil.rmtree(dir)
        ensureDir(dir)
    else:
        ensureDir(dir)

# creates and returns a polynomial regression model. Additionally 
def createPolyModel(x, y):
    degree = 5
    polyreg = make_pipeline(PolynomialFeatures(degree), preprocessing.StandardScaler(), LinearRegression())
    polyreg.fit(x.reshape(-1, 1),y)
    return polyreg

# Generates all the plots for a single task
def generateTaskPlots(traceDictionary: pd.DataFrame, title: str, rowLookup: List):
    for col in traceDictionary:
        plotAndModel( np.asarray(rowLookup), traceDictionary[col].to_numpy(),"input size in KB", col, title)
    

In [None]:
rowLookup = [float(cleanFileName(row)) for row in rowLookup]
print(rowLookup)
for dir in [modelDir, plotDir]:
     clearDir(dir)
for task in taskDict.keys():
	generateTaskPlots(taskDict[task].astype(float), task, rowLookup)

#### From here on the code should be split. The following code will assume the existence of previously trained models and simply generate a trace based on those models and a supplied template.

#### The following section creates a dictionary with the predicted values form each model for a given input size.

In [None]:
# Uses the models to create the trace for a single task
def createTemplateDictionary( inputSize: float, modelName: str):
    dir = f"{modelDir}/{modelName}"
    models = [(f.replace(".pkl", ""), load(f"{dir}/{f}")) for f in os.listdir(dir)]
    
    # loop through cols and create the predicted value for each task. Then add to dictionary for rendering
    for col, model in models:
        templateDictionary[f"{modelName}_{col}"] = "%.2f" % model.predict(np.asarray(inputSize).reshape(-1, 1))[0]
        templateDictionary[f"{modelName}_name"] = modelName
    return templateDictionary
     
def plotModelAndPrediction( model, xTitle, yTitle, title):
    
    plt.title(title, loc='center')
    plt.xlabel(xTitle)
    plt.ylabel(yTitle)
    
    regCurve = np.linspace(0, 20000, 100).reshape(-1, 1)
    plt.plot(regCurve, model.predict(regCurve))
    
    plt.show()
    plt.close()
    return

In [None]:
# create a dictionary for all models that can later be used to fill in the template
inputSize = 20000
templateDictionary = {}
for f in os.listdir(modelDir):
    templateDictionary = {**templateDictionary,**createTemplateDictionary(inputSize,f)}
print(templateDictionary)

#### The previously created dictionary is used to insert the predicted values into a template.

In [None]:
ensureDir(templatesDir)
ensureDir(preRenderedDir)
ensureDir(renderedDir)

In order to create a template for a workflow the tasks are used to genereate a pre-rendered template that will then be rendered again to add the actual values. Pre-rendering saves time on creating the final template especially for large workflows. 

In [None]:
for f in os.listdir(templatesDir):
	if not f.startswith('.'):
		with open(f"{templatesDir}/{f}", "r") as ft:
			template = Template(ft.read())
			renderedTemplate = []
			for task in tasks:
				renderedTemplate.append(template.substitute({"task" : task}))
		with open(f"{preRenderedDir}/{f}", "w") as fr:
			delimiter = ","
			renderedTemplate = f"[{delimiter.join(renderedTemplate)}]"
			fr.write(renderedTemplate)

In [None]:
# Because the normal template class does not allow the identifier to begin with a numeric character it was necessary to change the idpattern.
# The identifier now has to begin with a numeric character.
class TraceTemplate(Template):
    idpattern = r'(?-i:[_a-zA-Z0-9]*)'
    
# go through the preRenderedDir folder, render the templates and then svae them to the rendered folder
# Important: if there are missing vars in the templateDictionary, an error will be trown
for f in os.listdir(preRenderedDir):
	if not f.startswith('.'):
		with open(f"{preRenderedDir}/{f}", "r") as ft:
			template = TraceTemplate(ft.read())
			renderedTemplate = template.substitute(templateDictionary)
		with open(f"{renderedDir}/{f}", "w") as fr:
			fr.write(renderedTemplate)