Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 44 additions & 42 deletions elastic_datashader/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,18 @@ def convert_nm_to_ellipse_units(distance: float, units: str) -> float:
# NB. assume "majmin_m" if any others
return distance * 1852

def get_field_type(elastic_hosts: str,headers: Optional[str],params: Dict[str, Any],field:str,idx: str) -> str:
def get_field_type(elastic_hosts: str, headers: Optional[str], params: Dict[str, Any], field: str, idx: str) -> str:
user = params.get("user")
x_opaque_id = params.get("x-opaque-id")
es = Elasticsearch(
elastic_hosts.split(","),
verify_certs=False,
timeout=900,
headers=get_es_headers(headers, user,x_opaque_id),
headers=get_es_headers(headers, user, x_opaque_id),
)
mappings = es.indices.get_field_mapping(fields=field,index=idx)
#{'foot_prints': {'mappings': {'foot_print': {'full_name': 'foot_print', 'mapping': {'foot_print': {'type': 'geo_shape'}}}}}}
index = list(mappings.keys())[0] #if index is my_index* it comes back as my_index
mappings = es.indices.get_field_mapping(fields=field, index=idx)
# {'foot_prints': {'mappings': {'foot_print': {'full_name': 'foot_print', 'mapping': {'foot_print': {'type': 'geo_shape'}}}}}}
index = list(mappings.keys())[0] # if index is my_index* it comes back as my_index
return mappings[index]['mappings'][field]['mapping'][field]['type']

def get_search_base(
Expand Down Expand Up @@ -199,7 +199,7 @@ def get_search_base(
elastic_hosts.split(","),
verify_certs=False,
timeout=900,
headers=get_es_headers(headers, user,x_opaque_id),
headers=get_es_headers(headers, user, x_opaque_id),
)

# Create base search
Expand All @@ -219,13 +219,13 @@ def get_search_base(
if time_range and time_range[timestamp_field]:
base_s = base_s.filter("range", **time_range)

#filter the ellipse search range in the data base query so the legen matches the tiles
if params.get('render_mode',"") =="ellipses":
units = convert_nm_to_ellipse_units(params['search_nautical_miles'],params['ellipse_units'])
search_range = {params["ellipse_major"]:{"lte":units}}
base_s = base_s.filter("range",**search_range)
search_range = {params["ellipse_minor"]:{"lte":units}}
base_s = base_s.filter("range",**search_range)
# filter the ellipse search range in the data base query so the legen matches the tiles
if params.get('render_mode', "") =="ellipses":
units = convert_nm_to_ellipse_units(params['search_nautical_miles'], params['ellipse_units'])
search_range = {params["ellipse_major"]: {"lte": units}}
base_s = base_s.filter("range", **search_range)
search_range = {params["ellipse_minor"]: {"lte": units}}
base_s = base_s.filter("range", **search_range)

# Add lucene query
if lucene_query:
Expand Down Expand Up @@ -355,14 +355,14 @@ def build_dsl_filter(filter_inputs) -> Optional[Dict[str, Any]]:
filter_key = f.get("meta", {}).get("key")
if f.get("meta", {}).get("negate"):
if filter_key == "query":
filter_dict["must_not"].append( { "bool": f.get(filter_key).get("bool") } )
filter_dict["must_not"].append({"bool": f.get(filter_key).get("bool")})
else:
filter_dict["must_not"].append( { filter_key: f.get(filter_key) } )
filter_dict["must_not"].append({filter_key: f.get(filter_key)})
else:
if filter_key == "query":
filter_dict["filter"].append( { "bool": f.get(filter_key).get("bool") } )
filter_dict["filter"].append({"bool": f.get(filter_key).get("bool")})
else:
filter_dict["filter"].append( { filter_key: f.get(filter_key) } )
filter_dict["filter"].append({filter_key: f.get(filter_key)})

else:
raise ValueError("unsupported filter type {}".format(f.get("meta").get("type"))) # pylint: disable=C0209
Expand All @@ -389,7 +389,7 @@ def load_datashader_headers(header_file_path_str: Optional[str]) -> Dict[Any, An

return loaded_yaml

def get_es_headers(request_headers=None, user=None,x_opaque_id=None):
def get_es_headers(request_headers=None, user=None, x_opaque_id=None):
"""

:param request_headers:
Expand Down Expand Up @@ -420,15 +420,17 @@ def get_es_headers(request_headers=None, user=None,x_opaque_id=None):
return result

def parse_duration_interval(interval):
durations = {"days":"d",
"minutes":"m",
"hours":"h",
"weeks":"w",
"months":"M",
#"quarter":"q", dateutil.relativedelta doesn't handle quarters
"years":"y"}
durations = {
"days": "d",
"minutes": "m",
"hours": "h",
"weeks": "w",
"months": "M",
# "quarter": "q", dateutil.relativedelta doesn't handle quarters
"years": "y",
}
kwargs = {}
for key,value in durations.items():
for key, value in durations.items():
if interval[len(interval)-1] == value:
kwargs[key] = int(interval[0:len(interval)-1])
return relativedelta(**kwargs)
Expand Down Expand Up @@ -524,11 +526,11 @@ def geotile_bucket_to_lonlat(bucket):
if hasattr(bucket, "centroid"):
lon = bucket.centroid.location.lon
lat = bucket.centroid.location.lat
elif hasattr(bucket.key,'grids'):
z, x, y = [ int(x) for x in bucket.key.grids.split("/") ]
elif hasattr(bucket.key, 'grids'):
z, x, y = [int(x) for x in bucket.key.grids.split("/")]
lon, lat = mu.center(x, y, z)
else:
z, x, y = [ int(x) for x in bucket.key.split("/") ]
z, x, y = [int(x) for x in bucket.key.split("/")]
lon, lat = mu.center(x, y, z)
return lon, lat

Expand Down Expand Up @@ -571,7 +573,7 @@ def get_nested_field_from_hit(hit, field_parts: List[str], default=None):
raise ValueError("field must be provided")

def chunk_iter(iterable, chunk_size):
chunks = [ None ] * chunk_size
chunks = [None] * chunk_size
i = -1
for i, v in enumerate(iterable):
idx = (i % chunk_size)
Expand All @@ -581,14 +583,14 @@ def chunk_iter(iterable, chunk_size):
chunks[idx] = v

if i >= 0:
last_written_idx =( i % chunk_size)
last_written_idx = (i % chunk_size)
yield (False, chunks[0:last_written_idx+1])

def bucket_noop(bucket,search):
def bucket_noop(bucket, search):
# pylint: disable=unused-argument
return bucket
class Scan:
def __init__(self, searches, inner_aggs=None,field=None,precision=None, size=10, timeout=None,bucket_callback=bucket_noop):
def __init__(self, searches, inner_aggs=None, field=None, precision=None, size=10, timeout=None, bucket_callback=bucket_noop):
self.field = field
self.precision = precision
self.searches = searches
Expand Down Expand Up @@ -616,29 +618,29 @@ def execute(self):
self.total_took = 0
self.aborted = False

def run_search(s,**kwargs):
def run_search(s, **kwargs):
_timeout_at = kwargs.pop("timeout_at", None)
if _timeout_at:
_time_remaining = _timeout_at - int(time.time())
s = s.params(timeout=f"{_time_remaining}s")
if self.field and self.precision:
s.aggs.bucket("comp", "geotile_grid", field=self.field,precision=self.precision,size=self.size)
#logger.info(json.dumps(s.to_dict(),indent=2,default=str))
s.aggs.bucket("comp", "geotile_grid", field=self.field, precision=self.precision, size=self.size)
# logger.info(json.dumps(s.to_dict(), indent=2, default=str))
return s.execute()

timeout_at = None
if self.timeout:
timeout_at = int(time.time()) + self.timeout
for search in self.searches:
response = run_search(search,timeout_at=timeout_at)
response = run_search(search, timeout_at=timeout_at)
self.num_searches += 1
self.total_took += response.took
self.total_shards += response._shards.total # pylint: disable=W0212
self.total_skipped += response._shards.skipped # pylint: disable=W0212
self.total_successful += response._shards.successful # pylint: disable=W0212
self.total_failed += response._shards.failed # pylint: disable=W0212
for b in response.aggregations.comp.buckets:
b = self.bucket_callback(b,self)
b = self.bucket_callback(b, self)
yield b


Expand Down Expand Up @@ -735,19 +737,19 @@ def get_tile_categories(base_s, x, y, z, geopoint_field, category_field, size):
cat_s = cat_s.params(size=0)
cat_s = cat_s.filter("geo_bounding_box", **{geopoint_field: bb_dict})
cat_s.aggs.bucket("categories", "terms", field=category_field, size=size)
cat_s.aggs.bucket("missing", "filter", bool={ "must_not" : { "exists": { "field": category_field } } })
cat_s.aggs.bucket("missing", "filter", bool={"must_not" : {"exists": {"field": category_field}}})
response = cat_s.execute()
if hasattr(response.aggregations, "categories"):
for category in response.aggregations.categories:
# this if prevents bools from using 0/1 instead of true/false
if hasattr(category, "key_as_string"):
category_filters[str(category.key)] = { "term": {category_field: category.key_as_string} }
category_filters[str(category.key)] = {"term": {category_field: category.key_as_string}}
else:
category_filters[str(category.key)] = { "term": {category_field: category.key} }
category_filters[str(category.key)] = {"term": {category_field: category.key}}
category_legend[str(category.key)] = category.doc_count
category_legend["Other"] = response.aggregations.categories.sum_other_doc_count
if hasattr(response.aggregations, "missing") and response.aggregations.missing.doc_count > 0:
category_filters["N/A"] = { "bool": { "must_not" : { "exists": { "field": category_field } } } }
category_filters["N/A"] = {"bool": {"must_not" : {"exists": {"field": category_field}}}}
category_legend["N/A"] = response.aggregations.missing.doc_count

return category_filters, category_legend
50 changes: 25 additions & 25 deletions elastic_datashader/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ def create_default_params() -> Dict[str, Any]:
"track_connection": None,
"use_centroid": False,
"user": None,
"bucket_min":0,
"bucket_max":1,
"timeOverlap":False,
"timeOverlapSize":"auto"
"bucket_min": 0,
"bucket_max": 1,
"timeOverlap": False,
"timeOverlapSize": "auto"
}


Expand Down Expand Up @@ -293,10 +293,10 @@ def extract_parameters(headers: Dict[Any, Any], query_params: Dict[Any, Any]) ->
params["geopoint_field"] = query_params.get("geopoint_field", params["geopoint_field"])
params["timestamp_field"] = query_params.get("timestamp_field", params["timestamp_field"])
params.update(get_time_bounds(now, from_time, to_time))
params["bucket_min"] = float(query_params.get("bucket_min",0))
params["bucket_max"] = float(query_params.get("bucket_max",1))
params["timeOverlap"] = query_params.get("timeOverlap","false") == "true"
params["timeOverlapSize"] = query_params.get("timeOverlapSize","auto")
params["bucket_min"] = float(query_params.get("bucket_min", 0))
params["bucket_max"] = float(query_params.get("bucket_max", 1))
params["timeOverlap"] = query_params.get("timeOverlap", "false") == "true"
params["timeOverlapSize"] = query_params.get("timeOverlapSize", "auto")
params["debug"] = (query_params.get("debug", False) == 'true')

if params["geopoint_field"] is None:
Expand Down Expand Up @@ -346,7 +346,7 @@ def generate_global_params(headers, params, idx):
if category_type == "number":
bounds_s.aggs.metric("field_stats", "stats", field=category_field)

field_type = get_field_type(config.elastic_hosts, headers, params,geopoint_field, idx)
field_type = get_field_type(config.elastic_hosts, headers, params, geopoint_field, idx)
# Execute and process search
if len(list(bounds_s.aggs)) > 0 and field_type != "geo_shape":
logger.info(bounds_s.to_dict())
Expand Down Expand Up @@ -427,14 +427,14 @@ def generate_global_params(headers, params, idx):

if category_field:
max_value_s = copy.copy(base_s)
bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1)
bucket.metric("sum","sum",field=category_field,missing=0)
bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field, precision=geotile_precision, size=1)
bucket.metric("sum", "sum", field=category_field, missing=0)
resp = max_value_s.execute()
estimated_points_per_tile = resp.aggregations.comp.buckets[0].sum['value']
histogram_range = estimated_points_per_tile
else:
max_value_s = copy.copy(base_s)
max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1)
max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field, precision=geotile_precision, size=1)
resp = max_value_s.execute()
estimated_points_per_tile = resp.aggregations.comp.buckets[0].doc_count
histogram_range = estimated_points_per_tile
Expand Down Expand Up @@ -471,14 +471,14 @@ def merge_generated_parameters(headers, params, idx, param_hash):
timeout=120
)

#See if the hash exists
# See if the hash exists
try:
doc = Document.get(id=layer_id, using=es, index=".datashader_layers")
except NotFoundError:
doc = None

if not doc:
#if not, create the hash in the db but only if it does not already exist
# if not, create the hash in the db but only if it does not already exist
try:
doc = Document(
_id=layer_id,
Expand All @@ -493,13 +493,13 @@ def merge_generated_parameters(headers, params, idx, param_hash):
except ConflictError:
logger.debug("Hash document now exists, continuing")

#re-fetch to get sequence number correct
# re-fetch to get sequence number correct
doc = Document.get(id=layer_id, using=es, index=".datashader_layers")

#Check for generator timeouts:
# Check for generator timeouts:
if doc.to_dict().get("generated_params", {}).get("generation_start_time") and \
datetime.now(timezone.utc) > datetime.strptime(doc.to_dict().get("generated_params", {}).get("generation_start_time"),"%Y-%m-%dT%H:%M:%S.%f%z")+timedelta(seconds=5*60):
#Something caused the worker generating the params to time out so clear that entry
datetime.now(timezone.utc) > datetime.strptime(doc.to_dict().get("generated_params", {}).get("generation_start_time"), "%Y-%m-%dT%H:%M:%S.%f%z")+timedelta(seconds=5*60):
# Something caused the worker generating the params to time out so clear that entry
try:
doc.update(
using=es,
Expand All @@ -511,18 +511,18 @@ def merge_generated_parameters(headers, params, idx, param_hash):
except ConflictError:
logger.debug("Abandoned resetting parameters due to conflict, other process has completed.")

#Loop-check if the generated params are in missing/in-process/complete
# Loop-check if the generated params are in missing/in-process/complete
timeout_at = datetime.now(timezone.utc)+timedelta(seconds=45)

while doc.to_dict().get("generated_params", {}).get("complete", False) is False:
if datetime.now(timezone.utc) > timeout_at:
logger.info("Hit timeout waiting for generated parameters to be placed into database")
break

#If missing, mark them as in generation
# If missing, mark them as in generation
if not doc.to_dict().get("generated_params", None):
#Mark them as being generated but do so with concurrenty control
#https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.html
# Mark them as being generated but do so with concurrenty control
# https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.html
logger.info("Discovering generated parameters")
generated_params = {
"complete": False,
Expand All @@ -543,12 +543,12 @@ def merge_generated_parameters(headers, params, idx, param_hash):
logger.debug("Abandoned generating parameters due to conflict, will wait for other process to complete.")
break

#Generate and save off parameters
# Generate and save off parameters
logger.warning("Discovering generated params")
generated_params.update(generate_global_params(headers, params, idx))
generated_params["generation_complete_time"] = datetime.now(timezone.utc)
generated_params["complete"] = True
#Store off generated params
# Store off generated params
doc.update(
using=es,
index=".datashader_layers",
Expand All @@ -561,6 +561,6 @@ def merge_generated_parameters(headers, params, idx, param_hash):
sleep(1)
doc = Document.get(id=layer_id, using=es, index=".datashader_layers")

#We now have params so use them
# We now have params so use them
params["generated_params"] = doc.to_dict().get("generated_params")
return params
10 changes: 5 additions & 5 deletions elastic_datashader/routers/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def get_data(idx: str, lat: float, lon: float, radius: float, request: Req
lat = float(lat)
lon = float(lon)
radius = float(radius)
#Check for paging args
# Check for paging args
from_arg = int(request.args.get("from", 0))
size_arg = int(request.args.get("size", 100))

Expand Down Expand Up @@ -70,20 +70,20 @@ async def get_data(idx: str, lat: float, lon: float, radius: float, request: Req

# Build and execute search
base_s = get_search_base(config.elastic_hosts, request.headers, params, idx)
distance_filter_dict = {"distance": f"{radius}m", geopoint_field: {"lat":lat, "lon":lon}}
distance_filter_dict = {"distance": f"{radius}m", geopoint_field: {"lat": lat, "lon": lon}}
base_s = base_s.filter("geo_distance", **distance_filter_dict)
distance_sort_dict = {geopoint_field:{"lat":lat, "lon":lon}, "order":"asc", "ignore_unmapped":True}
distance_sort_dict = {geopoint_field: {"lat": lat, "lon": lon}, "order": "asc", "ignore_unmapped": True}
base_s = base_s.sort({"_geo_distance": distance_sort_dict})
# Paginate
base_s = base_s[from_arg:from_arg+size_arg]
base_s = base_s[from_arg: from_arg+size_arg]

search_resp = base_s.execute()
hits = []
hit_count = 0

for hit in search_resp:
if includes_list:
#Only include named fields
# Only include named fields
named = {}

for f in includes_list:
Expand Down
Loading