In [1]:
import sys
from datetime import datetime
import json
from elasticsearch import Elasticsearch, exceptions as es_exceptions
from elasticsearch.helpers import scan
import pandas as pd

I like to keep Elasticsearch connection configuration in a json file.
It should look like this:
```json
{
    "ES_HOST": "atlas-kibana.mwt2.org",
    "ES_USER": "ivukotic",
    "ES_PASS": "xxxxxxxx"
}
```

In [None]:
with open('config.json') as json_data:
    config = json.load(json_data,)

In [None]:
ind = 'xc-gstream' # this is index that contains the data. Can be alias.
KB=1024
MB=KB*1024
GB=MB*1024
TB=GB*1024

Actually connecting to ES.

In [None]:
es = Elasticsearch(
    hosts=[{'host': config['ES_HOST'], 'scheme':'https', 'port':9200}],
    basic_auth=(config['ES_USER'], config['ES_PASS']),
    request_timeout=60)

if es.ping():
    print('connected to ES.')
else:
    print('no connection to ES.')
    sys.exit(1)

Just to limit time range...

In [None]:
start_int = '2022-06-01 00:00'
stop_int = '2022-07-01 00:00'
start_dt = datetime.strptime(start_int, '%Y-%m-%d %H:%M')
stop_dt = datetime.strptime(stop_int, '%Y-%m-%d %H:%M')
print('start:', start_dt, '\nstop: ', stop_dt)

An example query. 
\_source determins what columns will be returned. Without it all the columns will be returned (possibly slow).


In [None]:
query={
    "_source": ['host', 'access_cnt','lfn', 'size', 'b_bypass', 'b_hit', 'b_miss', 'n_blks', 'n_blks_done'],
    "query": {
        "range": {
            "@timestamp": {
                "gt": int(start_dt.timestamp()*1000),
                "lte": int(stop_dt.timestamp()*1000)
            }
        }
    }
}

Here data is actually read, additionally filtered, and placed in lists that will be turned into a padas dataframe.

In [None]:
data = {'access':[], 'site':[], 'host':[], 'lfn':[], 'scope':[], 'fn':[], 
'b_hit':[], 'b_miss':[], 'b_bypass':[], 'fsize':[], 'fill':[]}

docs_read = 0
docs_skipped = 0
docs_bad=0

scroll = scan(client=es, index=ind, query=query, timeout="5m")

for res in scroll:
    docs_read += 1
    if not docs_read % 50000:
        print('docs read', docs_read)
        
    # print('res',res)
    
    if not 'host' in res['_source']:
        print('bad document:', res)
        docs_bad += 1
        continue
        
    site = res['_source']['site']
    if site=='UC-AF':
        docs_skipped+=1
        continue
        
    lfn=res['_source']['lfn']
    t=lfn.split('/')
    if len(t[-2])!=2 or len(t[-3])!=2:
        print('problematic lfn:', lfn)
        docs_bad+=1
        continue
        
    data['site'].append(site)
    data['lfn'].append(lfn)
    data['host'].append(res['_source']['host'])
    data['access'].append(res['_source']['access_cnt'])
    if t[-5]=='user':
        t[-4]='user.'+t[-4]
    data['scope'].append(t[-4])
    data['fn'].append(t[-1])
    data['b_hit'].append(res['_source']['b_hit'])
    data['b_miss'].append(res['_source']['b_miss'])
    data['b_bypass'].append(res['_source']['b_bypass'])
    data['fsize'].append(res['_source']['size'])
    data['fill'].append(res['_source']['n_blks_done']/res['_source']['n_blks'])
    
    # print('data',data)
    # break
print('read:', docs_read, 'bad:', docs_bad, 'skipped:', docs_skipped)

Making and saving pandas dataframe.

In [None]:
df= pd.DataFrame.from_dict(data)
df.to_parquet(f'data/xcache_{start_int.split()[0]}.parquet')  
df.head()