Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,19 @@
from __future__ import print_function

import json
import os
import sys
import uuid

from google.cloud import bigquery

# pylint: disable=g-bad-import-order
from absl import app as absl_app
from absl import flags
import tensorflow as tf
# pylint: enable=g-bad-import-order

from official.utils.flags import core as flags_core
from official.utils.logs import logger


class BigQueryUploader(object):
"""Upload the benchmark and metric info to BigQuery."""
"""Upload the benchmark and metric info from JSON input to BigQuery. """

def __init__(self, logging_dir, gcp_project=None, credentials=None):
def __init__(self, gcp_project=None, credentials=None):
"""Initialized BigQueryUploader with proper setting.

Args:
logging_dir: string, logging directory that contains the benchmark log.
gcp_project: string, the name of the GCP project that the log will be
uploaded to. The default project name will be detected from local
environment if no value is provided.
Expand All @@ -58,11 +47,11 @@ def __init__(self, logging_dir, gcp_project=None, credentials=None):
google.oauth2.service_account.Credentials to load credential from local
file for the case that the test is run out side of GCP.
"""
self._logging_dir = logging_dir
self._bq_client = bigquery.Client(
project=gcp_project, credentials=credentials)

def upload_benchmark_run(self, dataset_name, table_name, run_id):
def upload_benchmark_run_json(
self, dataset_name, table_name, run_id, run_json):
"""Upload benchmark run information to Bigquery.

Args:
Expand All @@ -72,19 +61,13 @@ def upload_benchmark_run(self, dataset_name, table_name, run_id):
the data will be uploaded.
run_id: string, a unique ID that will be attached to the data, usually
this is a UUID4 format.
run_json: dict, the JSON data that contains the benchmark run info.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming of this is confusing-- is it JSON or a python dict? If the latter, why is it called json?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In python, there isn't a specific type of JSON. python uses dict to represent JSON data, and of course it has some restriction about what's the type of the key and value in the dict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should absolutely not refer to dicts as JSON. Some dicts can be converted to JSON, but I tend to think that json in the variable names implies that the variable refers to the serialized string.

"""
expected_file = os.path.join(
self._logging_dir, logger.BENCHMARK_RUN_LOG_FILE_NAME)
with tf.gfile.GFile(expected_file) as f:
benchmark_json = json.load(f)
benchmark_json["model_id"] = run_id
table_ref = self._bq_client.dataset(dataset_name).table(table_name)
errors = self._bq_client.insert_rows_json(table_ref, [benchmark_json])
if errors:
tf.logging.error(
"Failed to upload benchmark info to bigquery: {}".format(errors))

def upload_metric(self, dataset_name, table_name, run_id):
run_json["model_id"] = run_id
self._upload_json(dataset_name, table_name, [run_json])

def upload_benchmark_metric_json(
self, dataset_name, table_name, run_id, metric_json_list):
"""Upload metric information to Bigquery.

Args:
Expand All @@ -95,39 +78,57 @@ def upload_metric(self, dataset_name, table_name, run_id):
benchmark_run table.
run_id: string, a unique ID that will be attached to the data, usually
this is a UUID4 format. This should be the same as the benchmark run_id.
metric_json_list: list, a list of JSON object that record the metric info.
"""
for m in metric_json_list:
m["run_id"] = run_id
self._upload_json(dataset_name, table_name, metric_json_list)

def upload_benchmark_run_file(
self, dataset_name, table_name, run_id, run_json_file):
"""Upload benchmark run information to Bigquery from input json file.

Args:
dataset_name: string, the name of bigquery dataset where the data will be
uploaded.
table_name: string, the name of bigquery table under the dataset where
the data will be uploaded.
run_id: string, a unique ID that will be attached to the data, usually
this is a UUID4 format.
run_json_file: string, the file path that contains the run JSON data.
"""
with tf.gfile.GFile(run_json_file) as f:
benchmark_json = json.load(f)
self.upload_benchmark_run_json(
dataset_name, table_name, run_id, benchmark_json)

def upload_metric_file(
self, dataset_name, table_name, run_id, metric_json_file):
"""Upload metric information to Bigquery from input json file.

Args:
dataset_name: string, the name of bigquery dataset where the data will be
uploaded.
table_name: string, the name of bigquery table under the dataset where
the metric data will be uploaded. This is different from the
benchmark_run table.
run_id: string, a unique ID that will be attached to the data, usually
this is a UUID4 format. This should be the same as the benchmark run_id.
metric_json_file: string, the file path that contains the metric JSON
data.
"""
expected_file = os.path.join(
self._logging_dir, logger.METRIC_LOG_FILE_NAME)
with tf.gfile.GFile(expected_file) as f:
lines = f.readlines()
with tf.gfile.GFile(metric_json_file) as f:
metrics = []
for line in filter(lambda l: l.strip(), lines):
metric = json.loads(line)
metric["run_id"] = run_id
metrics.append(metric)
table_ref = self._bq_client.dataset(dataset_name).table(table_name)
errors = self._bq_client.insert_rows_json(table_ref, metrics)
if errors:
tf.logging.error(
"Failed to upload benchmark info to bigquery: {}".format(errors))


def main(_):
if not flags.FLAGS.benchmark_log_dir:
print("Usage: benchmark_uploader.py --benchmark_log_dir=/some/dir")
sys.exit(1)

uploader = BigQueryUploader(
flags.FLAGS.benchmark_log_dir,
gcp_project=flags.FLAGS.gcp_project)
run_id = str(uuid.uuid4())
uploader.upload_benchmark_run(
flags.FLAGS.bigquery_data_set, flags.FLAGS.bigquery_run_table, run_id)
uploader.upload_metric(
flags.FLAGS.bigquery_data_set, flags.FLAGS.bigquery_metric_table, run_id)


if __name__ == "__main__":
flags_core.define_benchmark()
flags.adopt_module_key_flags(flags_core)
absl_app.run(main=main)
for line in f:
metrics.append(json.loads(line.strip()))
self.upload_benchmark_metric_json(
dataset_name, table_name, run_id, metrics)

def _upload_json(self, dataset_name, table_name, json_list):
# Find the unique table reference based on dataset and table name, so that
# the data can be inserted to it.
table_ref = self._bq_client.dataset(dataset_name).table(table_name)
errors = self._bq_client.insert_rows_json(table_ref, json_list)
if errors:
tf.logging.error(
"Failed to upload benchmark info to bigquery: {}".format(errors))
62 changes: 62 additions & 0 deletions official/benchmark/benchmark_uploader_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""Binary to upload benchmark generated by BenchmarkLogger to remote repo.

This library require google cloud bigquery lib as dependency, which can be
installed with:
> pip install --upgrade google-cloud-bigquery
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
import sys
import uuid

from absl import app as absl_app
from absl import flags

from official.benchmark import benchmark_uploader
from official.utils.flags import core as flags_core
from official.utils.logs import logger

def main(_):
if not flags.FLAGS.benchmark_log_dir:
print("Usage: benchmark_uploader.py --benchmark_log_dir=/some/dir")
sys.exit(1)

uploader = benchmark_uploader.BigQueryUploader(
gcp_project=flags.FLAGS.gcp_project)
run_id = str(uuid.uuid4())
run_json_file = os.path.join(
flags.FLAGS.benchmark_log_dir, logger.BENCHMARK_RUN_LOG_FILE_NAME)
metric_json_file = os.path.join(
flags.FLAGS.benchmark_log_dir, logger.METRIC_LOG_FILE_NAME)

uploader.upload_benchmark_run_file(
flags.FLAGS.bigquery_data_set, flags.FLAGS.bigquery_run_table, run_id,
run_json_file)
uploader.upload_metric_file(
flags.FLAGS.bigquery_data_set, flags.FLAGS.bigquery_metric_table, run_id,
metric_json_file)


if __name__ == "__main__":
flags_core.define_benchmark()
flags.adopt_module_key_flags(flags_core)
absl_app.run(main=main)
107 changes: 107 additions & 0 deletions official/benchmark/benchmark_uploader_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""Tests for benchmark_uploader."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import json
import os
import tempfile
import unittest
from mock import MagicMock
from mock import patch

import tensorflow as tf # pylint: disable=g-bad-import-order

try:
from google.cloud import bigquery
from official.benchmark import benchmark_uploader
except ImportError:
bigquery = None
benchmark_uploader = None


@unittest.skipIf(bigquery is None, 'Bigquery dependency is not installed.')
class BigQueryUploaderTest(tf.test.TestCase):

@patch.object(bigquery, 'Client')
def setUp(self, mock_bigquery):
self.mock_client = mock_bigquery.return_value
self.mock_dataset = MagicMock(name="dataset")
self.mock_table = MagicMock(name="table")
self.mock_client.dataset.return_value = self.mock_dataset
self.mock_dataset.table.return_value = self.mock_table
self.mock_client.insert_rows_json.return_value = []

self.benchmark_uploader = benchmark_uploader.BigQueryUploader()
self.benchmark_uploader._bq_client = self.mock_client

self.log_dir = tempfile.mkdtemp(dir=self.get_temp_dir())
with open(os.path.join(self.log_dir, 'metric.log'), 'a') as f:
json.dump({'name': 'accuracy', 'value': 1.0}, f)
f.write("\n")
json.dump({'name': 'loss', 'value': 0.5}, f)
f.write("\n")
with open(os.path.join(self.log_dir, 'run.log'), 'w') as f:
json.dump({'model_name': 'value'}, f)

def tearDown(self):
tf.gfile.DeleteRecursively(self.get_temp_dir())

def test_upload_benchmark_run_json(self):
self.benchmark_uploader.upload_benchmark_run_json(
'dataset', 'table', 'run_id', {'model_name': 'value'})

self.mock_client.insert_rows_json.assert_called_once_with(
self.mock_table, [{'model_name': 'value', 'model_id': 'run_id'}])

def test_upload_benchmark_metric_json(self):
metric_json_list = [
{'name': 'accuracy', 'value': 1.0},
{'name': 'loss', 'value': 0.5}
]
expected_params = [
{'run_id': 'run_id', 'name': 'accuracy', 'value': 1.0},
{'run_id': 'run_id', 'name': 'loss', 'value': 0.5}
]
self.benchmark_uploader.upload_benchmark_metric_json(
'dataset', 'table', 'run_id', metric_json_list)
self.mock_client.insert_rows_json.assert_called_once_with(
self.mock_table, expected_params)

def test_upload_benchmark_run_file(self):
self.benchmark_uploader.upload_benchmark_run_file(
'dataset', 'table', 'run_id', os.path.join(self.log_dir, 'run.log'))

self.mock_client.insert_rows_json.assert_called_once_with(
self.mock_table, [{'model_name': 'value', 'model_id': 'run_id'}])

def test_upload_metric_file(self):
self.benchmark_uploader.upload_metric_file(
'dataset', 'table', 'run_id',
os.path.join(self.log_dir, 'metric.log'))
expected_params = [
{'run_id': 'run_id', 'name': 'accuracy', 'value': 1.0},
{'run_id': 'run_id', 'name': 'loss', 'value': 0.5}
]
self.mock_client.insert_rows_json.assert_called_once_with(
self.mock_table, expected_params)


if __name__ == '__main__':
tf.test.main()
5 changes: 2 additions & 3 deletions official/resnet/resnet_run_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,12 @@ def resnet_main(
'synthetic_data': flags_obj.use_synthetic_data,
'train_epochs': flags_obj.train_epochs,
}
benchmark_logger = logger.config_benchmark_logger(flags_obj.benchmark_log_dir)
benchmark_logger = logger.config_benchmark_logger(flags_obj)
benchmark_logger.log_run_info('resnet', dataset_name, run_params)

train_hooks = hooks_helper.get_train_hooks(
flags_obj.hooks,
batch_size=flags_obj.batch_size,
benchmark_log_dir=flags_obj.benchmark_log_dir)
batch_size=flags_obj.batch_size)

def input_fn_train():
return input_function(
Expand Down
20 changes: 19 additions & 1 deletion official/utils/flags/_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ def define_benchmark(benchmark_log_dir=True, bigquery_uploader=True):

key_flags = []

flags.DEFINE_enum(
name="benchmark_logger_type", default="BaseBenchmarkLogger",
enum_values=["BaseBenchmarkLogger", "BenchmarkFileLogger",
"BenchmarkBigQueryLogger"],
help=help_wrap("The type of benchmark logger to use. Defaults to using "
"BaseBenchmarkLogger which logs to STDOUT. Different "
"loggers will require other flags to be able to work."))

if benchmark_log_dir:
flags.DEFINE_string(
name="benchmark_log_dir", short_name="bld", default=None,
Expand Down Expand Up @@ -64,4 +72,14 @@ def define_benchmark(benchmark_log_dir=True, bigquery_uploader=True):
help=help_wrap("The Bigquery table name where the benchmark metric "
"information will be uploaded."))

return key_flags
@flags.multi_flags_validator(
["benchmark_logger_type", "benchmark_log_dir"],
message="--benchmark_logger_type=BenchmarkFileLogger will require "
"--benchmark_log_dir being set")
def _check_benchmark_log_dir(flags_dict):
benchmark_logger_type = flags_dict["benchmark_logger_type"]
if benchmark_logger_type == "BenchmarkFileLogger":
return flags_dict["benchmark_log_dir"]
return True

return key_flags
Loading