# 导入ES数据

In [1]:
##功能拆解
#python连接Elasticsearch
#查询Elasticsearch打印结果
#导出所有结果数据
#将所有结果写入csv文件

In [3]:
from elasticsearch import Elasticsearch
from itertools import groupby
from matplotlib import pyplot as plt
from tqdm import tqdm_notebook as tqdm
from influxdb import DataFrameClient
import datetime
import time
import pickle
import pandas as pd
import numpy as np
import pandas as pd
np.set_printoptions(suppress=True)
pd.set_option('display.float_format', lambda x: '%.0f' % x) #避免科学技术法
%matplotlib inline

In [4]:
def search_all(index, body):
    rsp = es.search(index=index, body=dict(**body, size=1000), scroll='1m',request_timeout=30)
    total = rsp['hits']['total']
    print(total)
    scroll_id = rsp['_scroll_id']
    scroll_size = total
    with tqdm(total=total) as pbar:
        rets = []
        while scroll_size > 0:
            _rsp = es.scroll(scroll_id=scroll_id, scroll='1m')
            scroll_id = _rsp['_scroll_id']
            scroll_size = len(_rsp['hits']['hits'])
            total -= scroll_size
            rets.extend(parse(_rsp['hits']['hits']))
            pbar.update(scroll_size)
        
    return rets

def parse(response):
    try:
        return list(map(
            lambda x: {
                'trace_id': x['_source']['traceId'],
                'timestamp': x['_source']['timestamp'],
                'latency': x['_source']['duration'],
                'http_status': x['_source']['tags']['http.status_code'],
                'request_parent_id': x['_source']['parentId'] if 'parentId' in x['_source'] else 'None',
                'request_id': x['_source']['id'],
                'source': x['_source']['localEndpoint']['serviceName'],
                'http_name': x['_source']['name'],
                'target': x['_source']['name'].split('.')[0] + '.default'
            },
            response,
        ))
    except KeyError:
        print('error:', response)

        
def dump_index(index):
    path = './event_title.txt'
    rets = search_all(
            index=index, 
            body={
                  "query": {
                    "range": {
                      "timestamp_millis": {
                        #"gte": 1565347635000,
                        #"lte": 1565350242000
                        "gte": 1567084200000,
                        "lte": 1567085100000
                      }
                    }
                  }
                },
        
    )
    with open(path, 'wb+') as f:
        pickle.dump(rets, f)
    return rets
def load_path(path):
    with open(path, 'rb') as f:
        return pickle.load(f)


In [5]:
es = Elasticsearch(host='192.168.115.84', port=9200)
requests_8_29 = dump_index("zipkin:span-2019-08-29")
data_8_29=load_path('./event_title.txt')

40748


HBox(children=(IntProgress(value=0, max=40748), HTML(value='')))




In [67]:
df = pd.DataFrame(data_8_29, columns=['trace_id',
                                      'timestamp',
                                      'latency',
                                      'http_status',
                                      'request_parent_id',
                                      'request_id',
                                      'source',
                                      'http_name',
                                      'target'] )
#df.to_csv('./event_title.csv')
df = df.drop(columns=['request_parent_id','request_id','http_name'])
df['source']=df['source'].apply(lambda x : ('.'+x).strip('default').strip('.'))
df['target']=df['target'].apply(lambda x : ('.'+x).strip('default').strip('.'))
df

Unnamed: 0,trace_id,timestamp,latency,http_status,source,target
0,3e4c92af33630580eca1ec210550f66f,1567084433090689,399404,200,ts-travel-service,ts-ticketinfo-service
1,6d1bb6ea8133577502127e3401bbec41,1567084434031048,10673,200,ts-travel-service,ts-travel-service
2,6d1bb6ea8133577502127e3401bbec41,1567084434067402,4362,200,ts-travel-service,ts-route-service
3,6d1bb6ea8133577502127e3401bbec41,1567084434078578,3502,200,ts-travel-service,ts-route-service
4,6d1bb6ea8133577502127e3401bbec41,1567084429023205,430160,200,ts-ticketinfo-service,ts-ticketinfo-service
5,e5509cc13e65c4f7dd466bd545db0467,1567084429814289,6089,200,ts-travel-service,ts-train-service
6,094ac42cc35dc46b5cdf8558894cb4ba,1567084428913277,11953,200,ts-seat-service,ts-travel-service
7,e5509cc13e65c4f7dd466bd545db0467,1567084429792736,10030,200,ts-seat-service,ts-travel-service
8,e5509cc13e65c4f7dd466bd545db0467,1567084429557669,195677,200,ts-ticketinfo-service,ts-basic-service
9,e5509cc13e65c4f7dd466bd545db0467,1567084427654258,27955,200,ts-price-service,ts-price-service


# 获取influxdb的数据

In [117]:
#> SELECT * FROM "h2o_feet" WHERE time >= '2015-08-17T23:48:00Z' AND time <= '2015-08-18T00:30:00Z'
#print('数据库中的名称'+str(client.get_list_database()))
#print('数据库中的表'+str(client.query('show measurements;')))

In [73]:
def get_db(svc):
    client = DataFrameClient('192.168.115.31',34002,'root','','aiops_metric')#初始化
    query = 'select * from "{}" where time >= 1567055400000000000 and time<= 1567056300000000000'.format(svc)#time>now()-1h;
    result = dict(client.query(query,chunked=False))
    result = result[svc].reset_index().rename(columns={'index':'timestamp'})
    result['timestamp'] = result.apply(lambda x : x['timestamp'].timestamp(),axis=1)
    result['service'] = svc
    result['timestamp'] = result['timestamp']+28800
    return result
#因为容器数目不一致，需要转换名称
def dflist_2_dblist(df):
    db_list=[]
    df_list=list(set(df['target']))
    for i in df_list:
        db_list.append('metric.'+i)
    return db_list
#读取fluxdb所有需要的数据为一个大表
def all_db(df):
    db_list=dflist_2_dblist(df)
    df=get_db(db_list[0])
    for i in db_list[1:]:
        df = pd.concat((df, get_db(i)))
    df['service']=df['service'].apply(lambda x:(x+'.').strip('metric').strip('.'))
    df = df.fillna(0)
    #cpu使用率，service，timestamp，内存使用率，内存使用量，文件系统写入速率，文件系统读取速率，网络发送速率，网络接受速率
    df.columns = ['cpu_use', 'target', 'timestamp', 'mem_use_percent', 'mem_use_amount', 
                         'file_write_rate',  'file_read_rate', 'net_send_rate', 'file_recieve_rate']
    return df

In [74]:
db_data=all_db(df)
db_data

of pandas will change to not sort by default.

To accept the future behavior, pass 'sort=False'.




Unnamed: 0,cpu_use,target,timestamp,mem_use_percent,mem_use_amount,file_write_rate,file_read_rate,net_send_rate,file_recieve_rate
0,21,ts-travel-service,1567084200,0,1039515648,0,0,133302,43417
1,24,ts-travel-service,1567084260,0,1039568896,0,0,151717,49692
2,29,ts-travel-service,1567084320,0,1040400384,0,0,181574,59484
3,27,ts-travel-service,1567084380,0,1040429056,0,0,161577,52905
4,21,ts-travel-service,1567084440,0,1040379904,0,0,154591,50885
5,28,ts-travel-service,1567084500,0,1040924672,0,0,174548,57405
6,27,ts-travel-service,1567084560,0,1041010688,0,0,164718,54519
7,27,ts-travel-service,1567084620,0,1040584704,0,0,169133,55575
8,26,ts-travel-service,1567084680,0,1040723968,0,0,171663,56244
9,25,ts-travel-service,1567084740,0,1039548416,0,0,128738,42483


# 整合

In [75]:
time_start=1567084200


def time_to_little(data):
    n = (data - time_start * 1000000) // 60000000  
    return 1567084200 + n * 60

def es_time_to_little(df):
    df['timestamp'] = df['timestamp'].apply(time_to_little)#利用字典取分段函数
    df = df[~df['source'].isin(['istio-ingressgateway'])]
    return df

def zhenghe(df,db_data):
    df1 = df.copy()
    es_z = es_time_to_little(df1)
    result = pd.merge(es_z, db_data, on=['timestamp','target'])
    result['timestamp'] = df['timestamp']
    return result

In [76]:
zh=zhenghe(df,db_data)
zh

Unnamed: 0,trace_id,timestamp,latency,http_status,source,target,cpu_use,mem_use_percent,mem_use_amount,file_write_rate,file_read_rate,net_send_rate,file_recieve_rate
0,3e4c92af33630580eca1ec210550f66f,1567084433090689,399404,200,ts-travel-service,ts-ticketinfo-service,20,0,1019973632,0,0,119859,44876
1,6d1bb6ea8133577502127e3401bbec41,1567084434031048,430160,200,ts-ticketinfo-service,ts-ticketinfo-service,20,0,1019973632,0,0,119859,44876
2,3e4c92af33630580eca1ec210550f66f,1567084434067402,897216,200,ts-ticketinfo-service,ts-ticketinfo-service,20,0,1019973632,0,0,119859,44876
3,e5509cc13e65c4f7dd466bd545db0467,1567084434078578,293898,200,ts-travel2-service,ts-ticketinfo-service,20,0,1019973632,0,0,119859,44876
4,e5509cc13e65c4f7dd466bd545db0467,1567084429023205,199451,200,ts-travel-service,ts-ticketinfo-service,20,0,1019973632,0,0,119859,44876
5,6d1bb6ea8133577502127e3401bbec41,1567084429814289,303742,200,ts-ticketinfo-service,ts-ticketinfo-service,20,0,1019973632,0,0,119859,44876
6,e5509cc13e65c4f7dd466bd545db0467,1567084428913277,200645,200,ts-travel2-service,ts-ticketinfo-service,20,0,1019973632,0,0,119859,44876
7,e5509cc13e65c4f7dd466bd545db0467,1567084429792736,2204431,200,ts-travel-service,ts-ticketinfo-service,20,0,1019973632,0,0,119859,44876
8,094ac42cc35dc46b5cdf8558894cb4ba,1567084429557669,2455034,200,ts-ticketinfo-service,ts-ticketinfo-service,20,0,1019973632,0,0,119859,44876
9,89a7ff1bbd0281b4460d9e2e27e47838,1567084427654258,2787051,200,ts-ticketinfo-service,ts-ticketinfo-service,20,0,1019973632,0,0,119859,44876


# 转换为invocation

In [77]:
# 每个trace_id按照endtime降序进行排序，并转换为csv文件
#index需要咋csv中手动删除
def trans_csv(df):
    df['endtime']=df['timestamp']+df['latency']
    grouped = df.groupby('trace_id').apply(lambda x: x.sort_values('endtime', ascending=False))
    grouped['s_t']=grouped['source'].str.cat(grouped['target'],sep='->')
    grouped=grouped.drop(columns=['target','source','trace_id'])
    grouped.fillna(0)
    grouped.to_csv('grouped.csv') 
    #return grouped

#以traceid为单位进行整合
def trans(df):
    for i in df.columns[1:]:
        df[i]=str(list(df[i]))
    df=df.drop_duplicates(['trace_id'])
    return df

#输入csv文件名
def trans_last(zh):
    trans_csv(zh)
    df=pd.read_csv('grouped.csv').drop(columns='Unnamed: 1')
    df1 = df.groupby('trace_id').apply(trans).drop(columns='trace_id')
    df1.to_csv('grouped1.csv')
    df2 = pd.read_csv('grouped1.csv').drop(columns='Unnamed: 1')
    return df2
    

In [78]:
dfn = trans_last(zh)

In [79]:
dfn.head(10).to_dict(orient='records')

[{'trace_id': '00084bf4e31a5a323e2b69e8e2368ec6',
  'timestamp': '[1567084542556010, 1567084517210059, 1567084467728282]',
  'latency': '[12180, 5057, 6323]',
  'http_status': '[200, 200, 200]',
  'cpu_use': '[20.50201196070783, 30.90605216059944, 30.90605216059944]',
  'mem_use_percent': '[0.4042396545410156, 0.40468597412109375, 0.40468597412109375]',
  'mem_use_amount': '[999620608, 986808320, 986808320]',
  'file_write_rate': '[0.0, 0.0, 0.0]',
  'file_read_rate': '[0.0, 0.0, 0.0]',
  'net_send_rate': '[58651.99764598183, 103372.21845062985, 103372.21845062985]',
  'file_recieve_rate': '[43193.50901283812, 70098.86286490156, 70098.86286490156]',
  'endtime': '[1567084542568190, 1567084517215116, 1567084467734605]',
  's_t': "['ts-order-service->ts-order-service', 'ts-station-service->ts-station-service', 'ts-order-service->ts-station-service']"},
 {'trace_id': '00340510346cd70a0ccc9d66eeff4443',
  'timestamp': '[1567084546041882]',
  'latency': '[2760]',
  'http_status': '[200]',
 