Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 0 additions & 91 deletions src/ray/common/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,66 +118,6 @@ void TestSetupUtil::FlushRedisServer(int port) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

std::string TestSetupUtil::StartGcsServer(int port) {
std::string gcs_server_socket_name =
ray::JoinPaths(ray::GetUserTempDir(), "gcs_server" + ObjectID::FromRandom().Hex());
std::vector<std::string> cmdargs(
{TEST_GCS_SERVER_EXEC_PATH,
"--gcs_server_port=" + std::to_string(port),
"--config_list=" +
absl::Base64Escape(R"({"object_timeout_milliseconds": 2000})")});
cmdargs.push_back("--gcs_server_port=6379");
RAY_LOG(INFO) << "Start gcs server command: " << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true, gcs_server_socket_name + ".pid").second);
std::this_thread::sleep_for(std::chrono::milliseconds(200));
RAY_LOG(INFO) << "GCS server started.";
return gcs_server_socket_name;
}

void TestSetupUtil::StopGcsServer(const std::string &gcs_server_socket_name) {
KillProcessBySocketName(gcs_server_socket_name);
}

std::string TestSetupUtil::StartRaylet(const std::string &node_ip_address,
const int &port,
const std::string &bootstrap_address,
const std::string &resource,
std::string *store_socket_name) {
std::string raylet_socket_name =
ray::JoinPaths(ray::GetUserTempDir(), "raylet" + ObjectID::FromRandom().Hex());
std::string plasma_store_socket_name =
ray::JoinPaths(ray::GetUserTempDir(), "store" + ObjectID::FromRandom().Hex());
std::string mock_worker_command = CreateCommandLine({TEST_MOCK_WORKER_EXEC_PATH,
plasma_store_socket_name,
raylet_socket_name,
std::to_string(port),
""});
RAY_LOG(INFO) << "MockWorkerCommand: " << mock_worker_command;
std::vector<std::string> cmdargs({TEST_RAYLET_EXEC_PATH,
"--raylet_socket_name=" + raylet_socket_name,
"--gcs-address=" + bootstrap_address,
"--store_socket_name=" + plasma_store_socket_name,
"--object_manager_port=0",
"--node_manager_port=" + std::to_string(port),
"--node_ip_address=" + node_ip_address,
"--min-worker-port=0",
"--max-worker-port=0",
"--maximum_startup_concurrency=10",
"--static_resource_list=" + resource,
"--python_worker_command=" + mock_worker_command,
"--object_store_memory=10000000"});

RAY_LOG(INFO) << "Raylet Start command: " << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true, raylet_socket_name + ".pid").second);
std::this_thread::sleep_for(std::chrono::milliseconds(200));
*store_socket_name = plasma_store_socket_name;
return raylet_socket_name;
}

void TestSetupUtil::StopRaylet(const std::string &raylet_socket_name) {
KillProcessBySocketName(raylet_socket_name);
}

bool WaitReady(std::future<bool> future, const std::chrono::milliseconds &timeout_ms) {
auto status = future.wait_for(timeout_ms);
return status == std::future_status::ready && future.get();
Expand Down Expand Up @@ -210,29 +150,6 @@ void WaitForExpectedCount(std::atomic<int> &current_count,
EXPECT_TRUE(WaitForCondition(condition, timeout_ms));
}

void KillProcessBySocketName(std::string socket_name) {
std::string pidfile_path = socket_name + ".pid";
{
std::ifstream pidfile(pidfile_path, std::ios_base::in);
RAY_CHECK(pidfile.good());
pid_t pid = -1;
pidfile >> pid;
RAY_CHECK(pid != -1);
Process::FromPid(pid).Kill();
}
ASSERT_EQ(unlink(pidfile_path.c_str()), 0);
}

int KillAllExecutable(const std::string &executable) {
std::vector<std::string> cmdargs;
#ifdef _WIN32
cmdargs.insert(cmdargs.end(), {"taskkill", "/IM", executable});
#else
cmdargs.insert(cmdargs.end(), {"pkill", "-x", executable});
#endif
return Process::Call(cmdargs).value();
}

TaskID RandomTaskId() {
std::string data(TaskID::Size(), 0);
FillRandom(&data);
Expand Down Expand Up @@ -273,12 +190,4 @@ std::string TEST_REDIS_CLIENT_EXEC_PATH;
/// Ports of redis server.
std::vector<int> TEST_REDIS_SERVER_PORTS;

/// Path to gcs server executable binary.
std::string TEST_GCS_SERVER_EXEC_PATH;

/// Path to raylet executable binary.
std::string TEST_RAYLET_EXEC_PATH;
/// Path to mock worker executable binary. Required by raylet.
std::string TEST_MOCK_WORKER_EXEC_PATH;

} // namespace ray
34 changes: 1 addition & 33 deletions src/ray/common/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ static inline std::vector<rpc::ObjectReference> ObjectIdsToRefs(
class Buffer;
class RayObject;

// Magic argument to signal to mock_worker we should check message order.
static const int64_t SHOULD_CHECK_MESSAGE_ORDER = 123450000;

/// Wait until the future is ready, or timeout is reached.
///
/// \param[in] future The future to wait for.
Expand All @@ -66,14 +63,6 @@ void WaitForExpectedCount(std::atomic<int> &current_count,
int expected_count,
int timeout_ms = 60000);

/// Used to kill process whose pid is stored in `socket_name.id` file.
void KillProcessBySocketName(std::string socket_name);

/// Kills all processes with the given executable name (similar to killall).
/// Note: On Windows, this should include the file extension (e.g. ".exe"), if any.
/// This cannot be done automatically as doing so may be incorrect in some cases.
int KillAllExecutable(const std::string &executable_with_suffix);

// A helper function to return a random task id.
TaskID RandomTaskId();

Expand All @@ -92,38 +81,17 @@ extern std::string TEST_REDIS_CLIENT_EXEC_PATH;
/// Ports of redis server.
extern std::vector<int> TEST_REDIS_SERVER_PORTS;

/// Path to gcs server executable binary.
extern std::string TEST_GCS_SERVER_EXEC_PATH;

/// Path to raylet executable binary.
extern std::string TEST_RAYLET_EXEC_PATH;
/// Path to mock worker executable binary. Required by raylet.
extern std::string TEST_MOCK_WORKER_EXEC_PATH;

//--------------------------------------------------------------------------------
// COMPONENT MANAGEMENT CLASSES FOR TEST CASES
//--------------------------------------------------------------------------------
/// Test cases can use it to
/// 1. start/stop/flush redis server(s)
/// 2. start/stop object store
/// 3. start/stop gcs server
/// 4. start/stop raylet
/// 5. start/stop raylet monitor
/// Test cases can use it to start/stop/flush redis server(s).
class TestSetupUtil {
public:
static void StartUpRedisServers(const std::vector<int> &redis_server_ports,
bool save = false);
static void ShutDownRedisServers();
static void FlushAllRedisServers();

static std::string StartGcsServer(int port);
static void StopGcsServer(const std::string &gcs_server_socket_name);
static std::string StartRaylet(const std::string &node_ip_address,
const int &port,
const std::string &bootstrap_address,
const std::string &resource,
std::string *store_socket_name);
static void StopRaylet(const std::string &raylet_socket_name);
static void ExecuteRedisCmd(int port, std::vector<std::string> cmd);
static int StartUpRedisServer(int port, bool save = false);
static void ShutDownRedisServer(int port);
Expand Down