Skip to content

Commit

Permalink
[Streaming] Streaming data transfer and python integration (#6185)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaokunyang authored and raulchen committed Dec 10, 2019
1 parent c1d4ab8 commit 6272907
Show file tree
Hide file tree
Showing 93 changed files with 8,385 additions and 1,431 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Expand Up @@ -130,6 +130,7 @@ scripts/nodes.txt

# Pytest Cache
**/.pytest_cache
**/.cache
.benchmarks

# Vscode
Expand All @@ -145,6 +146,9 @@ java/**/.classpath
java/**/.project
java/runtime/native_dependencies/

# streaming/python
streaming/python/generated/

# python virtual env
venv

Expand Down
17 changes: 16 additions & 1 deletion .travis.yml
Expand Up @@ -34,6 +34,21 @@ matrix:
- if [ $RAY_CI_JAVA_AFFECTED != "1" ]; then exit; fi
- ./java/test.sh

- os: linux
env: BAZEL_PYTHON_VERSION=PY3 PYTHON=3.5 PYTHONWARNINGS=ignore TESTSUITE=streaming
install:
- python $TRAVIS_BUILD_DIR/ci/travis/determine_tests_to_run.py
- eval `python $TRAVIS_BUILD_DIR/ci/travis/determine_tests_to_run.py`
- if [ $RAY_CI_STREAMING_PYTHON_AFFECTED != "1" ]; then exit; fi
- ./ci/suppress_output ./ci/travis/install-bazel.sh
- ./ci/suppress_output ./ci/travis/install-dependencies.sh
- export PATH="$HOME/miniconda/bin:$PATH"
- ./ci/suppress_output ./ci/travis/install-ray.sh
script:
# Streaming cpp test.
- if [ $RAY_CI_STREAMING_CPP_AFFECTED == "1" ]; then ./ci/suppress_output bash streaming/src/test/run_streaming_queue_test.sh; fi
- if [ RAY_CI_STREAMING_PYTHON_AFFECTED == "1" ]; then python -m pytest -v --durations=5 --timeout=300 python/ray/streaming/tests/; fi

- os: linux
env: LINT=1 PYTHONWARNINGS=ignore
before_install:
Expand All @@ -51,7 +66,7 @@ matrix:
- sphinx-build -W -b html -d _build/doctrees source _build/html
- cd ..
# Run Python linting, ignore dict vs {} (C408), others are defaults
- flake8 --inline-quotes '"' --no-avoid-escape --exclude=python/ray/core/generated/,doc/source/conf.py,python/ray/cloudpickle/ --ignore=C408,E121,E123,E126,E226,E24,E704,W503,W504,W605
- flake8 --inline-quotes '"' --no-avoid-escape --exclude=python/ray/core/generated/,streaming/python/generated,doc/source/conf.py,python/ray/cloudpickle/ --ignore=C408,E121,E123,E126,E226,E24,E704,W503,W504,W605
- ./ci/travis/format.sh --all
# Make sure that the README is formatted properly.
- cd python
Expand Down
67 changes: 66 additions & 1 deletion BUILD.bazel
Expand Up @@ -7,6 +7,28 @@ load("@com_github_grpc_grpc//bazel:cc_grpc_library.bzl", "cc_grpc_library")
load("@com_github_grpc_grpc//bazel:cython_library.bzl", "pyx_library")
load("@rules_proto_grpc//python:defs.bzl", "python_grpc_compile")
load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
load("//bazel:ray.bzl", "if_linux_x86_64")

config_setting(
name = "windows",
values = {"cpu": "x64_windows"},
visibility = ["//visibility:public"],
)

config_setting(
name = "macos",
values = {
"apple_platform_type": "macos",
"cpu": "darwin",
},
visibility = ["//visibility:public"],
)

config_setting(
name = "linux_x86_64",
values = {"cpu": "k8"},
visibility = ["//visibility:public"],
)

# TODO(mehrdadn): (How to) support dynamic linking?
PROPAGATED_WINDOWS_DEFINES = ["RAY_STATIC"]
Expand Down Expand Up @@ -219,6 +241,7 @@ cc_library(
includes = [
"@boost//:asio",
],
visibility = ["//visibility:public"],
deps = [
":common_cc_proto",
":gcs_cc_proto",
Expand Down Expand Up @@ -327,6 +350,7 @@ cc_library(
"-lpthread",
],
}),
visibility = ["//streaming:__subpackages__"],
deps = [
":common_cc_proto",
":gcs",
Expand Down Expand Up @@ -373,6 +397,7 @@ cc_library(
"src/ray/core_worker/transport/*.h",
]),
copts = COPTS,
visibility = ["//visibility:public"],
deps = [
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
Expand Down Expand Up @@ -659,6 +684,7 @@ cc_library(
includes = [
"src",
],
visibility = ["//visibility:public"],
deps = [
":sha256",
"@com_github_google_glog//:glog",
Expand Down Expand Up @@ -782,15 +808,51 @@ pyx_library(
name = "_raylet",
srcs = glob([
"python/ray/__init__.py",
"python/ray/_raylet.pxd",
"python/ray/_raylet.pyx",
"python/ray/includes/*.pxd",
"python/ray/includes/*.pxi",
]),
copts = COPTS,
# Export ray ABI symbols, which can then be used by _streaming.so.
# We need to dlopen this lib with RTLD_GLOBAL to use ABI in this
# shared lib, see python/ray/__init__.py.
cc_kwargs = {
"linkstatic": 1,
# see https://github.com/tensorflow/tensorflow/blob/r2.1/tensorflow/lite/BUILD#L444
"linkopts": select({
"//:macos": [
"-Wl,-exported_symbols_list,$(location //:src/ray/ray_exported_symbols.lds)",
],
"//:windows": [],
"//conditions:default": [
"-Wl,--version-script,$(location //:src/ray/ray_version_script.lds)",
],
}),
},
copts = COPTS + if_linux_x86_64(["-fno-gnu-unique"]),
deps = [
"//:core_worker_lib",
"//:raylet_lib",
"//:serialization_cc_proto",
"//:src/ray/ray_exported_symbols.lds",
"//:src/ray/ray_version_script.lds",
],
)

pyx_library(
name = "_streaming",
srcs = glob([
"python/ray/streaming/_streaming.pyx",
"python/ray/__init__.py",
"python/ray/_raylet.pxd",
"python/ray/includes/*.pxd",
"python/ray/includes/*.pxi",
"python/ray/streaming/__init__.pxd",
"python/ray/streaming/includes/*.pxd",
"python/ray/streaming/includes/*.pxi",
]),
deps = [
"//streaming:streaming_lib",
],
)

Expand Down Expand Up @@ -922,6 +984,7 @@ genrule(
name = "ray_pkg",
srcs = [
"python/ray/_raylet.so",
"python/ray/streaming/_streaming.so",
"//:python_sources",
"//:all_py_proto",
"//:redis-server",
Expand All @@ -930,12 +993,14 @@ genrule(
"//:raylet",
"//:raylet_monitor",
"@plasma//:plasma_store_server",
"//streaming:copy_streaming_py_proto",
],
outs = ["ray_pkg.out"],
cmd = """
set -x &&
WORK_DIR=$$(pwd) &&
cp -f $(location python/ray/_raylet.so) "$$WORK_DIR/python/ray" &&
cp -f $(location python/ray/streaming/_streaming.so) $$WORK_DIR/python/ray/streaming &&
mkdir -p "$$WORK_DIR/python/ray/core/src/ray/thirdparty/redis/src/" &&
cp -f $(location //:redis-server) "$$WORK_DIR/python/ray/core/src/ray/thirdparty/redis/src/" &&
cp -f $(location //:redis-cli) "$$WORK_DIR/python/ray/core/src/ray/thirdparty/redis/src/" &&
Expand Down
6 changes: 6 additions & 0 deletions bazel/ray.bzl
Expand Up @@ -64,3 +64,9 @@ def define_java_module(
"{auto_gen_header}": "<!-- This file is auto-generated by Bazel from pom_template.xml, do not modify it. -->",
},
)

def if_linux_x86_64(a):
return select({
"//:linux_x86_64": a,
"//conditions:default": [],
})
3 changes: 2 additions & 1 deletion ci/travis/bazel-format.sh
Expand Up @@ -44,6 +44,7 @@ while [[ $# > 0 ]]; do
done

pushd $ROOT_DIR/../..
BAZEL_FILES="bazel/BUILD bazel/BUILD.plasma bazel/ray.bzl BUILD.bazel WORKSPACE"
BAZEL_FILES="bazel/BUILD bazel/BUILD.plasma bazel/ray.bzl BUILD.bazel
streaming/BUILD.bazel WORKSPACE"
buildifier -mode=$RUN_TYPE -diff_command="diff -u" $BAZEL_FILES
popd
16 changes: 16 additions & 0 deletions ci/travis/determine_tests_to_run.py
Expand Up @@ -38,6 +38,8 @@ def list_changed_files(commit_range):
RAY_CI_PYTHON_AFFECTED = 0
RAY_CI_LINUX_WHEELS_AFFECTED = 0
RAY_CI_MACOS_WHEELS_AFFECTED = 0
RAY_CI_STREAMING_CPP_AFFECTED = 0
RAY_CI_STREAMING_PYTHON_AFFECTED = 0

if os.environ["TRAVIS_EVENT_TYPE"] == "pull_request":

Expand Down Expand Up @@ -71,6 +73,7 @@ def list_changed_files(commit_range):
RAY_CI_PYTHON_AFFECTED = 1
RAY_CI_LINUX_WHEELS_AFFECTED = 1
RAY_CI_MACOS_WHEELS_AFFECTED = 1
RAY_CI_STREAMING_PYTHON_AFFECTED = 1
elif changed_file.startswith("java/"):
RAY_CI_JAVA_AFFECTED = 1
elif any(
Expand All @@ -86,6 +89,13 @@ def list_changed_files(commit_range):
RAY_CI_PYTHON_AFFECTED = 1
RAY_CI_LINUX_WHEELS_AFFECTED = 1
RAY_CI_MACOS_WHEELS_AFFECTED = 1
RAY_CI_STREAMING_CPP_AFFECTED = 1
RAY_CI_STREAMING_PYTHON_AFFECTED = 1
elif changed_file.startswith("streaming/src"):
RAY_CI_STREAMING_CPP_AFFECTED = 1
RAY_CI_STREAMING_PYTHON_AFFECTED = 1
elif changed_file.startswith("streaming/python"):
RAY_CI_STREAMING_PYTHON_AFFECTED = 1
else:
RAY_CI_TUNE_AFFECTED = 1
RAY_CI_RLLIB_AFFECTED = 1
Expand All @@ -94,6 +104,7 @@ def list_changed_files(commit_range):
RAY_CI_PYTHON_AFFECTED = 1
RAY_CI_LINUX_WHEELS_AFFECTED = 1
RAY_CI_MACOS_WHEELS_AFFECTED = 1
RAY_CI_STREAMING_CPP_AFFECTED = 1
else:
RAY_CI_TUNE_AFFECTED = 1
RAY_CI_RLLIB_AFFECTED = 1
Expand All @@ -102,6 +113,7 @@ def list_changed_files(commit_range):
RAY_CI_PYTHON_AFFECTED = 1
RAY_CI_LINUX_WHEELS_AFFECTED = 1
RAY_CI_MACOS_WHEELS_AFFECTED = 1
RAY_CI_STREAMING_CPP_AFFECTED = 1

# Log the modified environment variables visible in console.
for output_stream in [sys.stdout, sys.stderr]:
Expand All @@ -116,3 +128,7 @@ def list_changed_files(commit_range):
.format(RAY_CI_LINUX_WHEELS_AFFECTED))
_print("export RAY_CI_MACOS_WHEELS_AFFECTED={}"
.format(RAY_CI_MACOS_WHEELS_AFFECTED))
_print("export RAY_CI_STREAMING_CPP_AFFECTED={}"
.format(RAY_CI_STREAMING_CPP_AFFECTED))
_print("export RAY_CI_STREAMING_PYTHON_AFFECTED={}"
.format(RAY_CI_STREAMING_PYTHON_AFFECTED))
4 changes: 2 additions & 2 deletions ci/travis/format.sh
Expand Up @@ -79,14 +79,14 @@ format_changed() {
yapf --in-place "${YAPF_EXCLUDES[@]}" "${YAPF_FLAGS[@]}"
if which flake8 >/dev/null; then
git diff --name-only --diff-filter=ACRM "$MERGEBASE" -- '*.py' | xargs -P 5 \
flake8 --inline-quotes '"' --no-avoid-escape --exclude=python/ray/core/generated/,doc/source/conf.py,python/ray/cloudpickle/ --ignore=C408,E121,E123,E126,E226,E24,E704,W503,W504,W605
flake8 --inline-quotes '"' --no-avoid-escape --exclude=python/ray/core/generated/,streaming/python/generated,doc/source/conf.py,python/ray/cloudpickle/ --ignore=C408,E121,E123,E126,E226,E24,E704,W503,W504,W605
fi
fi

if ! git diff --diff-filter=ACRM --quiet --exit-code "$MERGEBASE" -- '*.pyx' '*.pxd' '*.pxi' &>/dev/null; then
if which flake8 >/dev/null; then
git diff --name-only --diff-filter=ACRM "$MERGEBASE" -- '*.pyx' '*.pxd' '*.pxi' | xargs -P 5 \
flake8 --inline-quotes '"' --no-avoid-escape --exclude=python/ray/core/generated/,doc/source/conf.py,python/ray/cloudpickle/ --ignore=C408,E121,E123,E126,E211,E225,E226,E227,E24,E704,E999,W503,W504,W605
flake8 --inline-quotes '"' --no-avoid-escape --exclude=python/ray/core/generated/,streaming/python/generated,doc/source/conf.py,python/ray/cloudpickle/ --ignore=C408,E121,E123,E126,E211,E225,E226,E227,E24,E704,E999,W503,W504,W605
fi
fi

Expand Down
9 changes: 9 additions & 0 deletions python/ray/__init__.py
Expand Up @@ -3,6 +3,7 @@
from __future__ import print_function

import os
from os.path import dirname
import sys

# MUST add pickle5 to the import path because it will be imported by some
Expand All @@ -19,6 +20,14 @@
os.path.abspath(os.path.dirname(__file__)), "pickle5_files")
sys.path.insert(0, pickle5_path)

# Expose ray ABI symbols which may be dependent by other shared
# libraries such as _streaming.so. See BUILD.bazel:_raylet
so_path = os.path.join(dirname(__file__), "_raylet.so")
if os.path.exists(so_path):
import ctypes
from ctypes import CDLL
CDLL(so_path, ctypes.RTLD_GLOBAL)

# MUST import ray._raylet before pyarrow to initialize some global variables.
# It seems the library related to memory allocation in pyarrow will destroy the
# initialization of grpc if we import pyarrow at first.
Expand Down
70 changes: 70 additions & 0 deletions python/ray/_raylet.pxd
@@ -0,0 +1,70 @@
# cython: profile=False
# distutils: language = c++
# cython: embedsignature = True
# cython: language_level = 3

from libcpp cimport bool as c_bool
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
from libcpp.memory cimport (
shared_ptr,
unique_ptr
)

from ray.includes.common cimport (
CBuffer,
CRayObject
)
from ray.includes.libcoreworker cimport CCoreWorker
from ray.includes.unique_ids cimport (
CObjectID,
CActorID
)

cdef class Buffer:
cdef:
shared_ptr[CBuffer] buffer
Py_ssize_t shape
Py_ssize_t strides

@staticmethod
cdef make(const shared_ptr[CBuffer]& buffer)

cdef class BaseID:
# To avoid the error of "Python int too large to convert to C ssize_t",
# here `cdef size_t` is required.
cdef size_t hash(self)

cdef class ObjectID(BaseID):
cdef:
CObjectID data
object buffer_ref
# Flag indicating whether or not this object ID was added to the set
# of active IDs in the core worker so we know whether we should clean
# it up.
c_bool in_core_worker

cdef CObjectID native(self)

cdef class ActorID(BaseID):
cdef CActorID data

cdef CActorID native(self)

cdef size_t hash(self)

cdef class CoreWorker:
cdef:
unique_ptr[CCoreWorker] core_worker
object async_thread
object async_event_loop

cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata,
size_t data_size, ObjectID object_id,
CObjectID *c_object_id, shared_ptr[CBuffer] *data)
# TODO: handle noreturn better
cdef store_task_outputs(
self, worker, outputs, const c_vector[CObjectID] return_ids,
c_vector[shared_ptr[CRayObject]] *returns)

cdef c_vector[c_string] string_vector_from_list(list string_list)

0 comments on commit 6272907

Please sign in to comment.