Skip to content

Commit

Permalink
More changes for BigQuery connector (#490)
Browse files Browse the repository at this point in the history
* Fixing Dockerfile

* Returning dataset in a form of Dictionary from BigQuery connector

* Adding NULL fields support to BigQuery connector

* python style tweak

* more style tweaks

* Style tweaks, comming from google account
  • Loading branch information
vlasenkoalexey authored and yongtang committed Sep 19, 2019
1 parent ea53711 commit 55d9750
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 3 deletions.
5 changes: 5 additions & 0 deletions tensorflow_io/bigquery/kernels/bigquery_lib.h
Expand Up @@ -184,6 +184,9 @@ class BigQueryReaderDatasetIterator : public DatasetIterator<Dataset> {
case avro::AVRO_ENUM:
dtype = DT_STRING;
break;
case avro::AVRO_NULL:
dtype = output_types[i];
break;
default:
return errors::InvalidArgument("unsupported data type: ",
field.type());
Expand Down Expand Up @@ -250,6 +253,8 @@ class BigQueryReaderDatasetIterator : public DatasetIterator<Dataset> {
((*out_tensors)[i]).scalar<string>()() =
field.value<avro::GenericEnum>().symbol();
break;
case avro::AVRO_NULL: // Fallthrough;
break;
default:
return errors::InvalidArgument("unsupported data type: ",
field.type());
Expand Down
18 changes: 16 additions & 2 deletions tensorflow_io/bigquery/python/ops/bigquery_api.py
Expand Up @@ -27,6 +27,9 @@
from __future__ import division
from __future__ import print_function

import collections
from operator import itemgetter

from tensorflow.python.data.experimental.ops import interleave_ops
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.framework import dtypes
Expand Down Expand Up @@ -223,8 +226,19 @@ class _BigQueryDataset(dataset_ops.DatasetSource):

def __init__(self, client_resource, selected_fields, output_types,
avro_schema, stream):
self._element_spec = tuple(
tensor_spec.TensorSpec([], dtype) for dtype in output_types)

# selected_fields and corresponding output_types have to be sorted because
# of b/141251314
sorted_fields_with_types = sorted(
zip(selected_fields, output_types),
key=itemgetter(0))
selected_fields, output_types = list(zip(*sorted_fields_with_types))
selected_fields = list(selected_fields)
output_types = list(output_types)

self._element_spec = collections.OrderedDict(zip(
selected_fields,
(tensor_spec.TensorSpec([], dtype) for dtype in output_types)))

variant_tensor = _bigquery_so.big_query_dataset(
client=client_resource,
Expand Down
2 changes: 1 addition & 1 deletion tools/dev/Dockerfile
Expand Up @@ -49,7 +49,7 @@ RUN /bin/bash -c "source activate tfio-dev && python -m pip install \
pyarrow==${ARROW_VERSION} \
pandas \
fastavro \
gast==0.2.2
gast==0.2.2 \
${PIP_ADD_PACKAGES} \
"

Expand Down

0 comments on commit 55d9750

Please sign in to comment.