diff --git a/vendor/jx_elasticsearch/es52/bulk_aggs.py b/vendor/jx_elasticsearch/es52/bulk_aggs.py index 2ebe0d078..f0edb5fea 100644 --- a/vendor/jx_elasticsearch/es52/bulk_aggs.py +++ b/vendor/jx_elasticsearch/es52/bulk_aggs.py @@ -18,7 +18,7 @@ from jx_elasticsearch.es52.aggs import build_es_query from jx_elasticsearch.es52.format import format_list_from_groupby from mo_dots import listwrap, unwrap, Null, wrap, coalesce, unwraplist -from mo_files import TempFile, URL, mimetype, File +from mo_files import TempFile, URL, mimetype from mo_future import first, is_text from mo_logs import Log, Except from mo_math.randoms import Random @@ -98,12 +98,12 @@ def es_bulkaggsop(esq, frum, query): Log.error("Requesting more than {{num}} partitions", num=num_partitions) acc, decoders, es_query = build_es_query(selects, query_path, schema, query) - filename = Random.base64(32, extra="-_") + ".json" + guid = Random.base64(32, extra="-_") Thread.run( - "extract to " + filename, + "extract to " + guid+".json", extractor, - filename, + guid, num_partitions, esq, query, @@ -117,7 +117,8 @@ def es_bulkaggsop(esq, frum, query): output = wrap( { - "url": URL_PREFIX / filename, + "url": URL_PREFIX / guid+".json", + "status": URL_PREFIX / guid+".status.json", "meta": { "format": "list", "timing": {"cardinality_check": cardinality_check.duration}, @@ -131,7 +132,7 @@ def es_bulkaggsop(esq, frum, query): def extractor( - filename, + guid, num_partitions, esq, query, @@ -150,7 +151,7 @@ def extractor( try: write_status( - filename, + guid, { "status": "starting", "chunks": num_partitions, @@ -192,7 +193,7 @@ def extractor( break else: write_status( - filename, + guid, { "status": "working", "chunk": i, @@ -207,9 +208,9 @@ def extractor( break output.write(b"\n]\n") - upload(filename, temp_file) + upload(guid+".json", temp_file) write_status( - filename, + guid, { "ok": True, "status": "done", @@ -223,7 +224,7 @@ def extractor( except Exception as e: e = Except.wrap(e) write_status( - filename, + guid, { "ok": False, "status": "error", @@ -254,9 +255,9 @@ def upload(filename, temp_file): ) -def write_status(filename, status): +def write_status(guid, status): try: - filename = File(filename).set_extension("status.json").filename + filename = guid + ".status.json" with Timer("upload status to S3 {{file}}", param={"file": filename}): try: connection = Connection(S3_CONFIG).connection