Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions tensorflow_io/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ from tensorflow.python.framework import dtypes
from tensorflow_io.bigquery import BigQueryClient
from tensorflow_io.bigquery import BigQueryReadSession

GCP_PROJECT_ID = '<FILL_ME_IN>'
GCP_PROJECT_ID = "<FILL_ME_IN>"
DATASET_GCP_PROJECT_ID = "bigquery-public-data"
DATASET_ID = "samples"
TABLE_ID = "wikipedia"
Expand All @@ -68,20 +68,29 @@ def main():
read_session = client.read_session(
"projects/" + GCP_PROJECT_ID,
DATASET_GCP_PROJECT_ID, TABLE_ID, DATASET_ID,
["title",
selected_fields=["title",
"id",
"num_characters",
"language",
"timestamp",
"wp_namespace",
"contributor_username"],
[dtypes.string,
output_types=[dtypes.string,
dtypes.int64,
dtypes.int64,
dtypes.string,
dtypes.int64,
dtypes.int64,
dtypes.string],
default_values=[
"",
0,
0,
"",
0,
0,
""
],
requested_streams=2,
row_restriction="num_characters > 1000",
data_format=BigQueryClient.DataFormat.AVRO)
Expand All @@ -98,8 +107,8 @@ def main():
print("row %d: %s" % (row_index, row))
row_index += 1

if __name__ == '__main__':
app.run(main)
if __name__ == "__main__":
main()

```

Expand Down Expand Up @@ -127,10 +136,10 @@ dataset = streams_ds.interleave(
Connector also supports reading BigQuery column with repeated mode (each field contains array of values with primitive type: Integer, Float, Boolean, String, but RECORD is not supported). In this case, selected_fields needs be a dictionary in a form like this:

```python
{ "field_a_name": {"mode": BigQueryClient.FieldMode.REPEATED, output_type: dtypes.int64},
"field_b_name": {"mode": BigQueryClient.FieldMode.NULLABLE, output_type: dtypes.string},
{ "field_a_name": {"mode": BigQueryClient.FieldMode.REPEATED, "output_type": dtypes.int64},
"field_b_name": {"mode": BigQueryClient.FieldMode.NULLABLE, "output_type": dtypes.string, "default_value", "<default_value>"},
...
"field_x_name": {"mode": BigQueryClient.FieldMode.REQUIRED, output_type: dtypes.string}
"field_x_name": {"mode": BigQueryClient.FieldMode.REQUIRED, "output_type": dtypes.string}
}
```
"mode" is BigQuery column attribute concept, it can be 'repeated', 'nullable' or 'required' (enum BigQueryClient.FieldMode.REPEATED, NULLABLE, REQUIRED).The output field order is unrelated to the order of fields in
Expand All @@ -144,7 +153,7 @@ from tensorflow.python.framework import dtypes
from tensorflow_io.bigquery import BigQueryClient
from tensorflow_io.bigquery import BigQueryReadSession

GCP_PROJECT_ID = '<FILL_ME_IN>'
GCP_PROJECT_ID = "<FILL_ME_IN>"
DATASET_GCP_PROJECT_ID = "bigquery-public-data"
DATASET_ID = "certain_dataset"
TABLE_ID = "certain_table_with_repeated_field"
Expand All @@ -156,10 +165,10 @@ def main():
"projects/" + GCP_PROJECT_ID,
DATASET_GCP_PROJECT_ID, TABLE_ID, DATASET_ID,
selected_fiels={
"field_a_name": {"mode": BigQueryClient.FieldMode.REPEATED, output_type: dtypes.int64},
"field_b_name": {"mode": BigQueryClient.FieldMode.NULLABLE, output_type: dtypes.string},
"field_c_name": {"mode": BigQueryClient.FieldMode.REQUIRED, output_type: dtypes.string}
"field_d_name": {"mode": BigQueryClient.FieldMode.REPEATED, output_type: dtypes.string}
"field_a_name": {"mode": BigQueryClient.FieldMode.REPEATED, "output_type": dtypes.int64},
"field_b_name": {"mode": BigQueryClient.FieldMode.NULLABLE, "output_type": dtypes.string, "default_value": ""},
"field_c_name": {"mode": BigQueryClient.FieldMode.REQUIRED, "output_type": dtypes.string}
"field_d_name": {"mode": BigQueryClient.FieldMode.REPEATED, "output_type": dtypes.string}
}
requested_streams=2,
row_restriction="num_characters > 1000",
Expand All @@ -171,8 +180,8 @@ def main():
print("row %d: %s" % (row_index, row))
row_index += 1

if __name__ == '__main__':
app.run(main)
if __name__ == "__main__":
main()
```

Then each field of a repeated column becomes a rank-1 variable length Tensor. If you want to
Expand Down
2 changes: 2 additions & 0 deletions tensorflow_io/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ cc_library(
"@arrow",
"@avro",
"@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/types:any",
"@com_google_googleapis//google/cloud/bigquery/storage/v1beta1:storage_cc_grpc",
"@local_config_tf//:libtensorflow_framework",
"@local_config_tf//:tf_header_lib",
Expand All @@ -190,6 +191,7 @@ cc_library(
"@com_google_absl//absl/algorithm",
"@com_google_absl//absl/container:fixed_array",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/types:any",
"@com_google_absl//absl/types:variant",
"@com_google_googleapis//google/cloud/bigquery/storage/v1beta1:storage_cc_grpc",
"@local_config_tf//:libtensorflow_framework",
Expand Down
46 changes: 44 additions & 2 deletions tensorflow_io/core/kernels/bigquery/bigquery_dataset_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.
#include <memory>
#include <vector>

#include "absl/types/any.h"
#include "arrow/buffer.h"
#include "arrow/ipc/api.h"
#include "tensorflow/core/framework/op_kernel.h"
Expand All @@ -30,6 +31,7 @@ class BigQueryDatasetOp : public DatasetOpKernel {
explicit BigQueryDatasetOp(OpKernelConstruction *ctx) : DatasetOpKernel(ctx) {
OP_REQUIRES_OK(ctx, ctx->GetAttr("selected_fields", &selected_fields_));
OP_REQUIRES_OK(ctx, ctx->GetAttr("output_types", &output_types_));
OP_REQUIRES_OK(ctx, ctx->GetAttr("default_values", &default_values_));
OP_REQUIRES_OK(ctx, ctx->GetAttr("offset", &offset_));
string data_format_str;
OP_REQUIRES_OK(ctx, ctx->GetAttr("data_format", &data_format_str));
Expand All @@ -54,20 +56,53 @@ class BigQueryDatasetOp : public DatasetOpKernel {
output_shapes.reserve(num_outputs);
DataTypeVector output_types_vector;
output_types_vector.reserve(num_outputs);
typed_default_values_.reserve(num_outputs);
for (uint64 i = 0; i < num_outputs; ++i) {
output_shapes.push_back({});
output_types_vector.push_back(output_types_[i]);
const DataType &output_type = output_types_[i];
const string &default_value = default_values_[i];
switch (output_type) {
case DT_FLOAT:
typed_default_values_.push_back(absl::any(std::stof(default_value)));
break;
case DT_DOUBLE:
typed_default_values_.push_back(absl::any(std::stod(default_value)));
break;
case DT_INT32:
int32_t value_int32_t;
strings::safe_strto32(default_value, &value_int32_t);
typed_default_values_.push_back(absl::any(value_int32_t));
break;
case DT_INT64:
int64_t value_int64_t;
strings::safe_strto64(default_value, &value_int64_t);
typed_default_values_.push_back(absl::any(value_int64_t));
break;
case DT_BOOL:
typed_default_values_.push_back(absl::any(default_value == "True"));
break;
case DT_STRING:
typed_default_values_.push_back(absl::any(default_value));
break;
default:
ctx->CtxFailure(
errors::InvalidArgument("Unsupported output_type:", output_type));
break;
}
}

*output = new Dataset(ctx, client_resource, output_types_vector,
std::move(output_shapes), std::move(stream),
std::move(schema), selected_fields_, output_types_,
offset_, data_format_);
typed_default_values_, offset_, data_format_);
}

private:
std::vector<string> selected_fields_;
std::vector<DataType> output_types_;
std::vector<string> default_values_;
std::vector<absl::any> typed_default_values_;
int64 offset_;
apiv1beta1::DataFormat data_format_;

Expand All @@ -79,7 +114,8 @@ class BigQueryDatasetOp : public DatasetOpKernel {
std::vector<PartialTensorShape> output_shapes,
string stream, string schema,
std::vector<string> selected_fields,
std::vector<DataType> output_types, int64 offset_,
std::vector<DataType> output_types,
std::vector<absl::any> typed_default_values, int64 offset_,
apiv1beta1::DataFormat data_format)
: DatasetBase(DatasetContext(ctx)),
client_resource_(client_resource),
Expand All @@ -88,6 +124,7 @@ class BigQueryDatasetOp : public DatasetOpKernel {
stream_(stream),
selected_fields_(selected_fields),
output_types_(output_types),
typed_default_values_(typed_default_values),
offset_(offset_),
avro_schema_(absl::make_unique<avro::ValidSchema>()),
data_format_(data_format) {
Expand Down Expand Up @@ -147,6 +184,10 @@ class BigQueryDatasetOp : public DatasetOpKernel {

const std::vector<DataType> &output_types() const { return output_types_; }

const std::vector<absl::any> &typed_default_values() const {
return typed_default_values_;
}

const std::unique_ptr<avro::ValidSchema> &avro_schema() const {
return avro_schema_;
}
Expand Down Expand Up @@ -180,6 +221,7 @@ class BigQueryDatasetOp : public DatasetOpKernel {
const string stream_;
const std::vector<string> selected_fields_;
const std::vector<DataType> output_types_;
const std::vector<absl::any> typed_default_values_;
const std::unique_ptr<avro::ValidSchema> avro_schema_;
const int64 offset_;
std::shared_ptr<::arrow::Schema> arrow_schema_;
Expand Down
38 changes: 24 additions & 14 deletions tensorflow_io/core/kernels/bigquery/bigquery_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ limitations under the License.
#include <Windows.h>
#undef OPTIONAL
#endif
#include "absl/types/any.h"
#include "api/Compiler.hh"
#include "api/DataFile.hh"
#include "api/Decoder.hh"
Expand Down Expand Up @@ -127,7 +128,8 @@ class BigQueryReaderDatasetIteratorBase : public DatasetIterator<Dataset> {

auto status =
ReadRecord(ctx, out_tensors, this->dataset()->selected_fields(),
this->dataset()->output_types());
this->dataset()->output_types(),
this->dataset()->typed_default_values());
current_row_index_++;
return status;
}
Expand Down Expand Up @@ -181,10 +183,11 @@ class BigQueryReaderDatasetIteratorBase : public DatasetIterator<Dataset> {
}

virtual Status EnsureHasRow(bool *end_of_sequence) = 0;
virtual Status ReadRecord(IteratorContext *ctx,
std::vector<Tensor> *out_tensors,
const std::vector<string> &columns,
const std::vector<DataType> &output_types) = 0;
virtual Status ReadRecord(
IteratorContext *ctx, std::vector<Tensor> *out_tensors,
const std::vector<string> &columns,
const std::vector<DataType> &output_types,
const std::vector<absl::any> &typed_default_values) = 0;
int current_row_index_ = 0;
mutex mu_;
std::unique_ptr<::grpc::ClientContext> read_rows_context_ TF_GUARDED_BY(mu_);
Expand Down Expand Up @@ -245,15 +248,15 @@ class BigQueryReaderArrowDatasetIterator

Status ReadRecord(IteratorContext *ctx, std::vector<Tensor> *out_tensors,
const std::vector<string> &columns,
const std::vector<DataType> &output_types)
const std::vector<DataType> &output_types,
const std::vector<absl::any> &typed_default_values)
TF_EXCLUSIVE_LOCKS_REQUIRED(this->mu_) override {
out_tensors->clear();
out_tensors->reserve(columns.size());

if (this->current_row_index_ == 0 && this->column_indices_.empty()) {
this->column_indices_.resize(columns.size());
for (size_t i = 0; i < columns.size(); ++i) {
DataType output_type = output_types[i];
auto column_name = this->record_batch_->column_name(i);
auto it = std::find(columns.begin(), columns.end(), column_name);
if (it == columns.end()) {
Expand Down Expand Up @@ -337,7 +340,8 @@ class BigQueryReaderAvroDatasetIterator

Status ReadRecord(IteratorContext *ctx, std::vector<Tensor> *out_tensors,
const std::vector<string> &columns,
const std::vector<DataType> &output_types)
const std::vector<DataType> &output_types,
const std::vector<absl::any> &typed_default_values)
TF_EXCLUSIVE_LOCKS_REQUIRED(this->mu_) override {
avro::decode(*this->decoder_, *this->datum_);
if (this->datum_->type() != avro::AVRO_RECORD) {
Expand Down Expand Up @@ -521,22 +525,28 @@ class BigQueryReaderAvroDatasetIterator
case avro::AVRO_NULL:
switch (output_types[i]) {
case DT_BOOL:
((*out_tensors)[i]).scalar<bool>()() = false;
((*out_tensors)[i]).scalar<bool>()() =
absl::any_cast<bool>(typed_default_values[i]);
break;
case DT_INT32:
((*out_tensors)[i]).scalar<int32>()() = 0;
((*out_tensors)[i]).scalar<int32>()() =
absl::any_cast<int32_t>(typed_default_values[i]);
break;
case DT_INT64:
((*out_tensors)[i]).scalar<int64>()() = 0l;
((*out_tensors)[i]).scalar<int64>()() =
absl::any_cast<int64_t>(typed_default_values[i]);
break;
case DT_FLOAT:
((*out_tensors)[i]).scalar<float>()() = 0.0f;
((*out_tensors)[i]).scalar<float>()() =
absl::any_cast<float>(typed_default_values[i]);
break;
case DT_DOUBLE:
((*out_tensors)[i]).scalar<double>()() = 0.0;
((*out_tensors)[i]).scalar<double>()() =
absl::any_cast<double>(typed_default_values[i]);
break;
case DT_STRING:
((*out_tensors)[i]).scalar<tstring>()() = "";
((*out_tensors)[i]).scalar<tstring>()() =
absl::any_cast<string>(typed_default_values[i]);
break;
default:
return errors::InvalidArgument(
Expand Down
2 changes: 2 additions & 0 deletions tensorflow_io/core/ops/bigquery_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ REGISTER_OP("IO>BigQueryReadSession")
.Attr("dataset_id: string")
.Attr("selected_fields: list(string) >= 1")
.Attr("output_types: list(type) >= 1")
.Attr("default_values: list(string) >= 1")
.Attr("requested_streams: int")
.Attr("data_format: string")
.Attr("row_restriction: string = ''")
Expand All @@ -53,6 +54,7 @@ REGISTER_OP("IO>BigQueryDataset")
.Attr("data_format: string")
.Attr("selected_fields: list(string) >= 1")
.Attr("output_types: list(type) >= 1")
.Attr("default_values: list(string) >= 1")
.Output("handle: variant")
.SetIsStateful() // TODO(b/123753214): Source dataset ops must be marked
// stateful to inhibit constant folding.
Expand Down
Loading