Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/plasma/eviction_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ int64_t EvictionState_choose_objects_to_evict(
break;
}
/* Find the object table entry for this object. */
object_table_entry *entry = plasma_store_info->objects[element->object_id];
ObjectTableEntry *entry = plasma_store_info->objects[element->object_id];
/* Update the cumulative bytes and the number of objects so far. */
num_bytes += (entry->info.data_size + entry->info.metadata_size);
num_objects += 1;
Expand Down
4 changes: 2 additions & 2 deletions src/plasma/plasma.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ uint8_t *create_object_info_buffer(ObjectInfoT *object_info) {
return notification;
}

object_table_entry *get_object_table_entry(PlasmaStoreInfo *store_info,
ObjectID object_id) {
ObjectTableEntry *get_object_table_entry(PlasmaStoreInfo *store_info,
ObjectID object_id) {
auto it = store_info->objects.find(object_id);
if (it == store_info->objects.end()) {
return NULL;
Expand Down
14 changes: 7 additions & 7 deletions src/plasma/plasma.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ typedef enum {

/** This type is used by the Plasma store. It is here because it is exposed to
* the eviction policy. */
typedef struct {
struct ObjectTableEntry {
/** Object id of this object. */
ObjectID object_id;
/** Object info like size, creation time and owner. */
Expand All @@ -118,16 +118,16 @@ typedef struct {
object_state state;
/** The digest of the object. Used to see if two objects are the same. */
unsigned char digest[DIGEST_SIZE];
} object_table_entry;
};

/** The plasma store information that is exposed to the eviction policy. */
typedef struct {
struct PlasmaStoreInfo {
/** Objects that are in the Plasma store. */
std::unordered_map<ObjectID, object_table_entry *, UniqueIDHasher> objects;
std::unordered_map<ObjectID, ObjectTableEntry *, UniqueIDHasher> objects;
/** The amount of memory (in bytes) that we allow to be allocated in the
* store. */
int64_t memory_capacity;
} PlasmaStoreInfo;
};

/**
* Get an entry from the object table and return NULL if the object_id
Expand All @@ -138,8 +138,8 @@ typedef struct {
* @return The entry associated with the object_id or NULL if the object_id
* is not present.
*/
object_table_entry *get_object_table_entry(PlasmaStoreInfo *store_info,
ObjectID object_id);
ObjectTableEntry *get_object_table_entry(PlasmaStoreInfo *store_info,
ObjectID object_id);

/**
* Print a warning if the status is less than zero. This should be used to check
Expand Down
88 changes: 34 additions & 54 deletions src/plasma/plasma_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
#include "event_loop.h"
#include "eviction_policy.h"
#include "io.h"
#include "uthash.h"
#include "utarray.h"
#include "plasma_protocol.h"
#include "plasma_store.h"
#include "plasma.h"
Expand All @@ -60,19 +58,11 @@ struct Client {
PlasmaStoreState *plasma_state;
};

/* This is used to define the queue of object notifications for plasma
* subscribers. */
UT_icd object_info_icd = {sizeof(uint8_t *), NULL, NULL, NULL};

typedef struct {
/** Client file descriptor. This is used as a key for the hash table. */
int subscriber_fd;
struct NotificationQueue {
/** The object notifications for clients. We notify the client about the
* objects in the order that the objects were sealed or deleted. */
std::deque<uint8_t *> *object_notifications;
/** Handle for the uthash table. */
UT_hash_handle hh;
} NotificationQueue;
std::deque<uint8_t *> object_notifications;
};

struct GetRequest {
GetRequest(Client *client, int num_object_ids, ObjectID object_ids[]);
Expand Down Expand Up @@ -106,8 +96,10 @@ struct PlasmaStoreState {

/** The pending notifications that have not been sent to subscribers because
* the socket send buffers were full. This is a hash table from client file
* descriptor to an array of object_ids to send to that client. */
NotificationQueue *pending_notifications;
* descriptor to an array of object_ids to send to that client.
* TODO(pcm): Consider putting this into the Client data structure and
* reorganize the code slightly. */
std::unordered_map<int, NotificationQueue> pending_notifications;
/** The plasma store information, including the object tables, that is exposed
* to the eviction policy. */
PlasmaStoreInfo *plasma_store_info;
Expand Down Expand Up @@ -142,7 +134,6 @@ GetRequest::GetRequest(Client *client,

PlasmaStoreState::PlasmaStoreState(event_loop *loop, int64_t system_memory)
: loop(loop),
pending_notifications(NULL),
plasma_store_info(new PlasmaStoreInfo()),
eviction_state(EvictionState_init()),
builder(make_protocol_builder()) {
Expand All @@ -158,14 +149,13 @@ void PlasmaStoreState_free(PlasmaStoreState *state) {
for (const auto &it : state->plasma_store_info->objects) {
delete it.second;
}
NotificationQueue *queue, *temp_queue;
HASH_ITER(hh, state->pending_notifications, queue, temp_queue) {
for (int i = 0; i < queue->object_notifications->size(); ++i) {
uint8_t *notification = (uint8_t *) queue->object_notifications->at(i);
for (const auto &element : state->pending_notifications) {
auto object_notifications = element.second.object_notifications;
for (int i = 0; i < object_notifications.size(); ++i) {
uint8_t *notification = (uint8_t *) object_notifications.at(i);
uint8_t *data = notification;
free(data);
}
delete queue->object_notifications;
}
}

Expand All @@ -174,7 +164,7 @@ void push_notification(PlasmaStoreState *state,

/* If this client is not already using the object, add the client to the
* object's list of clients, otherwise do nothing. */
void add_client_to_object_clients(object_table_entry *entry,
void add_client_to_object_clients(ObjectTableEntry *entry,
Client *client_info) {
/* Check if this client is already using the object. */
for (int i = 0; i < entry->clients.size(); ++i) {
Expand Down Expand Up @@ -245,7 +235,7 @@ int create_object(Client *client_context,
get_malloc_mapinfo(pointer, &fd, &map_size, &offset);
assert(fd != -1);

object_table_entry *entry = new object_table_entry();
ObjectTableEntry *entry = new ObjectTableEntry();
entry->object_id = obj_id;
entry->info.object_id = std::string((char *) &obj_id.id[0], sizeof(obj_id));
entry->info.data_size = data_size;
Expand Down Expand Up @@ -299,7 +289,7 @@ void remove_get_request(PlasmaStoreState *store_state, GetRequest *get_req) {
delete get_req;
}

void PlasmaObject_init(PlasmaObject *object, object_table_entry *entry) {
void PlasmaObject_init(PlasmaObject *object, ObjectTableEntry *entry) {
DCHECK(object != NULL);
DCHECK(entry != NULL);
DCHECK(entry->state == PLASMA_SEALED);
Expand Down Expand Up @@ -364,7 +354,7 @@ void update_object_get_requests(PlasmaStoreState *store_state,
int num_requests = get_requests.size();
for (int i = 0; i < num_requests; ++i) {
GetRequest *get_req = get_requests[index];
object_table_entry *entry =
ObjectTableEntry *entry =
get_object_table_entry(store_state->plasma_store_info, obj_id);
CHECK(entry != NULL);

Expand Down Expand Up @@ -411,7 +401,7 @@ void process_get_request(Client *client_context,

/* Check if this object is already present locally. If so, record that the
* object is being used and mark it as accounted for. */
object_table_entry *entry =
ObjectTableEntry *entry =
get_object_table_entry(plasma_state->plasma_store_info, obj_id);
if (entry && entry->state == PLASMA_SEALED) {
/* Update the get request to take into account the present object. */
Expand Down Expand Up @@ -443,7 +433,7 @@ void process_get_request(Client *client_context,
}
}

int remove_client_from_object_clients(object_table_entry *entry,
int remove_client_from_object_clients(ObjectTableEntry *entry,
Client *client_info) {
/* Find the location of the client in the array. */
for (int i = 0; i < entry->clients.size(); ++i) {
Expand Down Expand Up @@ -473,7 +463,7 @@ int remove_client_from_object_clients(object_table_entry *entry,

void release_object(Client *client_context, ObjectID object_id) {
PlasmaStoreState *plasma_state = client_context->plasma_state;
object_table_entry *entry =
ObjectTableEntry *entry =
get_object_table_entry(plasma_state->plasma_store_info, object_id);
CHECK(entry != NULL);
/* Remove the client from the object's array of clients. */
Expand All @@ -483,7 +473,7 @@ void release_object(Client *client_context, ObjectID object_id) {
/* Check if an object is present. */
int contains_object(Client *client_context, ObjectID object_id) {
PlasmaStoreState *plasma_state = client_context->plasma_state;
object_table_entry *entry =
ObjectTableEntry *entry =
get_object_table_entry(plasma_state->plasma_store_info, object_id);
return entry && (entry->state == PLASMA_SEALED) ? OBJECT_FOUND
: OBJECT_NOT_FOUND;
Expand All @@ -495,7 +485,7 @@ void seal_object(Client *client_context,
unsigned char digest[]) {
LOG_DEBUG("sealing object"); // TODO(pcm): add ObjectID here
PlasmaStoreState *plasma_state = client_context->plasma_state;
object_table_entry *entry =
ObjectTableEntry *entry =
get_object_table_entry(plasma_state->plasma_store_info, object_id);
CHECK(entry != NULL);
CHECK(entry->state == PLASMA_CREATED);
Expand All @@ -514,7 +504,7 @@ void seal_object(Client *client_context,
* be called on objects that are returned by the eviction policy to evict. */
void delete_object(PlasmaStoreState *plasma_state, ObjectID object_id) {
LOG_DEBUG("deleting object");
object_table_entry *entry =
ObjectTableEntry *entry =
get_object_table_entry(plasma_state->plasma_store_info, object_id);
/* TODO(rkn): This should probably not fail, but should instead throw an
* error. Maybe we should also support deleting objects that have been created
Expand Down Expand Up @@ -551,12 +541,10 @@ void remove_objects(PlasmaStoreState *plasma_state,

void push_notification(PlasmaStoreState *plasma_state,
ObjectInfoT *object_info) {
NotificationQueue *queue, *temp_queue;
HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) {
for (auto &element : plasma_state->pending_notifications) {
uint8_t *notification = create_object_info_buffer(object_info);
queue->object_notifications->push_back(notification);
send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state,
0);
element.second.object_notifications.push_back(notification);
send_notifications(plasma_state->loop, element.first, plasma_state, 0);
/* The notification gets freed in send_notifications when the notification
* is sent over the socket. */
}
Expand All @@ -568,16 +556,14 @@ void send_notifications(event_loop *loop,
void *context,
int events) {
PlasmaStoreState *plasma_state = (PlasmaStoreState *) context;
NotificationQueue *queue;
HASH_FIND_INT(plasma_state->pending_notifications, &client_sock, queue);
CHECK(queue != NULL);
auto it = plasma_state->pending_notifications.find(client_sock);

int num_processed = 0;
bool closed = false;
/* Loop over the array of pending notifications and send as many of them as
* possible. */
for (int i = 0; i < queue->object_notifications->size(); ++i) {
uint8_t *notification = (uint8_t *) queue->object_notifications->at(i);
for (int i = 0; i < it->second.object_notifications.size(); ++i) {
uint8_t *notification = (uint8_t *) it->second.object_notifications.at(i);
/* Decode the length, which is the first bytes of the message. */
int64_t size = *((int64_t *) notification);

Expand Down Expand Up @@ -610,20 +596,18 @@ void send_notifications(event_loop *loop,
free(notification);
}
/* Remove the sent notifications from the array. */
queue->object_notifications->erase(
queue->object_notifications->begin(),
queue->object_notifications->begin() + num_processed);
it->second.object_notifications.erase(
it->second.object_notifications.begin(),
it->second.object_notifications.begin() + num_processed);

/* Stop sending notifications if the pipe was broken. */
if (closed) {
close(client_sock);
delete queue->object_notifications;
HASH_DEL(plasma_state->pending_notifications, queue);
free(queue);
plasma_state->pending_notifications.erase(client_sock);
}

/* If we have sent all notifications, remove the fd from the event loop. */
if (queue->object_notifications->empty()) {
if (it->second.object_notifications.empty()) {
event_loop_remove_file(loop, client_sock);
}
}
Expand All @@ -644,17 +628,13 @@ void subscribe_to_updates(Client *client_context, int conn) {
/* Create a new array to buffer notifications that can't be sent to the
* subscriber yet because the socket send buffer is full. TODO(rkn): the queue
* never gets freed. */
NotificationQueue *queue =
(NotificationQueue *) malloc(sizeof(NotificationQueue));
queue->subscriber_fd = fd;
queue->object_notifications = new std::deque<uint8_t *>();
HASH_ADD_INT(plasma_state->pending_notifications, subscriber_fd, queue);
NotificationQueue &queue = plasma_state->pending_notifications[fd];

/* Push notifications to the new subscriber about existing objects. */
for (const auto &entry : plasma_state->plasma_store_info->objects) {
push_notification(plasma_state, &entry.second->info);
}
send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state, 0);
send_notifications(plasma_state->loop, fd, plasma_state, 0);
}

void process_message(event_loop *loop,
Expand Down