Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix build issue with KafkaDataset #17418

Merged
merged 10 commits into from
Mar 8, 2018
3 changes: 2 additions & 1 deletion tensorflow/contrib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ cc_library(
"//tensorflow/contrib/coder:all_kernels",
"//tensorflow/contrib/cudnn_rnn:cudnn_rnn_kernels",
"//tensorflow/contrib/data/kernels:dataset_kernels",
"//tensorflow/contrib/kafka:dataset_kernels",
"//tensorflow/contrib/factorization/kernels:all_kernels",
"//tensorflow/contrib/input_pipeline:input_pipeline_ops_kernels",
"//tensorflow/contrib/layers:sparse_feature_cross_op_kernel",
Expand All @@ -147,7 +148,7 @@ cc_library(
"//tensorflow/contrib/factorization:all_ops",
"//tensorflow/contrib/framework:all_ops",
"//tensorflow/contrib/input_pipeline:input_pipeline_ops_op_lib",
"//tensorflow/contrib/kafka:kafka_ops_op_lib",
"//tensorflow/contrib/kafka:dataset_ops_op_lib",
"//tensorflow/contrib/layers:sparse_feature_cross_op_op_lib",
"//tensorflow/contrib/nccl:nccl_ops_op_lib",
"//tensorflow/contrib/nearest_neighbor:nearest_neighbor_ops_op_lib",
Expand Down
107 changes: 68 additions & 39 deletions tensorflow/contrib/kafka/BUILD
Original file line number Diff line number Diff line change
@@ -1,66 +1,93 @@
package(
default_visibility = ["//visibility:private"],
)
package(default_visibility = ["//tensorflow:internal"])

licenses(["notice"]) # Apache 2.0

exports_files(["LICENSE"])

load("//tensorflow:tensorflow.bzl", "tf_gen_op_libs")
load("//tensorflow:tensorflow.bzl", "tf_gen_op_wrapper_py")
load("//tensorflow:tensorflow.bzl", "tf_kernel_library")
load("//tensorflow:tensorflow.bzl", "tf_py_test")
load(
"//tensorflow:tensorflow.bzl",
"tf_gen_op_wrapper_py",
"tf_kernel_library",
"tf_custom_op_library",
"tf_custom_op_py_library",
"tf_gen_op_libs",
"tf_py_test",
)

tf_kernel_library(
name = "kafka_kernels",
py_library(
name = "kafka",
srcs = ["__init__.py"],
srcs_version = "PY2AND3",
deps = [
":dataset_ops",
],
)

tf_custom_op_library(
name = "_dataset_ops.so",
srcs = ["ops/dataset_ops.cc"],
deps = [":dataset_kernels"],
)

tf_gen_op_libs(
op_lib_names = ["dataset_ops"],
)

cc_library(
name = "dataset_kernels",
srcs = ["kernels/kafka_dataset_ops.cc"],
visibility = ["//visibility:public"],
deps = [
"//tensorflow/core:framework",
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/core/kernels:bounds_check_lib",
"//tensorflow/core/kernels:dataset",
"//tensorflow/core:framework_headers_lib",
"//third_party/eigen3",
"@kafka",
"@protobuf_archive//:protobuf_headers",
],
alwayslink = 1,
)

tf_gen_op_libs(
op_lib_names = ["kafka_ops"],
py_library(
name = "dataset_ops",
srcs = [
"python/ops/kafka_dataset_ops.py",
],
srcs_version = "PY2AND3",
deps = [
"//tensorflow/core:lib",
":kafka_op_loader",
"//tensorflow/python:dataset_ops_gen",
"//tensorflow/python:util",
"//tensorflow/python/data/ops:dataset_ops",
"//tensorflow/python/data/util:nest",
],
)

tf_gen_op_wrapper_py(
name = "gen_kafka_ops",
out = "python/ops/gen_kafka_ops.py",
require_shape_functions = True,
deps = [":kafka_ops_op_lib"],
name = "gen_dataset_ops",
out = "python/ops/gen_dataset_ops.py",
deps = ["//tensorflow/contrib/kafka:dataset_ops_op_lib"],
)

py_library(
name = "kafka",
srcs = [
"__init__.py",
"python/ops/kafka_dataset_ops.py",
tf_kernel_library(
name = "dataset_ops_kernels",
deps = [
":dataset_kernels",
"//tensorflow/core:framework",
],
alwayslink = 1,
)

tf_custom_op_py_library(
name = "kafka_op_loader",
srcs = ["python/ops/kafka_op_loader.py"],
dso = ["//tensorflow/contrib/kafka:_dataset_ops.so"],
kernels = [
":dataset_ops_kernels",
"//tensorflow/contrib/kafka:dataset_ops_op_lib",
],
srcs_version = "PY2AND3",
visibility = ["//visibility:public"],
deps = [
":gen_kafka_ops",
":gen_dataset_ops",
"//tensorflow/contrib/util:util_py",
"//tensorflow/python:array_ops",
"//tensorflow/python:control_flow_ops",
"//tensorflow/python:framework",
"//tensorflow/python:framework_for_generated_wrappers",
"//tensorflow/python:platform",
"//tensorflow/python:state_ops",
"//tensorflow/python:training",
"//tensorflow/python/data/ops:dataset_ops",
"//tensorflow/python/data/ops:iterator_ops",
"//tensorflow/python/data/ops:readers",
],
)

Expand Down Expand Up @@ -95,7 +122,9 @@ tf_py_test(
filegroup(
name = "all_files",
srcs = glob(
["**/*"],
include = [
"**/*",
],
exclude = [
"**/METADATA",
"**/OWNERS",
Expand Down
4 changes: 1 addition & 3 deletions tensorflow/contrib/kafka/kernels/kafka_dataset_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

#include "tensorflow/core/kernels/dataset.h"

#include "tensorflow/core/framework/tensor.h"
#include "tensorflow/core/framework/dataset.h"

#include "src-cpp/rdkafkacpp.h"

Expand Down
9 changes: 5 additions & 4 deletions tensorflow/contrib/kafka/python/ops/kafka_dataset_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
from __future__ import division
from __future__ import print_function

from tensorflow.contrib.kafka.python.ops import gen_kafka_ops
from tensorflow.python.data.ops.readers import Dataset
from tensorflow.contrib.kafka.python.ops import kafka_op_loader # pylint: disable=unused-import
from tensorflow.contrib.kafka.python.ops import gen_dataset_ops
from tensorflow.python.data.ops.dataset_ops import Dataset
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.framework import tensor_shape
Expand Down Expand Up @@ -58,8 +59,8 @@ def __init__(self,
timeout, dtype=dtypes.int64, name="timeout")

def _as_variant_tensor(self):
return gen_kafka_ops.kafka_dataset(self._topics, self._servers, self._group,
self._eof, self._timeout)
return gen_dataset_ops.kafka_dataset(self._topics, self._servers,
self._group, self._eof, self._timeout)

@property
def output_classes(self):
Expand Down
24 changes: 24 additions & 0 deletions tensorflow/contrib/kafka/python/ops/kafka_op_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Python helper for loading kafka ops and kernels."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from tensorflow.contrib.util import loader
from tensorflow.python.platform import resource_loader

_dataset_ops = loader.load_op_library(
resource_loader.get_path_to_datafile("../../_dataset_ops.so"))
1 change: 1 addition & 0 deletions tensorflow/tools/pip_package/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ filegroup(
"@highwayhash//:LICENSE",
"@jemalloc//:COPYING",
"@jpeg//:LICENSE.md",
"@kafka//:LICENSE",
"@libxsmm_archive//:LICENSE",
"@lmdb//:LICENSE",
"@local_config_sycl//sycl:LICENSE.text",
Expand Down
13 changes: 9 additions & 4 deletions third_party/kafka/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,23 @@ cc_library(
],
hdrs = [
"config.h",
"src-cpp/rdkafkacpp.h",
"src-cpp/rdkafkacpp_int.h",
"src/lz4.c",
"src/snappy_compat.h",
],
defines = [
copts = [
"-Iexternal/kafka/src",
"-Iexternal/kafka/src-cpp",
],
includes = [
"src",
"src-cpp",
defines = [
],
linkopts = [
"-lpthread",
],
visibility = ["//visibility:public"],
deps = [
"@boringssl//:ssl",
"@zlib_archive//:zlib",
],
)