Skip to content
This repository has been archived by the owner on Sep 30, 2021. It is now read-only.

Commit

Permalink
record column usage
Browse files Browse the repository at this point in the history
  • Loading branch information
klahnakoski committed Dec 17, 2019
1 parent 6bcb4ed commit 4a85a86
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 6 deletions.
13 changes: 12 additions & 1 deletion tests/config/app_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,23 @@
{
"log_type": "console"
},
{
"log_type": "ses",
"from_address": "klahnakoski@mozilla.com",
"to_address": "klahnakoski@mozilla.com",
"subject": "[ALERT][DEV] Problem in ActiveData Frontend6",
"$ref": "file://~/private.json#aws_credentials"
},
{
"log_type": "elasticsearch",
"host": "http://localhost",
"port": 9200,
"index": "debug-activedata",
"type": "activedata"
"type": "activedata",
"rollover": {
"interval": "3month",
"max":"year"
}
}
]
}
Expand Down
2 changes: 1 addition & 1 deletion vendor/jx_elasticsearch/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# `jx_elasticsearch`

This library implements [JSON Query Expressions]() atop an Elasticsearch.
This library implements [JSON Query Expressions]() atop an Elasticsearch instance.


## Contribution
Expand Down
4 changes: 4 additions & 0 deletions vendor/jx_elasticsearch/es52/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from jx_elasticsearch.es52.bulk_aggs import is_bulkaggsop, es_bulkaggsop
from jx_elasticsearch.es52.deep import es_deepop, is_deepop
from jx_elasticsearch.es52.setop import es_setop, is_setop
from jx_elasticsearch.es52.stats import QueryStats
from jx_elasticsearch.es52.util import aggregates, temper_limit
from jx_elasticsearch.meta import ElasticsearchMetadata, Table
from jx_python import jx
Expand Down Expand Up @@ -78,6 +79,7 @@ def __init__(

self._ensure_max_result_window_set(name)
self.settings.type = self.es.settings.type
self.stats = QueryStats(self.es.cluster)

columns = self.snowflake.columns # ABSOLUTE COLUMNS
is_typed = any(c.es_column == EXISTS_TYPE for c in columns)
Expand Down Expand Up @@ -181,6 +183,8 @@ def query(self, _query):
try:
query = QueryOp.wrap(_query, container=self, namespace=self.namespace)

self.stats.record(query)

for s in listwrap(query.select):
if s.aggregate != None and not aggregates.get(s.aggregate):
Log.error(
Expand Down
8 changes: 4 additions & 4 deletions vendor/jx_elasticsearch/es52/bulk_aggs.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def es_bulkaggsop(esq, frum, query):
guid = Random.base64(32, extra="-_")

Thread.run(
"extract to " + guid+".json",
"extract to " + guid + ".json",
extractor,
guid,
num_partitions,
Expand All @@ -117,8 +117,8 @@ def es_bulkaggsop(esq, frum, query):

output = wrap(
{
"url": URL_PREFIX / guid+".json",
"status": URL_PREFIX / guid+".status.json",
"url": URL_PREFIX / (guid + ".json"),
"status": URL_PREFIX / (guid + ".status.json"),
"meta": {
"format": "list",
"timing": {"cardinality_check": cardinality_check.duration},
Expand Down Expand Up @@ -208,7 +208,7 @@ def extractor(
break
output.write(b"\n]\n")

upload(guid+".json", temp_file)
upload(guid + ".json", temp_file)
write_status(
guid,
{
Expand Down
74 changes: 74 additions & 0 deletions vendor/jx_elasticsearch/es52/stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http:# mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from __future__ import absolute_import, division, unicode_literals

from jx_base.expressions import Expression
from jx_elasticsearch.meta import Table
from mo_dots import listwrap, set_default
from mo_future import is_text
from mo_logs import Log
from mo_times import Date

DEBUG = True

COMMON = {}


class QueryStats(object):
def __new__(cls, cluster):
existing = COMMON.get(id(cluster))
if not existing:
existing = COMMON[id(cluster)] = object.__new__(cls)
return existing

def __init__(self, cluster):
if hasattr(self, "index"):
return

self.index = cluster.get_or_create_index(
index="meta.stats", typed=False, schema=SCHEMA
)
self.todo = self.index.threaded_queue(max_size=100, period=60)

def record(self, query):
try:
vars_record = get_stats(query)
except Exception as e:
Log.warning("problem processing query stats", cause=e)
self.todo.extend({"value": v} for v in vars_record)


def get_stats(query):
frum = query.frum
if isinstance(frum, Table):
vars_record = {"table": frum.name}
elif is_text(frum):
vars_record = {"table": frum}
else:
vars_record = get_stats(frum)
now = Date.now()
vars_record['timestamp']=now

output = []
for clause in ["select", "edges", "groupby", "window", "sort"]:
vars_record["mode"] = clause
for expr in listwrap(getattr(query, clause)):
if isinstance(expr.value, Expression):
for v in expr.value.vars():
output.append(set_default({"column": v.var}, vars_record))
for v in query.where.vars():
output.append(set_default({"column": v.var, "mode": "where"}, vars_record))
return output


SCHEMA = {
"settings": {"index.number_of_shards": 1, "index.number_of_replicas": 2},
"mappings": {"stats": {"properties": {}}},
}
3 changes: 3 additions & 0 deletions vendor/pyLibrary/env/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,9 @@ def search(self, query, timeout=None, retry=None):
)

def threaded_queue(self, batch_size=None, max_size=None, period=None, silent=False):
"""
USE THIS TO AVOID WAITING
"""

def errors(e, _buffer): # HANDLE ERRORS FROM extend()
if e.cause.cause:
Expand Down

0 comments on commit 4a85a86

Please sign in to comment.