Skip to content

Commit

Permalink
Remove all influx code paths
Browse files Browse the repository at this point in the history
  • Loading branch information
MushuEE committed Jul 8, 2020
1 parent b13c387 commit ce52daa
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 95 deletions.
2 changes: 0 additions & 2 deletions metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ py_binary(
requirement("certifi"),
requirement("chardet"),
requirement("idna"),
requirement("influxdb"),
requirement("python-dateutil"),
requirement("pytz"),
requirement("ruamel.yaml"),
Expand Down Expand Up @@ -68,7 +67,6 @@ py_test(
requirement("certifi"),
requirement("chardet"),
requirement("idna"),
requirement("influxdb"),
requirement("python-dateutil"),
requirement("pytz"),
requirement("ruamel.yaml"),
Expand Down
96 changes: 3 additions & 93 deletions metrics/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import time
import traceback

import influxdb
import requests
import ruamel.yaml as yaml

Expand Down Expand Up @@ -66,15 +65,14 @@ def do_jq(jq_filter, data_filename, out_filename, jq_bin='jq'):


class BigQuerier:
def __init__(self, project, bucket_path, backfill_days, influx_client):
def __init__(self, project, bucket_path, backfill_days):
if not project:
raise ValueError('project', project)
self.project = project
if not bucket_path:
print('Not uploading results, no bucket specified.', file=sys.stderr)
self.prefix = bucket_path

self.influx = influx_client
self.backfill_days = backfill_days

def do_query(self, query, out_filename):
Expand All @@ -98,47 +96,23 @@ def jq_upload(self, config, data_filename):
self.copy(filtered, os.path.join(config['metric'], filtered))
self.copy(filtered, latest)

def influx_upload(self, config, data_filename):
"""Uses jq to extract InfluxDB time series points then uploads to DB."""
points = '%s-data-points.json' % config['metric']
jq_point = config.get('measurements', {}).get('jq', None)
if not jq_point:
return
do_jq(jq_point, data_filename, points)
with open(points) as points_file:
try:
points = json.load(points_file)
except ValueError:
print("No influxdb points to upload.\n", file=sys.stderr)
return
if not self.influx:
print((
'Skipping influxdb upload of metric %s, no db configured.\n'
% config['metric']
), file=sys.stderr)
return
points = [ints_to_floats(point) for point in points]
self.influx.write_points(points, time_precision='s', batch_size=100)

def run_metric(self, config):
"""Runs query and filters results, uploading data to GCS."""
raw = 'raw-%s.json' % time.strftime('%Y-%m-%d')

self.update_query(config)
self.do_query(config['query'], raw)
self.copy(raw, os.path.join(config['metric'], raw))

consumer_error = False
for consumer in [self.jq_upload, self.influx_upload]:
for consumer in [self.jq_upload]:
try:
consumer(config, raw)
except (
ValueError,
KeyError,
IOError,
requests.exceptions.ConnectionError,
influxdb.client.InfluxDBClientError,
influxdb.client.InfluxDBServerError,
):
print(traceback.format_exc(), file=sys.stderr)
consumer_error = True
Expand All @@ -152,72 +126,13 @@ def copy(self, src, dest):
dest = os.path.join(self.prefix, dest)
check(['gsutil', '-h', 'Cache-Control:max-age=60', 'cp', src, dest])

def update_query(self, config):
"""Modifies config['query'] based on the metric configuration."""

# Currently the only modification that is supported is injecting the
# timestamp of the most recent influxdb data for a given metric.
# (For backfilling)
measure = config.get('measurements', {}).get('backfill')
if not measure:
return
if self.influx:
# To get the last data point timestamp we must also fetch a field.
# So first find a field that we can query if the metric exists.
points = self.influx.query('show field keys from %s limit 1' % measure)
points = list(points.get_points())

field = points and points[0].get('fieldKey')
last_time = None
if field:
results = self.influx.query(
'select last(%s), time from %s limit 1' % (field, measure)
)
last_time = next(results.get_points(), {}).get('time')
if last_time:
# format time properly
last_time = time.strptime(last_time, '%Y-%m-%dT%H:%M:%SZ')
last_time = calendar.timegm(last_time)
if not last_time:
last_time = int(time.time() - (60*60*24*self.backfill_days))
else:
# InfluxDB is not enabled so skip backfill so use default
last_time = int(time.time() - (60*60*24)*self.backfill_days)

# replace tag with formatted time
config['query'] = config['query'].replace('<LAST_DATA_TIME>', str(last_time))


def all_configs(search='**.yaml'):
"""Returns config files in the metrics dir."""
return glob.glob(os.path.join(
os.path.dirname(__file__), 'configs', search))


def make_influx_client():
"""Make an InfluxDB client from config at path $VELODROME_INFLUXDB_CONFIG"""
if 'VELODROME_INFLUXDB_CONFIG' not in os.environ:
return None

with open(os.environ['VELODROME_INFLUXDB_CONFIG']) as config_file:
config = json.load(config_file)

def check_config(field):
if not field in config:
raise ValueError('DB client config needs field \'%s\'' % field)
check_config('host')
check_config('port')
check_config('user')
check_config('password')
return influxdb.InfluxDBClient(
host=config['host'],
port=config['port'],
username=config['user'],
password=config['password'],
database='metrics',
)


def ints_to_floats(point):
for key, val in point.items():
if key == 'time':
Expand All @@ -231,7 +146,7 @@ def ints_to_floats(point):

def main(configs, project, bucket_path, backfill_days):
"""Loads metric config files and runs each metric."""
queryer = BigQuerier(project, bucket_path, backfill_days, make_influx_client())
queryer = BigQuerier(project, bucket_path, backfill_days)

# the 'bq show' command is called as a hack to dodge the config prompts that bq presents
# the first time it is run. A newline is passed to stdin to skip the prompt for default project
Expand Down Expand Up @@ -273,11 +188,6 @@ def main(configs, project, bucket_path, backfill_days):
PARSER.add_argument(
'--bucket',
help='Upload results to the specified gcs bucket.')
PARSER.add_argument(
'--backfill-days',
default=30,
type=int,
help='Number of days to backfill influxdb data.')

ARGS = PARSER.parse_args()
main(ARGS.config, ARGS.project, ARGS.bucket, ARGS.backfill_days)

0 comments on commit ce52daa

Please sign in to comment.