In [3]:
import os
import gc
import pickle
import pandas as pd
import numpy as np
import sys
from os import listdir
from os.path import isfile, join
from keras.models import Model, Sequential
from keras.layers import LSTM, Dense, Embedding
from keras.utils import multi_gpu_model
from sklearn.metrics import precision_recall_fscore_support as score
import multiprocessing
from joblib import Parallel, delayed

path                 = "."

In [None]:
# FIX 1 of 3 : set the following vars
path                 = "." #path to dir where test pickles reside and h5 file is
modelfingerprint     = "kubernetes_train_[80-20]_giga_long_mltithrdedgentr_PikTranSav_Mdl_[J_Filternet_tstep1]_epch_75_kube_binar_crsentr_adm_binar_acc.py" # name of py file
modelweightsfilename = "kubernetes_train_[80-20]_giga_long_mltithrdedgentr_PikTranSav_Mdl_[J_Filternet_tstep1]_epch_75_kube_binar_crsentr_adm_binar_acc.h5" # name of h5 file

# FIX 2 of 3: update the input_length as per timesteps
input_length    = 1 # 1 or 2                        # X_final.shape[1]
input_dim       = 3541                              # X_final.shape[2]
output_dim      = 1

pickgpus = "0,1"
os.environ["CUDA_VISIBLE_DEVICES"] = pickgpus

def create_model(input_dim, input_length, output_dim):
    print ('Creating model...')
    # FIX 3 of 3: copy everything from here till 'return parallel_model'
    model = Sequential()
    model.add(LSTM(2000, input_shape=(input_length,input_dim),return_sequences=True))
    model.add(LSTM(2000, input_shape=(input_length,input_dim),return_sequences=True))
    model.add(LSTM(1000, input_shape=(input_length,input_dim),return_sequences=True))
    model.add(LSTM(2000, input_shape=(input_length,input_dim),return_sequences=True))
    model.add(LSTM(2000))
    model.add(Dense(output_dim, activation='sigmoid'))
    ###
    print('Initiating parallel GPU model')
    parallel_model = multi_gpu_model(model, gpus=1+pickgpus.count(","))
    print ('Compiling...')
    parallel_model.compile(loss='binary_crossentropy',optimizer='adam',metrics=['binary_accuracy'])
    return parallel_model

In [None]:
# STEP 1
model   = create_model(input_dim, input_length, output_dim)
# STEP 2
print('Loading Model weights before Testing')
model.load_weights( join(path, modelweightsfilename) )

In [None]:
# warm up
warmup=True

In [None]:
test_files = [f for f in listdir(path) if isfile(join(path, f))]
test_files = [x for x in test_files if 'dict_jobID_' in x and 'FROM_giga_takeoutdf_w_jobgroup.pickle' in x ]
test_files.sort()

In [None]:
import itertools

def chunked(it, size):
    it = iter(it)
    while True:
        p = tuple(itertools.islice(it, size))
        if not p:
            break
        yield p

In [None]:
# parameters
from threading import Lock
lock               = Lock()

def h(jobID_Xtest_Ytest):
    global model
    global lock
    
    Y_true = []
    Y_pred = []
    jobID, X_test, Y_test     = jobID_Xtest_Ytest[0], jobID_Xtest_Ytest[1][0], jobID_Xtest_Ytest[1][1]         
    try:
        lock.acquire()
        y_pred_this_file = model.predict(X_test)
    finally:
        lock.release()
    
    Y_pred.extend([ np.rint(jj[0]) for jj in y_pred_this_file ])
    Y_true.extend([ xj[0] for xj in Y_test ])
    
    #test 1
    if not all(x == Y_true[0] for x in Y_true):
        print('All Y_true labels for jobID %d are not same \n' % jobID)
    
    return [jobID, Y_true, Y_pred]

In [None]:
stage_results = []
import random

for k in range(len(test_files)):
    # Load { jobID -> X_test, Y_test }:
    print( "Loading test file #%d - %s" % (k, test_files[k]) )
    with open( join(path, test_files[k]) , 'rb' ) as handle:
        global_dict = pickle.load(handle)
    print( "Loaded test file #%d" % (len(global_dict))       )
    
    # warm up
    if(warmup):
        print(model.predict(random.choice(list(global_dict.items()))[1][0]))
        warmup=False
        
    start = 1
    for chunk in chunked(global_dict.items(), 48*5):
        print("New Batch started : %d to %d " % (start, start+len(chunk)-1))
        results = Parallel(n_jobs=48, verbose=1, backend="threading")(map(delayed(h), [x for x in chunk] ))
        stage_results.extend(results)
        del results
        start += len(chunk)
    del global_dict
    gc.collect()

In [None]:
len(stage_results)

with open('stage_results_list_of_jobid_x_y_test.pickle', 'wb') as handle:
    pickle.dump(stage_results, handle, protocol=pickle.HIGHEST_PROTOCOL)

# Load predicted 'heartbeat' level results into memory

In [4]:
with open('stage_results_list_of_jobid_x_y_test.pickle', 'rb') as handle:
    stage_results = pickle.load(handle)

In [28]:
def process(listelement_parameters):
    listelement = listelement_parameters[0]
    parameters  = listelement_parameters[1]
    
    # each input: listelement is [jobID, Y_true, Y_pred]
    # make job level prediction, based on parameters
    decision_threshold = parameters[0]
    minsamplestopick   = parameters[1]
    maxsamplestopick   = parameters[2]
    
    ###
    jobID  = listelement[0]
    Y_true = listelement[1]
    Y_pred = listelement[2]
    
    truelabel = Y_true[0]
    
    # perform job-level prediction in a while loop:
    firstN           = min( minsamplestopick, len(Y_pred) )
    localsamplefrac  = (firstN*1.0) / len(Y_pred)
    localsamplefracN = localsamplefrac
    predictedlabel   = -1
    
    ##### Method: take majority
    while predictedlabel == -1 and firstN <= min(maxsamplestopick, len(Y_pred)):
        #
        localsamplefrac = (firstN*1.0) / len(Y_pred)
        
        ############################################################
        take            = Y_pred[:firstN]
        ############################################################
        
        if take.count(1.0) > int(decision_threshold * len(take)):
            predictedlabel = 1
            break
            
        if take.count(0.0) > int(decision_threshold * len(take)):
            predictedlabel = 0
            break
        
        firstN          =  firstN + 1
    
    # create a python list that will be return
    
    totaljobduration   = 1.0
    predictionduration = -1.0
    return [jobID, totaljobduration, predictionduration, 
            localsamplefrac, len(Y_true), decision_threshold, 
            truelabel, predictedlabel]

    ###################################

# Set parameters 1

In [29]:
%%time
# parameters   = [ decision_threshold, minsamplestopick, maxsamplestopick ]
parameters     = [ 0.95              , 3               ,20]
inputList      = [ [x, parameters] for x in stage_results ]
global_results = Parallel(n_jobs=48*100, verbose=1, backend="threading")(map(delayed(process), inputList))

[Parallel(n_jobs=4800)]: Done 200 tasks      | elapsed:    1.9s
[Parallel(n_jobs=4800)]: Done 1650 tasks      | elapsed:    2.0s
[Parallel(n_jobs=4800)]: Done 3200 tasks      | elapsed:    2.2s
[Parallel(n_jobs=4800)]: Done 4850 tasks      | elapsed:    2.4s
[Parallel(n_jobs=4800)]: Done 6600 tasks      | elapsed:    2.6s
[Parallel(n_jobs=4800)]: Done 8450 tasks      | elapsed:    2.8s
[Parallel(n_jobs=4800)]: Done 10400 tasks      | elapsed:    3.1s
[Parallel(n_jobs=4800)]: Done 12450 tasks      | elapsed:    3.2s
[Parallel(n_jobs=4800)]: Done 14600 tasks      | elapsed:    3.6s
[Parallel(n_jobs=4800)]: Done 16850 tasks      | elapsed:    3.9s
[Parallel(n_jobs=4800)]: Done 19200 tasks      | elapsed:    4.3s
[Parallel(n_jobs=4800)]: Done 21650 tasks      | elapsed:    5.9s
[Parallel(n_jobs=4800)]: Done 24200 tasks      | elapsed:    6.4s
[Parallel(n_jobs=4800)]: Done 26850 tasks      | elapsed:    7.4s
[Parallel(n_jobs=4800)]: Done 29600 tasks      | elapsed:    7.9s
[Parallel(n_jobs=

CPU times: user 1min 40s, sys: 1min 33s, total: 3min 14s
Wall time: 1min 50s


In [30]:
#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

In [31]:
# Populate totalduration and predictionduration
with open('giga_takeoutdf_w_jobgroup.pickle', 'rb') as f:
    takeoutdf = pickle.load(f)

In [32]:
def fill_durations(row):
    # Input format:
    # [jobID, totaljobduration, predictionduration,localsamplefrac, len(Y_true), decision_threshold,truelabel, predictedlabel]
    
    jobID      = row[0]
    firstN     = int(row[3]*row[4])

    # Sort by timestamp
    getsamples = takeoutdf[takeoutdf.JobID == jobID]
    df         = getsamples.sort_values(by=['HeartBeatTime']) #not inplace
    hlist      = df['HeartBeatTime'].tolist()
    
    starttime     = hlist[0]
    predictedtime = hlist[firstN-1]
    endtime       = hlist[row[4]-1]
    
    row[1]= (endtime-starttime).total_seconds()       #totaljobduration
    row[2]= (predictedtime-starttime).total_seconds() #predictionduration
    
    return jobID

In [33]:
# update times in the results
collector = Parallel(n_jobs=48*5, verbose=1, backend="threading")(map(delayed(fill_durations), global_results ))

[Parallel(n_jobs=240)]: Done 320 tasks      | elapsed:    5.9s
[Parallel(n_jobs=240)]: Done 770 tasks      | elapsed:    6.9s
[Parallel(n_jobs=240)]: Done 1320 tasks      | elapsed:    8.0s
[Parallel(n_jobs=240)]: Done 1970 tasks      | elapsed:    9.3s
[Parallel(n_jobs=240)]: Done 2720 tasks      | elapsed:   10.9s
[Parallel(n_jobs=240)]: Done 3570 tasks      | elapsed:   12.8s
[Parallel(n_jobs=240)]: Done 4520 tasks      | elapsed:   14.7s
[Parallel(n_jobs=240)]: Done 5570 tasks      | elapsed:   16.7s
[Parallel(n_jobs=240)]: Done 6720 tasks      | elapsed:   19.9s
[Parallel(n_jobs=240)]: Done 7970 tasks      | elapsed:   22.2s
[Parallel(n_jobs=240)]: Done 9320 tasks      | elapsed:   24.9s
[Parallel(n_jobs=240)]: Done 10770 tasks      | elapsed:   28.0s
[Parallel(n_jobs=240)]: Done 12320 tasks      | elapsed:   32.0s
[Parallel(n_jobs=240)]: Done 13970 tasks      | elapsed:   35.5s
[Parallel(n_jobs=240)]: Done 15720 tasks      | elapsed:   38.4s
[Parallel(n_jobs=240)]: Done 17570 tas

In [34]:
# Process results:

Y_true_job      = []
Y_predicted_job = []
averagefrac     = []
nodecision      = []
averagetime     = []
totaltestsamples= 0

for r in global_results:
    ###
    totaltestsamples += r[4]
    ###
    
    if r[-1] != -1:
        Y_true_job.append(r[-2])
        Y_predicted_job.append(r[-1])
        averagefrac.append(r[3])      #based on no of samples used for prediction
        
        if r[1] !=0:    #totaljobduration - total jobduration
            averagetime.append(r[2]/r[1]) #based on time of prediction
        elif r[2] == 0: #predictionduration - job duration at time of prediction
            averagetime.append(1)
        else:
            print('Error: predictionduration=%.2f, totaljobduration=%.2f ' % (r[2], r[1]))
    else:
        nodecision.append(r)

# [jobID, totaljobduration, predictionduration,localsamplefrac, len(Y_true), decision_threshold,truelabel, predictedlabel]

In [35]:
print('Len of Y_true : ', len(Y_true_job))
print('Len of Y_pred : ', len(Y_predicted_job))

print("Overall accuracy on test set: ", np.mean(np.equal(Y_true_job, Y_predicted_job)))

print("Calculating Score: ")
precision, recall, fscore, support = score(Y_true_job, Y_predicted_job)
print('precision    : {}'.format(precision))
print('recall       : {}'.format(recall))
print('fscore       : {}'.format(fscore))
print('support      : {}'.format(support))

print("Confusion matrix:")
print(pd.crosstab(pd.Series(Y_true_job), pd.Series(Y_predicted_job), rownames=['True'], colnames=['Predicted'], margins=True))
print('*********Test Results End**************')

Len of Y_true :  308448
Len of Y_pred :  308448
Overall accuracy on test set:  0.8493814192343604
Calculating Score: 
precision    : [0.82966041 0.87564062]
recall       : [0.89881919 0.79426526]
fscore       : [0.86285623 0.83297021]
support      : [162600 145848]
Confusion matrix:
Predicted       0       1     All
True                             
0          146148   16452  162600
1           30006  115842  145848
All        176154  132294  308448
*********Test Results End**************


In [36]:
np.mean(averagefrac), np.mean(averagetime)

(0.5439598882165064, 0.5039671829113419)

In [37]:
len(nodecision), len(Y_true_job)+len(nodecision),  len(nodecision)/(len(Y_true_job)+len(nodecision))

(12282, 320730, 0.038293892058740994)

In [38]:
len(averagetime)

308448

In [39]:
totaltestsamples #for verification (should be 3,517,991)

3517991

In [None]:
#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

In [None]:
# Verification

for x in 