diff --git a/elastic_datashader/elastic.py b/elastic_datashader/elastic.py index 4c19bc1..d06e1ca 100644 --- a/elastic_datashader/elastic.py +++ b/elastic_datashader/elastic.py @@ -158,18 +158,18 @@ def convert_nm_to_ellipse_units(distance: float, units: str) -> float: # NB. assume "majmin_m" if any others return distance * 1852 -def get_field_type(elastic_hosts: str,headers: Optional[str],params: Dict[str, Any],field:str,idx: str) -> str: +def get_field_type(elastic_hosts: str, headers: Optional[str], params: Dict[str, Any], field: str, idx: str) -> str: user = params.get("user") x_opaque_id = params.get("x-opaque-id") es = Elasticsearch( elastic_hosts.split(","), verify_certs=False, timeout=900, - headers=get_es_headers(headers, user,x_opaque_id), + headers=get_es_headers(headers, user, x_opaque_id), ) - mappings = es.indices.get_field_mapping(fields=field,index=idx) - #{'foot_prints': {'mappings': {'foot_print': {'full_name': 'foot_print', 'mapping': {'foot_print': {'type': 'geo_shape'}}}}}} - index = list(mappings.keys())[0] #if index is my_index* it comes back as my_index + mappings = es.indices.get_field_mapping(fields=field, index=idx) + # {'foot_prints': {'mappings': {'foot_print': {'full_name': 'foot_print', 'mapping': {'foot_print': {'type': 'geo_shape'}}}}}} + index = list(mappings.keys())[0] # if index is my_index* it comes back as my_index return mappings[index]['mappings'][field]['mapping'][field]['type'] def get_search_base( @@ -199,7 +199,7 @@ def get_search_base( elastic_hosts.split(","), verify_certs=False, timeout=900, - headers=get_es_headers(headers, user,x_opaque_id), + headers=get_es_headers(headers, user, x_opaque_id), ) # Create base search @@ -219,13 +219,13 @@ def get_search_base( if time_range and time_range[timestamp_field]: base_s = base_s.filter("range", **time_range) - #filter the ellipse search range in the data base query so the legen matches the tiles - if params.get('render_mode',"") =="ellipses": - units = convert_nm_to_ellipse_units(params['search_nautical_miles'],params['ellipse_units']) - search_range = {params["ellipse_major"]:{"lte":units}} - base_s = base_s.filter("range",**search_range) - search_range = {params["ellipse_minor"]:{"lte":units}} - base_s = base_s.filter("range",**search_range) + # filter the ellipse search range in the data base query so the legen matches the tiles + if params.get('render_mode', "") =="ellipses": + units = convert_nm_to_ellipse_units(params['search_nautical_miles'], params['ellipse_units']) + search_range = {params["ellipse_major"]: {"lte": units}} + base_s = base_s.filter("range", **search_range) + search_range = {params["ellipse_minor"]: {"lte": units}} + base_s = base_s.filter("range", **search_range) # Add lucene query if lucene_query: @@ -355,14 +355,14 @@ def build_dsl_filter(filter_inputs) -> Optional[Dict[str, Any]]: filter_key = f.get("meta", {}).get("key") if f.get("meta", {}).get("negate"): if filter_key == "query": - filter_dict["must_not"].append( { "bool": f.get(filter_key).get("bool") } ) + filter_dict["must_not"].append({"bool": f.get(filter_key).get("bool")}) else: - filter_dict["must_not"].append( { filter_key: f.get(filter_key) } ) + filter_dict["must_not"].append({filter_key: f.get(filter_key)}) else: if filter_key == "query": - filter_dict["filter"].append( { "bool": f.get(filter_key).get("bool") } ) + filter_dict["filter"].append({"bool": f.get(filter_key).get("bool")}) else: - filter_dict["filter"].append( { filter_key: f.get(filter_key) } ) + filter_dict["filter"].append({filter_key: f.get(filter_key)}) else: raise ValueError("unsupported filter type {}".format(f.get("meta").get("type"))) # pylint: disable=C0209 @@ -389,7 +389,7 @@ def load_datashader_headers(header_file_path_str: Optional[str]) -> Dict[Any, An return loaded_yaml -def get_es_headers(request_headers=None, user=None,x_opaque_id=None): +def get_es_headers(request_headers=None, user=None, x_opaque_id=None): """ :param request_headers: @@ -420,15 +420,17 @@ def get_es_headers(request_headers=None, user=None,x_opaque_id=None): return result def parse_duration_interval(interval): - durations = {"days":"d", - "minutes":"m", - "hours":"h", - "weeks":"w", - "months":"M", - #"quarter":"q", dateutil.relativedelta doesn't handle quarters - "years":"y"} + durations = { + "days": "d", + "minutes": "m", + "hours": "h", + "weeks": "w", + "months": "M", + # "quarter": "q", dateutil.relativedelta doesn't handle quarters + "years": "y", + } kwargs = {} - for key,value in durations.items(): + for key, value in durations.items(): if interval[len(interval)-1] == value: kwargs[key] = int(interval[0:len(interval)-1]) return relativedelta(**kwargs) @@ -524,11 +526,11 @@ def geotile_bucket_to_lonlat(bucket): if hasattr(bucket, "centroid"): lon = bucket.centroid.location.lon lat = bucket.centroid.location.lat - elif hasattr(bucket.key,'grids'): - z, x, y = [ int(x) for x in bucket.key.grids.split("/") ] + elif hasattr(bucket.key, 'grids'): + z, x, y = [int(x) for x in bucket.key.grids.split("/")] lon, lat = mu.center(x, y, z) else: - z, x, y = [ int(x) for x in bucket.key.split("/") ] + z, x, y = [int(x) for x in bucket.key.split("/")] lon, lat = mu.center(x, y, z) return lon, lat @@ -571,7 +573,7 @@ def get_nested_field_from_hit(hit, field_parts: List[str], default=None): raise ValueError("field must be provided") def chunk_iter(iterable, chunk_size): - chunks = [ None ] * chunk_size + chunks = [None] * chunk_size i = -1 for i, v in enumerate(iterable): idx = (i % chunk_size) @@ -581,14 +583,14 @@ def chunk_iter(iterable, chunk_size): chunks[idx] = v if i >= 0: - last_written_idx =( i % chunk_size) + last_written_idx = (i % chunk_size) yield (False, chunks[0:last_written_idx+1]) -def bucket_noop(bucket,search): +def bucket_noop(bucket, search): # pylint: disable=unused-argument return bucket class Scan: - def __init__(self, searches, inner_aggs=None,field=None,precision=None, size=10, timeout=None,bucket_callback=bucket_noop): + def __init__(self, searches, inner_aggs=None, field=None, precision=None, size=10, timeout=None, bucket_callback=bucket_noop): self.field = field self.precision = precision self.searches = searches @@ -616,21 +618,21 @@ def execute(self): self.total_took = 0 self.aborted = False - def run_search(s,**kwargs): + def run_search(s, **kwargs): _timeout_at = kwargs.pop("timeout_at", None) if _timeout_at: _time_remaining = _timeout_at - int(time.time()) s = s.params(timeout=f"{_time_remaining}s") if self.field and self.precision: - s.aggs.bucket("comp", "geotile_grid", field=self.field,precision=self.precision,size=self.size) - #logger.info(json.dumps(s.to_dict(),indent=2,default=str)) + s.aggs.bucket("comp", "geotile_grid", field=self.field, precision=self.precision, size=self.size) + # logger.info(json.dumps(s.to_dict(), indent=2, default=str)) return s.execute() timeout_at = None if self.timeout: timeout_at = int(time.time()) + self.timeout for search in self.searches: - response = run_search(search,timeout_at=timeout_at) + response = run_search(search, timeout_at=timeout_at) self.num_searches += 1 self.total_took += response.took self.total_shards += response._shards.total # pylint: disable=W0212 @@ -638,7 +640,7 @@ def run_search(s,**kwargs): self.total_successful += response._shards.successful # pylint: disable=W0212 self.total_failed += response._shards.failed # pylint: disable=W0212 for b in response.aggregations.comp.buckets: - b = self.bucket_callback(b,self) + b = self.bucket_callback(b, self) yield b @@ -735,19 +737,19 @@ def get_tile_categories(base_s, x, y, z, geopoint_field, category_field, size): cat_s = cat_s.params(size=0) cat_s = cat_s.filter("geo_bounding_box", **{geopoint_field: bb_dict}) cat_s.aggs.bucket("categories", "terms", field=category_field, size=size) - cat_s.aggs.bucket("missing", "filter", bool={ "must_not" : { "exists": { "field": category_field } } }) + cat_s.aggs.bucket("missing", "filter", bool={"must_not" : {"exists": {"field": category_field}}}) response = cat_s.execute() if hasattr(response.aggregations, "categories"): for category in response.aggregations.categories: # this if prevents bools from using 0/1 instead of true/false if hasattr(category, "key_as_string"): - category_filters[str(category.key)] = { "term": {category_field: category.key_as_string} } + category_filters[str(category.key)] = {"term": {category_field: category.key_as_string}} else: - category_filters[str(category.key)] = { "term": {category_field: category.key} } + category_filters[str(category.key)] = {"term": {category_field: category.key}} category_legend[str(category.key)] = category.doc_count category_legend["Other"] = response.aggregations.categories.sum_other_doc_count if hasattr(response.aggregations, "missing") and response.aggregations.missing.doc_count > 0: - category_filters["N/A"] = { "bool": { "must_not" : { "exists": { "field": category_field } } } } + category_filters["N/A"] = {"bool": {"must_not" : {"exists": {"field": category_field}}}} category_legend["N/A"] = response.aggregations.missing.doc_count return category_filters, category_legend diff --git a/elastic_datashader/parameters.py b/elastic_datashader/parameters.py index b7b50eb..e878ff1 100644 --- a/elastic_datashader/parameters.py +++ b/elastic_datashader/parameters.py @@ -50,10 +50,10 @@ def create_default_params() -> Dict[str, Any]: "track_connection": None, "use_centroid": False, "user": None, - "bucket_min":0, - "bucket_max":1, - "timeOverlap":False, - "timeOverlapSize":"auto" + "bucket_min": 0, + "bucket_max": 1, + "timeOverlap": False, + "timeOverlapSize": "auto" } @@ -293,10 +293,10 @@ def extract_parameters(headers: Dict[Any, Any], query_params: Dict[Any, Any]) -> params["geopoint_field"] = query_params.get("geopoint_field", params["geopoint_field"]) params["timestamp_field"] = query_params.get("timestamp_field", params["timestamp_field"]) params.update(get_time_bounds(now, from_time, to_time)) - params["bucket_min"] = float(query_params.get("bucket_min",0)) - params["bucket_max"] = float(query_params.get("bucket_max",1)) - params["timeOverlap"] = query_params.get("timeOverlap","false") == "true" - params["timeOverlapSize"] = query_params.get("timeOverlapSize","auto") + params["bucket_min"] = float(query_params.get("bucket_min", 0)) + params["bucket_max"] = float(query_params.get("bucket_max", 1)) + params["timeOverlap"] = query_params.get("timeOverlap", "false") == "true" + params["timeOverlapSize"] = query_params.get("timeOverlapSize", "auto") params["debug"] = (query_params.get("debug", False) == 'true') if params["geopoint_field"] is None: @@ -346,7 +346,7 @@ def generate_global_params(headers, params, idx): if category_type == "number": bounds_s.aggs.metric("field_stats", "stats", field=category_field) - field_type = get_field_type(config.elastic_hosts, headers, params,geopoint_field, idx) + field_type = get_field_type(config.elastic_hosts, headers, params, geopoint_field, idx) # Execute and process search if len(list(bounds_s.aggs)) > 0 and field_type != "geo_shape": logger.info(bounds_s.to_dict()) @@ -427,14 +427,14 @@ def generate_global_params(headers, params, idx): if category_field: max_value_s = copy.copy(base_s) - bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1) - bucket.metric("sum","sum",field=category_field,missing=0) + bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field, precision=geotile_precision, size=1) + bucket.metric("sum", "sum", field=category_field, missing=0) resp = max_value_s.execute() estimated_points_per_tile = resp.aggregations.comp.buckets[0].sum['value'] histogram_range = estimated_points_per_tile else: max_value_s = copy.copy(base_s) - max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1) + max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field, precision=geotile_precision, size=1) resp = max_value_s.execute() estimated_points_per_tile = resp.aggregations.comp.buckets[0].doc_count histogram_range = estimated_points_per_tile @@ -471,14 +471,14 @@ def merge_generated_parameters(headers, params, idx, param_hash): timeout=120 ) - #See if the hash exists + # See if the hash exists try: doc = Document.get(id=layer_id, using=es, index=".datashader_layers") except NotFoundError: doc = None if not doc: - #if not, create the hash in the db but only if it does not already exist + # if not, create the hash in the db but only if it does not already exist try: doc = Document( _id=layer_id, @@ -493,13 +493,13 @@ def merge_generated_parameters(headers, params, idx, param_hash): except ConflictError: logger.debug("Hash document now exists, continuing") - #re-fetch to get sequence number correct + # re-fetch to get sequence number correct doc = Document.get(id=layer_id, using=es, index=".datashader_layers") - #Check for generator timeouts: + # Check for generator timeouts: if doc.to_dict().get("generated_params", {}).get("generation_start_time") and \ - datetime.now(timezone.utc) > datetime.strptime(doc.to_dict().get("generated_params", {}).get("generation_start_time"),"%Y-%m-%dT%H:%M:%S.%f%z")+timedelta(seconds=5*60): - #Something caused the worker generating the params to time out so clear that entry + datetime.now(timezone.utc) > datetime.strptime(doc.to_dict().get("generated_params", {}).get("generation_start_time"), "%Y-%m-%dT%H:%M:%S.%f%z")+timedelta(seconds=5*60): + # Something caused the worker generating the params to time out so clear that entry try: doc.update( using=es, @@ -511,7 +511,7 @@ def merge_generated_parameters(headers, params, idx, param_hash): except ConflictError: logger.debug("Abandoned resetting parameters due to conflict, other process has completed.") - #Loop-check if the generated params are in missing/in-process/complete + # Loop-check if the generated params are in missing/in-process/complete timeout_at = datetime.now(timezone.utc)+timedelta(seconds=45) while doc.to_dict().get("generated_params", {}).get("complete", False) is False: @@ -519,10 +519,10 @@ def merge_generated_parameters(headers, params, idx, param_hash): logger.info("Hit timeout waiting for generated parameters to be placed into database") break - #If missing, mark them as in generation + # If missing, mark them as in generation if not doc.to_dict().get("generated_params", None): - #Mark them as being generated but do so with concurrenty control - #https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.html + # Mark them as being generated but do so with concurrenty control + # https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.html logger.info("Discovering generated parameters") generated_params = { "complete": False, @@ -543,12 +543,12 @@ def merge_generated_parameters(headers, params, idx, param_hash): logger.debug("Abandoned generating parameters due to conflict, will wait for other process to complete.") break - #Generate and save off parameters + # Generate and save off parameters logger.warning("Discovering generated params") generated_params.update(generate_global_params(headers, params, idx)) generated_params["generation_complete_time"] = datetime.now(timezone.utc) generated_params["complete"] = True - #Store off generated params + # Store off generated params doc.update( using=es, index=".datashader_layers", @@ -561,6 +561,6 @@ def merge_generated_parameters(headers, params, idx, param_hash): sleep(1) doc = Document.get(id=layer_id, using=es, index=".datashader_layers") - #We now have params so use them + # We now have params so use them params["generated_params"] = doc.to_dict().get("generated_params") return params diff --git a/elastic_datashader/routers/data.py b/elastic_datashader/routers/data.py index 29af934..c4147fb 100644 --- a/elastic_datashader/routers/data.py +++ b/elastic_datashader/routers/data.py @@ -32,7 +32,7 @@ async def get_data(idx: str, lat: float, lon: float, radius: float, request: Req lat = float(lat) lon = float(lon) radius = float(radius) - #Check for paging args + # Check for paging args from_arg = int(request.args.get("from", 0)) size_arg = int(request.args.get("size", 100)) @@ -70,12 +70,12 @@ async def get_data(idx: str, lat: float, lon: float, radius: float, request: Req # Build and execute search base_s = get_search_base(config.elastic_hosts, request.headers, params, idx) - distance_filter_dict = {"distance": f"{radius}m", geopoint_field: {"lat":lat, "lon":lon}} + distance_filter_dict = {"distance": f"{radius}m", geopoint_field: {"lat": lat, "lon": lon}} base_s = base_s.filter("geo_distance", **distance_filter_dict) - distance_sort_dict = {geopoint_field:{"lat":lat, "lon":lon}, "order":"asc", "ignore_unmapped":True} + distance_sort_dict = {geopoint_field: {"lat": lat, "lon": lon}, "order": "asc", "ignore_unmapped": True} base_s = base_s.sort({"_geo_distance": distance_sort_dict}) # Paginate - base_s = base_s[from_arg:from_arg+size_arg] + base_s = base_s[from_arg: from_arg+size_arg] search_resp = base_s.execute() hits = [] @@ -83,7 +83,7 @@ async def get_data(idx: str, lat: float, lon: float, radius: float, request: Req for hit in search_resp: if includes_list: - #Only include named fields + # Only include named fields named = {} for f in includes_list: diff --git a/elastic_datashader/routers/legend.py b/elastic_datashader/routers/legend.py index b880dc3..aad0470 100644 --- a/elastic_datashader/routers/legend.py +++ b/elastic_datashader/routers/legend.py @@ -21,17 +21,17 @@ router = APIRouter() -def lob(point,brng,distance): +def lob(point, brng, distance): - R = 6371009 #Radius of the Earth this is the same as georgio - brng = math.radians(brng) #Bearing is degrees converted to radians. - d = distance #Distance in meters + R = 6371009 # Radius of the Earth this is the same as georgio + brng = math.radians(brng) # Bearing is degrees converted to radians. + d = distance # Distance in meters - lat1 = math.radians(point['lat']) #Current lat point converted to radians - lon1 = math.radians(point['lon']) #Current lon point converted to radians + lat1 = math.radians(point['lat']) # Current lat point converted to radians + lon1 = math.radians(point['lon']) # Current lon point converted to radians - lat2 = math.asin( math.sin(lat1)*math.cos(d/R) + + lat2 = math.asin(math.sin(lat1)*math.cos(d/R) + math.cos(lat1)*math.sin(d/R)*math.cos(brng)) lon2 = lon1 + math.atan2(math.sin(brng)*math.sin(d/R)*math.cos(lat1), @@ -39,11 +39,11 @@ def lob(point,brng,distance): lat2 = math.degrees(lat2) lon2 = math.degrees(lon2) - return {"lat":lat2,"lon":lon2} + return {"lat": lat2, "lon": lon2} -def expand_bbox_by_meters(bbox,meters): - #top left line of bearing nw and bottom right lob se - return {"top_left":lob(bbox['top_left'],315,meters),"bottom_right":lob(bbox['bottom_right'],135,meters)} +def expand_bbox_by_meters(bbox, meters): + # top left line of bearing nw and bottom right lob se + return {"top_left": lob(bbox['top_left'], 315, meters), "bottom_right": lob(bbox['bottom_right'], 135, meters)} def legend_response(data: str, error: Optional[Exception]=None, **kwargs) -> Response: @@ -137,12 +137,12 @@ async def provide_legend(idx: str, field_name: str, request: Request): # pylint "lon": min(180.0, extent["maxLon"]), }, } - if params.get('render_mode',"") == "ellipses": - #expand the bbox by half the search_nautical_miles or we cut off items on the edge - #this still isn't 100% accurate because the tiles are squares and our viewport is rectangular - #you can sometimes see a little tiny part of the ellipse and it isn't counted + if params.get('render_mode', "") == "ellipses": + # expand the bbox by half the search_nautical_miles or we cut off items on the edge + # this still isn't 100% accurate because the tiles are squares and our viewport is rectangular + # you can sometimes see a little tiny part of the ellipse and it isn't counted meters = params['search_nautical_miles'] * 1852 - legend_bbox = expand_bbox_by_meters(legend_bbox,meters/2) + legend_bbox = expand_bbox_by_meters(legend_bbox, meters/2) logger.info("legend_bbox: %s", legend_bbox) base_s = base_s.filter("geo_bounding_box", **{geopoint_field: legend_bbox}) @@ -183,7 +183,7 @@ async def provide_legend(idx: str, field_name: str, request: Request): # pylint ) for tile in tiles_iter: - #Query the database to get the categories for this tile + # Query the database to get the categories for this tile _, tile_legend = get_tile_categories( base_s, tile.x, diff --git a/elastic_datashader/routers/tms.py b/elastic_datashader/routers/tms.py index 28b72f3..a0251a1 100644 --- a/elastic_datashader/routers/tms.py +++ b/elastic_datashader/routers/tms.py @@ -187,13 +187,13 @@ def generate_tile_to_cache(idx: str, x: int, y: int, z: int, params, parameter_h # Then bail and let another request have a shot at it. try: x_opaque_id = str(uuid.uuid4()) - headers = get_es_headers(request_headers=request.headers, user=params["user"],x_opaque_id=x_opaque_id) + headers = get_es_headers(request_headers=request.headers, user=params["user"], x_opaque_id=x_opaque_id) logger.debug("Loaded input headers %s", request.headers) logger.debug("Loaded elasticsearch headers %s", headers) # Get or generate extended parameters params = merge_generated_parameters(request.headers, params, idx, parameter_hash) - params = {**params,"x-opaque-id":x_opaque_id} + params = {**params, "x-opaque-id": x_opaque_id} base_tile_info = { 'hash': parameter_hash, 'idx': idx, @@ -318,7 +318,7 @@ async def fetch_or_render_tile(already_waited: int, idx: str, x: int, y: int, z: # Generate the tile into the cache in the background. background_tasks.add_task(generate_tile_to_cache, idx, x, y, z, params, parameter_hash, request) - #lets hold the connection open for the already_waited time then check the cache again + # lets hold the connection open for the already_waited time then check the cache again timeout = time.time() + already_waited while time.time() < timeout: time.sleep(0.1) diff --git a/elastic_datashader/tilegen.py b/elastic_datashader/tilegen.py index 5a6b5f4..2eb3ce6 100644 --- a/elastic_datashader/tilegen.py +++ b/elastic_datashader/tilegen.py @@ -93,7 +93,7 @@ def get_track_field_names(params: Dict[str, Any]) -> TrackFieldNames: track_connection=track_connection, ) -def populated_field_names(field_names: Union[EllipseFieldNames,TrackFieldNames]) -> List[str]: +def populated_field_names(field_names: Union[EllipseFieldNames, TrackFieldNames]) -> List[str]: field_names_as_lists = (v for v in asdict(field_names).values() if v is not None) return [".".join(field_name_as_list) for field_name_as_list in field_names_as_lists] @@ -126,7 +126,7 @@ def normalize_ellipses_to_list(locs, majors, minors, angles): and isinstance(locs[0], float) and isinstance(locs[1], float) ): - locs = [locs] # eg. [[-73.986,40.7485]] + locs = [locs] # eg. [[-73.986, 40.7485]] majors = [majors] minors = [minors] angles = [angles] @@ -146,7 +146,7 @@ def normalize_locations_to_list(locs): and isinstance(locs[0], float) and isinstance(locs[1], float) ): - locs = [locs] # eg. [[-73.986,40.7485]] + locs = [locs] # eg. [[-73.986, 40.7485]] else: # All other cases are single location locs = [locs] @@ -175,7 +175,7 @@ def normalize_location(location) -> Optional[Location]: lat, lon = location.split(",", 1) return Location(lat=float(lat), lon=float(lon)) - # sometimes the location is a two-element [lon,lat] list + # sometimes the location is a two-element [lon, lat] list if isinstance(location, (AttrList, list)): if len(location) != 2: logger.warning("skipping location with invalid list format %s", location) @@ -539,7 +539,7 @@ def get_span_upper_bound(span_range: str, estimated_points_per_tile: Optional[in return math.log(1e308) assert estimated_points_per_tile is not None - return math.log(max(math.pow(estimated_points_per_tile,2), 2)) + return math.log(max(math.pow(estimated_points_per_tile, 2), 2)) def get_span_none(span_upper_bound: Optional[float]) -> Optional[List[float]]: if span_upper_bound is None: @@ -673,14 +673,14 @@ def generate_nonaggregated_tile( ) ) - #Sort by category (if used) and then tracking value + # Sort by category (if used) and then tracking value if len(df) != 0: if category_field: - df.sort_values(["c","t"], inplace=True) + df.sort_values(["c", "t"], inplace=True) else: df.sort_values(["t"], inplace=True) - #Now we need to iterate through the list so far and separate by different colors/distances + # Now we need to iterate through the list so far and separate by different colors/distances split_dicts = [] start_points_dicts = [] current_track = [] @@ -689,7 +689,7 @@ def generate_nonaggregated_tile( old_row = blank_row for _, row in df.iterrows(): if old_row.get("c") != row.get("c"): - #new category, so insert space in the tracks dicts and add to the start dicts + # new category, so insert space in the tracks dicts and add to the start dicts if track_distance > filter_meters: split_dicts = split_dicts + current_track split_dicts.append(blank_row) @@ -702,7 +702,7 @@ def generate_nonaggregated_tile( not np.isnan(old_row.get("y")) : distance = np.sqrt(np.power(row.get("x")-old_row.get("x"), 2)+np.power(row.get("y")-old_row.get("y"), 2)) if distance > search_meters: - #These points are too far apart, split them as different tracks if total track length is acceptable + # These points are too far apart, split them as different tracks if total track length is acceptable if track_distance > filter_meters: split_dicts = split_dicts + current_track split_dicts.append(blank_row) @@ -714,7 +714,7 @@ def generate_nonaggregated_tile( current_track.append(dict(row)) old_row = row - #last one is always an end-point if the track was long enough + # last one is always an end-point if the track was long enough if track_distance > filter_meters: split_dicts = split_dicts + current_track split_dicts.append(blank_row) @@ -777,7 +777,7 @@ def generate_nonaggregated_tile( y_range=y_range, ).line(df, "x", "y", agg=rd.count_cat("c")) - #now for the points as well + # now for the points as well points_agg = None if df_points is not None: @@ -824,12 +824,12 @@ def generate_nonaggregated_tile( ) if (spread is not None) and (spread > 0): - #Spread squares x3 + # Spread squares x3 points_img = tf.spread(points_img, spread*3, shape='square') else: points_img = tf.spread(points_img, 2, shape='square') - #Stack end markers onto the tracks + # Stack end markers onto the tracks img = tf.stack(img, points_img) img = img.to_bytesio().read() @@ -985,7 +985,7 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_ base_s = get_search_base(config.elastic_hosts, headers, params, idx) base_s = base_s[0:0] # Now find out how many documents - count_s = copy.copy(base_s)[0:0] #slice of array sets from/size since we are aggregating the data we don't need the hits + count_s = copy.copy(base_s)[0:0] # slice of array sets from/size since we are aggregating the data we don't need the hits count_s = count_s.filter("geo_bounding_box", **{geopoint_field: bb_dict}) doc_cnt = count_s.count() @@ -1083,7 +1083,7 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_ "geo_centroid", field=geopoint_field ) - inner_aggs = { "categories": inner_agg } + inner_aggs = {"categories": inner_agg} elif category_field and histogram_interval is not None: # Histogram Mode inner_agg_size = histogram_cnt @@ -1100,7 +1100,7 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_ "geo_centroid", field=geopoint_field ) - inner_aggs = { "categories": inner_agg } + inner_aggs = {"categories": inner_agg} else: inner_agg_size = 1 if use_centroid: @@ -1113,7 +1113,7 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_ # the composite needs one bin for 'after_key' composite_agg_size = int(max_bins / inner_agg_size) - 1 - field_type = get_field_type(config.elastic_hosts, headers, params,geopoint_field, idx) + field_type = get_field_type(config.elastic_hosts, headers, params, geopoint_field, idx) partial_data = False # TODO can we get partial data? span = None if field_type == "geo_point": @@ -1121,11 +1121,11 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_ estimated_points_per_tile = get_estimated_points_per_tile(span_range, global_bounds, z, global_doc_cnt) if params['bucket_min']>0 or params['bucket_max']<1: if estimated_points_per_tile is None: - #this isn't good we need a real number so lets query the max aggregation ammount + # this isn't good we need a real number so lets query the max aggregation ammount max_value_s = copy.copy(base_s) - bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1) + bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field, precision=geotile_precision, size=1) if category_field: - bucket.metric("sum","sum",field=category_field,missing=0) + bucket.metric("sum", "sum", field=category_field, missing=0) resp = max_value_s.execute() if category_field: estimated_points_per_tile = resp.aggregations.comp.buckets[0].sum['value'] @@ -1134,20 +1134,20 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_ min_bucket = math.floor(math.exp(math.log(estimated_points_per_tile)*params['bucket_min'])) max_bucket = math.ceil(math.exp(math.log(estimated_points_per_tile)*params['bucket_max'])) - geo_tile_grid.pipeline("selector","bucket_selector",buckets_path={"doc_count":"_count"},script=f"params.doc_count >= {min_bucket} && params.doc_count <= {max_bucket}") + geo_tile_grid.pipeline("selector", "bucket_selector", buckets_path={"doc_count": "_count"}, script=f"params.doc_count >= {min_bucket} && params.doc_count <= {max_bucket}") if inner_aggs is not None: for agg_name, agg in inner_aggs.items(): geo_tile_grid.aggs[agg_name] = agg tile_s.aggs["comp"] = geo_tile_grid - resp = Scan([tile_s],timeout=config.query_timeout_seconds) + resp = Scan([tile_s], timeout=config.query_timeout_seconds) # resp = ScanAggs( # tile_s, # {"grids": geo_tile_grid}, # inner_aggs, # size=composite_agg_size, # timeout=config.query_timeout_seconds - # ) #Dont use composite aggregator because you cannot use a bucket selector + # ) # Dont use composite aggregator because you cannot use a bucket selector df = pd.DataFrame( @@ -1175,45 +1175,45 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_ geotile_precision = current_zoom+zoom searches = [] - if params.get("generated_params", {}).get('complete',False): + if params.get("generated_params", {}).get('complete', False): estimated_points_per_tile = params["generated_params"]['global_doc_cnt'] - span = [0,estimated_points_per_tile] + span = [0, estimated_points_per_tile] logger.info("USING GENERATED PARAMS") else: if category_field: max_value_s = copy.copy(base_s) - bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1) - bucket.metric("sum","sum",field=category_field,missing=0) + bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field, precision=geotile_precision, size=1) + bucket.metric("sum", "sum", field=category_field, missing=0) resp = max_value_s.execute() estimated_points_per_tile = resp.aggregations.comp.buckets[0].sum['value'] - span = [0,estimated_points_per_tile] + span = [0, estimated_points_per_tile] else: max_value_s = copy.copy(base_s) - max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1) + max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field, precision=geotile_precision, size=1) resp = max_value_s.execute() estimated_points_per_tile = resp.aggregations.comp.buckets[0].doc_count - span = [0,estimated_points_per_tile] - logger.info("EST Points: %s %s",estimated_points_per_tile,category_field) + span = [0, estimated_points_per_tile] + logger.info("EST Points: %s %s", estimated_points_per_tile, category_field) searches = [] - composite_agg_size = 65536#max agg bucket size + composite_agg_size = 65536 # max agg bucket size subtile_bb_dict = create_bounding_box_for_tile(x, y, z) subtile_s = copy.copy(base_s) subtile_s = subtile_s[0:0] subtile_s = subtile_s.filter("geo_bounding_box", **{geopoint_field: subtile_bb_dict}) - bucket = subtile_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=composite_agg_size,bounds=subtile_bb_dict) + bucket = subtile_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field, precision=geotile_precision, size=composite_agg_size, bounds=subtile_bb_dict) if category_field: - bucket.metric("sum","sum",field=category_field,missing=0) + bucket.metric("sum", "sum", field=category_field, missing=0) searches.append(subtile_s) - cmap = "bmy" #todo have front end pass the cmap for none categorical + cmap = "bmy" # todo have front end pass the cmap for none categorical - # def calc_aggregation(bucket,search): - # #get bounds from bucket.key - # #do search for sum of values on category_field + # def calc_aggregation(bucket, search): + # # get bounds from bucket.key + # # do search for sum of values on category_field # z, x, y = [ int(x) for x in bucket.key.split("/") ] # bucket_bb_dict = create_bounding_box_for_tile(x, y, z) # subtile_s = copy.copy(base_s) - # subtile_s.aggs.bucket("sum","avg",field=category_field,missing=0) + # subtile_s.aggs.bucket("sum", "avg", field=category_field, missing=0) # subtile_s = subtile_s[0:0] # subtile_s = subtile_s.filter("geo_bounding_box", **{geopoint_field: bucket_bb_dict}) # response = subtile_s.execute() @@ -1223,39 +1223,39 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_ # search.total_skipped += response._shards.skipped # pylint: disable=W0212 # search.total_successful += response._shards.successful # pylint: disable=W0212 # search.total_failed += response._shards.failed # pylint: disable=W0212 - # bucket.doc_count = response.aggregations.sum['value'] #replace with sum of category_field + # bucket.doc_count = response.aggregations.sum['value'] # replace with sum of category_field # return bucket - def remap_bucket(bucket,search): + def remap_bucket(bucket, search): # pylint: disable=unused-argument - #get bounds from bucket.key - #remap sub aggregation for sum of values to the doc count + # get bounds from bucket.key + # remap sub aggregation for sum of values to the doc count bucket.doc_count = bucket.sum['value'] return bucket bucket_callback = None if category_field: - #bucket_callback = calc_aggregation #don't run a sub query. sub aggregation worked But we might want to leave this in for cross index searches + # bucket_callback = calc_aggregation # don't run a sub query. sub aggregation worked But we might want to leave this in for cross index searches bucket_callback = remap_bucket - if params['timeOverlap']:#run scan using date intervals to check overlaps during the same time + if params['timeOverlap']: # run scan using date intervals to check overlaps during the same time subtile_bb_dict = create_bounding_box_for_tile(x, y, z) interval = params['timeOverlapSize'] - logger.info("CREATING TIMEBUCKETS %s",interval) - searches = create_time_interval_searches(base_s,subtile_bb_dict,start_time,stop_time,timestamp_field,geopoint_field,geotile_precision,composite_agg_size,category_field,interval) + logger.info("CREATING TIMEBUCKETS %s", interval) + searches = create_time_interval_searches(base_s, subtile_bb_dict, start_time, stop_time, timestamp_field, geopoint_field, geotile_precision, composite_agg_size, category_field, interval) - resp = Scan(searches,timeout=config.query_timeout_seconds,bucket_callback=bucket_callback) + resp = Scan(searches, timeout=config.query_timeout_seconds, bucket_callback=bucket_callback) df = pd.DataFrame( convert_composite( resp.execute(), - False,#we don't need categorical, because ES doesn't support composite buckets for geo_shapes we calculate that with a secondary search in the bucket_callback - False,#we dont need filter_buckets, because ES doesn't support composite buckets for geo_shapes we calculate that with a secondary search in the bucket_callback + False, # we don't need categorical, because ES doesn't support composite buckets for geo_shapes we calculate that with a secondary search in the bucket_callback + False, # we dont need filter_buckets, because ES doesn't support composite buckets for geo_shapes we calculate that with a secondary search in the bucket_callback histogram_interval, category_type, category_format ) ) if len(df)/resp.num_searches == composite_agg_size: - logger.warning("clipping on tile %s",[x,y,z]) + logger.warning("clipping on tile %s", [x, y, z]) s2 = time.time() logger.info("ES took %s (%s) for %s with %s searches", (s2 - s1), resp.total_took, len(df), resp.num_searches) @@ -1284,7 +1284,7 @@ def remap_bucket(bucket,search): # TODO it would be nice if datashader honored the category orders # in z-order, then we could make "Other" drawn underneath the less # promenent colors - categories = list( df["t"].unique() ) + categories = list(df["t"].unique()) metrics["categories"] = json.dumps(categories) try: categories.insert(0, categories.pop(categories.index("Other"))) @@ -1354,7 +1354,7 @@ def remap_bucket(bucket,search): if span is None: span = get_span_zero(span_upper_bound) logger.info("Span %s %s", span, span_range) - logger.info("aggs min:%s max:%s",float(agg.min()),float(agg.max())) + logger.info("aggs min:%s max:%s", float(agg.min()), float(agg.max())) img = tf.shade(agg, cmap=cc.palette[cmap], how="log", span=span) ############################################################### @@ -1385,25 +1385,25 @@ def remap_bucket(bucket,search): raise -def create_time_interval_searches(base_s,subtile_bb_dict,start_time,stop_time,timestamp_field,geopoint_field,geotile_precision,composite_agg_size,category_field,interval="auto"): +def create_time_interval_searches(base_s, subtile_bb_dict, start_time, stop_time, timestamp_field, geopoint_field, geotile_precision, composite_agg_size, category_field, interval="auto"): stime = start_time searches = [] if interval == "auto": delta = stop_time - start_time minutes = delta.total_seconds() /60 - step = 1 #step through all the minutes in an hour to find a fit + step = 1 # step through all the minutes in an hour to find a fit while minutes/step > 546: step = step +1 interval = str(step)+"m" - logger.info("Actual time bucket %s",interval) + logger.info("Actual time bucket %s", interval) while stime < stop_time: subtile_s = copy.copy(base_s) subtile_s = subtile_s.filter("geo_bounding_box", **{geopoint_field: subtile_bb_dict}) subtile_s = subtile_s[0:0] bucket_start_time = stime bucket_duration = parse_duration_interval(interval) - #logger.info(bucket_duration) + # logger.info(bucket_duration) bucket_stop_time = bucket_start_time+ bucket_duration bucket_stop_time = min(bucket_stop_time, stop_time) time_range = {timestamp_field: {}} @@ -1411,9 +1411,9 @@ def create_time_interval_searches(base_s,subtile_bb_dict,start_time,stop_time,ti time_range[timestamp_field]["lte"] = bucket_stop_time stime = bucket_stop_time subtile_s = subtile_s.filter("range", **time_range) - bucket = subtile_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=composite_agg_size,bounds=subtile_bb_dict) - bucket.pipeline("selector","bucket_selector",buckets_path={"doc_count":"_count"},script="params.doc_count >= 2") + bucket = subtile_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field, precision=geotile_precision, size=composite_agg_size, bounds=subtile_bb_dict) + bucket.pipeline("selector", "bucket_selector", buckets_path={"doc_count": "_count"}, script="params.doc_count >= 2") if category_field: - bucket.metric("sum","sum",field=category_field,missing=0) + bucket.metric("sum", "sum", field=category_field, missing=0) searches.append(subtile_s) return searches diff --git a/pyproject.toml b/pyproject.toml index a166e56..728742c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,6 +78,7 @@ setenv = deps = geopy + flake8 pylint pytest pytest-cov @@ -87,6 +88,7 @@ whitelist_externals = mkdir commands = + flake8 --select E201,E202,E231,E262,E265 elastic_datashader pylint elastic_datashader mkdir -p {toxinidir}/tms-cache pytest --cov-report term-missing --cov=elastic_datashader