## References
- https://janakiev.com/blog/python-filesystem-analysis/

- dependencies:
    - (optional - optional extension in the future: pip install persist-queue[extra] ( https://github.com/peter-wangxu/persist-queue ) 
    - pip install rq ( https://python-rq.org/ ) 
    
- related work
    - https://github.com/cedadev/facet-scanner
    - https://github.com/cedadev/ceda-elasticsearch-tools

# Elastic search ingester 

## Work Plan: 

- implement project readers in pool_tools
   - every reader returns elastic search ingest items
- implement directory walker, generating ingest reqests
   - ingest request is input to project reader
- track index status information in index itself    
- use http://python-rq.org/ to queue ingest items
- use parallel workers to run elastic search ingest items
   - each worker maintains an elesticsearch endpoint


In [1]:
# for development: auto reload of packages
%load_ext autoreload

In [2]:
import os
from pindex import pool_tools

In [3]:
prefix = "/home/stephan/sshik1017/CMIP6/data/CMIP6/"   # mounted via sshfs

ipsl_test = "CMIP/IPSL/IPSL-CM6A-LR/1pctCO2/r1i1p1f1/day"
# prefix = /work/ik1017/            # on mistral 
pool_tools.index("cmip6",prefix+ipsl_test)

Project handler initialized: /home/stephan/sshik1017/CMIP6/data/CMIP6/CMIP/IPSL/IPSL-CM6A-LR/1pctCO2/r1i1p1f1/day
EL connected
Elastic search endpoint:  [{'host': 'localhost', 'port': 9200}]
<Elasticsearch([{'host': 'localhost', 'port': 9200}])>
-- removed existing mapping
-- mapping created
opening:  /home/stephan/sshik1017/CMIP6/data/CMIP6/CMIP/IPSL/IPSL-CM6A-LR/1pctCO2/r1i1p1f1/day/rlus/gr/v20180727/rlus_day_IPSL-CM6A-LR_1pctCO2_r1i1p1f1_gr_18500101-19991231.nc
{'rlus_day_IPSL-CM6A-LR_1pctCO2_r1i1p1f1_gr_18500101-19991231.nc': {'variable': 'rlus', 'table': 'day', 'model': 'IPSL-CM6A-LR', 'experiment': '1pctCO2', 'member': 'r1i1p1f1', 'grid': 'gr', 'time': '18500101-19991231', 'file_name': 'rlus_day_IPSL-CM6A-LR_1pctCO2_r1i1p1f1_gr_18500101-19991231.nc', 'project': 'cmip6', 'dataset_id': '/home/stephan/sshik1017/CMIP6/data/CMIP6/CMIP/IPSL/IPSL-CM6A-LR/1pctCO2/r1i1p1f1/day/rlus/gr/v20180727', 'stime': '18500101', 'etime': '19991231', 'tracking_id': 'hdl:21.14100/d9eb0c49-4ec5-4091-88f

In [2]:
from elasticsearch import Elasticsearch
epar = [{'host': 'localhost', 'port': 9200}]
es = Elasticsearch(epar)

es.indices.delete(index="cmip6",ignore=[400,404])

{'acknowledged': True}

In [8]:
from elasticsearch import Elasticsearch
def connect_elasticsearch():
    _es = None
    _es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
    if _es.ping():
        print('EL connected')
    else:
        print('ERROR: EL not connected')
    return _es

In [9]:
es = connect_elasticsearch()

EL connected


In [19]:
from pindex import config
from pprint import pprint
if es.indices.exists('cmip6'):
   es.indices.delete(index='cmip6')
# index settings
fcmip6 = config.settings.FACETS['cmip6']

# create index
settings = {'mappings': {
      'properties': fcmip6
      }}
pprint(settings)
es.indices.create(index='cmip6', ignore=400, body=settings)

{'mappings': {'properties': {'activity': {'type': 'keyword'},
                             'contact': {'type': 'text'},
                             'dataset_id': {'type': 'text'},
                             'etime': {'type': 'integer'},
                             'experiment': {'type': 'keyword'},
                             'grid': {'type': 'keyword'},
                             'institution': {'type': 'keyword'},
                             'lfile': {'type': 'keyword'},
                             'member': {'type': 'keyword'},
                             'mip_era': {'type': 'keyword'},
                             'model': {'type': 'keyword'},
                             'project': {'type': 'keyword'},
                             'st_atime': {'type': 'float'},
                             'st_ctime': {'type': 'float'},
                             'st_mtime': {'type': 'float'},
                             'st_size': {'type': 'text'},
                             'stime

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'cmip6'}

In [9]:
if es.indices.exists('cmip6'):
   es.indices.delete(index='cmip6')
# index settings
settings = {
    "mappings": {
 
        "properties": {
                "mip-era": {
                    "type": "keyword"
                },
                "activity": {
                    "type": "keyword"
                }, 
             "tracking_id": {
                    "type": "keyword"
                },
                "contact": {
                    "type": "keyword"
                },
                "st_size": {
                    "type": "float"
                },
                "st_atime": {
                    "type": "float"
                }, 
                "st_mtime": {
                    "type": "keyword"
                },
                "st_ctime":{
                    "type": "keyword"
                },
                "file_name":{
                    "type": "keyword"
                },
                "project": {
                    "type": "keyword"
                },
                "dataset_id": {
                    "type": "text"
                },
                "stime": {
                    "type": "integer"
                },
                "etime": {
                    "type": "integer"
                }, 
                     "institution": {
                    "type": "keyword"
                },
                "model": {
                    "type": "keyword"
                },
                "experiment": {
                    "type": "keyword"
                },
                "member": {
                    "type": "keyword"
                },
                "variable": {
                    "type": "keyword"
                },
                "grid": {
                    "type": "keyword"
                },
                "version": {
                    "type": "keyword"
                },
                "lfile": {
                    "type": "keyword"
                },
                "time": {
                    "type": "text"
                },

        }
     }
}
# create index
es.indices.create(index='cmip6', ignore=400, body=settings)


{'acknowledged': True, 'shards_acknowledged': True, 'index': 'cmip6'}

In [None]:
60 * 60 * 3


In [None]:
from persistqueue import Queue
q = Queue("mypath")
q.put('a')
q.put('b')
q.put('c')
q.get()

In [None]:
q.get()

In [None]:
import os
dirpaths  = [f.path for f in os.scandir('/run/media/stephan/Volume/data') if f.is_dir()]
dirpaths

In [None]:
import os
tst = os.stat('/run/media/stephan/Volume/data/hus')
print(tst)

In [None]:
import os

proj = "cmip6"
sh = pool_tools.SHelper(proj=proj, start_dir="/run/media/stephan/Volume/data")
print("SHelper initialized:", sh.start_dir)
es = sh.get_es()
print(es)
walk = os.walk(sh.start_dir)
i = 0

for root, dirs, files in walk:
    if len(files) > 0: 
        print("=============================: ",root,dirs,files)
        res = sh.worker(root, dirs, files)
        # update elastic search
        i_list = []
        for item, i_dict in res.items():
             i += 1
             print("index update: ", i)
             if len(i_dict) > 0:
                index_res = es.index(index=proj, id=i, body=i_dict)
        print("============================")


In [None]:
## bulk update
es = Elasticsearch()
>>> # k is a generator expression that produces
... # a series of dictionaries containing test data.
... # The test data are just letter permutations
... # created with itertools.permutations.
... #
... # We then reference k as the iterator that's
... # consumed by the elasticsearch.helpers.bulk method.
>>> k = ({'_type':'foo', '_index':'test','letters':''.join(letters)}
...      for letters in itertools.permutations(string.letters,2))

>>> # calling k.next() shows examples
... # (while consuming the generator, of course)
>>> # each dict contains a doc type, index, and data (at minimum)
>>> k.next()
{'_type': 'foo', 'letters': 'ab', '_index': 'test'}
>>> k.next()
{'_type': 'foo', 'letters': 'ac', '_index': 'test'}
>>> # create our test index
>>> es.indices.create('test')
{u'acknowledged': True}
>>> helpers.bulk(es,k)
(2650, [])
>>> # check to make sure we got what we expected...
>>> es.count(index='test')

In [4]:
q_string = {"query": {
               "bool": {
                   "must": [
                       {"match": {"variable": "tas"}},
                       {"match": {"model":"GFDL-CM4"}},
                       {"match": {"time":'015101-025012'}}
                                  ]             
                      }
               }
           }
r_string = {"query": {
               "bool": {
                   "must": [
                       {"match": {"stime": "055101"}},
                       {"match": {"model":"GFDL-CM4"}}                   
                                  ]             
                      }
               }
           }

r_string = {"query": {
               "bool": {
                   "filter": [
                       {"term": {"etime":'19991231'}}
                   ]
                      }
               }
           }

t_string = {"query": {
              "filter" : [
                 {"term" : { "etime": '19991231'}}
              ]
            }
           }


tr_id_string =  {"query": {
               "bool": {
                   "filter": [
                       {"term": {"tracking_id":"hdl:21.14100/bb6bcc55-8d61-4622-b85f-818b37f5dd2b"}}
                   ]
                      }
               }
           }



In [6]:
import pprint
pp=pprint.PrettyPrinter(indent=4)
res = es.search(index="cmip6", body=tr_id_string)
pp.pprint(res['hits']['hits'])

NameError: name 'es' is not defined

In [None]:
import redis

r = redis.Redis(password='prolog1..')


In [None]:
r.set('foo', 'bar')
value = r.get('foo')
print(value)

In [None]:
r.save()