Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/ray-project/ray into serv…
Browse files Browse the repository at this point in the history
…e-get-deployment
  • Loading branch information
edoakes committed Mar 29, 2021
2 parents c8f5227 + e79d4cf commit d37e0b2
Show file tree
Hide file tree
Showing 71 changed files with 1,375 additions and 867 deletions.
21 changes: 0 additions & 21 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,6 @@ cc_library(
],
)

cc_binary(
name = "plasma_store_server",
srcs = [
"src/ray/plasma/store_exec.cc",
],
copts = PLASMA_COPTS,
visibility = ["//visibility:public"],
deps = [
":plasma_store_server_lib",
],
)

FLATC_ARGS = [
"--gen-object-api",
"--gen-mutable",
Expand Down Expand Up @@ -679,7 +667,6 @@ cc_test(
name = "core_worker_test",
srcs = ["src/ray/core_worker/test/core_worker_test.cc"],
args = [
"$(location //:plasma_store_server)",
"$(location raylet)",
"$(location mock_worker)",
"$(location gcs_server)",
Expand All @@ -692,7 +679,6 @@ cc_test(
"//:gcs_server",
"//:libray_redis_module.so",
"//:mock_worker",
"//:plasma_store_server",
"//:raylet",
"//:redis-cli",
"//:redis-server",
Expand Down Expand Up @@ -1876,12 +1862,6 @@ copy_to_workspace(
dstdir = "python/ray/core/src/ray/gcs",
)

copy_to_workspace(
name = "cp_plasma_store_server",
srcs = [":plasma_store_server"],
dstdir = "python/ray/core/src/plasma",
)

genrule(
name = "install_py_proto",
srcs = [
Expand Down Expand Up @@ -1913,7 +1893,6 @@ genrule(
":cp_libray_redis_module",
":cp_raylet",
":cp_gcs_server",
":cp_plasma_store_server",
],
outs = ["ray_pkg.out"],
cmd = """
Expand Down
2 changes: 1 addition & 1 deletion ci/travis/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ test_python() {
test_cpp() {
bazel build --config=ci //cpp:all
# shellcheck disable=SC2046
bazel test --config=ci $(./scripts/bazel_export_options) //cpp:all --build_tests_only
bazel test --config=ci $(./scripts/bazel_export_options) --test_strategy=exclusive //cpp:all --build_tests_only
# run the cpp example
bazel run //cpp/example:example

Expand Down
38 changes: 38 additions & 0 deletions cpp/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ cc_library(
"//:ray_util",
"@boost//:asio",
"@boost//:callable_traits",
"@boost//:dll",
"@boost//:thread",
"@com_google_absl//absl/synchronization",
"@msgpack",
Expand Down Expand Up @@ -123,3 +124,40 @@ cc_binary(
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "ray_remote_cluster_test",
testonly = 0,
srcs = glob([
"src/ray/test/ray_remote_cluster/*.cc",
]),
args = [
"$(location ray_remote_cluster_test.so)",
],
copts = COPTS,
data = [
"ray_cpp_pkg",
"ray_remote_cluster_test.so",
],
linkstatic = True,
deps = [
"ray_api",
"@com_github_gflags_gflags//:gflags",
"@com_google_googletest//:gtest_main",
],
)

cc_binary(
name = "ray_remote_cluster_test.so",
srcs = glob([
"src/ray/test/ray_remote_cluster/*.cc",
]),
copts = COPTS,
linkopts = ["-shared"],
linkstatic = True,
deps = [
"ray_api",
"@com_github_gflags_gflags//:gflags",
"@com_google_googletest//:gtest_main",
],
)
14 changes: 7 additions & 7 deletions cpp/example/example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ int main(int argc, char **argv) {
std::cout << "task_result1 = " << task_result1 << std::endl;

/// common task with args
task_obj = Ray::Task(Plus1, 5).Remote();
task_obj = Ray::Task(Plus1).Remote(5);
int task_result2 = *(Ray::Get(task_obj));
std::cout << "task_result2 = " << task_result2 << std::endl;

Expand All @@ -92,15 +92,15 @@ int main(int argc, char **argv) {

/// general function remote call(args passed by value)
auto r0 = Ray::Task(Return1).Remote();
auto r2 = Ray::Task(Plus, 3, 22).Remote();
auto r2 = Ray::Task(Plus).Remote(3, 22);
int task_result3 = *(Ray::Get(r2));
std::cout << "task_result3 = " << task_result3 << std::endl;

/// general function remote call(args passed by reference)
auto r3 = Ray::Task(Return1).Remote();
auto r4 = Ray::Task(Plus1, r3).Remote();
auto r5 = Ray::Task(Plus, r4, r3).Remote();
auto r6 = Ray::Task(Plus, r4, 10).Remote();
auto r4 = Ray::Task(Plus1).Remote(r3);
auto r5 = Ray::Task(Plus).Remote(r4, r3);
auto r6 = Ray::Task(Plus).Remote(r4, 10);
int task_result4 = *(Ray::Get(r6));
int task_result5 = *(Ray::Get(r5));
std::cout << "task_result4 = " << task_result4 << ", task_result5 = " << task_result5
Expand All @@ -118,8 +118,8 @@ int main(int argc, char **argv) {
auto r12 = actor5.Task(&Counter::Add, r11).Remote();
auto r13 = actor5.Task(&Counter::Add, r10).Remote();
auto r14 = actor5.Task(&Counter::Add, r13).Remote();
auto r15 = Ray::Task(Plus, r0, r11).Remote();
auto r16 = Ray::Task(Plus1, r15).Remote();
auto r15 = Ray::Task(Plus).Remote(r0, r11);
auto r16 = Ray::Task(Plus1).Remote(r15);
int result12 = *(Ray::Get(r12));
int result14 = *(Ray::Get(r14));
int result11 = *(Ray::Get(r11));
Expand Down
41 changes: 20 additions & 21 deletions cpp/include/ray/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class Ray {
/// \param[in] func The function to be remote executed.
/// \param[in] args The function arguments passed by a value or ObjectRef.
/// \return TaskCaller.
template <typename F, typename... Args>
static TaskCaller<boost::callable_traits::return_type_t<F>> Task(F func, Args... args);
template <typename F>
static TaskCaller<boost::callable_traits::return_type_t<F>> Task(F func);

/// Generic version of creating an actor
/// It is used for creating an actor, such as: ActorCreator<Counter> creator =
Expand All @@ -98,10 +98,8 @@ class Ray {
private:
static std::once_flag is_inited_;

template <typename ReturnType, typename FuncType, typename ExecFuncType,
typename... ArgTypes>
static TaskCaller<ReturnType> TaskInternal(FuncType &func, ExecFuncType &exec_func,
ArgTypes &... args);
template <typename ReturnType, typename FuncType>
static TaskCaller<ReturnType> TaskInternal(FuncType &func);

template <typename ActorType, typename FuncType, typename ExecFuncType,
typename... ArgTypes>
Expand Down Expand Up @@ -172,17 +170,19 @@ inline WaitResult Ray::Wait(const std::vector<ObjectID> &ids, int num_objects,
return ray::internal::RayRuntime()->Wait(ids, num_objects, timeout_ms);
}

template <typename ReturnType, typename FuncType, typename ExecFuncType,
typename... ArgTypes>
inline TaskCaller<ReturnType> Ray::TaskInternal(FuncType &func, ExecFuncType &exec_func,
ArgTypes &... args) {
std::vector<std::unique_ptr<::ray::TaskArg>> task_args;
Arguments::WrapArgs(&task_args, args...);
RemoteFunctionPtrHolder ptr;
template <typename ReturnType, typename FuncType>
inline TaskCaller<ReturnType> Ray::TaskInternal(FuncType &func) {
RemoteFunctionPtrHolder ptr{};
ptr.function_pointer = reinterpret_cast<uintptr_t>(func);
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
return TaskCaller<ReturnType>(ray::internal::RayRuntime().get(), ptr,
std::move(task_args));
if (ray::api::RayConfig::GetInstance()->use_ray_remote) {
auto function_name = ray::internal::FunctionManager::Instance().GetFunctionName(func);
if (function_name.empty()) {
throw RayException(
"Function not found. Please use RAY_REMOTE to register this function.");
}
ptr.function_name = std::move(function_name);
}
return TaskCaller<ReturnType>(ray::internal::RayRuntime().get(), ptr);
}

template <typename ActorType, typename FuncType, typename ExecFuncType,
Expand All @@ -200,12 +200,11 @@ inline ActorCreator<ActorType> Ray::CreateActorInternal(FuncType &create_func,
}

/// Normal task.
template <typename F, typename... Args>
TaskCaller<boost::callable_traits::return_type_t<F>> Ray::Task(F func, Args... args) {
template <typename F>
TaskCaller<boost::callable_traits::return_type_t<F>> Ray::Task(F func) {
using ReturnType = boost::callable_traits::return_type_t<F>;
return TaskInternal<ReturnType>(
func, NormalExecFunction<ReturnType, typename FilterArgType<Args>::type...>,
args...);

return TaskInternal<ReturnType>(func);
}

/// Generic version of creating an actor.
Expand Down
34 changes: 28 additions & 6 deletions cpp/include/ray/api/exec_funcs.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
#pragma once

#include <ray/api/arguments.h>
#include <ray/api/function_manager.h>
#include <ray/api/serializer.h>
#include <msgpack.hpp>
#include "absl/utility/utility.h"
#include "ray/core.h"

namespace ray {

namespace api {
/// The following execution functions are wrappers of remote functions.
/// Execution functions make remote functions executable in distributed system.
Expand All @@ -26,10 +31,10 @@ namespace api {
/// ActorExecFunction the wrapper of actor member function.

template <typename ReturnType, typename CastReturnType, typename... OtherArgTypes>
std::shared_ptr<msgpack::sbuffer> ExecuteNormalFunction(
uintptr_t base_addr, size_t func_offset,
const std::vector<std::shared_ptr<RayObject>> &args_buffer, TaskType task_type,
std::shared_ptr<OtherArgTypes> &&... args) {
absl::enable_if_t<!std::is_void<ReturnType>::value, std::shared_ptr<msgpack::sbuffer>>
ExecuteNormalFunction(uintptr_t base_addr, size_t func_offset,
const std::vector<std::shared_ptr<RayObject>> &args_buffer,
TaskType task_type, std::shared_ptr<OtherArgTypes> &&... args) {
int arg_index = 0;
Arguments::UnwrapArgs(args_buffer, arg_index, &args...);

Expand All @@ -43,6 +48,14 @@ std::shared_ptr<msgpack::sbuffer> ExecuteNormalFunction(
Serializer::Serialize((CastReturnType)(return_value)));
}

template <typename ReturnType, typename CastReturnType, typename... OtherArgTypes>
absl::enable_if_t<std::is_void<ReturnType>::value> ExecuteNormalFunction(
uintptr_t base_addr, size_t func_offset,
const std::vector<std::shared_ptr<RayObject>> &args_buffer, TaskType task_type,
std::shared_ptr<OtherArgTypes> &&... args) {
// TODO: Will support void functions for old api later.
}

template <typename ReturnType, typename ActorType, typename... OtherArgTypes>
std::shared_ptr<msgpack::sbuffer> ExecuteActorFunction(
uintptr_t base_addr, size_t func_offset,
Expand All @@ -69,10 +82,19 @@ std::shared_ptr<msgpack::sbuffer> ExecuteActorFunction(
}

template <typename ReturnType, typename... Args>
std::shared_ptr<msgpack::sbuffer> NormalExecFunction(
absl::enable_if_t<!std::is_void<ReturnType>::value, std::shared_ptr<msgpack::sbuffer>>
NormalExecFunction(uintptr_t base_addr, size_t func_offset,
const std::vector<std::shared_ptr<RayObject>> &args_buffer) {
return ExecuteNormalFunction<ReturnType, ReturnType, Args...>(
base_addr, func_offset, args_buffer, TaskType::NORMAL_TASK,
std::shared_ptr<Args>{}...);
}

template <typename ReturnType, typename... Args>
absl::enable_if_t<std::is_void<ReturnType>::value> NormalExecFunction(
uintptr_t base_addr, size_t func_offset,
const std::vector<std::shared_ptr<RayObject>> &args_buffer) {
return ExecuteNormalFunction<ReturnType, ReturnType, Args...>(
ExecuteNormalFunction<ReturnType, ReturnType, Args...>(
base_addr, func_offset, args_buffer, TaskType::NORMAL_TASK,
std::shared_ptr<Args>{}...);
}
Expand Down
Loading

0 comments on commit d37e0b2

Please sign in to comment.