-
Notifications
You must be signed in to change notification settings - Fork 306
Add GRPCDataset to allow pulling data from a gRPC server such as numpy array #206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
2f8f133 to
27ceb0e
Compare
BryanCutler
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yongtang I only had time for a quick skim right now, but it looks really cool! Do you think I could reuse some of these parts to feed data from Python memory to the ArrowDataset?
I had a small concern about using a void* as one of the method arguments, maybe you can elaborate on this?
| rootpath, | ||
| os.path.relpath(os.path.join(rootname, filename), datapath)) | ||
| print("setup.py - copy {} to {}".format(src, dst)) | ||
| shutil.copyfile(src, dst) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering why adding this now, was it grabbing some incorrect files?
Also could you just reuse the similar loop above and just nest under a list of extensions? like
for file_pattern in ["*.so", "*py"]:
for filename in fnmatch.filter(filenames, file_pattern):
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @BryanCutler. The PR has been updated. Previously, we pick up py files in the source tree, and pick up .so files in bazel-bin. However, for gRPC, it consists of generated code in python (.py) based on endpoint.proto file. Those files are generated as part of the build so they are placed in bazel-bin (by bazel). So setup.py has to be updated to capture the py files in bazel-bin as well.
| virtual ~DataInput() {} | ||
| virtual Status FromStream(io::InputStreamInterface& s) = 0; | ||
| virtual Status ReadRecord(io::InputStreamInterface& s, IteratorContext* ctx, std::unique_ptr<T>& state, int64 record_to_read, int64* record_read, std::vector<Tensor>* out_tensors) const = 0; | ||
| virtual Status ReadReferenceRecord(void* s, IteratorContext* ctx, std::unique_ptr<T>& state, int64 record_to_read, int64* record_read, std::vector<Tensor>* out_tensors) const = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind explaining why using void * here and the change to using pointers instead of references?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BryanCutler Previously, references works well as we are mostly dealing with file streams (io::InputStreamInterface& s). However, for gRPC it is not a file stream and is not returning raw bytes (no ReadNBytes). So InputStreamInterface is irrelevant here. Changed to pointers so that nullptr could be passed for gRPC.
The main purpose is to try to reuse the logic of batch where we need to piece together chunks of tensor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for the explanation. Having void* arguments on public apis make a little nervous though. Do you think putting this as a protected member makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BryanCutler Thanks for the review. The PR has been updated. Now FileInput (and subclass delcared) only need to implement:
ReadRecord(io::InputStreamInterface* s, IteratorContext* ctx, ...)
and StreamInput only need to implement:
Status ReadRecord(IteratorContext* ctx, ...
So void * is hidden and will not be touched by any ops.
Please take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good thanks!
|
Add a reference of tensorflow/tensorflow#13530 , which has a need for pandas input. Could be a follow up PR I think. |
| "python/ops/endpoint_pb2_grpc.py", | ||
| ], | ||
| cmd = "python -m grpc_tools.protoc -Itensorflow_io/grpc --python_out=$(BINDIR)/tensorflow_io/grpc/python/ops/ --grpc_python_out=$(BINDIR)/tensorflow_io/grpc/python/ops/ $< ; touch $(BINDIR)/tensorflow_io/grpc/python/ops/__init__.py", | ||
| output_to_bindir = True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flatbuffers build for Arrow had an option to put the generated files under a prefix directory, e.g. out_prefix = "cpp/src/arrow/ipc/", is that possible to do here to just put the files in the right place initially?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BryanCutler Bazel used mount point to hide the source directory purposely, so that the generated code could not be exposed in source directory. So we have to copy the python file exposed in bazel-bin (not the source directory). The flatbuffers case is different as the generated file are used for compile (bazel could see), not for final exposure (like python case we encounter here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see thanks for the explanation
|
@BryanCutler The PR has been updated. Please take a look. |
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
This fix tries to have a way to pull data from a gRPC server, as long as a gRPC server implement the protocol in endpoint.proto. The main purpose is to allow reading partial data from numpy array in memory. In the from_numpy method, the following is done: - Create a gRPC server and exposes ReadRecord endpoint. - Start the gRPC server on a random port in local host. - Pass endpoint (<localhost:port>) to GRPCDataset - GRPCDataset will pull the data from gRPC server. Note from_numpy is just a facility method to setup a gRPC server. In theory, a GRPC server could be created by any other process and by any language. Only the endpoint information is needed to create GRPCDataset. In case numpy array is huge, then this method could be helpful as it is not required to save numpy file, and load back by tf.data, or pass the whole numpy data into a tensor. This could open up doors for other languages as well. For example, in case of R, we could setup a grpc server with R, and pass R dataframes in memory and allows GRPCDataset to pull a chunk of the data at a time. Note we create gRPC server locally, but it is possible to expose gRPC server remotely so that the workload could be distributed. Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
BryanCutler
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I had just one more minor comment
| endpoint = grpc_server.endpoint() | ||
| dtype = a.dtype | ||
| shape = list(a.shape) | ||
| batch = batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this maybe a typo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BryanCutler Thanks! Just updated.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
|
Let's merge this one. Saw some discussion about Java for which this PR should help. |
Add GRPCDataset to allow pulling data from a gRPC server such as numpy array
This fix tries to have a way to pull data from a gRPC server, as long as a gRPC server implement the protocol in endpoint.proto.
The main purpose is to allow reading partial data from numpy array in memory.
In the from_numpy method, the following is done:
Note from_numpy is just a facility method to setup a gRPC server. In theory, a GRPC server could be created by any other process and by any language. Only the endpoint information is needed to create GRPCDataset.
In case numpy array is huge, then this method could be helpful as it is not required to save numpy file, and load back by tf.data, or pass the whole numpy data into a tensor.
This could open up doors for other languages as well. For example, in case of R, we could setup a grpc server with R, and pass R dataframes in memory and allows GRPCDataset to pull a chunk of the data at a time.
Note we create gRPC server locally, but it is possible to expose gRPC server remotely so that the workload could be distributed.
Signed-off-by: Yong Tang yong.tang.github@outlook.com