Skip to content

Commit

Permalink
Add benchmark logger that does stream upload to bigquery. (#4210)
Browse files Browse the repository at this point in the history
* Move the benchmark_uploader to new location.

* Update benchmark logger to streaming upload.

* Fix lint and unit test error.

* delint.

* Update the benchmark uploader test.

Skip the import of benchmark_uploader when bigquery is not installed.

* Merge the 2 classes of benchmark uploader into 1.

* Address review comments.

* delint.

* Execute bigquery upload in a separate thread.

* Change to use python six.moves for importing.

* Address review comments and delint.

* Address review comment.

Adding comment for potential performance impact for model on CPU.

* Fix random failure on py3.

* Fix the order of flag saver to avoid the randomness.

The test is broken when the benchmark_logger_type is set first, and
validated when the benchmark_log_dir is not set yet.
  • Loading branch information
qlzh727 committed May 11, 2018
1 parent 80178fc commit 0270cac
Show file tree
Hide file tree
Showing 10 changed files with 450 additions and 116 deletions.
Empty file added official/benchmark/__init__.py
Empty file.
Expand Up @@ -25,30 +25,19 @@
from __future__ import print_function

import json
import os
import sys
import uuid

from google.cloud import bigquery

# pylint: disable=g-bad-import-order
from absl import app as absl_app
from absl import flags
import tensorflow as tf
# pylint: enable=g-bad-import-order

from official.utils.flags import core as flags_core
from official.utils.logs import logger


class BigQueryUploader(object):
"""Upload the benchmark and metric info to BigQuery."""
"""Upload the benchmark and metric info from JSON input to BigQuery. """

def __init__(self, logging_dir, gcp_project=None, credentials=None):
def __init__(self, gcp_project=None, credentials=None):
"""Initialized BigQueryUploader with proper setting.
Args:
logging_dir: string, logging directory that contains the benchmark log.
gcp_project: string, the name of the GCP project that the log will be
uploaded to. The default project name will be detected from local
environment if no value is provided.
Expand All @@ -58,11 +47,11 @@ def __init__(self, logging_dir, gcp_project=None, credentials=None):
google.oauth2.service_account.Credentials to load credential from local
file for the case that the test is run out side of GCP.
"""
self._logging_dir = logging_dir
self._bq_client = bigquery.Client(
project=gcp_project, credentials=credentials)

def upload_benchmark_run(self, dataset_name, table_name, run_id):
def upload_benchmark_run_json(
self, dataset_name, table_name, run_id, run_json):
"""Upload benchmark run information to Bigquery.
Args:
Expand All @@ -72,19 +61,13 @@ def upload_benchmark_run(self, dataset_name, table_name, run_id):
the data will be uploaded.
run_id: string, a unique ID that will be attached to the data, usually
this is a UUID4 format.
run_json: dict, the JSON data that contains the benchmark run info.
"""
expected_file = os.path.join(
self._logging_dir, logger.BENCHMARK_RUN_LOG_FILE_NAME)
with tf.gfile.GFile(expected_file) as f:
benchmark_json = json.load(f)
benchmark_json["model_id"] = run_id
table_ref = self._bq_client.dataset(dataset_name).table(table_name)
errors = self._bq_client.insert_rows_json(table_ref, [benchmark_json])
if errors:
tf.logging.error(
"Failed to upload benchmark info to bigquery: {}".format(errors))

def upload_metric(self, dataset_name, table_name, run_id):
run_json["model_id"] = run_id
self._upload_json(dataset_name, table_name, [run_json])

def upload_benchmark_metric_json(
self, dataset_name, table_name, run_id, metric_json_list):
"""Upload metric information to Bigquery.
Args:
Expand All @@ -95,39 +78,57 @@ def upload_metric(self, dataset_name, table_name, run_id):
benchmark_run table.
run_id: string, a unique ID that will be attached to the data, usually
this is a UUID4 format. This should be the same as the benchmark run_id.
metric_json_list: list, a list of JSON object that record the metric info.
"""
for m in metric_json_list:
m["run_id"] = run_id
self._upload_json(dataset_name, table_name, metric_json_list)

def upload_benchmark_run_file(
self, dataset_name, table_name, run_id, run_json_file):
"""Upload benchmark run information to Bigquery from input json file.
Args:
dataset_name: string, the name of bigquery dataset where the data will be
uploaded.
table_name: string, the name of bigquery table under the dataset where
the data will be uploaded.
run_id: string, a unique ID that will be attached to the data, usually
this is a UUID4 format.
run_json_file: string, the file path that contains the run JSON data.
"""
with tf.gfile.GFile(run_json_file) as f:
benchmark_json = json.load(f)
self.upload_benchmark_run_json(
dataset_name, table_name, run_id, benchmark_json)

def upload_metric_file(
self, dataset_name, table_name, run_id, metric_json_file):
"""Upload metric information to Bigquery from input json file.
Args:
dataset_name: string, the name of bigquery dataset where the data will be
uploaded.
table_name: string, the name of bigquery table under the dataset where
the metric data will be uploaded. This is different from the
benchmark_run table.
run_id: string, a unique ID that will be attached to the data, usually
this is a UUID4 format. This should be the same as the benchmark run_id.
metric_json_file: string, the file path that contains the metric JSON
data.
"""
expected_file = os.path.join(
self._logging_dir, logger.METRIC_LOG_FILE_NAME)
with tf.gfile.GFile(expected_file) as f:
lines = f.readlines()
with tf.gfile.GFile(metric_json_file) as f:
metrics = []
for line in filter(lambda l: l.strip(), lines):
metric = json.loads(line)
metric["run_id"] = run_id
metrics.append(metric)
table_ref = self._bq_client.dataset(dataset_name).table(table_name)
errors = self._bq_client.insert_rows_json(table_ref, metrics)
if errors:
tf.logging.error(
"Failed to upload benchmark info to bigquery: {}".format(errors))


def main(_):
if not flags.FLAGS.benchmark_log_dir:
print("Usage: benchmark_uploader.py --benchmark_log_dir=/some/dir")
sys.exit(1)

uploader = BigQueryUploader(
flags.FLAGS.benchmark_log_dir,
gcp_project=flags.FLAGS.gcp_project)
run_id = str(uuid.uuid4())
uploader.upload_benchmark_run(
flags.FLAGS.bigquery_data_set, flags.FLAGS.bigquery_run_table, run_id)
uploader.upload_metric(
flags.FLAGS.bigquery_data_set, flags.FLAGS.bigquery_metric_table, run_id)


if __name__ == "__main__":
flags_core.define_benchmark()
flags.adopt_module_key_flags(flags_core)
absl_app.run(main=main)
for line in f:
metrics.append(json.loads(line.strip()))
self.upload_benchmark_metric_json(
dataset_name, table_name, run_id, metrics)

def _upload_json(self, dataset_name, table_name, json_list):
# Find the unique table reference based on dataset and table name, so that
# the data can be inserted to it.
table_ref = self._bq_client.dataset(dataset_name).table(table_name)
errors = self._bq_client.insert_rows_json(table_ref, json_list)
if errors:
tf.logging.error(
"Failed to upload benchmark info to bigquery: {}".format(errors))
62 changes: 62 additions & 0 deletions official/benchmark/benchmark_uploader_main.py
@@ -0,0 +1,62 @@
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""Binary to upload benchmark generated by BenchmarkLogger to remote repo.
This library require google cloud bigquery lib as dependency, which can be
installed with:
> pip install --upgrade google-cloud-bigquery
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
import sys
import uuid

from absl import app as absl_app
from absl import flags

from official.benchmark import benchmark_uploader
from official.utils.flags import core as flags_core
from official.utils.logs import logger

def main(_):
if not flags.FLAGS.benchmark_log_dir:
print("Usage: benchmark_uploader.py --benchmark_log_dir=/some/dir")
sys.exit(1)

uploader = benchmark_uploader.BigQueryUploader(
gcp_project=flags.FLAGS.gcp_project)
run_id = str(uuid.uuid4())
run_json_file = os.path.join(
flags.FLAGS.benchmark_log_dir, logger.BENCHMARK_RUN_LOG_FILE_NAME)
metric_json_file = os.path.join(
flags.FLAGS.benchmark_log_dir, logger.METRIC_LOG_FILE_NAME)

uploader.upload_benchmark_run_file(
flags.FLAGS.bigquery_data_set, flags.FLAGS.bigquery_run_table, run_id,
run_json_file)
uploader.upload_metric_file(
flags.FLAGS.bigquery_data_set, flags.FLAGS.bigquery_metric_table, run_id,
metric_json_file)


if __name__ == "__main__":
flags_core.define_benchmark()
flags.adopt_module_key_flags(flags_core)
absl_app.run(main=main)
107 changes: 107 additions & 0 deletions official/benchmark/benchmark_uploader_test.py
@@ -0,0 +1,107 @@
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""Tests for benchmark_uploader."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import json
import os
import tempfile
import unittest
from mock import MagicMock
from mock import patch

import tensorflow as tf # pylint: disable=g-bad-import-order

try:
from google.cloud import bigquery
from official.benchmark import benchmark_uploader
except ImportError:
bigquery = None
benchmark_uploader = None


@unittest.skipIf(bigquery is None, 'Bigquery dependency is not installed.')
class BigQueryUploaderTest(tf.test.TestCase):

@patch.object(bigquery, 'Client')
def setUp(self, mock_bigquery):
self.mock_client = mock_bigquery.return_value
self.mock_dataset = MagicMock(name="dataset")
self.mock_table = MagicMock(name="table")
self.mock_client.dataset.return_value = self.mock_dataset
self.mock_dataset.table.return_value = self.mock_table
self.mock_client.insert_rows_json.return_value = []

self.benchmark_uploader = benchmark_uploader.BigQueryUploader()
self.benchmark_uploader._bq_client = self.mock_client

self.log_dir = tempfile.mkdtemp(dir=self.get_temp_dir())
with open(os.path.join(self.log_dir, 'metric.log'), 'a') as f:
json.dump({'name': 'accuracy', 'value': 1.0}, f)
f.write("\n")
json.dump({'name': 'loss', 'value': 0.5}, f)
f.write("\n")
with open(os.path.join(self.log_dir, 'run.log'), 'w') as f:
json.dump({'model_name': 'value'}, f)

def tearDown(self):
tf.gfile.DeleteRecursively(self.get_temp_dir())

def test_upload_benchmark_run_json(self):
self.benchmark_uploader.upload_benchmark_run_json(
'dataset', 'table', 'run_id', {'model_name': 'value'})

self.mock_client.insert_rows_json.assert_called_once_with(
self.mock_table, [{'model_name': 'value', 'model_id': 'run_id'}])

def test_upload_benchmark_metric_json(self):
metric_json_list = [
{'name': 'accuracy', 'value': 1.0},
{'name': 'loss', 'value': 0.5}
]
expected_params = [
{'run_id': 'run_id', 'name': 'accuracy', 'value': 1.0},
{'run_id': 'run_id', 'name': 'loss', 'value': 0.5}
]
self.benchmark_uploader.upload_benchmark_metric_json(
'dataset', 'table', 'run_id', metric_json_list)
self.mock_client.insert_rows_json.assert_called_once_with(
self.mock_table, expected_params)

def test_upload_benchmark_run_file(self):
self.benchmark_uploader.upload_benchmark_run_file(
'dataset', 'table', 'run_id', os.path.join(self.log_dir, 'run.log'))

self.mock_client.insert_rows_json.assert_called_once_with(
self.mock_table, [{'model_name': 'value', 'model_id': 'run_id'}])

def test_upload_metric_file(self):
self.benchmark_uploader.upload_metric_file(
'dataset', 'table', 'run_id',
os.path.join(self.log_dir, 'metric.log'))
expected_params = [
{'run_id': 'run_id', 'name': 'accuracy', 'value': 1.0},
{'run_id': 'run_id', 'name': 'loss', 'value': 0.5}
]
self.mock_client.insert_rows_json.assert_called_once_with(
self.mock_table, expected_params)


if __name__ == '__main__':
tf.test.main()
5 changes: 2 additions & 3 deletions official/resnet/resnet_run_loop.py
Expand Up @@ -395,13 +395,12 @@ def resnet_main(
'synthetic_data': flags_obj.use_synthetic_data,
'train_epochs': flags_obj.train_epochs,
}
benchmark_logger = logger.config_benchmark_logger(flags_obj.benchmark_log_dir)
benchmark_logger = logger.config_benchmark_logger(flags_obj)
benchmark_logger.log_run_info('resnet', dataset_name, run_params)

train_hooks = hooks_helper.get_train_hooks(
flags_obj.hooks,
batch_size=flags_obj.batch_size,
benchmark_log_dir=flags_obj.benchmark_log_dir)
batch_size=flags_obj.batch_size)

def input_fn_train():
return input_function(
Expand Down
20 changes: 19 additions & 1 deletion official/utils/flags/_benchmark.py
Expand Up @@ -36,6 +36,14 @@ def define_benchmark(benchmark_log_dir=True, bigquery_uploader=True):

key_flags = []

flags.DEFINE_enum(
name="benchmark_logger_type", default="BaseBenchmarkLogger",
enum_values=["BaseBenchmarkLogger", "BenchmarkFileLogger",
"BenchmarkBigQueryLogger"],
help=help_wrap("The type of benchmark logger to use. Defaults to using "
"BaseBenchmarkLogger which logs to STDOUT. Different "
"loggers will require other flags to be able to work."))

if benchmark_log_dir:
flags.DEFINE_string(
name="benchmark_log_dir", short_name="bld", default=None,
Expand Down Expand Up @@ -64,4 +72,14 @@ def define_benchmark(benchmark_log_dir=True, bigquery_uploader=True):
help=help_wrap("The Bigquery table name where the benchmark metric "
"information will be uploaded."))

return key_flags
@flags.multi_flags_validator(
["benchmark_logger_type", "benchmark_log_dir"],
message="--benchmark_logger_type=BenchmarkFileLogger will require "
"--benchmark_log_dir being set")
def _check_benchmark_log_dir(flags_dict):
benchmark_logger_type = flags_dict["benchmark_logger_type"]
if benchmark_logger_type == "BenchmarkFileLogger":
return flags_dict["benchmark_log_dir"]
return True

return key_flags

0 comments on commit 0270cac

Please sign in to comment.