Skip to content

Commit

Permalink
Updated src/naarad/metrics for flake8 fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Hsu committed Apr 22, 2015
1 parent 1f4d39a commit fb59244
Show file tree
Hide file tree
Showing 13 changed files with 527 additions and 385 deletions.
59 changes: 35 additions & 24 deletions src/naarad/metrics/cluster_metric.py
@@ -1,10 +1,20 @@
# coding=utf-8
"""
© 2013 LinkedIn Corp. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
Copyright 2013 LinkedIn Corp. All rights reserved.
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from collections import defaultdict
import datetime
import gc
Expand All @@ -18,22 +28,23 @@

logger = logging.getLogger('naarad.metrics.cluster_metric')


class ClusterMetric(Metric):
"""
supporting the metric of Cluster, which aggregates the performance metrics of multiple hosts
"""

metrics = [] # all other non-aggregate metrics;
aggr_metrics = [] # metrics to be aggregated
aggr_hosts = [] # hosts to be aggregated
aggr_hosts = [] # hosts to be aggregated

def __init__ (self, section, aggregate_hosts, aggregate_metrics, metrics, output_directory, resource_path, label,
ts_start, ts_end, rule_strings, important_sub_metrics, anomaly_detection_metrics, **other_options):
def __init__(self, section, aggregate_hosts, aggregate_metrics, metrics, output_directory, resource_path, label,
ts_start, ts_end, rule_strings, important_sub_metrics, anomaly_detection_metrics, **other_options):
self.metrics = metrics
self.aggr_metrics = aggregate_metrics.split()
self.aggr_hosts = aggregate_hosts.split()

#Metric arguments take 'infile' and 'hostname', for ClusterMetric, they are invalid, so just provide empty strings.
# Metric arguments take 'infile' and 'hostname', for ClusterMetric, they are invalid, so just provide empty strings.
Metric.__init__(self, section, '', '', output_directory, resource_path, label, ts_start, ts_end, rule_strings,
important_sub_metrics, anomaly_detection_metrics)

Expand All @@ -59,60 +70,60 @@ def collect(self):
else: # no user input of aggregate functions
return True

cur_column = '.'.join(fields[0].split('.')[1:]) #e.g. sda.await or all.percent-sys
cur_column = '.'.join(fields[0].split('.')[1:]) # e.g. sda.await or all.percent-sys

#store data points of various aggregation functions
# Store data points of various aggregation functions
aggr_data = {}
aggr_data['raw'] = [] #store all the raw values
aggr_data['sum'] = defaultdict(float) #store the sum values for each timestamp
aggr_data['count'] = defaultdict(int) #store the count of each timestamp (i.e. qps)
aggr_data['raw'] = [] # Store all the raw values
aggr_data['sum'] = defaultdict(float) # Store the sum values for each timestamp
aggr_data['count'] = defaultdict(int) # Store the count of each timestamp (i.e. qps)

for metric in self.metrics: # loop the list to find from all metrics to merge
for metric in self.metrics: # Loop the list to find from all metrics to merge
if metric.hostname in self.aggr_hosts and \
cur_column in metric.csv_column_map.values():
cur_column in metric.csv_column_map.values():
file_csv = metric.get_csv(cur_column)
timestamp_format = None
with open(file_csv) as fh:
for line in fh:
aggr_data['raw'].append(line.rstrip())
words = line.split(",")
ts = words[0].split('.')[0] #in case of sub-seconds; we only want the value of seconds;
ts = words[0].split('.')[0] # In case of sub-seconds; we only want the value of seconds;
if not timestamp_format or timestamp_format == 'unknown':
timestamp_format = naarad.utils.detect_timestamp_format(ts)
if timestamp_format == 'unknown':
continue
ts = naarad.utils.get_standardized_timestamp(ts, timestamp_format)
aggr_data['sum'][ts] += float(words[1])
aggr_data['count'][ts] += 1
#"raw" csv file
# "raw" csv file
if 'raw' in functions_aggr:
out_csv = self.get_csv(cur_column, 'raw')
self.csv_files.append(out_csv)
with open(out_csv, 'w') as fh:
fh.write("\n".join(sorted(aggr_data['raw'])))
#"sum" csv file

# "sum" csv file
if 'sum' in functions_aggr:
out_csv = self.get_csv(cur_column, 'sum')
self.csv_files.append(out_csv)
with open(out_csv, 'w') as fh:
for k,v in sorted(aggr_data['sum'].items()):
for (k, v) in sorted(aggr_data['sum'].items()):
fh.write(k + "," + str(v) + '\n')

# "avg" csv file
if 'avg' in functions_aggr:
out_csv = self.get_csv(cur_column, 'avg')
self.csv_files.append(out_csv)
with open(out_csv, 'w') as fh:
for k,v in sorted(aggr_data['sum'].items()):
fh.write(k + "," + str(v/aggr_data['count'][k]) + '\n')
for (k, v) in sorted(aggr_data['sum'].items()):
fh.write(k + "," + str(v / aggr_data['count'][k]) + '\n')

# "count" csv file (qps)
if 'count' in functions_aggr:
out_csv = self.get_csv(cur_column, 'count')
self.csv_files.append(out_csv)
with open(out_csv, 'w') as fh:
for k,v in sorted(aggr_data['count'].items()):
for (k, v) in sorted(aggr_data['count'].items()):
fh.write(k + "," + str(v) + '\n')

gc.collect()
Expand All @@ -124,8 +135,8 @@ def get_csv(self, column, func):

def parse(self):
"""
merge multiple hosts' csv into one csv file. This approach has the benefit of reusing calculate_stats(), but with the penalty of reading the single csv later for calculate_stats()
However, since file cache will cache the newly written csv files, reading the csv file will not likely be a IO bottleneck.
Merge multiple hosts' csv into one csv file. This approach has the benefit of reusing calculate_stats(), but with the penalty of reading the single csv
later for calculate_stats(). However, since file cache will cache the newly written csv files, reading the csv file will not likely be a IO bottleneck.
"""

return True

0 comments on commit fb59244

Please sign in to comment.