From c6f15c402a9fa0ddb412661c244eee44aa6859c9 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sun, 23 Apr 2017 23:56:41 -0700 Subject: [PATCH 1/7] temp commit --- src/plasma/plasma_store.cc | 48 +++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/src/plasma/plasma_store.cc b/src/plasma/plasma_store.cc index 9fe627e178fd..ace261cc5eab 100644 --- a/src/plasma/plasma_store.cc +++ b/src/plasma/plasma_store.cc @@ -64,7 +64,7 @@ struct Client { * subscribers. */ UT_icd object_info_icd = {sizeof(uint8_t *), NULL, NULL, NULL}; -typedef struct { +struct NotificationItem{ /** Client file descriptor. This is used as a key for the hash table. */ int subscriber_fd; /** The object notifications for clients. We notify the client about the @@ -72,7 +72,7 @@ typedef struct { std::deque *object_notifications; /** Handle for the uthash table. */ UT_hash_handle hh; -} NotificationQueue; +}; struct GetRequest { GetRequest(Client *client, int num_object_ids, ObjectID object_ids[]); @@ -106,8 +106,10 @@ struct PlasmaStoreState { /** The pending notifications that have not been sent to subscribers because * the socket send buffers were full. This is a hash table from client file - * descriptor to an array of object_ids to send to that client. */ - NotificationQueue *pending_notifications; + * descriptor to an array of object_ids to send to that client. + * TODO(pcm): Consider putting this into the Client data structure and + * reorganize the code slightly. */ + std::unordered_map pending_notifications; /** The plasma store information, including the object tables, that is exposed * to the eviction policy. */ PlasmaStoreInfo *plasma_store_info; @@ -142,7 +144,6 @@ GetRequest::GetRequest(Client *client, PlasmaStoreState::PlasmaStoreState(event_loop *loop, int64_t system_memory) : loop(loop), - pending_notifications(NULL), plasma_store_info(new PlasmaStoreInfo()), eviction_state(EvictionState_init()), builder(make_protocol_builder()) { @@ -158,14 +159,14 @@ void PlasmaStoreState_free(PlasmaStoreState *state) { for (const auto &it : state->plasma_store_info->objects) { delete it.second; } - NotificationQueue *queue, *temp_queue; - HASH_ITER(hh, state->pending_notifications, queue, temp_queue) { - for (int i = 0; i < queue->object_notifications->size(); ++i) { - uint8_t *notification = (uint8_t *) queue->object_notifications->at(i); + for (const auto &it : state->pending_notifications) { + auto object_notifications = it.second->object_notifications; + for (int i = 0; i < object_notifications->size(); ++i) { + uint8_t *notification = (uint8_t *) object_notifications->at(i); uint8_t *data = notification; free(data); } - delete queue->object_notifications; + delete object_notifications; } } @@ -551,12 +552,10 @@ void remove_objects(PlasmaStoreState *plasma_state, void push_notification(PlasmaStoreState *plasma_state, ObjectInfoT *object_info) { - NotificationQueue *queue, *temp_queue; - HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) { + for (const auto &it : plasma_state->pending_notifications) { uint8_t *notification = create_object_info_buffer(object_info); - queue->object_notifications->push_back(notification); - send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state, - 0); + it.second->object_notifications->push_back(notification); + send_notifications(plasma_state->loop, it.first, plasma_state, 0); /* The notification gets freed in send_notifications when the notification * is sent over the socket. */ } @@ -568,16 +567,14 @@ void send_notifications(event_loop *loop, void *context, int events) { PlasmaStoreState *plasma_state = (PlasmaStoreState *) context; - NotificationQueue *queue; - HASH_FIND_INT(plasma_state->pending_notifications, &client_sock, queue); - CHECK(queue != NULL); + NotificationItem *item = plasma_state->pending_notifications[client_sock]; int num_processed = 0; bool closed = false; /* Loop over the array of pending notifications and send as many of them as * possible. */ - for (int i = 0; i < queue->object_notifications->size(); ++i) { - uint8_t *notification = (uint8_t *) queue->object_notifications->at(i); + for (int i = 0; i < item->object_notifications->size(); ++i) { + uint8_t *notification = (uint8_t *) item->object_notifications->at(i); /* Decode the length, which is the first bytes of the message. */ int64_t size = *((int64_t *) notification); @@ -610,16 +607,15 @@ void send_notifications(event_loop *loop, free(notification); } /* Remove the sent notifications from the array. */ - queue->object_notifications->erase( - queue->object_notifications->begin(), - queue->object_notifications->begin() + num_processed); + item->object_notifications->erase( + item->object_notifications->begin(), + item->object_notifications->begin() + num_processed); /* Stop sending notifications if the pipe was broken. */ if (closed) { close(client_sock); - delete queue->object_notifications; - HASH_DEL(plasma_state->pending_notifications, queue); - free(queue); + delete item->object_notifications; + plasma_state->pending_notifications.erase(client_sock); } /* If we have sent all notifications, remove the fd from the event loop. */ From 7d8286df401d1318176ce63dc076566ad7928739 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 24 Apr 2017 01:26:58 -0700 Subject: [PATCH 2/7] converted more plasma notifications --- src/plasma/plasma_store.cc | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/plasma/plasma_store.cc b/src/plasma/plasma_store.cc index ace261cc5eab..2df691b758b6 100644 --- a/src/plasma/plasma_store.cc +++ b/src/plasma/plasma_store.cc @@ -160,7 +160,7 @@ void PlasmaStoreState_free(PlasmaStoreState *state) { delete it.second; } for (const auto &it : state->pending_notifications) { - auto object_notifications = it.second->object_notifications; + auto object_notifications = it.second.object_notifications; for (int i = 0; i < object_notifications->size(); ++i) { uint8_t *notification = (uint8_t *) object_notifications->at(i); uint8_t *data = notification; @@ -554,7 +554,7 @@ void push_notification(PlasmaStoreState *plasma_state, ObjectInfoT *object_info) { for (const auto &it : plasma_state->pending_notifications) { uint8_t *notification = create_object_info_buffer(object_info); - it.second->object_notifications->push_back(notification); + it.second.object_notifications->push_back(notification); send_notifications(plasma_state->loop, it.first, plasma_state, 0); /* The notification gets freed in send_notifications when the notification * is sent over the socket. */ @@ -567,7 +567,7 @@ void send_notifications(event_loop *loop, void *context, int events) { PlasmaStoreState *plasma_state = (PlasmaStoreState *) context; - NotificationItem *item = plasma_state->pending_notifications[client_sock]; + NotificationItem *item = &plasma_state->pending_notifications[client_sock]; int num_processed = 0; bool closed = false; @@ -619,7 +619,7 @@ void send_notifications(event_loop *loop, } /* If we have sent all notifications, remove the fd from the event loop. */ - if (queue->object_notifications->empty()) { + if (item->object_notifications->empty()) { event_loop_remove_file(loop, client_sock); } } @@ -640,17 +640,15 @@ void subscribe_to_updates(Client *client_context, int conn) { /* Create a new array to buffer notifications that can't be sent to the * subscriber yet because the socket send buffer is full. TODO(rkn): the queue * never gets freed. */ - NotificationQueue *queue = - (NotificationQueue *) malloc(sizeof(NotificationQueue)); - queue->subscriber_fd = fd; - queue->object_notifications = new std::deque(); - HASH_ADD_INT(plasma_state->pending_notifications, subscriber_fd, queue); + NotificationItem &item = plasma_state->pending_notifications[fd]; + item.subscriber_fd = fd; + item.object_notifications = new std::deque(); /* Push notifications to the new subscriber about existing objects. */ for (const auto &entry : plasma_state->plasma_store_info->objects) { push_notification(plasma_state, &entry.second->info); } - send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state, 0); + send_notifications(plasma_state->loop, fd, plasma_state, 0); } void process_message(event_loop *loop, From e5de101a6b500652cf2b19363574cce8437a44fb Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 24 Apr 2017 01:39:10 -0700 Subject: [PATCH 3/7] cleanup --- src/plasma/plasma_store.cc | 36 +++++++++++------------------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/src/plasma/plasma_store.cc b/src/plasma/plasma_store.cc index 2df691b758b6..c1694b5b694a 100644 --- a/src/plasma/plasma_store.cc +++ b/src/plasma/plasma_store.cc @@ -35,8 +35,6 @@ #include "event_loop.h" #include "eviction_policy.h" #include "io.h" -#include "uthash.h" -#include "utarray.h" #include "plasma_protocol.h" #include "plasma_store.h" #include "plasma.h" @@ -60,18 +58,10 @@ struct Client { PlasmaStoreState *plasma_state; }; -/* This is used to define the queue of object notifications for plasma - * subscribers. */ -UT_icd object_info_icd = {sizeof(uint8_t *), NULL, NULL, NULL}; - struct NotificationItem{ - /** Client file descriptor. This is used as a key for the hash table. */ - int subscriber_fd; /** The object notifications for clients. We notify the client about the * objects in the order that the objects were sealed or deleted. */ - std::deque *object_notifications; - /** Handle for the uthash table. */ - UT_hash_handle hh; + std::deque object_notifications; }; struct GetRequest { @@ -161,12 +151,11 @@ void PlasmaStoreState_free(PlasmaStoreState *state) { } for (const auto &it : state->pending_notifications) { auto object_notifications = it.second.object_notifications; - for (int i = 0; i < object_notifications->size(); ++i) { - uint8_t *notification = (uint8_t *) object_notifications->at(i); + for (int i = 0; i < object_notifications.size(); ++i) { + uint8_t *notification = (uint8_t *) object_notifications.at(i); uint8_t *data = notification; free(data); } - delete object_notifications; } } @@ -552,9 +541,9 @@ void remove_objects(PlasmaStoreState *plasma_state, void push_notification(PlasmaStoreState *plasma_state, ObjectInfoT *object_info) { - for (const auto &it : plasma_state->pending_notifications) { + for (auto &it : plasma_state->pending_notifications) { uint8_t *notification = create_object_info_buffer(object_info); - it.second.object_notifications->push_back(notification); + it.second.object_notifications.push_back(notification); send_notifications(plasma_state->loop, it.first, plasma_state, 0); /* The notification gets freed in send_notifications when the notification * is sent over the socket. */ @@ -573,8 +562,8 @@ void send_notifications(event_loop *loop, bool closed = false; /* Loop over the array of pending notifications and send as many of them as * possible. */ - for (int i = 0; i < item->object_notifications->size(); ++i) { - uint8_t *notification = (uint8_t *) item->object_notifications->at(i); + for (int i = 0; i < item->object_notifications.size(); ++i) { + uint8_t *notification = (uint8_t *) item->object_notifications.at(i); /* Decode the length, which is the first bytes of the message. */ int64_t size = *((int64_t *) notification); @@ -607,19 +596,18 @@ void send_notifications(event_loop *loop, free(notification); } /* Remove the sent notifications from the array. */ - item->object_notifications->erase( - item->object_notifications->begin(), - item->object_notifications->begin() + num_processed); + item->object_notifications.erase( + item->object_notifications.begin(), + item->object_notifications.begin() + num_processed); /* Stop sending notifications if the pipe was broken. */ if (closed) { close(client_sock); - delete item->object_notifications; plasma_state->pending_notifications.erase(client_sock); } /* If we have sent all notifications, remove the fd from the event loop. */ - if (item->object_notifications->empty()) { + if (item->object_notifications.empty()) { event_loop_remove_file(loop, client_sock); } } @@ -641,8 +629,6 @@ void subscribe_to_updates(Client *client_context, int conn) { * subscriber yet because the socket send buffer is full. TODO(rkn): the queue * never gets freed. */ NotificationItem &item = plasma_state->pending_notifications[fd]; - item.subscriber_fd = fd; - item.object_notifications = new std::deque(); /* Push notifications to the new subscriber about existing objects. */ for (const auto &entry : plasma_state->plasma_store_info->objects) { From e47c0bb818a7374c3b0e62718aed9ce91b0bd7de Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 24 Apr 2017 02:23:25 -0700 Subject: [PATCH 4/7] rename --- src/plasma/eviction_policy.cc | 2 +- src/plasma/plasma.cc | 4 ++-- src/plasma/plasma.h | 14 +++++++------- src/plasma/plasma_store.cc | 20 ++++++++++---------- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/plasma/eviction_policy.cc b/src/plasma/eviction_policy.cc index da33ef447338..d25cc39cf055 100644 --- a/src/plasma/eviction_policy.cc +++ b/src/plasma/eviction_policy.cc @@ -122,7 +122,7 @@ int64_t EvictionState_choose_objects_to_evict( break; } /* Find the object table entry for this object. */ - object_table_entry *entry = plasma_store_info->objects[element->object_id]; + ObjectTableEntry *entry = plasma_store_info->objects[element->object_id]; /* Update the cumulative bytes and the number of objects so far. */ num_bytes += (entry->info.data_size + entry->info.metadata_size); num_objects += 1; diff --git a/src/plasma/plasma.cc b/src/plasma/plasma.cc index 820d47c787ef..51cd0b55c3ca 100644 --- a/src/plasma/plasma.cc +++ b/src/plasma/plasma.cc @@ -40,8 +40,8 @@ uint8_t *create_object_info_buffer(ObjectInfoT *object_info) { return notification; } -object_table_entry *get_object_table_entry(PlasmaStoreInfo *store_info, - ObjectID object_id) { +ObjectTableEntry *get_object_table_entry(PlasmaStoreInfo *store_info, + ObjectID object_id) { auto it = store_info->objects.find(object_id); if (it == store_info->objects.end()) { return NULL; diff --git a/src/plasma/plasma.h b/src/plasma/plasma.h index 5ea28a02984a..5fd31683c955 100644 --- a/src/plasma/plasma.h +++ b/src/plasma/plasma.h @@ -97,7 +97,7 @@ typedef enum { /** This type is used by the Plasma store. It is here because it is exposed to * the eviction policy. */ -typedef struct { +struct ObjectTableEntry { /** Object id of this object. */ ObjectID object_id; /** Object info like size, creation time and owner. */ @@ -118,16 +118,16 @@ typedef struct { object_state state; /** The digest of the object. Used to see if two objects are the same. */ unsigned char digest[DIGEST_SIZE]; -} object_table_entry; +}; /** The plasma store information that is exposed to the eviction policy. */ -typedef struct { +struct PlasmaStoreInfo { /** Objects that are in the Plasma store. */ - std::unordered_map objects; + std::unordered_map objects; /** The amount of memory (in bytes) that we allow to be allocated in the * store. */ int64_t memory_capacity; -} PlasmaStoreInfo; +}; /** * Get an entry from the object table and return NULL if the object_id @@ -138,8 +138,8 @@ typedef struct { * @return The entry associated with the object_id or NULL if the object_id * is not present. */ -object_table_entry *get_object_table_entry(PlasmaStoreInfo *store_info, - ObjectID object_id); +ObjectTableEntry *get_object_table_entry(PlasmaStoreInfo *store_info, + ObjectID object_id); /** * Print a warning if the status is less than zero. This should be used to check diff --git a/src/plasma/plasma_store.cc b/src/plasma/plasma_store.cc index c1694b5b694a..3fd1b4d110aa 100644 --- a/src/plasma/plasma_store.cc +++ b/src/plasma/plasma_store.cc @@ -164,7 +164,7 @@ void push_notification(PlasmaStoreState *state, /* If this client is not already using the object, add the client to the * object's list of clients, otherwise do nothing. */ -void add_client_to_object_clients(object_table_entry *entry, +void add_client_to_object_clients(ObjectTableEntry *entry, Client *client_info) { /* Check if this client is already using the object. */ for (int i = 0; i < entry->clients.size(); ++i) { @@ -235,7 +235,7 @@ int create_object(Client *client_context, get_malloc_mapinfo(pointer, &fd, &map_size, &offset); assert(fd != -1); - object_table_entry *entry = new object_table_entry(); + ObjectTableEntry *entry = new ObjectTableEntry(); entry->object_id = obj_id; entry->info.object_id = std::string((char *) &obj_id.id[0], sizeof(obj_id)); entry->info.data_size = data_size; @@ -289,7 +289,7 @@ void remove_get_request(PlasmaStoreState *store_state, GetRequest *get_req) { delete get_req; } -void PlasmaObject_init(PlasmaObject *object, object_table_entry *entry) { +void PlasmaObject_init(PlasmaObject *object, ObjectTableEntry *entry) { DCHECK(object != NULL); DCHECK(entry != NULL); DCHECK(entry->state == PLASMA_SEALED); @@ -354,7 +354,7 @@ void update_object_get_requests(PlasmaStoreState *store_state, int num_requests = get_requests.size(); for (int i = 0; i < num_requests; ++i) { GetRequest *get_req = get_requests[index]; - object_table_entry *entry = + ObjectTableEntry *entry = get_object_table_entry(store_state->plasma_store_info, obj_id); CHECK(entry != NULL); @@ -401,7 +401,7 @@ void process_get_request(Client *client_context, /* Check if this object is already present locally. If so, record that the * object is being used and mark it as accounted for. */ - object_table_entry *entry = + ObjectTableEntry *entry = get_object_table_entry(plasma_state->plasma_store_info, obj_id); if (entry && entry->state == PLASMA_SEALED) { /* Update the get request to take into account the present object. */ @@ -433,7 +433,7 @@ void process_get_request(Client *client_context, } } -int remove_client_from_object_clients(object_table_entry *entry, +int remove_client_from_object_clients(ObjectTableEntry *entry, Client *client_info) { /* Find the location of the client in the array. */ for (int i = 0; i < entry->clients.size(); ++i) { @@ -463,7 +463,7 @@ int remove_client_from_object_clients(object_table_entry *entry, void release_object(Client *client_context, ObjectID object_id) { PlasmaStoreState *plasma_state = client_context->plasma_state; - object_table_entry *entry = + ObjectTableEntry *entry = get_object_table_entry(plasma_state->plasma_store_info, object_id); CHECK(entry != NULL); /* Remove the client from the object's array of clients. */ @@ -473,7 +473,7 @@ void release_object(Client *client_context, ObjectID object_id) { /* Check if an object is present. */ int contains_object(Client *client_context, ObjectID object_id) { PlasmaStoreState *plasma_state = client_context->plasma_state; - object_table_entry *entry = + ObjectTableEntry *entry = get_object_table_entry(plasma_state->plasma_store_info, object_id); return entry && (entry->state == PLASMA_SEALED) ? OBJECT_FOUND : OBJECT_NOT_FOUND; @@ -485,7 +485,7 @@ void seal_object(Client *client_context, unsigned char digest[]) { LOG_DEBUG("sealing object"); // TODO(pcm): add ObjectID here PlasmaStoreState *plasma_state = client_context->plasma_state; - object_table_entry *entry = + ObjectTableEntry *entry = get_object_table_entry(plasma_state->plasma_store_info, object_id); CHECK(entry != NULL); CHECK(entry->state == PLASMA_CREATED); @@ -504,7 +504,7 @@ void seal_object(Client *client_context, * be called on objects that are returned by the eviction policy to evict. */ void delete_object(PlasmaStoreState *plasma_state, ObjectID object_id) { LOG_DEBUG("deleting object"); - object_table_entry *entry = + ObjectTableEntry *entry = get_object_table_entry(plasma_state->plasma_store_info, object_id); /* TODO(rkn): This should probably not fail, but should instead throw an * error. Maybe we should also support deleting objects that have been created From 4ac7db37ff3d76fbdccbb209fb2a09c5d71dbe77 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 24 Apr 2017 10:50:18 -0700 Subject: [PATCH 5/7] linting --- src/plasma/plasma_store.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plasma/plasma_store.cc b/src/plasma/plasma_store.cc index 3fd1b4d110aa..ea0ab2c21d17 100644 --- a/src/plasma/plasma_store.cc +++ b/src/plasma/plasma_store.cc @@ -58,7 +58,7 @@ struct Client { PlasmaStoreState *plasma_state; }; -struct NotificationItem{ +struct NotificationItem { /** The object notifications for clients. We notify the client about the * objects in the order that the objects were sealed or deleted. */ std::deque object_notifications; From ed079840976e8c441d182538301535cf56e1a946 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 24 Apr 2017 13:50:30 -0700 Subject: [PATCH 6/7] fixes --- src/plasma/plasma_store.cc | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/plasma/plasma_store.cc b/src/plasma/plasma_store.cc index ea0ab2c21d17..01227e17554e 100644 --- a/src/plasma/plasma_store.cc +++ b/src/plasma/plasma_store.cc @@ -58,7 +58,7 @@ struct Client { PlasmaStoreState *plasma_state; }; -struct NotificationItem { +struct NotificationQueue { /** The object notifications for clients. We notify the client about the * objects in the order that the objects were sealed or deleted. */ std::deque object_notifications; @@ -98,8 +98,8 @@ struct PlasmaStoreState { * the socket send buffers were full. This is a hash table from client file * descriptor to an array of object_ids to send to that client. * TODO(pcm): Consider putting this into the Client data structure and - * reorganize the code slightly. */ - std::unordered_map pending_notifications; + * reorganize the code slightly. */ + std::unordered_map pending_notifications; /** The plasma store information, including the object tables, that is exposed * to the eviction policy. */ PlasmaStoreInfo *plasma_store_info; @@ -149,8 +149,8 @@ void PlasmaStoreState_free(PlasmaStoreState *state) { for (const auto &it : state->plasma_store_info->objects) { delete it.second; } - for (const auto &it : state->pending_notifications) { - auto object_notifications = it.second.object_notifications; + for (const auto &element : state->pending_notifications) { + auto object_notifications = element.second.object_notifications; for (int i = 0; i < object_notifications.size(); ++i) { uint8_t *notification = (uint8_t *) object_notifications.at(i); uint8_t *data = notification; @@ -541,10 +541,10 @@ void remove_objects(PlasmaStoreState *plasma_state, void push_notification(PlasmaStoreState *plasma_state, ObjectInfoT *object_info) { - for (auto &it : plasma_state->pending_notifications) { + for (auto &element : plasma_state->pending_notifications) { uint8_t *notification = create_object_info_buffer(object_info); - it.second.object_notifications.push_back(notification); - send_notifications(plasma_state->loop, it.first, plasma_state, 0); + element.second.object_notifications.push_back(notification); + send_notifications(plasma_state->loop, element.first, plasma_state, 0); /* The notification gets freed in send_notifications when the notification * is sent over the socket. */ } @@ -556,14 +556,14 @@ void send_notifications(event_loop *loop, void *context, int events) { PlasmaStoreState *plasma_state = (PlasmaStoreState *) context; - NotificationItem *item = &plasma_state->pending_notifications[client_sock]; + NotificationQueue *queue = &plasma_state->pending_notifications[client_sock]; int num_processed = 0; bool closed = false; /* Loop over the array of pending notifications and send as many of them as * possible. */ - for (int i = 0; i < item->object_notifications.size(); ++i) { - uint8_t *notification = (uint8_t *) item->object_notifications.at(i); + for (int i = 0; i < queue->object_notifications.size(); ++i) { + uint8_t *notification = (uint8_t *) queue->object_notifications.at(i); /* Decode the length, which is the first bytes of the message. */ int64_t size = *((int64_t *) notification); @@ -596,9 +596,9 @@ void send_notifications(event_loop *loop, free(notification); } /* Remove the sent notifications from the array. */ - item->object_notifications.erase( - item->object_notifications.begin(), - item->object_notifications.begin() + num_processed); + queue->object_notifications.erase( + queue->object_notifications.begin(), + queue->object_notifications.begin() + num_processed); /* Stop sending notifications if the pipe was broken. */ if (closed) { @@ -607,7 +607,7 @@ void send_notifications(event_loop *loop, } /* If we have sent all notifications, remove the fd from the event loop. */ - if (item->object_notifications.empty()) { + if (queue->object_notifications.empty()) { event_loop_remove_file(loop, client_sock); } } @@ -628,7 +628,7 @@ void subscribe_to_updates(Client *client_context, int conn) { /* Create a new array to buffer notifications that can't be sent to the * subscriber yet because the socket send buffer is full. TODO(rkn): the queue * never gets freed. */ - NotificationItem &item = plasma_state->pending_notifications[fd]; + NotificationQueue &queue = plasma_state->pending_notifications[fd]; /* Push notifications to the new subscriber about existing objects. */ for (const auto &entry : plasma_state->plasma_store_info->objects) { From 15ef2a730bae771ec03ff49e60e751276499c803 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 24 Apr 2017 13:59:27 -0700 Subject: [PATCH 7/7] fixes --- src/plasma/plasma_store.cc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/plasma/plasma_store.cc b/src/plasma/plasma_store.cc index 01227e17554e..2512dcad72ec 100644 --- a/src/plasma/plasma_store.cc +++ b/src/plasma/plasma_store.cc @@ -556,14 +556,14 @@ void send_notifications(event_loop *loop, void *context, int events) { PlasmaStoreState *plasma_state = (PlasmaStoreState *) context; - NotificationQueue *queue = &plasma_state->pending_notifications[client_sock]; + auto it = plasma_state->pending_notifications.find(client_sock); int num_processed = 0; bool closed = false; /* Loop over the array of pending notifications and send as many of them as * possible. */ - for (int i = 0; i < queue->object_notifications.size(); ++i) { - uint8_t *notification = (uint8_t *) queue->object_notifications.at(i); + for (int i = 0; i < it->second.object_notifications.size(); ++i) { + uint8_t *notification = (uint8_t *) it->second.object_notifications.at(i); /* Decode the length, which is the first bytes of the message. */ int64_t size = *((int64_t *) notification); @@ -596,9 +596,9 @@ void send_notifications(event_loop *loop, free(notification); } /* Remove the sent notifications from the array. */ - queue->object_notifications.erase( - queue->object_notifications.begin(), - queue->object_notifications.begin() + num_processed); + it->second.object_notifications.erase( + it->second.object_notifications.begin(), + it->second.object_notifications.begin() + num_processed); /* Stop sending notifications if the pipe was broken. */ if (closed) { @@ -607,7 +607,7 @@ void send_notifications(event_loop *loop, } /* If we have sent all notifications, remove the fd from the event loop. */ - if (queue->object_notifications.empty()) { + if (it->second.object_notifications.empty()) { event_loop_remove_file(loop, client_sock); } }