diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 90552266fcf7..e925497d7b37 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -6,6 +6,8 @@ include(${CMAKE_CURRENT_LIST_DIR}/cmake/Common.cmake) add_subdirectory(redis_module) +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC") + include_directories(thirdparty/ae) add_custom_target( @@ -14,26 +16,26 @@ add_custom_target( WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/thirdparty/hiredis) add_library(common STATIC - event_loop.c - common.c - task.c - io.c - net.c - logging.c - state/redis.c - state/table.c - state/object_table.c - state/task_table.c - state/db_client_table.c - state/actor_notification_table.c - state/local_scheduler_table.c + event_loop.cc + common.cc + task.cc + io.cc + net.cc + logging.cc + state/redis.cc + state/table.cc + state/object_table.cc + state/task_table.cc + state/db_client_table.cc + state/actor_notification_table.cc + state/local_scheduler_table.cc thirdparty/ae/ae.c thirdparty/sha256.c) target_link_libraries(common "${CMAKE_CURRENT_LIST_DIR}/thirdparty/hiredis/libhiredis.a") function(define_test test_name library) - add_executable(${test_name} test/${test_name}.c ${ARGN}) + add_executable(${test_name} test/${test_name}.cc ${ARGN}) add_dependencies(${test_name} hiredis flatbuffers_ep) target_link_libraries(${test_name} common ${FLATBUFFERS_STATIC_LIB} ${library}) target_compile_options(${test_name} PUBLIC "-DPLASMA_TEST -DLOCAL_SCHEDULER_TEST -DCOMMON_TEST -DRAY_COMMON_LOG_LEVEL=4 -DRAY_TIMEOUT=50") diff --git a/src/common/common.c b/src/common/common.cc similarity index 100% rename from src/common/common.c rename to src/common/common.cc diff --git a/src/common/event_loop.c b/src/common/event_loop.cc similarity index 100% rename from src/common/event_loop.c rename to src/common/event_loop.cc diff --git a/src/common/event_loop.h b/src/common/event_loop.h index e6671acb90d3..90e10a9725cc 100644 --- a/src/common/event_loop.h +++ b/src/common/event_loop.h @@ -3,6 +3,7 @@ #include +extern "C" { #ifdef _WIN32 /* Quirks mean that Windows version needs to be included differently */ #include @@ -10,6 +11,7 @@ #else #include "ae/ae.h" #endif +} /* Unique timer ID that will be generated when the timer is added to the * event loop. Will not be reused later on in another call diff --git a/src/common/io.c b/src/common/io.cc similarity index 99% rename from src/common/io.c rename to src/common/io.cc index 5b8d9aaecd15..58a4f72f99e9 100644 --- a/src/common/io.c +++ b/src/common/io.cc @@ -348,7 +348,7 @@ int64_t read_buffer(int fd, int64_t *type, UT_array *buffer) { return 0; } -void write_log_message(int fd, char *message) { +void write_log_message(int fd, const char *message) { /* Account for the \0 at the end of the string. */ write_message(fd, LOG_MESSAGE, strlen(message) + 1, (uint8_t *) message); } diff --git a/src/common/io.h b/src/common/io.h index bcc0ec257217..46bfcb3943cd 100644 --- a/src/common/io.h +++ b/src/common/io.h @@ -174,7 +174,7 @@ int64_t read_buffer(int fd, int64_t *type, UT_array *buffer); /** * Write a null-terminated string to a file descriptor. */ -void write_log_message(int fd, char *message); +void write_log_message(int fd, const char *message); void write_formatted_log_message(int fd, const char *format, ...); diff --git a/src/common/lib/python/common_extension.c b/src/common/lib/python/common_extension.cc similarity index 99% rename from src/common/lib/python/common_extension.c rename to src/common/lib/python/common_extension.cc index 94368d32df73..3faa2edcf5b6 100644 --- a/src/common/lib/python/common_extension.c +++ b/src/common/lib/python/common_extension.cc @@ -96,7 +96,7 @@ PyObject *PyTask_from_string(PyObject *self, PyObject *args) { } PyTask *result = PyObject_New(PyTask, &PyTaskType); result = (PyTask *) PyObject_Init((PyObject *) result, &PyTaskType); - result->spec = malloc(size); + result->spec = (task_spec *) malloc(size); memcpy(result->spec, data, size); /* TODO(pcm): Better error checking once we use flatbuffers. */ if (size != task_spec_size(result->spec)) { diff --git a/src/common/logging.c b/src/common/logging.cc similarity index 100% rename from src/common/logging.c rename to src/common/logging.cc diff --git a/src/common/net.c b/src/common/net.cc similarity index 100% rename from src/common/net.c rename to src/common/net.cc diff --git a/src/common/state/actor_notification_table.c b/src/common/state/actor_notification_table.cc similarity index 83% rename from src/common/state/actor_notification_table.c rename to src/common/state/actor_notification_table.cc index 3579d03e3937..d405d62634cf 100644 --- a/src/common/state/actor_notification_table.c +++ b/src/common/state/actor_notification_table.cc @@ -7,7 +7,8 @@ void actor_notification_table_subscribe( void *subscribe_context, RetryInfo *retry) { ActorNotificationTableSubscribeData *sub_data = - malloc(sizeof(ActorNotificationTableSubscribeData)); + (ActorNotificationTableSubscribeData *) malloc( + sizeof(ActorNotificationTableSubscribeData)); sub_data->subscribe_callback = subscribe_callback; sub_data->subscribe_context = subscribe_context; diff --git a/src/common/state/db_client_table.c b/src/common/state/db_client_table.cc similarity index 70% rename from src/common/state/db_client_table.c rename to src/common/state/db_client_table.cc index 7712e705057f..7414badedde1 100644 --- a/src/common/state/db_client_table.c +++ b/src/common/state/db_client_table.cc @@ -9,11 +9,11 @@ void db_client_table_subscribe( db_client_table_done_callback done_callback, void *user_context) { DBClientTableSubscribeData *sub_data = - malloc(sizeof(DBClientTableSubscribeData)); + (DBClientTableSubscribeData *) malloc(sizeof(DBClientTableSubscribeData)); sub_data->subscribe_callback = subscribe_callback; sub_data->subscribe_context = subscribe_context; init_table_callback(db_handle, NIL_ID, __func__, sub_data, retry, - done_callback, redis_db_client_table_subscribe, - user_context); + (table_done_callback) done_callback, + redis_db_client_table_subscribe, user_context); } diff --git a/src/common/state/local_scheduler_table.c b/src/common/state/local_scheduler_table.cc similarity index 81% rename from src/common/state/local_scheduler_table.c rename to src/common/state/local_scheduler_table.cc index f27d918c1c39..f49df56479ce 100644 --- a/src/common/state/local_scheduler_table.c +++ b/src/common/state/local_scheduler_table.cc @@ -7,7 +7,8 @@ void local_scheduler_table_subscribe( void *subscribe_context, RetryInfo *retry) { LocalSchedulerTableSubscribeData *sub_data = - malloc(sizeof(LocalSchedulerTableSubscribeData)); + (LocalSchedulerTableSubscribeData *) malloc( + sizeof(LocalSchedulerTableSubscribeData)); sub_data->subscribe_callback = subscribe_callback; sub_data->subscribe_context = subscribe_context; @@ -19,7 +20,8 @@ void local_scheduler_table_send_info(DBHandle *db_handle, LocalSchedulerInfo *info, RetryInfo *retry) { LocalSchedulerTableSendInfoData *data = - malloc(sizeof(LocalSchedulerTableSendInfoData)); + (LocalSchedulerTableSendInfoData *) malloc( + sizeof(LocalSchedulerTableSendInfoData)); data->info = *info; init_table_callback(db_handle, NIL_ID, __func__, data, retry, NULL, diff --git a/src/common/state/object_table.c b/src/common/state/object_table.cc similarity index 70% rename from src/common/state/object_table.c rename to src/common/state/object_table.cc index a09414a29db3..091a1c3638ee 100644 --- a/src/common/state/object_table.c +++ b/src/common/state/object_table.cc @@ -9,7 +9,8 @@ void object_table_lookup(DBHandle *db_handle, void *user_context) { CHECK(db_handle != NULL); init_table_callback(db_handle, object_id, __func__, NULL, retry, - done_callback, redis_object_table_lookup, user_context); + (table_done_callback) done_callback, + redis_object_table_lookup, user_context); } void object_table_add(DBHandle *db_handle, @@ -21,11 +22,13 @@ void object_table_add(DBHandle *db_handle, void *user_context) { CHECK(db_handle != NULL); - ObjectTableAddData *info = malloc(sizeof(ObjectTableAddData)); + ObjectTableAddData *info = + (ObjectTableAddData *) malloc(sizeof(ObjectTableAddData)); info->object_size = object_size; memcpy(&info->digest[0], digest, DIGEST_SIZE); init_table_callback(db_handle, object_id, __func__, info, retry, - done_callback, redis_object_table_add, user_context); + (table_done_callback) done_callback, + redis_object_table_add, user_context); } void object_table_remove(DBHandle *db_handle, @@ -38,11 +41,12 @@ void object_table_remove(DBHandle *db_handle, /* Copy the client ID, if one was provided. */ DBClientID *client_id_copy = NULL; if (client_id != NULL) { - client_id_copy = malloc(sizeof(DBClientID)); + client_id_copy = (DBClientID *) malloc(sizeof(DBClientID)); *client_id_copy = *client_id; } init_table_callback(db_handle, object_id, __func__, client_id_copy, retry, - done_callback, redis_object_table_remove, user_context); + (table_done_callback) done_callback, + redis_object_table_remove, user_context); } void object_table_subscribe_to_notifications( @@ -54,14 +58,16 @@ void object_table_subscribe_to_notifications( object_table_lookup_done_callback done_callback, void *user_context) { CHECK(db_handle != NULL); - ObjectTableSubscribeData *sub_data = malloc(sizeof(ObjectTableSubscribeData)); + ObjectTableSubscribeData *sub_data = + (ObjectTableSubscribeData *) malloc(sizeof(ObjectTableSubscribeData)); sub_data->object_available_callback = object_available_callback; sub_data->subscribe_context = subscribe_context; sub_data->subscribe_all = subscribe_all; - init_table_callback( - db_handle, NIL_OBJECT_ID, __func__, sub_data, retry, done_callback, - redis_object_table_subscribe_to_notifications, user_context); + init_table_callback(db_handle, NIL_OBJECT_ID, __func__, sub_data, retry, + (table_done_callback) done_callback, + redis_object_table_subscribe_to_notifications, + user_context); } void object_table_request_notifications(DBHandle *db_handle, @@ -71,8 +77,9 @@ void object_table_request_notifications(DBHandle *db_handle, CHECK(db_handle != NULL); CHECK(num_object_ids > 0); ObjectTableRequestNotificationsData *data = - malloc(sizeof(ObjectTableRequestNotificationsData) + - num_object_ids * sizeof(ObjectID)); + (ObjectTableRequestNotificationsData *) malloc( + sizeof(ObjectTableRequestNotificationsData) + + num_object_ids * sizeof(ObjectID)); data->num_object_ids = num_object_ids; memcpy(data->object_ids, object_ids, num_object_ids * sizeof(ObjectID)); @@ -86,12 +93,14 @@ void object_info_subscribe(DBHandle *db_handle, RetryInfo *retry, object_info_done_callback done_callback, void *user_context) { - ObjectInfoSubscribeData *sub_data = malloc(sizeof(ObjectInfoSubscribeData)); + ObjectInfoSubscribeData *sub_data = + (ObjectInfoSubscribeData *) malloc(sizeof(ObjectInfoSubscribeData)); sub_data->subscribe_callback = subscribe_callback; sub_data->subscribe_context = subscribe_context; init_table_callback(db_handle, NIL_OBJECT_ID, __func__, sub_data, retry, - done_callback, redis_object_info_subscribe, user_context); + (table_done_callback) done_callback, + redis_object_info_subscribe, user_context); } void result_table_add(DBHandle *db_handle, @@ -100,10 +109,11 @@ void result_table_add(DBHandle *db_handle, RetryInfo *retry, result_table_done_callback done_callback, void *user_context) { - TaskID *task_id_copy = malloc(sizeof(TaskID)); + TaskID *task_id_copy = (TaskID *) malloc(sizeof(TaskID)); memcpy(task_id_copy, task_id_arg.id, sizeof(*task_id_copy)); init_table_callback(db_handle, object_id, __func__, task_id_copy, retry, - done_callback, redis_result_table_add, user_context); + (table_done_callback) done_callback, + redis_result_table_add, user_context); } void result_table_lookup(DBHandle *db_handle, @@ -112,5 +122,6 @@ void result_table_lookup(DBHandle *db_handle, result_table_lookup_callback done_callback, void *user_context) { init_table_callback(db_handle, object_id, __func__, NULL, retry, - done_callback, redis_result_table_lookup, user_context); + (table_done_callback) done_callback, + redis_result_table_lookup, user_context); } diff --git a/src/common/state/redis.c b/src/common/state/redis.cc similarity index 99% rename from src/common/state/redis.c rename to src/common/state/redis.cc index 8eeb363395bd..952764e4f12e 100644 --- a/src/common/state/redis.c +++ b/src/common/state/redis.cc @@ -4,9 +4,13 @@ #include #include #include + +extern "C" { /* Including hiredis here is necessary on Windows for typedefs used in ae.h. */ #include "hiredis/hiredis.h" #include "hiredis/adapters/ae.h" +} + #include "utstring.h" #include "common.h" diff --git a/src/common/state/table.c b/src/common/state/table.cc similarity index 98% rename from src/common/state/table.c rename to src/common/state/table.cc index ed404f0c2a1d..835a0a433251 100644 --- a/src/common/state/table.c +++ b/src/common/state/table.cc @@ -24,7 +24,8 @@ TableCallbackData *init_table_callback(DBHandle *db_handle, } CHECK(retry); /* Allocate and initialize callback data structure for object table */ - TableCallbackData *callback_data = malloc(sizeof(TableCallbackData)); + TableCallbackData *callback_data = + (TableCallbackData *) malloc(sizeof(TableCallbackData)); CHECKM(callback_data != NULL, "Memory allocation error!") callback_data->id = id; callback_data->label = label; diff --git a/src/common/state/task_table.c b/src/common/state/task_table.cc similarity index 75% rename from src/common/state/task_table.c rename to src/common/state/task_table.cc index 9a842320fc90..8fdb6d0a022c 100644 --- a/src/common/state/task_table.c +++ b/src/common/state/task_table.cc @@ -6,10 +6,11 @@ void task_table_get_task(DBHandle *db_handle, TaskID task_id, RetryInfo *retry, - task_table_get_callback done_callback, + task_table_get_callback get_callback, void *user_context) { - init_table_callback(db_handle, task_id, __func__, NULL, retry, done_callback, - redis_task_table_get_task, user_context); + init_table_callback(db_handle, task_id, __func__, NULL, retry, + (void *) get_callback, redis_task_table_get_task, + user_context); } void task_table_add_task(DBHandle *db_handle, @@ -18,7 +19,8 @@ void task_table_add_task(DBHandle *db_handle, task_table_done_callback done_callback, void *user_context) { init_table_callback(db_handle, Task_task_id(task), __func__, task, retry, - done_callback, redis_task_table_add_task, user_context); + (table_done_callback) done_callback, + redis_task_table_add_task, user_context); } void task_table_update(DBHandle *db_handle, @@ -27,7 +29,8 @@ void task_table_update(DBHandle *db_handle, task_table_done_callback done_callback, void *user_context) { init_table_callback(db_handle, Task_task_id(task), __func__, task, retry, - done_callback, redis_task_table_update, user_context); + (table_done_callback) done_callback, + redis_task_table_update, user_context); } void task_table_test_and_update(DBHandle *db_handle, @@ -38,14 +41,14 @@ void task_table_test_and_update(DBHandle *db_handle, task_table_get_callback done_callback, void *user_context) { TaskTableTestAndUpdateData *update_data = - malloc(sizeof(TaskTableTestAndUpdateData)); + (TaskTableTestAndUpdateData *) malloc(sizeof(TaskTableTestAndUpdateData)); update_data->test_state_bitmask = test_state_bitmask; update_data->update_state = update_state; /* Update the task entry's local scheduler with this client's ID. */ update_data->local_scheduler_id = db_handle->client; init_table_callback(db_handle, task_id, __func__, update_data, retry, - done_callback, redis_task_table_test_and_update, - user_context); + (table_done_callback) done_callback, + redis_task_table_test_and_update, user_context); } /* TODO(swang): A corresponding task_table_unsubscribe. */ @@ -57,12 +60,14 @@ void task_table_subscribe(DBHandle *db_handle, RetryInfo *retry, task_table_done_callback done_callback, void *user_context) { - TaskTableSubscribeData *sub_data = malloc(sizeof(TaskTableSubscribeData)); + TaskTableSubscribeData *sub_data = + (TaskTableSubscribeData *) malloc(sizeof(TaskTableSubscribeData)); sub_data->local_scheduler_id = local_scheduler_id; sub_data->state_filter = state_filter; sub_data->subscribe_callback = subscribe_callback; sub_data->subscribe_context = subscribe_context; init_table_callback(db_handle, local_scheduler_id, __func__, sub_data, retry, - done_callback, redis_task_table_subscribe, user_context); + (table_done_callback) done_callback, + redis_task_table_subscribe, user_context); } diff --git a/src/common/state/task_table.h b/src/common/state/task_table.h index 658924f34fcf..f09d9994cf75 100644 --- a/src/common/state/task_table.h +++ b/src/common/state/task_table.h @@ -42,7 +42,7 @@ typedef void (*task_table_get_callback)(Task *task, void *user_context); void task_table_get_task(DBHandle *db, TaskID task_id, RetryInfo *retry, - task_table_get_callback done_callback, + task_table_get_callback get_callback, void *user_context); /** diff --git a/src/common/task.c b/src/common/task.cc similarity index 99% rename from src/common/task.c rename to src/common/task.cc index 384f1de233cf..9b7f8bd2a47e 100644 --- a/src/common/task.c +++ b/src/common/task.cc @@ -2,7 +2,9 @@ #include #include +extern "C" { #include "sha256.h" +} #include "utarray.h" #include "task.h" diff --git a/src/common/test/common_tests.c b/src/common/test/common_tests.cc similarity index 100% rename from src/common/test/common_tests.c rename to src/common/test/common_tests.cc diff --git a/src/common/test/db_tests.c b/src/common/test/db_tests.cc similarity index 98% rename from src/common/test/db_tests.c rename to src/common/test/db_tests.cc index d45f4cdd30c0..fb07b9f263c6 100644 --- a/src/common/test/db_tests.c +++ b/src/common/test/db_tests.cc @@ -114,14 +114,14 @@ Task *task_table_test_task; void task_table_test_fail_callback(UniqueID id, void *context, void *user_data) { - event_loop *loop = user_data; + event_loop *loop = (event_loop *) user_data; event_loop_stop(loop); } int64_t task_table_delayed_add_task(event_loop *loop, int64_t id, void *context) { - DBHandle *db = context; + DBHandle *db = (DBHandle *) context; RetryInfo retry = { .num_retries = NUM_RETRIES, .timeout = TIMEOUT, @@ -138,7 +138,7 @@ void task_table_test_callback(Task *callback_task, void *user_data) { CHECK(Task_size(callback_task) == Task_size(task_table_test_task)); CHECK(memcmp(callback_task, task_table_test_task, Task_size(callback_task)) == 0); - event_loop *loop = user_data; + event_loop *loop = (event_loop *) user_data; event_loop_stop(loop); } diff --git a/src/common/test/io_tests.c b/src/common/test/io_tests.cc similarity index 95% rename from src/common/test/io_tests.c rename to src/common/test/io_tests.cc index b50569a4f657..887579e0d3b6 100644 --- a/src/common/test/io_tests.c +++ b/src/common/test/io_tests.cc @@ -15,8 +15,8 @@ TEST ipc_socket_test(void) { int socket_fd = bind_ipc_sock(socket_pathname, true); ASSERT(socket_fd >= 0); - char *test_string = "hello world"; - char *test_bytes = "another string"; + const char *test_string = "hello world"; + const char *test_bytes = "another string"; pid_t pid = fork(); if (pid == 0) { close(socket_fd); @@ -60,7 +60,7 @@ TEST long_ipc_socket_test(void) { for (int i = 0; i < 10000; i++) { utstring_printf(test_string, "hello world "); } - char *test_bytes = "another string"; + const char *test_bytes = "another string"; pid_t pid = fork(); if (pid == 0) { close(socket_fd); diff --git a/src/common/test/object_table_tests.c b/src/common/test/object_table_tests.cc similarity index 98% rename from src/common/test/object_table_tests.c rename to src/common/test/object_table_tests.cc index 8ab6ae8210bf..24bd63eab26a 100644 --- a/src/common/test/object_table_tests.c +++ b/src/common/test/object_table_tests.cc @@ -46,7 +46,7 @@ void new_object_lookup_callback(ObjectID object_id, void *user_context) { .timeout = 100, .fail_callback = new_object_fail_callback, }; - DBHandle *db = user_context; + DBHandle *db = (DBHandle *) user_context; result_table_lookup(db, new_object_id, &retry, new_object_done_callback, NULL); } @@ -57,7 +57,7 @@ void new_object_task_callback(TaskID task_id, void *user_context) { .timeout = 100, .fail_callback = new_object_fail_callback, }; - DBHandle *db = user_context; + DBHandle *db = (DBHandle *) user_context; result_table_add(db, new_object_id, new_object_task_id, &retry, new_object_lookup_callback, (void *) db); } @@ -246,7 +246,7 @@ TEST subscribe_timeout_test(void) { int64_t reconnect_context_callback(event_loop *loop, int64_t timer_id, void *context) { - DBHandle *db = context; + DBHandle *db = (DBHandle *) context; /* Reconnect to redis. This is not reconnecting the pub/sub channel. */ redisAsyncFree(db->context); redisFree(db->sync_context); @@ -304,7 +304,7 @@ void add_lookup_done_callback(ObjectID object_id, } void add_lookup_callback(ObjectID object_id, void *user_context) { - DBHandle *db = user_context; + DBHandle *db = (DBHandle *) user_context; RetryInfo retry = { .num_retries = 5, .timeout = 100, @@ -352,7 +352,7 @@ void add_remove_lookup_done_callback(ObjectID object_id, } void add_remove_lookup_callback(ObjectID object_id, void *user_context) { - DBHandle *db = user_context; + DBHandle *db = (DBHandle *) user_context; RetryInfo retry = { .num_retries = 5, .timeout = 100, @@ -363,7 +363,7 @@ void add_remove_lookup_callback(ObjectID object_id, void *user_context) { } void add_remove_callback(ObjectID object_id, void *user_context) { - DBHandle *db = user_context; + DBHandle *db = (DBHandle *) user_context; RetryInfo retry = { .num_retries = 5, .timeout = 100, @@ -406,7 +406,7 @@ int subscribe_retry_succeeded = 0; int64_t reconnect_sub_context_callback(event_loop *loop, int64_t timer_id, void *context) { - DBHandle *db = context; + DBHandle *db = (DBHandle *) context; /* Reconnect to redis. This is not reconnecting the pub/sub channel. */ redisAsyncFree(db->sub_context); redisAsyncFree(db->context); @@ -777,7 +777,8 @@ void subscribe_object_available_later_object_available_callback( TEST subscribe_object_available_later_test(void) { int64_t data_size = 0xF1F0; subscribe_object_present_context_t *myctx = - malloc(sizeof(subscribe_object_present_context_t)); + (subscribe_object_present_context_t *) malloc( + sizeof(subscribe_object_present_context_t)); myctx->teststr = subscribe_object_available_later_context; myctx->data_size = data_size; diff --git a/src/common/test/redis_tests.c b/src/common/test/redis_tests.cc similarity index 94% rename from src/common/test/redis_tests.c rename to src/common/test/redis_tests.cc index dbcdada923a8..92cdc382953b 100644 --- a/src/common/test/redis_tests.c +++ b/src/common/test/redis_tests.cc @@ -27,7 +27,8 @@ void async_redis_socket_test_callback(redisAsyncContext *ac, void *privdata) { async_redis_socket_test_callback_called = 1; redisContext *context = redisConnect("127.0.0.1", 6379); - redisReply *reply = redisCommand(context, test_get_format, test_key); + redisReply *reply = + (redisReply *) redisCommand(context, test_get_format, test_key); redisFree(context); CHECK(reply != NULL); if (strcmp(reply->str, test_value)) { @@ -55,10 +56,9 @@ TEST redis_socket_test(void) { close(socket_fd); unlink(socket_pathname); - redisReply *reply; - reply = redisCommand(context, cmd, 0, 0); + redisReply *reply = (redisReply *) redisCommand(context, cmd, 0, 0); freeReplyObject(reply); - reply = redisCommand(context, "GET %s", test_key); + reply = (redisReply *) redisCommand(context, "GET %s", test_key); ASSERT(reply != NULL); ASSERT_STR_EQ(reply->str, test_value); freeReplyObject(reply); @@ -69,7 +69,7 @@ TEST redis_socket_test(void) { } void redis_read_callback(event_loop *loop, int fd, void *context, int events) { - DBHandle *db = context; + DBHandle *db = (DBHandle *) context; char *cmd = read_log_message(fd); redisAsyncCommand(db->context, async_redis_socket_test_callback, NULL, cmd); free(cmd); @@ -137,7 +137,7 @@ int logging_test_callback_called = 0; void logging_test_callback(redisAsyncContext *ac, void *r, void *privdata) { logging_test_callback_called = 1; redisContext *context = redisConnect("127.0.0.1", 6379); - redisReply *reply = redisCommand(context, "KEYS %s", "log:*"); + redisReply *reply = (redisReply *) redisCommand(context, "KEYS %s", "log:*"); redisFree(context); CHECK(reply != NULL); CHECK(reply->elements > 0); @@ -148,7 +148,7 @@ void logging_read_callback(event_loop *loop, int fd, void *context, int events) { - DBHandle *conn = context; + DBHandle *conn = (DBHandle *) context; char *cmd = read_log_message(fd); redisAsyncCommand(conn->context, logging_test_callback, NULL, cmd, (char *) conn->client.id, sizeof(conn->client.id)); diff --git a/src/common/test/task_table_tests.c b/src/common/test/task_table_tests.cc similarity index 99% rename from src/common/test/task_table_tests.c rename to src/common/test/task_table_tests.cc index 92c8553e7ee0..bd7626d2a45e 100644 --- a/src/common/test/task_table_tests.c +++ b/src/common/test/task_table_tests.cc @@ -81,7 +81,7 @@ void add_success_callback(TaskID task_id, void *context) { add_success = 1; CHECK(TaskID_equal(task_id, Task_task_id(add_lookup_task))); - DBHandle *db = context; + DBHandle *db = (DBHandle *) context; RetryInfo retry = { .num_retries = 5, .timeout = 1000, @@ -200,7 +200,7 @@ TEST publish_timeout_test(void) { int64_t reconnect_db_callback(event_loop *loop, int64_t timer_id, void *context) { - DBHandle *db = context; + DBHandle *db = (DBHandle *) context; /* Reconnect to redis. */ redisAsyncFree(db->sub_context); db->sub_context = redisAsyncConnect("127.0.0.1", 6379); diff --git a/src/common/test/task_tests.c b/src/common/test/task_tests.cc similarity index 100% rename from src/common/test/task_tests.c rename to src/common/test/task_tests.cc diff --git a/src/global_scheduler/CMakeLists.txt b/src/global_scheduler/CMakeLists.txt index f4a2e84966fb..cd5d8cbb91bc 100644 --- a/src/global_scheduler/CMakeLists.txt +++ b/src/global_scheduler/CMakeLists.txt @@ -4,5 +4,5 @@ project(global_scheduler) include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake) -add_executable(global_scheduler global_scheduler.c global_scheduler_algorithm.c) +add_executable(global_scheduler global_scheduler.cc global_scheduler_algorithm.cc) target_link_libraries(global_scheduler common ${HIREDIS_LIB}) diff --git a/src/global_scheduler/global_scheduler.c b/src/global_scheduler/global_scheduler.cc similarity index 98% rename from src/global_scheduler/global_scheduler.c rename to src/global_scheduler/global_scheduler.cc index 3a7378f25fcf..ab3b5beb40a6 100644 --- a/src/global_scheduler/global_scheduler.c +++ b/src/global_scheduler/global_scheduler.cc @@ -64,7 +64,8 @@ void assign_task_to_local_scheduler(GlobalSchedulerState *state, GlobalSchedulerState *GlobalSchedulerState_init(event_loop *loop, const char *redis_addr, int redis_port) { - GlobalSchedulerState *state = malloc(sizeof(GlobalSchedulerState)); + GlobalSchedulerState *state = + (GlobalSchedulerState *) malloc(sizeof(GlobalSchedulerState)); /* Must initialize state to 0. Sets hashmap head(s) to NULL. */ memset(state, 0, sizeof(GlobalSchedulerState)); state->db = @@ -184,7 +185,7 @@ void process_new_db_client(DBClientID db_client_id, /* Add plasma_manager ip:port -> local_scheduler_db_client_id association to * state. */ AuxAddressEntry *plasma_local_scheduler_entry = - calloc(1, sizeof(AuxAddressEntry)); + (AuxAddressEntry *) calloc(1, sizeof(AuxAddressEntry)); plasma_local_scheduler_entry->aux_address = strdup(aux_address); plasma_local_scheduler_entry->local_scheduler_db_client_id = db_client_id; HASH_ADD_KEYPTR(plasma_local_scheduler_hh, @@ -319,7 +320,7 @@ void local_scheduler_table_handler(DBClientID client_id, } int task_cleanup_handler(event_loop *loop, timer_id id, void *context) { - GlobalSchedulerState *state = context; + GlobalSchedulerState *state = (GlobalSchedulerState *) context; /* Loop over the pending tasks and resubmit them. */ int64_t num_pending_tasks = utarray_len(state->pending_tasks); for (int64_t i = num_pending_tasks - 1; i >= 0; --i) { diff --git a/src/global_scheduler/global_scheduler_algorithm.c b/src/global_scheduler/global_scheduler_algorithm.cc similarity index 98% rename from src/global_scheduler/global_scheduler_algorithm.c rename to src/global_scheduler/global_scheduler_algorithm.cc index 3ddccc1db9d9..852a2b61d5d6 100644 --- a/src/global_scheduler/global_scheduler_algorithm.c +++ b/src/global_scheduler/global_scheduler_algorithm.cc @@ -8,7 +8,7 @@ GlobalSchedulerPolicyState *GlobalSchedulerPolicyState_init(void) { GlobalSchedulerPolicyState *policy_state = - malloc(sizeof(GlobalSchedulerPolicyState)); + (GlobalSchedulerPolicyState *) malloc(sizeof(GlobalSchedulerPolicyState)); policy_state->round_robin_index = 0; int num_weight_elem = @@ -131,7 +131,7 @@ ObjectSizeEntry *create_object_size_hashmap(GlobalSchedulerState *state, HASH_FIND_STR(object_size_table, object_location, s); if (NULL == s) { /* This location not yet known, so add this object location. */ - s = calloc(1, sizeof(ObjectSizeEntry)); + s = (ObjectSizeEntry *) calloc(1, sizeof(ObjectSizeEntry)); s->object_location = object_location; HASH_ADD_KEYPTR(hh, object_size_table, s->object_location, strlen(s->object_location), s); diff --git a/src/local_scheduler/CMakeLists.txt b/src/local_scheduler/CMakeLists.txt index 22c0660776e7..578b4627db3d 100644 --- a/src/local_scheduler/CMakeLists.txt +++ b/src/local_scheduler/CMakeLists.txt @@ -5,6 +5,8 @@ project(local_scheduler) # Recursively include common include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake) +add_definitions(-fPIC) + if(APPLE) SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so") endif(APPLE) @@ -22,8 +24,8 @@ include_directories("${CMAKE_CURRENT_LIST_DIR}/../") include_directories("${CMAKE_CURRENT_LIST_DIR}/../plasma/") add_library(local_scheduler_library SHARED - local_scheduler_extension.c - ../common/lib/python/common_extension.c) + local_scheduler_extension.cc + ../common/lib/python/common_extension.cc) get_filename_component(PYTHON_SHARED_LIBRARY ${PYTHON_LIBRARIES} NAME) if(APPLE) @@ -31,14 +33,14 @@ if(APPLE) POST_BUILD COMMAND ${CMAKE_INSTALL_NAME_TOOL} -change ${PYTHON_SHARED_LIBRARY} ${PYTHON_LIBRARIES} liblocal_scheduler_library.so) endif(APPLE) -add_library(local_scheduler_client STATIC local_scheduler_client.c) +add_library(local_scheduler_client STATIC local_scheduler_client.cc) target_link_libraries(local_scheduler_library local_scheduler_client ${COMMON_LIB} ${PYTHON_LIBRARIES}) -add_executable(local_scheduler local_scheduler.c local_scheduler_algorithm.c) +add_executable(local_scheduler local_scheduler.cc local_scheduler_algorithm.cc) target_link_libraries(local_scheduler local_scheduler_client common ${HIREDIS_LIB} plasma_lib) -add_executable(local_scheduler_tests test/local_scheduler_tests.c local_scheduler.c local_scheduler_algorithm.c ) +add_executable(local_scheduler_tests test/local_scheduler_tests.cc local_scheduler.cc local_scheduler_algorithm.cc) target_link_libraries(local_scheduler_tests local_scheduler_client common ${HIREDIS_LIB} plasma_lib) target_compile_options(local_scheduler_tests PUBLIC "-DLOCAL_SCHEDULER_TEST") diff --git a/src/local_scheduler/local_scheduler.c b/src/local_scheduler/local_scheduler.cc similarity index 96% rename from src/local_scheduler/local_scheduler.c rename to src/local_scheduler/local_scheduler.cc index a588a8b21b1b..1bab3082eef4 100644 --- a/src/local_scheduler/local_scheduler.c +++ b/src/local_scheduler/local_scheduler.cc @@ -240,7 +240,7 @@ void start_worker(LocalSchedulerState *state, ActorID actor_id) { for (; state->config.start_worker_command[num_args] != NULL; ++num_args) { } const char **start_actor_worker_command = - malloc((num_args + 3) * sizeof(const char *)); + (const char **) malloc((num_args + 3) * sizeof(const char *)); for (int i = 0; i < num_args; ++i) { start_actor_worker_command[i] = state->config.start_worker_command[i]; } @@ -278,7 +278,8 @@ const char **parse_command(const char *command) { free(command_copy); /* Allocate a NULL-terminated array for the tokens. */ - const char **command_args = malloc((num_args + 1) * sizeof(const char *)); + const char **command_args = + (const char **) malloc((num_args + 1) * sizeof(const char *)); command_args[num_args] = NULL; /* Fill in the token array. */ @@ -309,7 +310,8 @@ LocalSchedulerState *LocalSchedulerState_init( const double static_resource_conf[], const char *start_worker_command, int num_workers) { - LocalSchedulerState *state = malloc(sizeof(LocalSchedulerState)); + LocalSchedulerState *state = + (LocalSchedulerState *) malloc(sizeof(LocalSchedulerState)); /* Set the configuration struct for the local scheduler. */ if (start_worker_command != NULL) { state->config.start_worker_command = parse_command(start_worker_command); @@ -342,7 +344,7 @@ LocalSchedulerState *LocalSchedulerState_init( utstring_printf(num_gpus, "%f", static_resource_conf[1]); if (plasma_manager_address != NULL) { num_args = 8; - db_connect_args = malloc(sizeof(char *) * num_args); + db_connect_args = (const char **) malloc(sizeof(char *) * num_args); db_connect_args[0] = "local_scheduler_socket_name"; db_connect_args[1] = local_scheduler_socket_name; db_connect_args[2] = "num_cpus"; @@ -353,7 +355,7 @@ LocalSchedulerState *LocalSchedulerState_init( db_connect_args[7] = plasma_manager_address; } else { num_args = 6; - db_connect_args = malloc(sizeof(char *) * num_args); + db_connect_args = (const char **) malloc(sizeof(char *) * num_args); db_connect_args[0] = "local_scheduler_socket_name"; db_connect_args[1] = local_scheduler_socket_name; db_connect_args[2] = "num_cpus"; @@ -463,7 +465,7 @@ void process_plasma_notification(event_loop *loop, int client_sock, void *context, int events) { - LocalSchedulerState *state = context; + LocalSchedulerState *state = (LocalSchedulerState *) context; /* Read the notification from Plasma. */ ObjectInfo object_info; int error = @@ -494,7 +496,7 @@ void reconstruct_task_update_callback(Task *task, void *user_context) { } /* Otherwise, the test-and-set succeeded, so resubmit the task for execution * to ensure that reconstruction will happen. */ - LocalSchedulerState *state = user_context; + LocalSchedulerState *state = (LocalSchedulerState *) user_context; task_spec *spec = Task_task_spec(task); /* If the task is an actor task, then we currently do not reconstruct it. * TODO(rkn): Handle this better. */ @@ -517,7 +519,7 @@ void reconstruct_evicted_result_lookup_callback(ObjectID reconstruct_object_id, * put. */ CHECKM(!IS_NIL_ID(task_id), "No task information found for object during reconstruction"); - LocalSchedulerState *state = user_context; + LocalSchedulerState *state = (LocalSchedulerState *) user_context; /* If there are no other instances of the task running, it's safe for us to * claim responsibility for reconstruction. */ task_table_test_and_update(state->db, task_id, @@ -541,7 +543,7 @@ void reconstruct_failed_result_lookup_callback(ObjectID reconstruct_object_id, "entry yet)"); return; } - LocalSchedulerState *state = user_context; + LocalSchedulerState *state = (LocalSchedulerState *) user_context; /* If the task failed to finish, it's safe for us to claim responsibility for * reconstruction. */ task_table_test_and_update(state->db, task_id, TASK_STATUS_LOST, @@ -557,7 +559,7 @@ void reconstruct_object_lookup_callback(ObjectID reconstruct_object_id, /* Only continue reconstruction if we find that the object doesn't exist on * any nodes. NOTE: This codepath is not responsible for checking if the * object table entry is up-to-date. */ - LocalSchedulerState *state = user_context; + LocalSchedulerState *state = (LocalSchedulerState *) user_context; /* Look up the task that created the object in the result table. */ if (manager_count == 0) { /* If the object was created and later evicted, we reconstruct the object @@ -589,7 +591,7 @@ void process_message(event_loop *loop, int client_sock, void *context, int events) { - LocalSchedulerClient *worker = context; + LocalSchedulerClient *worker = (LocalSchedulerClient *) context; LocalSchedulerState *state = worker->local_scheduler_state; int64_t type; @@ -631,10 +633,10 @@ void process_message(event_loop *loop, int64_t value_length; memcpy(&value_length, &message[offset], sizeof(value_length)); offset += sizeof(value_length); - uint8_t *key = malloc(key_length); + uint8_t *key = (uint8_t *) malloc(key_length); memcpy(key, &message[offset], key_length); offset += key_length; - uint8_t *value = malloc(value_length); + uint8_t *value = (uint8_t *) malloc(value_length); memcpy(value, &message[offset], value_length); offset += value_length; CHECK(offset == length); @@ -762,11 +764,12 @@ void new_client_connection(event_loop *loop, int listener_sock, void *context, int events) { - LocalSchedulerState *state = context; + LocalSchedulerState *state = (LocalSchedulerState *) context; int new_socket = accept_client(listener_sock); /* Create a struct for this worker. This will be freed when we free the local * scheduler state. */ - LocalSchedulerClient *worker = malloc(sizeof(LocalSchedulerClient)); + LocalSchedulerClient *worker = + (LocalSchedulerClient *) malloc(sizeof(LocalSchedulerClient)); worker->sock = new_socket; worker->task_in_progress = NULL; worker->is_blocked = false; @@ -820,7 +823,7 @@ void handle_task_scheduled_callback(Task *original_task, void *user_context) { void handle_actor_creation_callback(ActorInfo info, void *context) { ActorID actor_id = info.actor_id; DBClientID local_scheduler_id = info.local_scheduler_id; - LocalSchedulerState *state = context; + LocalSchedulerState *state = (LocalSchedulerState *) context; /* Make sure the actor entry is not already present in the actor map table. * TODO(rkn): We will need to remove this check to handle the case where the * corresponding publish is retried and the case in which a task that creates @@ -831,7 +834,7 @@ void handle_actor_creation_callback(ActorInfo info, void *context) { /* Create a new entry and add it to the actor mapping table. TODO(rkn): * Currently this is never removed (except when the local scheduler state is * deleted). */ - entry = malloc(sizeof(actor_map_entry)); + entry = (actor_map_entry *) malloc(sizeof(actor_map_entry)); entry->actor_id = actor_id; entry->local_scheduler_id = local_scheduler_id; HASH_ADD(hh, state->actor_mapping, actor_id, sizeof(entry->actor_id), entry); @@ -846,7 +849,7 @@ void handle_actor_creation_callback(ActorInfo info, void *context) { } int heartbeat_handler(event_loop *loop, timer_id id, void *context) { - LocalSchedulerState *state = context; + LocalSchedulerState *state = (LocalSchedulerState *) context; SchedulingAlgorithmState *algorithm_state = state->algorithm_state; LocalSchedulerInfo info; /* Ask the scheduling algorithm to fill out the scheduler info struct. */ diff --git a/src/local_scheduler/local_scheduler_algorithm.c b/src/local_scheduler/local_scheduler_algorithm.cc similarity index 99% rename from src/local_scheduler/local_scheduler_algorithm.c rename to src/local_scheduler/local_scheduler_algorithm.cc index 75760a363b62..dd1cf4c39b2f 100644 --- a/src/local_scheduler/local_scheduler_algorithm.c +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -109,7 +109,7 @@ struct SchedulingAlgorithmState { SchedulingAlgorithmState *SchedulingAlgorithmState_init(void) { SchedulingAlgorithmState *algorithm_state = - malloc(sizeof(SchedulingAlgorithmState)); + (SchedulingAlgorithmState *) malloc(sizeof(SchedulingAlgorithmState)); /* Initialize an empty hash map for the cache of local available objects. */ algorithm_state->local_objects = NULL; /* Initialize the hash table of objects being fetched. */ @@ -220,7 +220,7 @@ void create_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id, LocalSchedulerClient *worker) { /* This will be freed when the actor is removed in remove_actor. */ - LocalActorInfo *entry = malloc(sizeof(LocalActorInfo)); + LocalActorInfo *entry = (LocalActorInfo *) malloc(sizeof(LocalActorInfo)); entry->actor_id = actor_id; entry->task_counter = 0; /* Initialize the doubly-linked list to NULL. */ @@ -341,7 +341,7 @@ void add_task_to_actor_queue(LocalSchedulerState *state, CHECK(task_counter >= entry->task_counter); /* Create a new task queue entry. */ - task_queue_entry *elt = malloc(sizeof(task_queue_entry)); + task_queue_entry *elt = (task_queue_entry *) malloc(sizeof(task_queue_entry)); elt->spec = (task_spec *) malloc(task_spec_size(spec)); memcpy(elt->spec, spec, task_spec_size(spec)); /* Add the task spec to the actor's task queue in a manner that preserves the @@ -457,7 +457,7 @@ void fetch_missing_dependency(LocalSchedulerState *state, * hash table of locally available objects in handle_object_available when * the object becomes available locally. It will get freed if the object is * subsequently removed locally. */ - entry = malloc(sizeof(object_entry)); + entry = (object_entry *) malloc(sizeof(object_entry)); entry->object_id = obj_id; utarray_new(entry->dependent_tasks, &task_queue_entry_icd); HASH_ADD(hh, algorithm_state->remote_objects, object_id, @@ -529,7 +529,7 @@ bool can_run(SchedulingAlgorithmState *algorithm_state, task_spec *task) { /* TODO(rkn): This method will need to be changed to call reconstruct. */ /* TODO(swang): This method is not covered by any valgrind tests. */ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) { - LocalSchedulerState *state = context; + LocalSchedulerState *state = (LocalSchedulerState *) context; /* Only try the fetches if we are connected to the object store manager. */ if (!plasma_manager_is_connected(state->plasma_conn)) { LOG_INFO("Local scheduler is not connected to a object store manager"); @@ -538,7 +538,7 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) { /* Allocate a buffer to hold all the object IDs for active fetch requests. */ int num_object_ids = HASH_COUNT(state->algorithm_state->remote_objects); - ObjectID *object_ids = malloc(num_object_ids * sizeof(ObjectID)); + ObjectID *object_ids = (ObjectID *) malloc(num_object_ids * sizeof(ObjectID)); /* Fill out the request with the object IDs for active fetches. */ object_entry *fetch_request, *tmp; @@ -646,7 +646,7 @@ task_queue_entry *queue_task(LocalSchedulerState *state, bool from_global_scheduler) { /* Copy the spec and add it to the task queue. The allocated spec will be * freed when it is assigned to a worker. */ - task_queue_entry *elt = malloc(sizeof(task_queue_entry)); + task_queue_entry *elt = (task_queue_entry *) malloc(sizeof(task_queue_entry)); elt->spec = (task_spec *) malloc(task_spec_size(spec)); memcpy(elt->spec, spec, task_spec_size(spec)); DL_APPEND((*task_queue), elt); diff --git a/src/local_scheduler/local_scheduler_client.c b/src/local_scheduler/local_scheduler_client.cc similarity index 100% rename from src/local_scheduler/local_scheduler_client.c rename to src/local_scheduler/local_scheduler_client.cc diff --git a/src/local_scheduler/local_scheduler_extension.c b/src/local_scheduler/local_scheduler_extension.cc similarity index 100% rename from src/local_scheduler/local_scheduler_extension.c rename to src/local_scheduler/local_scheduler_extension.cc diff --git a/src/local_scheduler/local_scheduler_shared.h b/src/local_scheduler/local_scheduler_shared.h index cf117b64bfac..ba86efdb37e8 100644 --- a/src/local_scheduler/local_scheduler_shared.h +++ b/src/local_scheduler/local_scheduler_shared.h @@ -29,9 +29,9 @@ enum local_scheduler_message_type { }; /* These are needed to define the UT_arrays. */ -UT_icd task_ptr_icd; -UT_icd workers_icd; -UT_icd pid_t_icd; +extern UT_icd task_ptr_icd; +extern UT_icd workers_icd; +extern UT_icd pid_t_icd; /** This struct is used to register a new worker with the local scheduler. * It is shipped as part of local_scheduler_connect */ diff --git a/src/local_scheduler/test/local_scheduler_tests.c b/src/local_scheduler/test/local_scheduler_tests.cc similarity index 96% rename from src/local_scheduler/test/local_scheduler_tests.c rename to src/local_scheduler/test/local_scheduler_tests.cc index 9c33a28abae6..0f3773ff30fc 100644 --- a/src/local_scheduler/test/local_scheduler_tests.c +++ b/src/local_scheduler/test/local_scheduler_tests.cc @@ -58,7 +58,8 @@ LocalSchedulerMock *LocalSchedulerMock_init(int num_workers, int redis_port = 6379; const double static_resource_conf[MAX_RESOURCE_INDEX] = {DEFAULT_NUM_CPUS, DEFAULT_NUM_GPUS}; - LocalSchedulerMock *mock = malloc(sizeof(LocalSchedulerMock)); + LocalSchedulerMock *mock = + (LocalSchedulerMock *) malloc(sizeof(LocalSchedulerMock)); memset(mock, 0, sizeof(LocalSchedulerMock)); mock->loop = event_loop_create(); /* Bind to the local scheduler port and initialize the local scheduler. */ @@ -95,7 +96,8 @@ LocalSchedulerMock *LocalSchedulerMock_init(int num_workers, /* Connect a local scheduler client. */ mock->num_local_scheduler_conns = num_mock_workers; - mock->conns = malloc(sizeof(LocalSchedulerConnection *) * num_mock_workers); + mock->conns = (LocalSchedulerConnection **) malloc( + sizeof(LocalSchedulerConnection *) * num_mock_workers); for (int i = 0; i < num_mock_workers; ++i) { mock->conns[i] = LocalSchedulerConnection_init( utstring_body(local_scheduler_socket_name), NIL_ACTOR_ID); @@ -160,12 +162,13 @@ TEST object_reconstruction_test(void) { * simulate it having been created and evicted. */ const char *client_id = "clientid"; redisContext *context = redisConnect("127.0.0.1", 6379); - redisReply *reply = redisCommand(context, "RAY.OBJECT_TABLE_ADD %b %ld %b %s", - return_id.id, sizeof(return_id.id), 1, - NIL_DIGEST, (size_t) DIGEST_SIZE, client_id); + redisReply *reply = (redisReply *) redisCommand( + context, "RAY.OBJECT_TABLE_ADD %b %ld %b %s", return_id.id, + sizeof(return_id.id), 1, NIL_DIGEST, (size_t) DIGEST_SIZE, client_id); freeReplyObject(reply); - reply = redisCommand(context, "RAY.OBJECT_TABLE_REMOVE %b %s", return_id.id, - sizeof(return_id.id), client_id); + reply = (redisReply *) redisCommand(context, "RAY.OBJECT_TABLE_REMOVE %b %s", + return_id.id, sizeof(return_id.id), + client_id); freeReplyObject(reply); redisFree(context); @@ -245,12 +248,13 @@ TEST object_reconstruction_recursive_test(void) { redisContext *context = redisConnect("127.0.0.1", 6379); for (int i = 0; i < NUM_TASKS; ++i) { ObjectID return_id = task_return(specs[i], 0); - redisReply *reply = redisCommand( + redisReply *reply = (redisReply *) redisCommand( context, "RAY.OBJECT_TABLE_ADD %b %ld %b %s", return_id.id, sizeof(return_id.id), 1, NIL_DIGEST, (size_t) DIGEST_SIZE, client_id); freeReplyObject(reply); - reply = redisCommand(context, "RAY.OBJECT_TABLE_REMOVE %b %s", return_id.id, - sizeof(return_id.id), client_id); + reply = (redisReply *) redisCommand( + context, "RAY.OBJECT_TABLE_REMOVE %b %s", return_id.id, + sizeof(return_id.id), client_id); freeReplyObject(reply); } redisFree(context); @@ -335,7 +339,7 @@ task_spec *object_reconstruction_suppression_spec; void object_reconstruction_suppression_callback(ObjectID object_id, void *user_context) { /* Submit the task after adding the object to the object table. */ - LocalSchedulerConnection *worker = user_context; + LocalSchedulerConnection *worker = (LocalSchedulerConnection *) user_context; local_scheduler_submit(worker, object_reconstruction_suppression_spec); } diff --git a/src/numbuf/CMakeLists.txt b/src/numbuf/CMakeLists.txt index bfef07f42088..297463461dac 100644 --- a/src/numbuf/CMakeLists.txt +++ b/src/numbuf/CMakeLists.txt @@ -47,7 +47,7 @@ if(HAS_PLASMA) include_directories("${CMAKE_CURRENT_LIST_DIR}/../plasma") include_directories("${CMAKE_CURRENT_LIST_DIR}/../common") include_directories("${CMAKE_CURRENT_LIST_DIR}/../common/thirdparty") - set(COMMON_EXTENSION ../common/lib/python/common_extension.c) + set(COMMON_EXTENSION ../common/lib/python/common_extension.cc) endif() add_definitions(-fPIC) diff --git a/src/numbuf/python/src/pynumbuf/numbuf.cc b/src/numbuf/python/src/pynumbuf/numbuf.cc index 63b052b3dcec..8a694d050bea 100644 --- a/src/numbuf/python/src/pynumbuf/numbuf.cc +++ b/src/numbuf/python/src/pynumbuf/numbuf.cc @@ -15,13 +15,17 @@ #include "memory.h" #ifdef HAS_PLASMA -extern "C" { #include "plasma_client.h" #include "plasma_protocol.h" -} +extern "C" { PyObject* NumbufPlasmaOutOfMemoryError; PyObject* NumbufPlasmaObjectExistsError; +} + +#include "common_extension.h" +#include "plasma_extension.h" + #endif using namespace arrow; @@ -193,9 +197,6 @@ static PyObject* register_callbacks(PyObject* self, PyObject* args) { #ifdef HAS_PLASMA -#include "common_extension.h" -#include "plasma_extension.h" - /** * Release the object when its associated PyCapsule goes out of scope. * diff --git a/src/plasma/CMakeLists.txt b/src/plasma/CMakeLists.txt index 080fb5524bab..b7ee8a9a28a0 100644 --- a/src/plasma/CMakeLists.txt +++ b/src/plasma/CMakeLists.txt @@ -41,11 +41,11 @@ include_directories("${CMAKE_CURRENT_LIST_DIR}/") include_directories("${CMAKE_CURRENT_LIST_DIR}/../") add_library(plasma SHARED - plasma.c - plasma_extension.c - ../common/lib/python/common_extension.c + plasma.cc + plasma_extension.cc + ../common/lib/python/common_extension.cc plasma_protocol.cc - plasma_client.c + plasma_client.cc thirdparty/xxhash.c fling.c) @@ -70,10 +70,10 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC") set_source_files_properties(thirdparty/dlmalloc.c PROPERTIES COMPILE_FLAGS -Wno-all) add_executable(plasma_store - plasma_store.c - plasma.c + plasma_store.cc + plasma.cc plasma_protocol.cc - eviction_policy.c + eviction_policy.cc fling.c malloc.c) @@ -82,8 +82,8 @@ add_dependencies(plasma_store hiredis gen_plasma_fbs) target_link_libraries(plasma_store common ${FLATBUFFERS_STATIC_LIB}) add_library(plasma_lib STATIC - plasma_client.c - plasma.c + plasma_client.cc + plasma.cc plasma_protocol.cc fling.c thirdparty/xxhash.c) @@ -94,15 +94,15 @@ add_dependencies(plasma_lib gen_plasma_fbs) add_dependencies(plasma protocol_fbs) add_executable(plasma_manager - plasma_manager.c) + plasma_manager.cc) target_link_libraries(plasma_manager common plasma_lib ${FLATBUFFERS_STATIC_LIB}) -add_library(plasma_client SHARED plasma_client.c) +add_library(plasma_client SHARED plasma_client.cc) target_link_libraries(plasma_client ${FLATBUFFERS_STATIC_LIB}) target_link_libraries(plasma_client common plasma_lib ${FLATBUFFERS_STATIC_LIB}) define_test(client_tests plasma_lib) -define_test(manager_tests plasma_lib plasma_manager.c) +define_test(manager_tests plasma_lib plasma_manager.cc) define_test(serialization_tests plasma_lib) diff --git a/src/plasma/eviction_policy.c b/src/plasma/eviction_policy.cc similarity index 94% rename from src/plasma/eviction_policy.c rename to src/plasma/eviction_policy.cc index bb322d22de62..d82f843e69f1 100644 --- a/src/plasma/eviction_policy.c +++ b/src/plasma/eviction_policy.cc @@ -4,14 +4,14 @@ /** An element representing a released object in a doubly-linked list. This is * used to implement an LRU cache. */ -typedef struct released_object { +typedef struct ReleasedObject { /** The object_id of the released object. */ ObjectID object_id; /** Needed for the doubly-linked list macros. */ - struct released_object *prev; + struct ReleasedObject *prev; /** Needed for the doubly-linked list macros. */ - struct released_object *next; -} released_object; + struct ReleasedObject *next; +} ReleasedObject; /** This type is used to define a hash table mapping the object ID of a released * object to its location in the doubly-linked list of released objects. */ @@ -20,7 +20,7 @@ typedef struct { ObjectID object_id; /** A pointer to the corresponding entry for this object in the doubly-linked * list of released objects. */ - released_object *released_object; + ReleasedObject *released_object; /** Handle for the uthash table. */ UT_hash_handle handle; } released_object_entry; @@ -31,7 +31,7 @@ struct EvictionState { int64_t memory_used; /** A doubly-linked list of the released objects in order from least recently * released to most recently released. */ - released_object *released_objects; + ReleasedObject *released_objects; /** A hash table mapping the object ID of a released object to its location in * the doubly linked list of released objects. */ released_object_entry *released_object_table; @@ -42,7 +42,7 @@ struct EvictionState { UT_icd released_objects_entry_icd = {sizeof(ObjectID), NULL, NULL, NULL}; EvictionState *EvictionState_init(void) { - EvictionState *state = malloc(sizeof(EvictionState)); + EvictionState *state = (EvictionState *) malloc(sizeof(EvictionState)); state->memory_used = 0; state->released_objects = NULL; state->released_object_table = NULL; @@ -51,7 +51,7 @@ EvictionState *EvictionState_init(void) { void EvictionState_free(EvictionState *s) { /* Delete each element in the doubly-linked list. */ - released_object *element, *temp; + ReleasedObject *element, *temp; DL_FOREACH_SAFE(s->released_objects, element, temp) { DL_DELETE(s->released_objects, element); free(element); @@ -69,7 +69,8 @@ void EvictionState_free(EvictionState *s) { void add_object_to_lru_cache(EvictionState *eviction_state, ObjectID object_id) { /* Add the object ID to the doubly-linked list. */ - released_object *linked_list_entry = malloc(sizeof(released_object)); + ReleasedObject *linked_list_entry = + (ReleasedObject *) malloc(sizeof(ReleasedObject)); linked_list_entry->object_id = object_id; DL_APPEND(eviction_state->released_objects, linked_list_entry); /* Check that the object ID is not already in the hash table. */ @@ -78,7 +79,8 @@ void add_object_to_lru_cache(EvictionState *eviction_state, sizeof(object_id), hash_table_entry); CHECK(hash_table_entry == NULL); /* Add the object ID to the hash table. */ - hash_table_entry = malloc(sizeof(released_object_entry)); + hash_table_entry = + (released_object_entry *) malloc(sizeof(released_object_entry)); hash_table_entry->object_id = object_id; hash_table_entry->released_object = linked_list_entry; HASH_ADD(handle, eviction_state->released_object_table, object_id, @@ -114,7 +116,7 @@ int64_t EvictionState_choose_objects_to_evict( int64_t num_bytes = 0; /* Figure out how many objects need to be evicted in order to recover a * sufficient number of bytes. */ - released_object *element, *temp; + ReleasedObject *element, *temp; DL_FOREACH_SAFE(eviction_state->released_objects, element, temp) { if (num_bytes >= num_bytes_required) { break; diff --git a/src/plasma/plasma.c b/src/plasma/plasma.cc similarity index 100% rename from src/plasma/plasma.c rename to src/plasma/plasma.cc diff --git a/src/plasma/plasma_client.c b/src/plasma/plasma_client.cc similarity index 97% rename from src/plasma/plasma_client.c rename to src/plasma/plasma_client.cc index 8c57488f49e4..66ae58dc58af 100644 --- a/src/plasma/plasma_client.c +++ b/src/plasma/plasma_client.cc @@ -25,15 +25,19 @@ #include "plasma.h" #include "plasma_protocol.h" #include "plasma_client.h" -#include "fling.h" #include "uthash.h" #include "utlist.h" #include "sha256.h" +extern "C" { + +#include "fling.h" + #define XXH_STATIC_LINKING_ONLY #include "xxhash.h" #define XXH64_DEFAULT_SEED 0 +} typedef struct { /** Key that uniquely identifies the memory mapped file. In practice, we @@ -140,13 +144,13 @@ uint8_t *lookup_or_mmap(PlasmaConnection *conn, close(fd); return entry->pointer; } else { - uint8_t *result = - mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + uint8_t *result = (uint8_t *) mmap(NULL, map_size, PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); if (result == MAP_FAILED) { LOG_FATAL("mmap failed"); } close(fd); - entry = malloc(sizeof(client_mmap_table_entry)); + entry = (client_mmap_table_entry *) malloc(sizeof(client_mmap_table_entry)); entry->key = store_fd_val; entry->pointer = result; entry->length = map_size; @@ -177,7 +181,7 @@ void increment_object_count(PlasmaConnection *conn, if (object_entry == NULL) { /* Add this object ID to the hash table of object IDs in use. The * corresponding call to free happens in plasma_release. */ - object_entry = malloc(sizeof(object_in_use_entry)); + object_entry = (object_in_use_entry *) malloc(sizeof(object_in_use_entry)); object_entry->object_id = object_id; object_entry->object = *object; object_entry->count = 0; @@ -305,8 +309,10 @@ void plasma_get(PlasmaConnection *conn, num_objects, timeout_ms) >= 0); uint8_t *reply_data = plasma_receive(conn->store_conn, MessageType_PlasmaGetReply); - ObjectID *received_obj_ids = malloc(num_objects * sizeof(ObjectID)); - PlasmaObject *object_data = malloc(num_objects * sizeof(PlasmaObject)); + ObjectID *received_obj_ids = + (ObjectID *) malloc(num_objects * sizeof(ObjectID)); + PlasmaObject *object_data = + (PlasmaObject *) malloc(num_objects * sizeof(PlasmaObject)); PlasmaObject *object; plasma_read_GetReply(reply_data, received_obj_ids, object_data, num_objects); free(reply_data); @@ -413,7 +419,8 @@ void plasma_perform_release(PlasmaConnection *conn, ObjectID object_id) { void plasma_release(PlasmaConnection *conn, ObjectID obj_id) { /* Add the new object to the release history. The corresponding call to free * will occur in plasma_perform_release or in plasma_disconnect. */ - pending_release *pending_release_entry = malloc(sizeof(pending_release)); + pending_release *pending_release_entry = + (pending_release *) malloc(sizeof(pending_release)); pending_release_entry->object_id = obj_id; DL_APPEND(conn->release_history, pending_release_entry); conn->release_history_length += 1; @@ -555,7 +562,8 @@ PlasmaConnection *plasma_connect(const char *store_socket_name, const char *manager_socket_name, int release_delay) { /* Initialize the store connection struct */ - PlasmaConnection *result = malloc(sizeof(PlasmaConnection)); + PlasmaConnection *result = + (PlasmaConnection *) malloc(sizeof(PlasmaConnection)); result->store_conn = connect_ipc_sock_retry(store_socket_name, -1, -1); if (manager_socket_name != NULL) { result->manager_conn = connect_ipc_sock_retry(manager_socket_name, -1, -1); diff --git a/src/plasma/plasma_extension.c b/src/plasma/plasma_extension.cc similarity index 97% rename from src/plasma/plasma_extension.c rename to src/plasma/plasma_extension.cc index 377b54335a34..9d41c71cd01c 100644 --- a/src/plasma/plasma_extension.c +++ b/src/plasma/plasma_extension.cc @@ -79,7 +79,7 @@ PyObject *PyPlasma_create(PyObject *self, PyObject *args) { CHECK(error_code == PlasmaError_OK); #if PY_MAJOR_VERSION >= 3 - return PyMemoryView_FromMemory((void *) data, (Py_ssize_t) size, PyBUF_WRITE); + return PyMemoryView_FromMemory((char *) data, (Py_ssize_t) size, PyBUF_WRITE); #else return PyBuffer_FromReadWriteMemory((void *) data, (Py_ssize_t) size); #endif @@ -154,11 +154,11 @@ PyObject *PyPlasma_get(PyObject *self, PyObject *args) { #if PY_MAJOR_VERSION >= 3 PyTuple_SetItem( t, 0, PyMemoryView_FromMemory( - (void *) object_buffers[i].data, + (char *) object_buffers[i].data, (Py_ssize_t) object_buffers[i].data_size, PyBUF_READ)); PyTuple_SetItem( t, 1, PyMemoryView_FromMemory( - (void *) object_buffers[i].metadata, + (char *) object_buffers[i].metadata, (Py_ssize_t) object_buffers[i].metadata_size, PyBUF_READ)); #else PyTuple_SetItem( @@ -207,7 +207,7 @@ PyObject *PyPlasma_fetch(PyObject *self, PyObject *args) { return NULL; } Py_ssize_t n = PyList_Size(object_id_list); - ObjectID *object_ids = malloc(sizeof(ObjectID) * n); + ObjectID *object_ids = (ObjectID *) malloc(sizeof(ObjectID) * n); for (int i = 0; i < n; ++i) { PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]); } @@ -249,7 +249,8 @@ PyObject *PyPlasma_wait(PyObject *self, PyObject *args) { return NULL; } - ObjectRequest *object_requests = malloc(sizeof(ObjectRequest) * n); + ObjectRequest *object_requests = + (ObjectRequest *) malloc(sizeof(ObjectRequest) * n); for (int i = 0; i < n; ++i) { CHECK(PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_requests[i].object_id) == 1); diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.cc similarity index 97% rename from src/plasma/plasma_manager.c rename to src/plasma/plasma_manager.cc index 8873b78f6fcf..4aaad9663b26 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.cc @@ -299,7 +299,8 @@ void add_wait_request_for_object(plasma_manager_state *manager_state, * new object_wait_requests struct for this object ID and add it to the hash * table. */ if (object_wait_reqs == NULL) { - object_wait_reqs = malloc(sizeof(object_wait_requests)); + object_wait_reqs = + (object_wait_requests *) malloc(sizeof(object_wait_requests)); object_wait_reqs->object_id = object_id; utarray_new(object_wait_reqs->wait_requests, &wait_request_icd); HASH_ADD(hh, *object_wait_requests_table_ptr, object_id, @@ -423,7 +424,7 @@ void update_object_wait_requests(plasma_manager_state *manager_state, fetch_request *create_fetch_request(plasma_manager_state *manager_state, ObjectID object_id) { - fetch_request *fetch_req = malloc(sizeof(fetch_request)); + fetch_request *fetch_req = (fetch_request *) malloc(sizeof(fetch_request)); fetch_req->object_id = object_id; fetch_req->manager_count = 0; fetch_req->manager_vector = NULL; @@ -450,7 +451,8 @@ plasma_manager_state *init_plasma_manager_state(const char *store_socket_name, int manager_port, const char *db_addr, int db_port) { - plasma_manager_state *state = malloc(sizeof(plasma_manager_state)); + plasma_manager_state *state = + (plasma_manager_state *) malloc(sizeof(plasma_manager_state)); state->loop = event_loop_create(); state->plasma_conn = plasma_connect(store_socket_name, NULL, PLASMA_DEFAULT_RELEASE_DELAY); @@ -465,7 +467,8 @@ plasma_manager_state *init_plasma_manager_state(const char *store_socket_name, utstring_printf(manager_address_str, "%s:%d", manager_addr, manager_port); int num_args = 6; - const char **db_connect_args = malloc(sizeof(char *) * num_args); + const char **db_connect_args = + (const char **) malloc(sizeof(char *) * num_args); db_connect_args[0] = "store_socket_name"; db_connect_args[1] = store_socket_name; db_connect_args[2] = "manager_socket_name"; @@ -705,7 +708,7 @@ ClientConnection *get_manager_connection(plasma_manager_state *state, /* TODO(swang): Handle the case when connection to this manager was * unsuccessful. */ CHECK(fd >= 0); - manager_conn = malloc(sizeof(ClientConnection)); + manager_conn = (ClientConnection *) malloc(sizeof(ClientConnection)); manager_conn->fd = fd; manager_conn->manager_state = state; manager_conn->transfer_queue = NULL; @@ -768,7 +771,8 @@ void process_transfer_request(event_loop *loop, counter += 1; } while (obj_buffer.data_size == -1); DCHECK(obj_buffer.metadata == obj_buffer.data + obj_buffer.data_size); - plasma_request_buffer *buf = malloc(sizeof(plasma_request_buffer)); + plasma_request_buffer *buf = + (plasma_request_buffer *) malloc(sizeof(plasma_request_buffer)); buf->type = MessageType_PlasmaDataReply; buf->object_id = obj_id; /* We treat buf->data as a pointer to the concatenated data and metadata, so @@ -799,7 +803,8 @@ void process_data_request(event_loop *loop, int64_t data_size, int64_t metadata_size, ClientConnection *conn) { - plasma_request_buffer *buf = malloc(sizeof(plasma_request_buffer)); + plasma_request_buffer *buf = + (plasma_request_buffer *) malloc(sizeof(plasma_request_buffer)); buf->object_id = object_id; buf->data_size = data_size; buf->metadata_size = metadata_size; @@ -867,7 +872,7 @@ void request_transfer_from(plasma_manager_state *manager_state, } plasma_request_buffer *transfer_request = - malloc(sizeof(plasma_request_buffer)); + (plasma_request_buffer *) malloc(sizeof(plasma_request_buffer)); transfer_request->type = MessageType_PlasmaDataRequest; transfer_request->object_id = fetch_req->object_id; @@ -885,7 +890,7 @@ void request_transfer_from(plasma_manager_state *manager_state, } int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) { - plasma_manager_state *manager_state = context; + plasma_manager_state *manager_state = (plasma_manager_state *) context; /* Loop over the fetch requests and reissue the requests. */ fetch_request *fetch_req, *tmp; HASH_ITER(hh, manager_state->fetch_requests, fetch_req, tmp) { @@ -938,12 +943,12 @@ void request_transfer(ObjectID object_id, } /* Update the manager vector. */ fetch_req->manager_count = manager_count; - fetch_req->manager_vector = malloc(manager_count * sizeof(char *)); + fetch_req->manager_vector = (char **) malloc(manager_count * sizeof(char *)); fetch_req->next_manager = 0; memset(fetch_req->manager_vector, 0, manager_count * sizeof(char *)); for (int i = 0; i < manager_count; ++i) { int len = strlen(manager_vector[i]); - fetch_req->manager_vector[i] = malloc(len + 1); + fetch_req->manager_vector[i] = (char *) malloc(len + 1); strncpy(fetch_req->manager_vector[i], manager_vector[i], len); fetch_req->manager_vector[i][len] = '\0'; } @@ -1015,7 +1020,8 @@ void process_fetch_requests(ClientConnection *client_conn, int num_object_ids_to_request = 0; /* This is allocating more space than necessary, but we do not know the exact * number of object IDs to request notifications for yet. */ - ObjectID *object_ids_to_request = malloc(num_object_ids * sizeof(ObjectID)); + ObjectID *object_ids_to_request = + (ObjectID *) malloc(num_object_ids * sizeof(ObjectID)); for (int i = 0; i < num_object_ids; ++i) { ObjectID obj_id = object_ids[i]; @@ -1056,7 +1062,7 @@ void process_fetch_requests(ClientConnection *client_conn, } int wait_timeout_handler(event_loop *loop, timer_id id, void *context) { - wait_request *wait_req = context; + wait_request *wait_req = (wait_request *) context; return_from_wait(wait_req->client_conn->manager_state, wait_req); return EVENT_LOOP_TIMER_DONE; } @@ -1070,13 +1076,13 @@ void process_wait_request(ClientConnection *client_conn, plasma_manager_state *manager_state = client_conn->manager_state; /* Create a wait request for this object. */ - wait_request *wait_req = malloc(sizeof(wait_request)); + wait_request *wait_req = (wait_request *) malloc(sizeof(wait_request)); memset(wait_req, 0, sizeof(wait_request)); wait_req->client_conn = client_conn; wait_req->timer = -1; wait_req->num_object_requests = num_object_requests; wait_req->object_requests = - malloc(num_object_requests * sizeof(ObjectRequest)); + (ObjectRequest *) malloc(num_object_requests * sizeof(ObjectRequest)); for (int i = 0; i < num_object_requests; ++i) { wait_req->object_requests[i].object_id = object_requests[i].object_id; wait_req->object_requests[i].type = object_requests[i].type; @@ -1089,7 +1095,7 @@ void process_wait_request(ClientConnection *client_conn, /* This is allocating more space than necessary, but we do not know the exact * number of object IDs to request notifications for yet. */ ObjectID *object_ids_to_request = - malloc(num_object_requests * sizeof(ObjectID)); + (ObjectID *) malloc(num_object_requests * sizeof(ObjectID)); for (int i = 0; i < num_object_requests; ++i) { ObjectID obj_id = object_requests[i].object_id; @@ -1278,7 +1284,7 @@ void process_object_notification(event_loop *loop, int client_sock, void *context, int events) { - plasma_manager_state *state = context; + plasma_manager_state *state = (plasma_manager_state *) context; ObjectInfo object_info; /* Read the notification from Plasma. */ int error = @@ -1333,7 +1339,8 @@ void process_message(event_loop *loop, case MessageType_PlasmaFetchRequest: { LOG_DEBUG("Processing fetch remote"); int64_t num_objects = plasma_read_FetchRequest_num_objects(data); - ObjectID *object_ids_to_fetch = malloc(num_objects * sizeof(ObjectID)); + ObjectID *object_ids_to_fetch = + (ObjectID *) malloc(num_objects * sizeof(ObjectID)); /* TODO(pcm): process_fetch_requests allocates an array of num_objects * object_ids too so these should be shared in the future. */ plasma_read_FetchRequest(data, object_ids_to_fetch, num_objects); @@ -1344,7 +1351,7 @@ void process_message(event_loop *loop, LOG_DEBUG("Processing wait"); int num_object_ids = plasma_read_WaitRequest_num_object_ids(data); ObjectRequest *object_requests = - malloc(num_object_ids * sizeof(ObjectRequest)); + (ObjectRequest *) malloc(num_object_ids * sizeof(ObjectRequest)); int64_t timeout_ms; int num_ready_objects; plasma_read_WaitRequest(data, &object_requests[0], num_object_ids, @@ -1386,7 +1393,8 @@ ClientConnection *ClientConnection_init(event_loop *loop, int events) { int new_socket = accept_client(listener_sock); /* Create a new data connection context per client. */ - ClientConnection *conn = malloc(sizeof(ClientConnection)); + ClientConnection *conn = + (ClientConnection *) malloc(sizeof(ClientConnection)); conn->manager_state = (plasma_manager_state *) context; conn->cursor = 0; conn->transfer_queue = NULL; diff --git a/src/plasma/plasma_protocol.cc b/src/plasma/plasma_protocol.cc index fe2e5b8c168d..cb3c8dc7efb4 100644 --- a/src/plasma/plasma_protocol.cc +++ b/src/plasma/plasma_protocol.cc @@ -1,11 +1,8 @@ #include "flatbuffers/flatbuffers.h" #include "format/plasma_generated.h" -extern "C" { - #include "plasma_protocol.h" #include "io.h" -} /** * Convert an object ID to a flatbuffer string. @@ -45,8 +42,6 @@ to_flat(flatbuffers::FlatBufferBuilder &fbb, return fbb.CreateVector(results); } -extern "C" { - #define FLATBUFFER_BUILDER_DEFAULT_SIZE 1024 protocol_builder *make_protocol_builder(void) { @@ -652,5 +647,3 @@ void plasma_read_DataReply(uint8_t *data, *object_size = (int64_t) message->object_size(); *metadata_size = (int64_t) message->metadata_size(); } - -} /* extern "C" */ diff --git a/src/plasma/plasma_protocol.h b/src/plasma/plasma_protocol.h index ee4533fa0a36..0e254bc0c6ec 100644 --- a/src/plasma/plasma_protocol.h +++ b/src/plasma/plasma_protocol.h @@ -1,55 +1,13 @@ #ifndef PLASMA_PROTOCOL_H #define PLASMA_PROTOCOL_H +#include "format/plasma_generated.h" + #include "common.h" #include "plasma.h" -/* This is temporary to facilitate the transition between flatcc and google - * flatbuffers. */ - -typedef int32_t MessageType_enum_t; -#define MessageType_PlasmaCreateRequest ((MessageType_enum_t) 1L) -#define MessageType_PlasmaCreateReply ((MessageType_enum_t) 2L) -#define MessageType_PlasmaSealRequest ((MessageType_enum_t) 3L) -#define MessageType_PlasmaSealReply ((MessageType_enum_t) 4L) -#define MessageType_PlasmaGetRequest ((MessageType_enum_t) 5L) -#define MessageType_PlasmaGetReply ((MessageType_enum_t) 6L) -#define MessageType_PlasmaReleaseRequest ((MessageType_enum_t) 7L) -#define MessageType_PlasmaReleaseReply ((MessageType_enum_t) 8L) -#define MessageType_PlasmaDeleteRequest ((MessageType_enum_t) 9L) -#define MessageType_PlasmaDeleteReply ((MessageType_enum_t) 10L) -#define MessageType_PlasmaStatusRequest ((MessageType_enum_t) 11L) -#define MessageType_PlasmaStatusReply ((MessageType_enum_t) 12L) -#define MessageType_PlasmaContainsRequest ((MessageType_enum_t) 13L) -#define MessageType_PlasmaContainsReply ((MessageType_enum_t) 14L) -#define MessageType_PlasmaConnectRequest ((MessageType_enum_t) 15L) -#define MessageType_PlasmaConnectReply ((MessageType_enum_t) 16L) -#define MessageType_PlasmaEvictRequest ((MessageType_enum_t) 17L) -#define MessageType_PlasmaEvictReply ((MessageType_enum_t) 18L) -#define MessageType_PlasmaFetchRequest ((MessageType_enum_t) 19L) -#define MessageType_PlasmaWaitRequest ((MessageType_enum_t) 20L) -#define MessageType_PlasmaWaitReply ((MessageType_enum_t) 21L) -#define MessageType_PlasmaSubscribeRequest ((MessageType_enum_t) 22L) -#define MessageType_PlasmaUnsubscribeRequest ((MessageType_enum_t) 23L) -#define MessageType_PlasmaDataRequest ((MessageType_enum_t) 24L) -#define MessageType_PlasmaDataReply ((MessageType_enum_t) 25L) - -typedef int32_t PlasmaError_enum_t; -#define PlasmaError_OK ((PlasmaError_enum_t) 0L) -#define PlasmaError_ObjectExists ((PlasmaError_enum_t) 1L) -#define PlasmaError_ObjectNonexistent ((PlasmaError_enum_t) 2L) -#define PlasmaError_OutOfMemory ((PlasmaError_enum_t) 3L) - -typedef int32_t ObjectStatus_enum_t; -#define ObjectStatus_Local ((ObjectStatus_enum_t) 1L) -#define ObjectStatus_Remote ((ObjectStatus_enum_t) 2L) -#define ObjectStatus_Nonexistent ((ObjectStatus_enum_t) 3L) -#define ObjectStatus_Transfer ((ObjectStatus_enum_t) 4L) - typedef void protocol_builder; -/* End temporary. */ - /* An argument to a function that a return value gets written to. */ #define OUT diff --git a/src/plasma/plasma_store.c b/src/plasma/plasma_store.cc similarity index 96% rename from src/plasma/plasma_store.c rename to src/plasma/plasma_store.cc index 7ca6dd178b9e..f493a97b36d6 100644 --- a/src/plasma/plasma_store.c +++ b/src/plasma/plasma_store.cc @@ -31,14 +31,16 @@ #include "io.h" #include "uthash.h" #include "utarray.h" -#include "fling.h" -#include "malloc.h" #include "plasma_protocol.h" #include "plasma_store.h" #include "plasma.h" +extern "C" { +#include "fling.h" +#include "malloc.h" void *dlmalloc(size_t); void dlfree(void *); +} /** Contains all information that is associated with a Plasma store client. */ struct Client { @@ -126,12 +128,14 @@ UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL}; PlasmaStoreState *PlasmaStoreState_init(event_loop *loop, int64_t system_memory) { - PlasmaStoreState *state = malloc(sizeof(PlasmaStoreState)); + PlasmaStoreState *state = + (PlasmaStoreState *) malloc(sizeof(PlasmaStoreState)); state->loop = loop; state->object_get_requests = NULL; state->pending_notifications = NULL; /* Initialize the plasma store info. */ - state->plasma_store_info = malloc(sizeof(PlasmaStoreInfo)); + state->plasma_store_info = + (PlasmaStoreInfo *) malloc(sizeof(PlasmaStoreInfo)); state->plasma_store_info->objects = NULL; state->plasma_store_info->memory_capacity = system_memory; /* Initialize the eviction state. */ @@ -202,14 +206,14 @@ int create_object(Client *client_context, return PlasmaError_OutOfMemory; } /* Allocate space for the new object */ - uint8_t *pointer = dlmalloc(data_size + metadata_size); + uint8_t *pointer = (uint8_t *) dlmalloc(data_size + metadata_size); int fd; int64_t map_size; ptrdiff_t offset; get_malloc_mapinfo(pointer, &fd, &map_size, &offset); assert(fd != -1); - entry = malloc(sizeof(object_table_entry)); + entry = (object_table_entry *) malloc(sizeof(object_table_entry)); memset(entry, 0, sizeof(object_table_entry)); memcpy(&entry->object_id, &obj_id, sizeof(entry->object_id)); entry->info.obj_id = obj_id; @@ -250,7 +254,7 @@ void add_get_request_for_object(PlasmaStoreState *store_state, * new ObjectGetRequests struct for this object ID and add it to the hash * table. */ if (object_get_reqs == NULL) { - object_get_reqs = malloc(sizeof(ObjectGetRequests)); + object_get_reqs = (ObjectGetRequests *) malloc(sizeof(ObjectGetRequests)); object_get_reqs->object_id = object_id; utarray_new(object_get_reqs->get_requests, &get_request_icd); HASH_ADD(hh, store_state->object_get_requests, object_id, @@ -413,7 +417,7 @@ void update_object_get_requests(PlasmaStoreState *store_state, } int get_timeout_handler(event_loop *loop, timer_id id, void *context) { - GetRequest *get_req = context; + GetRequest *get_req = (GetRequest *) context; return_from_get(get_req->client->plasma_state, get_req); return EVENT_LOOP_TIMER_DONE; } @@ -425,13 +429,14 @@ void process_get_request(Client *client_context, PlasmaStoreState *plasma_state = client_context->plasma_state; /* Create a get request for this object. */ - GetRequest *get_req = malloc(sizeof(GetRequest)); + GetRequest *get_req = (GetRequest *) malloc(sizeof(GetRequest)); memset(get_req, 0, sizeof(GetRequest)); get_req->client = client_context; get_req->timer = -1; get_req->num_object_ids = num_object_ids; - get_req->object_ids = malloc(num_object_ids * sizeof(ObjectID)); - get_req->objects = malloc(num_object_ids * sizeof(PlasmaObject)); + get_req->object_ids = (ObjectID *) malloc(num_object_ids * sizeof(ObjectID)); + get_req->objects = + (PlasmaObject *) malloc(num_object_ids * sizeof(PlasmaObject)); for (int i = 0; i < num_object_ids; ++i) { get_req->object_ids[i] = object_ids[i]; } @@ -570,7 +575,12 @@ void delete_object(PlasmaStoreState *plasma_state, ObjectID object_id) { utarray_free(entry->clients); free(entry); /* Inform all subscribers that the object has been deleted. */ - ObjectInfo notification = {.obj_id = object_id, .is_deletion = true}; + ObjectInfo notification; + /* We memset the struct here because we have to initialize the full struct. + * However, we do not use most of the fields. */ + memset(¬ification, 0, sizeof(notification)); + notification.obj_id = object_id; + notification.is_deletion = true; push_notification(plasma_state, ¬ification); } @@ -602,7 +612,7 @@ void send_notifications(event_loop *loop, int client_sock, void *context, int events) { - PlasmaStoreState *plasma_state = context; + PlasmaStoreState *plasma_state = (PlasmaStoreState *) context; NotificationQueue *queue; HASH_FIND_INT(plasma_state->pending_notifications, &client_sock, queue); CHECK(queue != NULL); @@ -679,7 +689,7 @@ void process_message(event_loop *loop, int client_sock, void *context, int events) { - Client *client_context = context; + Client *client_context = (Client *) context; PlasmaStoreState *state = client_context->plasma_state; int64_t type; read_buffer(client_sock, &type, state->input_buffer); @@ -710,7 +720,8 @@ void process_message(event_loop *loop, } break; case MessageType_PlasmaGetRequest: { num_objects = plasma_read_GetRequest_num_objects(input); - ObjectID *object_ids_to_get = malloc(num_objects * sizeof(ObjectID)); + ObjectID *object_ids_to_get = + (ObjectID *) malloc(num_objects * sizeof(ObjectID)); int64_t timeout_ms; plasma_read_GetRequest(input, object_ids_to_get, &timeout_ms, num_objects); /* TODO(pcm): The array object_ids_to_get could be reused in @@ -791,7 +802,7 @@ void new_client_connection(event_loop *loop, int listener_sock, void *context, int events) { - PlasmaStoreState *plasma_state = context; + PlasmaStoreState *plasma_state = (PlasmaStoreState *) context; int new_socket = accept_client(listener_sock); /* Create a new client object. This will also be used as the context to use * for events on this client's socket. TODO(rkn): free this somewhere. */ diff --git a/src/plasma/test/client_tests.c b/src/plasma/test/client_tests.cc similarity index 100% rename from src/plasma/test/client_tests.c rename to src/plasma/test/client_tests.cc diff --git a/src/plasma/test/manager_tests.c b/src/plasma/test/manager_tests.cc similarity index 93% rename from src/plasma/test/manager_tests.c rename to src/plasma/test/manager_tests.cc index 8120ed805a35..2bee92920fae 100644 --- a/src/plasma/test/manager_tests.c +++ b/src/plasma/test/manager_tests.cc @@ -58,7 +58,7 @@ typedef struct { } plasma_mock; plasma_mock *init_plasma_mock(plasma_mock *remote_mock) { - plasma_mock *mock = malloc(sizeof(plasma_mock)); + plasma_mock *mock = (plasma_mock *) malloc(sizeof(plasma_mock)); /* Start listening on all the ports and initiate the local plasma manager. */ mock->port = bind_inet_sock_retry(&mock->manager_remote_fd); mock->local_store = connect_ipc_sock_retry(plasma_store_socket_name, 5, 100); @@ -121,7 +121,7 @@ void destroy_plasma_mock(plasma_mock *mock) { TEST request_transfer_test(void) { plasma_mock *local_mock = init_plasma_mock(NULL); plasma_mock *remote_mock = init_plasma_mock(local_mock); - const char **manager_vector = malloc(sizeof(char *)); + const char **manager_vector = (const char **) malloc(sizeof(char *)); UT_string *addr = NULL; utstring_new(addr); utstring_printf(addr, "127.0.0.1:%d", remote_mock->port); @@ -164,7 +164,7 @@ TEST request_transfer_retry_test(void) { plasma_mock *local_mock = init_plasma_mock(NULL); plasma_mock *remote_mock1 = init_plasma_mock(local_mock); plasma_mock *remote_mock2 = init_plasma_mock(local_mock); - const char **manager_vector = malloc(sizeof(char *) * 2); + const char **manager_vector = (const char **) malloc(sizeof(char *) * 2); UT_string *addr0 = NULL; utstring_new(addr0); utstring_printf(addr0, "127.0.0.1:%d", remote_mock1->port); @@ -217,20 +217,18 @@ TEST read_write_object_chunk_test(void) { const char *data = "Hello world!"; const int data_size = strlen(data) + 1; const int metadata_size = 0; - plasma_request_buffer remote_buf = { - .type = MessageType_PlasmaDataReply, - .object_id = oid, - .data = (uint8_t *) data, - .data_size = data_size, - .metadata = (uint8_t *) data + data_size, - .metadata_size = metadata_size, - }; - plasma_request_buffer local_buf = { - .object_id = oid, - .data_size = data_size, - .metadata_size = metadata_size, - .data = malloc(data_size), - }; + plasma_request_buffer remote_buf; + remote_buf.type = MessageType_PlasmaDataReply; + remote_buf.object_id = oid; + remote_buf.data = (uint8_t *) data; + remote_buf.data_size = data_size; + remote_buf.metadata = (uint8_t *) data + data_size; + remote_buf.metadata_size = metadata_size; + plasma_request_buffer local_buf; + local_buf.object_id = oid; + local_buf.data_size = data_size; + local_buf.metadata_size = metadata_size; + local_buf.data = (uint8_t *) malloc(data_size); /* The test: * - Write the object data from the remote manager to the local. * - Read the object data on the local manager. diff --git a/src/plasma/test/serialization_tests.c b/src/plasma/test/serialization_tests.cc similarity index 99% rename from src/plasma/test/serialization_tests.c rename to src/plasma/test/serialization_tests.cc index bdab9461f181..3bc74c77df58 100644 --- a/src/plasma/test/serialization_tests.c +++ b/src/plasma/test/serialization_tests.cc @@ -19,9 +19,9 @@ protocol_builder *g_B; * @return File descriptor of the file. */ int create_temp_file(void) { - static char template[] = "/tmp/tempfileXXXXXX"; + static char temp[] = "/tmp/tempfileXXXXXX"; char file_name[32]; - strncpy(file_name, template, 32); + strncpy(file_name, temp, 32); return mkstemp(file_name); }