In [6]:
from elasticsearch import Elasticsearch, helpers
from pathlib import Path
import time
import gcomc_reader

es = Elasticsearch("http://es:9200")
filename_identifier = 'LST'
product = 'LST'
source_path = "../source"

In [7]:
product_index = product.lower()
# es.indices.delete(product_index) #indexを作り直す場合 

if not es.indices.exists(index=product_index):
    es.indices.create(index=product_index, body={
        "mappings": {
            "properties": {
                "granule_id": {
                  "type": "keyword"
                },
                "location": {
                  "type": "geo_point"
                },
                "value": {
                    "type": "float"
                },
                "observation_datetime": {
                  "type": "date"
                },
                "qa_flags": {
                    "type": "short"
                }
            }
        }
    })

In [8]:
process_index = 'process'
# es.indices.delete(process_index) #indexを作り直す場合 

if not es.indices.exists(index=process_index):
    es.indices.create(index=process_index, body={
        "mappings": {
            "properties": {
                "location": {
                  "type": "geo_shape"
                },
                "start_datetime": {
                  "type": "date"
                },
                "end_datetime": {
                  "type": "date"
                },
                "processed_datetime": {
                  "type": "date"
                },
                "good": {
                    "type": "boolean"
                },
                "algorithm_version": {
                    "type": "keyword"
                },
                "product": {
                    "type": "keyword"
                },
                "done": {
                    "type": "boolean"
                }
            }
        }
    })

In [9]:
p = Path(source_path)

for file_path in p.glob("*{}*.h5".format(filename_identifier)):
    r = gcomc_reader.Tile(str(file_path))

    processed = es.search(index=process_index, body={"query":{"bool":{"must":[{"term":{"_id":r.granule_id}},{"term":{"done":True}}]}}})
    if processed['hits']['total']['value'] == 0 and r.resolution == 'Q':
        print("start: {}".format(r.granule_id))
        start = time.time()
        es.index(index=process_index, id=r.granule_id, body={
            "location": {
                "type" : "polygon",
                "coordinates" : [
                  [r.lower_left, r.lower_right, r.upper_right, r.upper_left, r.lower_left]
                ]
            },
            "start_datetime": r.start_time,
            "end_datetime": r.end_time,
            "processed_datetime": r.processed,
            "good": r.is_good(),
            "algorithm_version": r.algorithm_version,
            "product": r.product,
            "done": False
        })

        if r.is_good() == True:
            items = []
            for line in range(r.lines):
                for pixel in range(r.pixels):
                    try:
                        item = r.get_point(line, pixel)
                        if item['value'] is not None and item['observation_datetime'] is not None:
                            item['granule_id'] = r.granule_id
                            id = '{}_{}_{}'.format(r.granule_id, line, pixel)
                            items.append({'_op_type': 'index', '_index':product_index, '_id':id, '_source':item})

                            if len(items)>10000:
                                helpers.bulk(es, items)
                                items = []
                    except:
                        pass

            if len(items)>0:
                helpers.bulk(es, items)
        es.update(index=process_index, id=r.granule_id, body={"doc": {"done": True}})
        print("done: {}[s]".format(time.time() - start))
    else:
        print("passed: {}".format(r.granule_id))

True
passed: GC1SG1_20200925A01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200917D01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200919D01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200917A01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200919A01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200920D01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200920A01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200923A01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200923D01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200922A01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200922D01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200921D01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200921A01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200918D01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200916D01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200918A01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200916A01D_T0221_L2SG_LST_Q_2000
True
passed: GC1SG1_20200915A01D_T0221_L2SG_LST_Q_2000
True
passe

In [10]:
es.count(index=product_index)

{'count': 179817666,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}

In [11]:
es.count(index=process_index)

{'count': 21,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}

In [12]:
print("created points: {}".format(es.count(index=product_index)))
print("processed files: {}".format(es.count(index=process_index)))

created points: {'count': 179817666, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}
processed files: {'count': 21, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}
