Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis/after-success.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
# ==============================================================================

if [[ ( ${TRAVIS_BRANCH} == "master" ) && ( ${TRAVIS_EVENT_TYPE} != "pull_request" ) ]]; then
if [[ (( ${TRAVIS_BRANCH} == "master" ) || ( ${TRAVIS_BRANCH} == "R0.81" )) && ( ${TRAVIS_EVENT_TYPE} != "pull_request" ) ]]; then

twine upload wheelhouse/tensorflow_io_nightly-*.whl

Expand Down
1 change: 1 addition & 0 deletions tensorflow_io/bigquery/kernels/bigquery_kernels.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class BigQueryReadSessionOp : public OpKernel {
createReadSessionRequest.mutable_read_options()->set_row_restriction(
row_restriction_);
createReadSessionRequest.set_requested_streams(requested_streams_);
createReadSessionRequest.set_sharding_strategy(apiv1beta1::ShardingStrategy::BALANCED);
createReadSessionRequest.set_format(apiv1beta1::DataFormat::AVRO);
VLOG(3) << "createReadSessionRequest: "
<< createReadSessionRequest.DebugString();
Expand Down
24 changes: 24 additions & 0 deletions tensorflow_io/bigquery/kernels/bigquery_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,30 @@ class BigQueryReaderDatasetIterator : public DatasetIterator<Dataset> {
field.value<avro::GenericEnum>().symbol();
break;
case avro::AVRO_NULL:
switch(output_types[i]) {
case DT_BOOL:
((*out_tensors)[i]).scalar<bool>()() = false;
break;
case DT_INT32:
((*out_tensors)[i]).scalar<int32>()() = 0;
break;
case DT_INT64:
((*out_tensors)[i]).scalar<int64>()() = 0l;
break;
case DT_FLOAT:
((*out_tensors)[i]).scalar<float>()() = 0.0f;
break;
case DT_DOUBLE:
((*out_tensors)[i]).scalar<double>()() = 0.0;
break;
case DT_STRING:
((*out_tensors)[i]).scalar<string>()() = "";
break;
default:
return errors::InvalidArgument(
"unsupported data type against AVRO_NULL: ",
output_types[i]);
}
break;
default:
return errors::InvalidArgument("unsupported data type: ",
Expand Down
2 changes: 1 addition & 1 deletion tensorflow_io/core/python/ops/io_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
from __future__ import print_function

package = 'tensorflow>=1.15.0,<1.16.0'
version = '0.8.0'
version = '0.8.1'
132 changes: 103 additions & 29 deletions tests/test_bigquery_eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(self, avro_schema, avro_serialized_rows_per_stream,
self, self._grpc_server)
port = self._grpc_server.add_insecure_port("localhost:0")
self._endpoint = "localhost:" + str(port)
print ("started server on :" + self._endpoint)
print("started a fake server on :" + self._endpoint)

def start(self):
self._grpc_server.start()
Expand All @@ -74,7 +74,7 @@ def _build_stream_name(self, stream_index):
self._table_id + "/" + str(stream_index)

def CreateReadSession(self, request, context):
print ("called CreateReadSession on server")
print("called CreateReadSession on a fake server")
self._project_id = request.table_reference.project_id
self._table_id = request.table_reference.table_id
self._dataset_id = request.table_reference.dataset_id
Expand All @@ -89,10 +89,10 @@ def CreateReadSession(self, request, context):
return response

def ReadRows(self, request, context):
print ("called ReadRows on server")
print("called ReadRows on a fake server")
response = storage_pb2.ReadRowsResponse()
stream_index = self._streams.index(request.read_position.stream.name)
if stream_index >= 0 and stream_index < len(
if 0 <= stream_index < len(
self._avro_serialized_rows_per_stream):
response.avro_rows.serialized_binary_rows = \
self._avro_serialized_rows_per_stream[stream_index]
Expand All @@ -104,53 +104,107 @@ def ReadRows(self, request, context):
class BigqueryOpsTest(test.TestCase):
"""Tests for BigQuery adapter."""

GCP_PROJECT_ID = "bigquery-public-data"
DATASET_ID = "usa_names"
TABLE_ID = "usa_1910_current"
PARENT = "projects/some_parent"
GCP_PROJECT_ID = "test_project_id"
DATASET_ID = "test_dataset"
TABLE_ID = "test_table"
PARENT = "projects/test_parent"

AVRO_SCHEMA = """
{
"type": "record",
"name": "__root__",
"fields": [
{
"name": "state",
"name": "string",
"type": [
"null",
"string"
],
"doc": "2-digit state code"
"doc": "nullable string"
},
{
"name": "name",
"name": "boolean",
"type": [
"null",
"string"
"boolean"
],
"doc": "nullable boolean"
},
{
"name": "int",
"type": [
"null",
"int"
],
"doc": "Given name of a person at birth"
"doc": "nullable int"
},
{
"name": "number",
"name": "long",
"type": [
"null",
"long"
],
"doc": "Number of occurrences of the name"
"doc": "nullable long"
},
{
"name": "float",
"type": [
"null",
"float"
],
"doc": "nullable float"
},
{
"name": "double",
"type": [
"null",
"double"
],
"doc": "nullable double"
}
]
}"""

STREAM_1_ROWS = [{
"state": "wa",
"name": "Andrew",
"number": 1
}, {
"state": "wa",
"name": "Eva",
"number": 2
}]
STREAM_2_ROWS = [{"state": "ny", "name": "Emma", "number": 10}]
STREAM_1_ROWS = [
{
"string": "string1",
"boolean": True,
"int": 10,
"long": 100,
"float": 1000.0,
"double": 10000.0
},
{
"string": "string2",
"boolean": False,
"int": 12,
"long": 102,
"float": 1002.0,
"double": 10002.0
}
]
STREAM_2_ROWS = [
{
"string": "string2",
"boolean": True,
"int": 20,
"long": 200,
"float": 2000.0,
"double": 20000.0
},
{
# Empty record, all values are null
}
]

DEFAULT_VALUES = {
'boolean': False,
'double': 0.0,
'float': 0.0,
'int': 0,
'long': 0,
'string': ''
}

@staticmethod
def _serialize_to_avro(rows, schema):
Expand Down Expand Up @@ -233,8 +287,16 @@ def test_read_rows(self):
self.PARENT,
self.GCP_PROJECT_ID,
self.TABLE_ID,
self.DATASET_ID, ["state", "name", "number"],
[dtypes.string, dtypes.string, dtypes.int64],
self.DATASET_ID,
["string", "boolean", "int", "long", "float", "double"],
[
dtypes.string,
dtypes.bool,
dtypes.int32,
dtypes.int64,
dtypes.float32,
dtypes.float64
],
requested_streams=2)

streams_list = read_session.get_streams()
Expand All @@ -252,6 +314,8 @@ def test_read_rows(self):
itr2 = iter(dataset2)
self.assertEqual(self.STREAM_2_ROWS[0],
self._normalize_dictionary(itr2.get_next()))
self.assertEqual(self.DEFAULT_VALUES,
self._normalize_dictionary(itr2.get_next()))
with self.assertRaises(errors.OutOfRangeError):
itr2.get_next()

Expand All @@ -262,8 +326,16 @@ def test_parallel_read_rows(self):
self.PARENT,
self.GCP_PROJECT_ID,
self.TABLE_ID,
self.DATASET_ID, ["state", "name", "number"],
[dtypes.string, dtypes.string, dtypes.int64],
self.DATASET_ID,
["string", "boolean", "int", "long", "float", "double"],
[
dtypes.string,
dtypes.bool,
dtypes.int32,
dtypes.int64,
dtypes.float32,
dtypes.float64
],
requested_streams=2)

dataset = read_session.parallel_read_rows()
Expand All @@ -274,6 +346,8 @@ def test_parallel_read_rows(self):
self._normalize_dictionary(itr.get_next()))
self.assertEqual(self.STREAM_1_ROWS[1],
self._normalize_dictionary(itr.get_next()))
self.assertEqual(self.DEFAULT_VALUES,
self._normalize_dictionary(itr.get_next()))

if __name__ == "__main__":
test.main()