In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import pandas as pd

In [3]:
import os
BASE_DIR = '/content/drive/MyDrive/anmproject/trace.zip (Unzipped Files)/trace/'
tracefiles = tuple(filter(lambda x: 'trace_' in x, os.listdir(BASE_DIR)))

datasets = [pd.read_csv(BASE_DIR + x) for x in tracefiles]
 

#JDBC and LOCAL both lose dsName, becoming the database as their servicename
for i, df in enumerate(datasets):
    if df['callType'].iloc[0] == 'JDBC':
      df['serviceName'] = df['dsName']
      df = df.drop(['dsName'], axis=1)
    elif df['callType'].iloc[0] == 'LOCAL':
      df = df.drop(['serviceName'], axis=1)
      df['serviceName'] = df['dsName']
      df = df.drop(['dsName'], axis=1)
    datasets[i] = df

# concat dfs into 1 df
traces = pd.concat(datasets)

del datasets

In [4]:
d = dict(tuple(traces.groupby('traceId')))

In [5]:
i = 0
for trace in d:
  if i % 10000 == 0:
    print(f'Currently at {i}')
  d[trace] = d[trace].sort_values('pid').sort_values('startTime')
  i+=1
del traces

Currently at 0
Currently at 10000
Currently at 20000
Currently at 30000
Currently at 40000
Currently at 50000
Currently at 60000
Currently at 70000
Currently at 80000
Currently at 90000
Currently at 100000
Currently at 110000
Currently at 120000
Currently at 130000
Currently at 140000
Currently at 150000
Currently at 160000
Currently at 170000
Currently at 180000
Currently at 190000
Currently at 200000
Currently at 210000
Currently at 220000
Currently at 230000
Currently at 240000
Currently at 250000
Currently at 260000
Currently at 270000
Currently at 280000
Currently at 290000
Currently at 300000
Currently at 310000
Currently at 320000
Currently at 330000
Currently at 340000
Currently at 350000
Currently at 360000
Currently at 370000
Currently at 380000
Currently at 390000
Currently at 400000
Currently at 410000
Currently at 420000
Currently at 430000
Currently at 440000
Currently at 450000
Currently at 460000
Currently at 470000
Currently at 480000
Currently at 490000
Currently at 5

In [83]:
from itertools import groupby

def trace_to_call_path(df):
    """
    df : Trace Pandas dataframe. The dataframe must be sorted and pre processed
    Pre processing includes dsName being removed from JDBC and LOCAL
    """
    
    # transform CSF serviceName
    ids = df[df['callType'] == 'CSF']['id']
    children_cmdb = df[df['pid'].isin(ids)]['cmdb_id']
    df.loc[df['callType']=='CSF', 'serviceName'] = list(children_cmdb)


    names = dict(zip(df.id, df.serviceName))
    names['None'] = 'Start'

    path = []
    durations = []
    def merge_fn(row):
        """ Makes a row become (pid, id) format while filtering same service calls"""
        v = (names[row['pid']], names[row['id']])
        if v[0] == v[1]:
            durations[-1] += row['elapsedTime']
            return
        else:
            durations.append(row['elapsedTime'])
        path.append(v)
    
    df.apply(merge_fn, axis=1) # apply horizontally
    l = path
    
    # reduce repeated duplicates and make sure their times are saved
    tmp = [(x[0], sum([1 for _ in x[1]])) for x in groupby(path)]
    times = list(map(lambda x: x[1], tmp))
    path = list(map(lambda x: x[0], tmp))

    call_path = set()
    # generate path, last sum is the sum of repeated times
    for index, p in enumerate(path):
        call_path.add((p[1], tuple(path[:index + 1]), sum(durations[index:times[index] + index])))
    return call_path

In [84]:
class TraceAnomaly:
  def __init__(self, paths_csv_file):
      import csv
      with open(paths_csv_file) as f:
          reader = csv.reader(f)
          # transform second position into a tuple instead of string
          data = sorted(list(map(lambda x: (x[0],eval(x[1])), list(reader))))
          
      # path : position
      self.paths = {data[i]: i for i in range(len(data))}


  def get_stv(self, dataframe):
      """dataframe should be a single trace already pre processed"""
      graph = list(trace_to_call_path(dataframe))
      
      # (s, path, time)
      indexes = tuple(map(lambda x: self.paths[x[:2]], graph))

      # id : val
      return dict(zip(indexes, tuple(map(lambda x: x[2], graph))))


In [85]:
model = TraceAnomaly('/content/drive/MyDrive/anmproject/trace_list.csv')

In [86]:
model.get_stv(d[list(d.keys())[0]])

{7: 336.0,
 32: 42.0,
 128: 44.0,
 442: 61.0,
 1306: 7.0,
 3910: 11.0,
 3911: 24.0,
 3912: 74.0,
 3913: 26.0,
 5726: 9.0,
 7490: 88.0,
 7581: 338.0,
 7727: 44.0,
 7728: 54.0,
 7729: 9.0,
 7730: 18.0,
 9288: 5.0,
 9290: 380.0}

In [87]:
def work(data, index):
  keys = list(d.keys())
  print(f'Keys are {len(keys)} long and data is {data}')
  for i in data:
    key = keys[i]
    res[index].append((key, model.get_stv(d[key])))

NUM_THREADS = 4
data = [range(i, len(d), NUM_THREADS) for i in range(NUM_THREADS)]

In [89]:
import threading
res = [[] for _ in range(NUM_THREADS)]
print(f'Res is {res}')
threads = [threading.Thread(target=work, args=(data[i], i)) for i in range(NUM_THREADS)]
for t in threads:
  t.start()

for t in threads:
  t.join()

Res is [[], [], [], []]
Keys are 730041 long and data is range(0, 730041, 4)
Keys are 730041 long and data is range(1, 730041, 4)
Keys are 730041 long and data is range(2, 730041, 4)
Keys are 730041 long and data is range(3, 730041, 4)


In [90]:
from functools import reduce
import csv

res = list(reduce(lambda x,y: x+y, res))
with open('/content/drive/MyDrive/anmproject/trace_data.csv', 'w+') as f:
  wr = csv.writer(f)
  wr.writerows(res)

In [91]:
val = [len(res[i]) for i in range(NUM_THREADS)]
f'{val} == {sum(val)}'

'[2, 2, 2, 2] == 8'