In [1]:
import pandas as pd
import numpy as np
import os
import threading
import sys
import time
from utils import utils
import copy
from matplotlib import pyplot as plt
import numpy as np
import random
np.random.seed(1234)
random.seed(1234)

unique_timestamp = list()
for ts in range(0, 43260000, 60000):
    unique_timestamp.append(str(int(ts/60000)))


In [2]:
def generate_msname_metric_df(df_, msname_list_):
    msname_provider_mcr_ = dict()
    for msname_ in msname_list_:
        msname_provider_mcr_[msname_] = list()
        msname_df_ = df_[df_["msname"] == msname_]
        for ts in unique_timestamp:
            temp_df_ = msname_df_[msname_df_['timestamp'] == ts]
            temp_df_ = temp_df_[temp_df_["metric"] == "providerRPC_MCR"]
            msname_provider_mcr_[msname_].append(temp_df_)
    # msname_provider_mcr_ => { msname : [mcr at t=0, mcr at t=1, ...]}
    return msname_provider_mcr_


def get_statistics(li_):
    output_df = pd.DataFrame(columns = ['msname', 'timestamp', 'num', 'sum', 'avg', 'std', 'min', 'max', 'p0.1', 'p1', 'p5', 'p10', 'p25', 'p50', 'p75', 'p90', 'p95', 'p99', 'p99.9'])
    temp_li = list()
    temp_li.append(len(li_))
    temp_li.append(sum(li_))
    temp_li.append(np.average(li_))
    temp_li.append(np.std(li_))
    temp_li.append(min(li_))
    temp_li.append(max(li_))
    if min(li_) == 0:
        temp_li.append(float('inf'))
    else:
        temp_li.append(max(li_)/min(li_))
    if np.percentile(li_, 1) == 0:
        temp_li.append(float('inf'))
    else:
        temp_li.append(max(li_)/np.percentile(li_, 1))
    temp_li.append(np.percentile(li_, 0.1))
    temp_li.append(np.percentile(li_, 1))
    temp_li.append(np.percentile(li_, 5))
    temp_li.append(np.percentile(li_, 10))
    temp_li.append(np.percentile(li_, 25))
    temp_li.append(np.percentile(li_, 50))
    temp_li.append(np.percentile(li_, 75))
    temp_li.append(np.percentile(li_, 90))
    temp_li.append(np.percentile(li_, 95))
    temp_li.append(np.percentile(li_, 99))
    temp_li.append(np.percentile(li_, 99.9))
    return temp_li

def generate_stat_df(msname_provider_mcr_, msname_, output_df_):
    for i in range(len(unique_timestamp)):
        temp_df = msname_provider_mcr_[msname_][i]
        v_ = temp_df["value"].values.tolist()
        if sum(v_) == -1:
            continue
        if  -1 in v_:
            v_.remove(-1)
        temp_row = [msname_, str(unique_timestamp[i])]
        if len(v_) != 0:
            stats = get_statistics(v_)
            temp_row.extend(stats)
            output_df_.loc[len(output_df_.index)] = temp_row
        else:
            continue


def detect_microburst(call_rate, window_size, burst_degree):
    burst_ts = list()
    burst_cnt = 0
    burst_detect = 0
    for i in range(window_size, len(call_rate)):
        if burst_detect == 0:
            prev_window_avg = sum(call_rate[i-window_size:i])/window_size
            if call_rate[i] > prev_window_avg * burst_degree:
                burst_cnt += 1
                burst_ts.append(i)
                burst_detect = window_size
        else:
            burst_detect -= 1
    return burst_cnt, burst_ts

def series_string_to_list(series):
    # series = series.tolist()[0]
    to_li = series[1:-1]
    to_li = to_li.split(",")
    to_li = [float(x) for x in to_li]
    return to_li


def read_call_rate(file_path):
    file1 = open(file_path, 'r')
    lines = file1.readlines()
    file1.close()
    msname = lines.pop(0)
    call_rate = list()
    for i in range(len(lines)):
        call_rate.append(float(lines[i]))
    return msname, call_rate
    
########################################################################################## 
    
class WorkloadGenerator:
    def __init__ (self, req_per_sec, total_sec):
        self.request_per_sec = req_per_sec
        self.total_seconds = total_sec
        self.total_num_req = self.request_per_sec * self.total_seconds
        self.total_num_req = int(self.total_num_req)

    def exponential_distribution(self):
        scale_ = (1 / self.request_per_sec) # scale parameter is the inverse of the rate parameter lambda
        exp_dist=np.random.exponential(scale=scale_, size=(self.total_num_req))
        exp_dist_sum = sum(exp_dist)
        total_millisecond = self.total_seconds*1000
        weight = total_millisecond / exp_dist_sum
        norm_exp_dist = [ x*weight for x in exp_dist ] # norm + sec->ms
        # norm_exp_dist = [ (x / sum(exp_dist))*self.total_seconds*1000 for x in exp_dist ] # norm + sec->ms
        # first_request_start_time = 0.0
        # norm_exp_dist.insert(0, first_request_start_time) # For the very first event, 0 second event arrival is inserted.
        
        # print("DEBUG", "")
        # print("DEBUG", "="*40)
        # print("DEBUG", "== Exponential workload statistics ==")
        # print("DEBUG", "="*40)
        # print("DEBUG", "- total num requests: {}".format(self.total_num_req))
        # print("DEBUG", "- sum: {}".format(sum(norm_exp_dist)))
        # print("DEBUG", "- mean interval: {}".format(sum(norm_exp_dist)/len(norm_exp_dist)))
        # print("DEBUG", "- max interval: {}".format(max(norm_exp_dist)))
        # print("DEBUG", "- min interval: {}".format(min(norm_exp_dist)))
        # print("DEBUG", "="*40)
        return norm_exp_dist
    
    def constant_distribution(self):
        dist = [(self.total_seconds/self.total_num_req) * 1000] * self.total_num_req
        # print("DEBUG", "total_num_req: ", self.total_num_req)
        # print("DEBUG", dist)
        # print("DEBUG", "")
        # print("DEBUG", "="*40)
        # print("DEBUG", "== Constant workload statistics ==")
        # print("DEBUG", "="*40)
        # print("DEBUG", "- total num requests: {}".format(self.total_num_req))
        # print("DEBUG", "- sum: {}".format(sum(dist)))
        # print("DEBUG", "- mean interval: {}".format(sum(dist)/len(dist)))
        # print("DEBUG", "- max interval: {}".format(max(dist)))
        # print("DEBUG", "- min interval: {}".format(min(dist)))
        # print("DEBUG", "="*40)
        return dist
    
def interval_to_arrival(req_intv):
    req_arrival = list()
    for i in range(len(req_intv)):
        if i == 0:
            req_arrival.append(req_intv[i])
        else:
            req_arrival.append(req_arrival[i-1] + req_intv[i])
    return req_arrival

def generate_workload_from_alibaba_trace(rpm, base_rps):
    def normalize(rps_list, base_rps):
        min_rps = np.percentile(rps_list, 50)
        norm_weight = base_rps/min_rps
        return [x*norm_weight for x in rps_list]
    rps_list = [ x/60 for x in rpm]
    norm_rps_list = normalize(rps_list, base_rps)
    wrk_list = list()
    for rps in norm_rps_list:
        wrk_list.append(WorkloadGenerator(req_per_sec=rps, total_sec=60))
    request_interval = list()
    for i in range(len(wrk_list)):
        request_interval += wrk_list[i].exponential_distribution()
    return request_interval

In [3]:
svc_per_line_df = pd.read_csv("svc_per_line.csv")

''' Count burstness '''
num_burst = list()
burst_timestamp = list()
window_size=5
burst_degree=2
for idx in range(len(svc_per_line_df)):
    rpm = svc_per_line_df.loc[idx, "li"]
    rpm = series_string_to_list(rpm)
    burst_cnt, burst_ts = detect_microburst(rpm, window_size, burst_degree)
    num_burst.append(burst_cnt)
    burst_timestamp.append(burst_ts)
svc_per_line_df["num_burst"] = num_burst
svc_per_line_df["burst_timestamp"] = burst_timestamp
svc_per_line_df = svc_per_line_df.sort_values(by=["num_burst"], ascending=False)
svc_per_line_df = svc_per_line_df.reset_index(drop=True)

''' Burst statistics '''
total_num_burst = svc_per_line_df['num_burst'].sum() # 2703
bursty_svc_df = svc_per_line_df[svc_per_line_df["num_burst"] > 0]
num_bursty_svc = len(bursty_svc_df) # 264 out of 1301
num_all_svc = len(svc_per_line_df) # 264 out of 1301
avg_burst_per_svc = total_num_burst/num_all_svc
avg_burst_per_bursty_svc = total_num_burst/num_bursty_svc
print("num_all_svc: ", num_all_svc)
print("num_bursty_svc: ", num_bursty_svc)
print("total_num_burst: ", total_num_burst)
print("average num of burst per all service during 12 hours: {}".format(avg_burst_per_svc))
print("average num of burst per bursty service during 12 hours: {}".format(avg_burst_per_bursty_svc))
print("average num of burst per all service per hour: {}".format(avg_burst_per_svc/12))
print("average num of burst per bursty service per hour: {}".format(avg_burst_per_bursty_svc/12))

num_all_svc:  1043
num_bursty_svc:  285
total_num_burst:  3079
average num of burst per all service during 12 hours: 2.9520613614573348
average num of burst per bursty service during 12 hours: 10.803508771929824
average num of burst per all service per hour: 0.2460051134547779
average num of burst per bursty service per hour: 0.9002923976608187


In [4]:


msname_8 = "6d9c26b9"


rpm = svc_per_line_df[svc_per_line_df["msname-8"] == msname_8]["li"].tolist()[0]
rpm = series_string_to_list(rpm)
request_interval = generate_workload_from_alibaba_trace(rpm, 50)
request_arrival = interval_to_arrival(request_interval)
ts = time.time()
temp = list()
for arr_time in request_arrival:
    temp.append(str(arr_time)+"\n")
file1 = open("request_arrival_time_"+msname_8+".txt", 'w')
file1.writelines(temp)
file1.close()
print("request arrival file write: {}".format(time.time() - ts))


request arrival file write: 3.6544203758239746
