# Extracts jobs and traces data


In [1]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
import pandas as pd
import numpy as np

es = Elasticsearch(['atlas-kibana.mwt2.org:9200'], timeout=60)

### select time period

In [2]:
start_date = '2018-08-01 00:00:00'
end_date = '2018-08-10 00:00:00'

dataset = 'managed_AUG_1'

print("start:", start_date, "end:", end_date)
start = int(pd.Timestamp(start_date).timestamp())*1000
end = int(pd.Timestamp(end_date).timestamp())*1000

print("start:", start, "end:", end)

start: 2018-08-01 00:00:00 end: 2018-08-10 00:00:00
start: 1533081600000 end: 1533859200000


### finds wall time, cores for all jobs.


In [3]:
job_query = {
    "_source": ["actualcorecount", "wall_time", "pandaid", "inputfiletype", "creationtime","prodsourcelabel"],
    'query': {
        'bool': {
            'must': [
                {'term': {'prodsourcelabel': 'managed'}},
                {'range': {'creationtime': {'gte': start, 'lt': end}}}
            ]
        }
    }
}

scroll = scan(client=es, index="jobs", query=job_query)
count = 0
requests = []
for res in scroll:
    r = res['_source']
    requests.append([r['pandaid'], r['wall_time'], r['actualcorecount'], r['inputfiletype']])

    if not count % 100000:
        print(count)
#     if count>300:
#         break
    count = count + 1


SyntaxError: invalid syntax (<ipython-input-3-171e1ac26f71>, line 18)

In [None]:
all_jobs = pd.DataFrame(requests).sort_values(0)
all_jobs.columns = ['pandaid', 'wall_time', 'cores', 'type']
minpid = int(all_jobs.pandaid.min())
maxpid = int(all_jobs.pandaid.max())
print("jobs:", all_jobs.shape[0], "min", minpid, "max", maxpid)

### select traces


In [None]:
trace_query = {
    "_source": ["time_start", "time_end",  "event", "scope", "filename", "filesize", "pandaID"],
    'query': {
        'bool': {
            'must': [
                {'range': {'pandaID': {'gte': minpid, 'lt': maxpid}}},
                {'exists': {"field": "filename"}},
                {'exists': {"field": "pandaID"}},
                # {'wildcard': {'event': 'get_sm*'}},
                {'term': {'event': 'get_sm'}}
                # {'term': {'event': 'get_sm_a'}},
                # {'term': {'event': 'download'}},
            ]
        }
    }
}

### Does scan


In [None]:
scroll = scan(client=es, index="traces", query=trace_query)
count = 0
requests = []
for res in scroll:
    r = res['_source']
    requests.append([r['scope'] + ':' + r['filename'], r['filesize'], r['time_start'], r['pandaID']])

    if not count % 100000:
        print(count)
#     if count>300:
#         break
    count = count + 1

all_accesses = pd.DataFrame(requests).sort_values(3)
all_accesses.columns = ['filename', 'filesize', 'transfer_start', 'pandaid']

In [None]:
pids = all_accesses.pandaid.unique()
print("Unique PandaIDs", len(pids), 'traces loaded', len(requests), 'jobs not accounted', (1-len(pids)/all_jobs.shape[0])*100, "%")


In [None]:
all_accesses.set_index('pandaid', drop=True, inplace=True)
all_jobs.set_index('pandaid', drop=True, inplace=True)

all=all_accesses.join(all_jobs,how='inner')
all.describe()


In [None]:
print(all.head())
all.to_hdf(dataset + '.h5', key="prod", mode='w', complevel=1)

In [None]:
print('traces:',all_accesses.shape[0])
print('jobs:',all_jobs.shape[0])
print('merged:',all.shape[0])
print('Done.')