Skip to content
This repository has been archived by the owner on Sep 30, 2021. It is now read-only.

Commit

Permalink
use guid, include status in response
Browse files Browse the repository at this point in the history
  • Loading branch information
klahnakoski committed Dec 16, 2019
1 parent 254eb1b commit 6bcb4ed
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions vendor/jx_elasticsearch/es52/bulk_aggs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from jx_elasticsearch.es52.aggs import build_es_query
from jx_elasticsearch.es52.format import format_list_from_groupby
from mo_dots import listwrap, unwrap, Null, wrap, coalesce, unwraplist
from mo_files import TempFile, URL, mimetype, File
from mo_files import TempFile, URL, mimetype
from mo_future import first, is_text
from mo_logs import Log, Except
from mo_math.randoms import Random
Expand Down Expand Up @@ -98,12 +98,12 @@ def es_bulkaggsop(esq, frum, query):
Log.error("Requesting more than {{num}} partitions", num=num_partitions)

acc, decoders, es_query = build_es_query(selects, query_path, schema, query)
filename = Random.base64(32, extra="-_") + ".json"
guid = Random.base64(32, extra="-_")

Thread.run(
"extract to " + filename,
"extract to " + guid+".json",
extractor,
filename,
guid,
num_partitions,
esq,
query,
Expand All @@ -117,7 +117,8 @@ def es_bulkaggsop(esq, frum, query):

output = wrap(
{
"url": URL_PREFIX / filename,
"url": URL_PREFIX / guid+".json",
"status": URL_PREFIX / guid+".status.json",
"meta": {
"format": "list",
"timing": {"cardinality_check": cardinality_check.duration},
Expand All @@ -131,7 +132,7 @@ def es_bulkaggsop(esq, frum, query):


def extractor(
filename,
guid,
num_partitions,
esq,
query,
Expand All @@ -150,7 +151,7 @@ def extractor(

try:
write_status(
filename,
guid,
{
"status": "starting",
"chunks": num_partitions,
Expand Down Expand Up @@ -192,7 +193,7 @@ def extractor(
break
else:
write_status(
filename,
guid,
{
"status": "working",
"chunk": i,
Expand All @@ -207,9 +208,9 @@ def extractor(
break
output.write(b"\n]\n")

upload(filename, temp_file)
upload(guid+".json", temp_file)
write_status(
filename,
guid,
{
"ok": True,
"status": "done",
Expand All @@ -223,7 +224,7 @@ def extractor(
except Exception as e:
e = Except.wrap(e)
write_status(
filename,
guid,
{
"ok": False,
"status": "error",
Expand Down Expand Up @@ -254,9 +255,9 @@ def upload(filename, temp_file):
)


def write_status(filename, status):
def write_status(guid, status):
try:
filename = File(filename).set_extension("status.json").filename
filename = guid + ".status.json"
with Timer("upload status to S3 {{file}}", param={"file": filename}):
try:
connection = Connection(S3_CONFIG).connection
Expand Down

0 comments on commit 6bcb4ed

Please sign in to comment.