In [81]:
import re
import datetime
import csv

import numpy as np
import pandas as pd

from elasticsearch import Elasticsearch, helpers

pd.options.display.max_colwidth = 100
pd.set_option('display.max_rows', 300)

In [2]:
es = Elasticsearch([{'host': 'hcc-metrics.unl.edu', 'port': 9200}])

In [3]:
indices = es.cat.indices(index="cms-*", h="index", request_timeout=600).split('\n')
indices = sorted(indices)
indices = [x for x in indices if x != '']

In [4]:
def time_filter(indices, days=0, until=0):
    if days == 0:
        return ["cms-*"]
    today = datetime.date.today()
    filtered = []
    datefmt = '%Y-%m-%d'
    for i in indices:
        day = re.sub(r'cms-', '', i).rstrip()
        day = datetime.datetime.strptime(day, datefmt).date()
        diff = today - day
        if until <= diff.days < days + until:
            filtered.append(i.rstrip())
    return filtered

In [5]:
# Change second argument to use a different time range for ES queries
ind = time_filter(indices, 90, 0)
ind = ','.join(ind)

In [10]:
s = {
    "size": 10000,
    "_source": ["CMS_JobType", "WMAgent_RequestName", "WMAgent_SubTaskName", "WMAgent_TaskType"],
    "query": {
        "bool": {
            "filter": {
                "exists": {"field": "WMAgent_RequestName"}
            }
        }
    },
    "aggs": {
        "req": {
            "terms": {
                "field": "WMAgent_RequestName",
                "size": 0,
            },
            "aggs": {
                "task": {
                    "terms": {
                        "field": "WMAgent_SubTaskName",
                        "size": 0,
                    },
                    "aggs": {
                        "ttype": {
                            "terms": {
                                "field": "WMAgent_TaskType",
                                "size": 0
                            },
                            "aggs": {
                                "jtype": {
                                    "terms": {
                                        "field": "CMS_JobType",
                                        "size": 0
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}
res = es.search(index=ind, body=s, request_timeout=1200)

In [12]:
with open('wmagent.csv', 'w') as f:
    writer = csv.writer(f)
    writer.writerow(['req', 'task', 'ttype', 'jtype', 'njobs'])
    for b1 in res['aggregations']['req']['buckets']:
        req = b1['key']
        for b2 in b1['task']['buckets']:
            task = b2['key']
            for b3 in b2['ttype']['buckets']:
                ttype = b3['key']
                for b4 in b3['jtype']['buckets']:
                    jtype = b4['key']
                    njobs = b4['doc_count']
                writer.writerow([req, task, ttype, jtype, njobs])

In [16]:
df = pd.read_csv('wmagent.csv')

In [66]:
def tasktype(row):
    req = row['req']
    task = row['task']
    ttype = row['ttype']
    jtype = row['jtype'].lower()
    ttype = ttype.lower()

    if "cleanupunmerged" in ttype:
        tt = "Cleanup"
    elif "dataprocessing" in ttype:
        tt = "DataProcessing"
    elif "merge" in ttype:
        tt = "Merge"
    elif "logcollect" in ttype:
        tt = 'LogCollect'
    elif "express" in ttype:
        tt = "Express"
    elif "reco" in ttype and "promptreco" in task:
        tt = "PromptReco"
    elif "repack" in ttype:
        tt = "Repack"
    elif "alcaskim" in ttype:
        tt = "AlcaSkim"
    elif ("miniaod" in req.lower()) and (ttype == "steponeproc"):
        tt = "MINIAOD"
    elif ttype == "steponeproc":
        tt = 'DIGIRECO'
    elif ttype == "steptwoproc":
        tt = 'RECO'
    elif ttype == "stepthreeproc":
        tt = 'MINIAODSIM'
    elif "miniaodv" in ttype:
        tt = 'MINIAOD'
    elif ("gs-" in req.lower()) and ttype.endswith("_0"):
        tt = 'GENSIM'
    elif ttype.endswith("_0"):
        tt = 'DIGI'
    elif "digifull" in ttype:
        tt = 'DIGIFULL'
    elif "digihi" in ttype:
        tt = 'DIGIHI'
    elif ttype.endswith("_1"):
        tt = 'RECO'
    elif "recoprod" in ttype:
        tt = 'RECOPROD'
    elif "recohi" in ttype:
        tt = 'RECOHI'
    elif "recofull" in ttype:
        tt = 'RECOFULL'
    elif ttype == "montecarlofromgen":
        tt = 'GENSIM'
    elif "gensim" in ttype:
        tt = 'GENSIM'
    elif (ttype == 'production') and ("gs-" in req.lower()):
        tt = 'GENSIM'
    elif (ttype == 'production') and ("lhe-" in req.lower()):
        tt = 'LHE'
    elif (ttype == 'production') and ("premix-" in req.lower()):
        tt = 'PREMIX'
    elif "premix" in ttype:
        tt = 'PREMIX'
    elif "validation" in req.lower():
        tt = "Validation"
    elif "relval" in req.lower():
        tt = "RelVal"
    elif "dqm" in ttype:
        tt = "DQM"
    elif "skim" in ttype:
        tt = "Skim"
    elif ttype == "production":
        tt = "GENSIM"
    else:
        tt = ttype
    return tt
    

In [216]:
def tasktype2(row):
    req = row['req'].lower()
    task = row['task'].lower()
    ttype = row['ttype'].lower()
    jtype = row['jtype'].lower()

    if 'cleanupunmerged' in ttype:
        tt = 'Cleanup'
    elif 'logcollect' in ttype:
        tt = 'LogCollect'
    elif ttype == 'repack':
        tt = 'Repack'
    elif ttype.startswith('skim'):
        tt = 'Skim'
    elif 'harvest' in ttype:
        tt = 'Harvesting'
    elif 'merge' in ttype:
        tt = 'Merge'
    elif ttype.startswith('express'):
        tt = 'Express'
    elif ttype == 'alcaskim':
        tt = 'Alcaskim'
    elif 'gensim' in ttype:
        tt = 'GENSIM'
    elif ttype == 'montecarlofromgen':
        tt = 'GENSIM'
    elif ttype == 'gen':
        tt = 'GEN'
    elif ttype == 'reco':
        tt = 'RECO'
    elif ttype == 'dataprocessing':
        tt = 'RECO'
    elif ttype.startswith("reco"):
        tt = 'DIGI'
    elif ttype.endswith("_0"):
        tt = 'DIGI'
    elif ttype.endswith("_1"):
        tt = 'DIGIRECO'
    elif ttype.startswith("digi"):
        tt = 'DIGI'
    elif ttype == 'steponeproc':
        tt = 'DIGI'
    elif ttype == 'steptwoproc':
        tt = 'DIGIRECO'
    elif ttype == 'stepthreeproc':
        tt = 'DIGIRECOMINIAOD'
    elif 'miniaod' in ttype:
        tt = 'MINIAOD'
    elif ttype == 'production':
        tt = 'GENSIM'
    elif 'hlt' in ttype:
        tt = 'RECO'
    elif "validation" in task:
        tt = "Validation"
    elif "relval" in task:
        tt = "RelVal"
    elif "fastsim" in task:
        tt = "FastSim"
    elif "premix" in task:
        tt = "Premix"
    elif "minbias" in task:
        tt = "MinBias"
    else:
        tt = "Other"

    return tt
    

In [217]:
df['tasktype'] = df.apply(lambda row: tasktype2(row), axis=1)

In [218]:
grouped = df.groupby(['tasktype'], as_index=False)
a = grouped.agg({'njobs': 'sum'})

In [219]:
a.sort_values('njobs', ascending=False)

Unnamed: 0,tasktype,njobs
8,GENSIM,16848132
12,Merge,3805694
2,DIGI,3432400
16,RECO,3418075
10,LogCollect,1832861
1,Cleanup,1110288
5,Express,601932
3,DIGIRECO,524678
18,Repack,506466
0,Alcaskim,235585


Unfortunately several task types escape classification, namely a large number of 'production' jobs, 'steptwoproc', 'stepthreeproc', 'digiup15', etc.

It is my guess that 'steptwoproc' is a DIGI (or better a ReDigi), looking at the WmAgent code.

Similarly, 'dataprocessing' should be RECO.

In [20]:
print df[df.ttype == 'steponeproc'][:10]

                                                                         req  \
110        pdmvserv_tsg-runiisummer16dr80-00016_00028_v0__161103_145128_8040   
151        pdmvserv_tsg-runiisummer16dr80-00003_00030_v1__161103_162133_1286   
237         pdmvserv_tsg-runiisummer16dr80-00004_00013_v0__161029_004151_538   
524  pdmvserv_btv-runiisummer16dr80premix-00010_00006_v0__161106_201423_2069   
637        pdmvserv_btv-runiisummer16dr80-00011_00029_v1__161103_161302_8989   
674  pdmvserv_top-runiisummer16dr80premix-00010_00015_v0__161106_205404_3279   
677        pdmvserv_btv-runiisummer16dr80-00001_00029_v1__161103_161122_2049   
713        pdmvserv_hig-runiispring16dr80-01830_00657_v0__160905_161208_8622   
721  diego_top-runiisummer16dr80-backfill-00001_00001_v0__160823_175035_3427   
725        pdmvserv_sus-runiisummer16dr80-00001_00029_v1__161103_161739_5368   

                                                                                     task  \
110        /pdmvserv_tsg-r