diff --git a/src/plasma/eviction_policy.cc b/src/plasma/eviction_policy.cc index da33ef447338..d25cc39cf055 100644 --- a/src/plasma/eviction_policy.cc +++ b/src/plasma/eviction_policy.cc @@ -122,7 +122,7 @@ int64_t EvictionState_choose_objects_to_evict( break; } /* Find the object table entry for this object. */ - object_table_entry *entry = plasma_store_info->objects[element->object_id]; + ObjectTableEntry *entry = plasma_store_info->objects[element->object_id]; /* Update the cumulative bytes and the number of objects so far. */ num_bytes += (entry->info.data_size + entry->info.metadata_size); num_objects += 1; diff --git a/src/plasma/plasma.cc b/src/plasma/plasma.cc index 820d47c787ef..51cd0b55c3ca 100644 --- a/src/plasma/plasma.cc +++ b/src/plasma/plasma.cc @@ -40,8 +40,8 @@ uint8_t *create_object_info_buffer(ObjectInfoT *object_info) { return notification; } -object_table_entry *get_object_table_entry(PlasmaStoreInfo *store_info, - ObjectID object_id) { +ObjectTableEntry *get_object_table_entry(PlasmaStoreInfo *store_info, + ObjectID object_id) { auto it = store_info->objects.find(object_id); if (it == store_info->objects.end()) { return NULL; diff --git a/src/plasma/plasma.h b/src/plasma/plasma.h index 5ea28a02984a..5fd31683c955 100644 --- a/src/plasma/plasma.h +++ b/src/plasma/plasma.h @@ -97,7 +97,7 @@ typedef enum { /** This type is used by the Plasma store. It is here because it is exposed to * the eviction policy. */ -typedef struct { +struct ObjectTableEntry { /** Object id of this object. */ ObjectID object_id; /** Object info like size, creation time and owner. */ @@ -118,16 +118,16 @@ typedef struct { object_state state; /** The digest of the object. Used to see if two objects are the same. */ unsigned char digest[DIGEST_SIZE]; -} object_table_entry; +}; /** The plasma store information that is exposed to the eviction policy. */ -typedef struct { +struct PlasmaStoreInfo { /** Objects that are in the Plasma store. */ - std::unordered_map objects; + std::unordered_map objects; /** The amount of memory (in bytes) that we allow to be allocated in the * store. */ int64_t memory_capacity; -} PlasmaStoreInfo; +}; /** * Get an entry from the object table and return NULL if the object_id @@ -138,8 +138,8 @@ typedef struct { * @return The entry associated with the object_id or NULL if the object_id * is not present. */ -object_table_entry *get_object_table_entry(PlasmaStoreInfo *store_info, - ObjectID object_id); +ObjectTableEntry *get_object_table_entry(PlasmaStoreInfo *store_info, + ObjectID object_id); /** * Print a warning if the status is less than zero. This should be used to check diff --git a/src/plasma/plasma_store.cc b/src/plasma/plasma_store.cc index 9fe627e178fd..2512dcad72ec 100644 --- a/src/plasma/plasma_store.cc +++ b/src/plasma/plasma_store.cc @@ -35,8 +35,6 @@ #include "event_loop.h" #include "eviction_policy.h" #include "io.h" -#include "uthash.h" -#include "utarray.h" #include "plasma_protocol.h" #include "plasma_store.h" #include "plasma.h" @@ -60,19 +58,11 @@ struct Client { PlasmaStoreState *plasma_state; }; -/* This is used to define the queue of object notifications for plasma - * subscribers. */ -UT_icd object_info_icd = {sizeof(uint8_t *), NULL, NULL, NULL}; - -typedef struct { - /** Client file descriptor. This is used as a key for the hash table. */ - int subscriber_fd; +struct NotificationQueue { /** The object notifications for clients. We notify the client about the * objects in the order that the objects were sealed or deleted. */ - std::deque *object_notifications; - /** Handle for the uthash table. */ - UT_hash_handle hh; -} NotificationQueue; + std::deque object_notifications; +}; struct GetRequest { GetRequest(Client *client, int num_object_ids, ObjectID object_ids[]); @@ -106,8 +96,10 @@ struct PlasmaStoreState { /** The pending notifications that have not been sent to subscribers because * the socket send buffers were full. This is a hash table from client file - * descriptor to an array of object_ids to send to that client. */ - NotificationQueue *pending_notifications; + * descriptor to an array of object_ids to send to that client. + * TODO(pcm): Consider putting this into the Client data structure and + * reorganize the code slightly. */ + std::unordered_map pending_notifications; /** The plasma store information, including the object tables, that is exposed * to the eviction policy. */ PlasmaStoreInfo *plasma_store_info; @@ -142,7 +134,6 @@ GetRequest::GetRequest(Client *client, PlasmaStoreState::PlasmaStoreState(event_loop *loop, int64_t system_memory) : loop(loop), - pending_notifications(NULL), plasma_store_info(new PlasmaStoreInfo()), eviction_state(EvictionState_init()), builder(make_protocol_builder()) { @@ -158,14 +149,13 @@ void PlasmaStoreState_free(PlasmaStoreState *state) { for (const auto &it : state->plasma_store_info->objects) { delete it.second; } - NotificationQueue *queue, *temp_queue; - HASH_ITER(hh, state->pending_notifications, queue, temp_queue) { - for (int i = 0; i < queue->object_notifications->size(); ++i) { - uint8_t *notification = (uint8_t *) queue->object_notifications->at(i); + for (const auto &element : state->pending_notifications) { + auto object_notifications = element.second.object_notifications; + for (int i = 0; i < object_notifications.size(); ++i) { + uint8_t *notification = (uint8_t *) object_notifications.at(i); uint8_t *data = notification; free(data); } - delete queue->object_notifications; } } @@ -174,7 +164,7 @@ void push_notification(PlasmaStoreState *state, /* If this client is not already using the object, add the client to the * object's list of clients, otherwise do nothing. */ -void add_client_to_object_clients(object_table_entry *entry, +void add_client_to_object_clients(ObjectTableEntry *entry, Client *client_info) { /* Check if this client is already using the object. */ for (int i = 0; i < entry->clients.size(); ++i) { @@ -245,7 +235,7 @@ int create_object(Client *client_context, get_malloc_mapinfo(pointer, &fd, &map_size, &offset); assert(fd != -1); - object_table_entry *entry = new object_table_entry(); + ObjectTableEntry *entry = new ObjectTableEntry(); entry->object_id = obj_id; entry->info.object_id = std::string((char *) &obj_id.id[0], sizeof(obj_id)); entry->info.data_size = data_size; @@ -299,7 +289,7 @@ void remove_get_request(PlasmaStoreState *store_state, GetRequest *get_req) { delete get_req; } -void PlasmaObject_init(PlasmaObject *object, object_table_entry *entry) { +void PlasmaObject_init(PlasmaObject *object, ObjectTableEntry *entry) { DCHECK(object != NULL); DCHECK(entry != NULL); DCHECK(entry->state == PLASMA_SEALED); @@ -364,7 +354,7 @@ void update_object_get_requests(PlasmaStoreState *store_state, int num_requests = get_requests.size(); for (int i = 0; i < num_requests; ++i) { GetRequest *get_req = get_requests[index]; - object_table_entry *entry = + ObjectTableEntry *entry = get_object_table_entry(store_state->plasma_store_info, obj_id); CHECK(entry != NULL); @@ -411,7 +401,7 @@ void process_get_request(Client *client_context, /* Check if this object is already present locally. If so, record that the * object is being used and mark it as accounted for. */ - object_table_entry *entry = + ObjectTableEntry *entry = get_object_table_entry(plasma_state->plasma_store_info, obj_id); if (entry && entry->state == PLASMA_SEALED) { /* Update the get request to take into account the present object. */ @@ -443,7 +433,7 @@ void process_get_request(Client *client_context, } } -int remove_client_from_object_clients(object_table_entry *entry, +int remove_client_from_object_clients(ObjectTableEntry *entry, Client *client_info) { /* Find the location of the client in the array. */ for (int i = 0; i < entry->clients.size(); ++i) { @@ -473,7 +463,7 @@ int remove_client_from_object_clients(object_table_entry *entry, void release_object(Client *client_context, ObjectID object_id) { PlasmaStoreState *plasma_state = client_context->plasma_state; - object_table_entry *entry = + ObjectTableEntry *entry = get_object_table_entry(plasma_state->plasma_store_info, object_id); CHECK(entry != NULL); /* Remove the client from the object's array of clients. */ @@ -483,7 +473,7 @@ void release_object(Client *client_context, ObjectID object_id) { /* Check if an object is present. */ int contains_object(Client *client_context, ObjectID object_id) { PlasmaStoreState *plasma_state = client_context->plasma_state; - object_table_entry *entry = + ObjectTableEntry *entry = get_object_table_entry(plasma_state->plasma_store_info, object_id); return entry && (entry->state == PLASMA_SEALED) ? OBJECT_FOUND : OBJECT_NOT_FOUND; @@ -495,7 +485,7 @@ void seal_object(Client *client_context, unsigned char digest[]) { LOG_DEBUG("sealing object"); // TODO(pcm): add ObjectID here PlasmaStoreState *plasma_state = client_context->plasma_state; - object_table_entry *entry = + ObjectTableEntry *entry = get_object_table_entry(plasma_state->plasma_store_info, object_id); CHECK(entry != NULL); CHECK(entry->state == PLASMA_CREATED); @@ -514,7 +504,7 @@ void seal_object(Client *client_context, * be called on objects that are returned by the eviction policy to evict. */ void delete_object(PlasmaStoreState *plasma_state, ObjectID object_id) { LOG_DEBUG("deleting object"); - object_table_entry *entry = + ObjectTableEntry *entry = get_object_table_entry(plasma_state->plasma_store_info, object_id); /* TODO(rkn): This should probably not fail, but should instead throw an * error. Maybe we should also support deleting objects that have been created @@ -551,12 +541,10 @@ void remove_objects(PlasmaStoreState *plasma_state, void push_notification(PlasmaStoreState *plasma_state, ObjectInfoT *object_info) { - NotificationQueue *queue, *temp_queue; - HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) { + for (auto &element : plasma_state->pending_notifications) { uint8_t *notification = create_object_info_buffer(object_info); - queue->object_notifications->push_back(notification); - send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state, - 0); + element.second.object_notifications.push_back(notification); + send_notifications(plasma_state->loop, element.first, plasma_state, 0); /* The notification gets freed in send_notifications when the notification * is sent over the socket. */ } @@ -568,16 +556,14 @@ void send_notifications(event_loop *loop, void *context, int events) { PlasmaStoreState *plasma_state = (PlasmaStoreState *) context; - NotificationQueue *queue; - HASH_FIND_INT(plasma_state->pending_notifications, &client_sock, queue); - CHECK(queue != NULL); + auto it = plasma_state->pending_notifications.find(client_sock); int num_processed = 0; bool closed = false; /* Loop over the array of pending notifications and send as many of them as * possible. */ - for (int i = 0; i < queue->object_notifications->size(); ++i) { - uint8_t *notification = (uint8_t *) queue->object_notifications->at(i); + for (int i = 0; i < it->second.object_notifications.size(); ++i) { + uint8_t *notification = (uint8_t *) it->second.object_notifications.at(i); /* Decode the length, which is the first bytes of the message. */ int64_t size = *((int64_t *) notification); @@ -610,20 +596,18 @@ void send_notifications(event_loop *loop, free(notification); } /* Remove the sent notifications from the array. */ - queue->object_notifications->erase( - queue->object_notifications->begin(), - queue->object_notifications->begin() + num_processed); + it->second.object_notifications.erase( + it->second.object_notifications.begin(), + it->second.object_notifications.begin() + num_processed); /* Stop sending notifications if the pipe was broken. */ if (closed) { close(client_sock); - delete queue->object_notifications; - HASH_DEL(plasma_state->pending_notifications, queue); - free(queue); + plasma_state->pending_notifications.erase(client_sock); } /* If we have sent all notifications, remove the fd from the event loop. */ - if (queue->object_notifications->empty()) { + if (it->second.object_notifications.empty()) { event_loop_remove_file(loop, client_sock); } } @@ -644,17 +628,13 @@ void subscribe_to_updates(Client *client_context, int conn) { /* Create a new array to buffer notifications that can't be sent to the * subscriber yet because the socket send buffer is full. TODO(rkn): the queue * never gets freed. */ - NotificationQueue *queue = - (NotificationQueue *) malloc(sizeof(NotificationQueue)); - queue->subscriber_fd = fd; - queue->object_notifications = new std::deque(); - HASH_ADD_INT(plasma_state->pending_notifications, subscriber_fd, queue); + NotificationQueue &queue = plasma_state->pending_notifications[fd]; /* Push notifications to the new subscriber about existing objects. */ for (const auto &entry : plasma_state->plasma_store_info->objects) { push_notification(plasma_state, &entry.second->info); } - send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state, 0); + send_notifications(plasma_state->loop, fd, plasma_state, 0); } void process_message(event_loop *loop,