From 55d97508c2813e90d85532e804e93ab7786bbadf Mon Sep 17 00:00:00 2001 From: Aleksey Vlasenko Date: Wed, 18 Sep 2019 17:41:22 -0700 Subject: [PATCH] More changes for BigQuery connector (#490) * Fixing Dockerfile * Returning dataset in a form of Dictionary from BigQuery connector * Adding NULL fields support to BigQuery connector * python style tweak * more style tweaks * Style tweaks, comming from google account --- tensorflow_io/bigquery/kernels/bigquery_lib.h | 5 +++++ .../bigquery/python/ops/bigquery_api.py | 18 ++++++++++++++++-- tools/dev/Dockerfile | 2 +- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/tensorflow_io/bigquery/kernels/bigquery_lib.h b/tensorflow_io/bigquery/kernels/bigquery_lib.h index 52589a432..a1d600ccb 100644 --- a/tensorflow_io/bigquery/kernels/bigquery_lib.h +++ b/tensorflow_io/bigquery/kernels/bigquery_lib.h @@ -184,6 +184,9 @@ class BigQueryReaderDatasetIterator : public DatasetIterator { case avro::AVRO_ENUM: dtype = DT_STRING; break; + case avro::AVRO_NULL: + dtype = output_types[i]; + break; default: return errors::InvalidArgument("unsupported data type: ", field.type()); @@ -250,6 +253,8 @@ class BigQueryReaderDatasetIterator : public DatasetIterator { ((*out_tensors)[i]).scalar()() = field.value().symbol(); break; + case avro::AVRO_NULL: // Fallthrough; + break; default: return errors::InvalidArgument("unsupported data type: ", field.type()); diff --git a/tensorflow_io/bigquery/python/ops/bigquery_api.py b/tensorflow_io/bigquery/python/ops/bigquery_api.py index a6400373a..918f70404 100644 --- a/tensorflow_io/bigquery/python/ops/bigquery_api.py +++ b/tensorflow_io/bigquery/python/ops/bigquery_api.py @@ -27,6 +27,9 @@ from __future__ import division from __future__ import print_function +import collections +from operator import itemgetter + from tensorflow.python.data.experimental.ops import interleave_ops from tensorflow.python.data.ops import dataset_ops from tensorflow.python.framework import dtypes @@ -223,8 +226,19 @@ class _BigQueryDataset(dataset_ops.DatasetSource): def __init__(self, client_resource, selected_fields, output_types, avro_schema, stream): - self._element_spec = tuple( - tensor_spec.TensorSpec([], dtype) for dtype in output_types) + + # selected_fields and corresponding output_types have to be sorted because + # of b/141251314 + sorted_fields_with_types = sorted( + zip(selected_fields, output_types), + key=itemgetter(0)) + selected_fields, output_types = list(zip(*sorted_fields_with_types)) + selected_fields = list(selected_fields) + output_types = list(output_types) + + self._element_spec = collections.OrderedDict(zip( + selected_fields, + (tensor_spec.TensorSpec([], dtype) for dtype in output_types))) variant_tensor = _bigquery_so.big_query_dataset( client=client_resource, diff --git a/tools/dev/Dockerfile b/tools/dev/Dockerfile index 663a49bf0..f5e2d68bb 100644 --- a/tools/dev/Dockerfile +++ b/tools/dev/Dockerfile @@ -49,7 +49,7 @@ RUN /bin/bash -c "source activate tfio-dev && python -m pip install \ pyarrow==${ARROW_VERSION} \ pandas \ fastavro \ - gast==0.2.2 + gast==0.2.2 \ ${PIP_ADD_PACKAGES} \ "