Skip to content

Commit

Permalink
PoC: Set statuses prior to invoking listener
Browse files Browse the repository at this point in the history
Signed-off-by: Erik Boasson <eb@ilities.com>
  • Loading branch information
eboasson committed Aug 19, 2021
1 parent 10eb888 commit 47cccd8
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 15 deletions.
11 changes: 8 additions & 3 deletions src/core/ddsc/src/dds__entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ DDS_EXPORT inline dds_entity_kind_t dds_entity_kind (const dds_entity *e) {
return e->m_kind;
}

DDS_EXPORT void dds_entity_observers_signal (dds_entity *observed, uint32_t status);

DDS_EXPORT void dds_entity_status_signal (dds_entity *e, uint32_t status);

union dds_status_union {
Expand All @@ -102,20 +104,23 @@ union dds_status_union {
static void status_cb_##name_ (dds_##entity_kind_ * const e, const status_cb_data_t *data, bool enabled) \
{ \
struct dds_listener const * const listener = &e->m_entity.m_listener; \
const status_mask_t status_mask = (status_mask_t) (1u << DDS_##NAME_##_STATUS_ID); \
const bool invoke = (listener->on_##name_ != 0) && enabled; \
union dds_status_union lst; \
update_##name_ (&e->m_##name_##_status, invoke ? &lst.name_ : NULL, data); \
bool signal = dds_entity_status_set (&e->m_entity, status_mask); \
if (invoke) { \
dds_entity_status_reset (&e->m_entity, (status_mask_t) (1u << DDS_##NAME_##_STATUS_ID)); \
e->m_entity.m_cb_pending_count++; \
e->m_entity.m_cb_count++; \
ddsrt_mutex_unlock (&e->m_entity.m_observers_lock); \
listener->on_##name_ (e->m_entity.m_hdllink.hdl, lst.name_, listener->on_##name_##_arg); \
ddsrt_mutex_lock (&e->m_entity.m_observers_lock); \
e->m_entity.m_cb_count--; \
e->m_entity.m_cb_pending_count--; \
} else if (enabled) { \
dds_entity_status_set (&e->m_entity, (status_mask_t) (1u << DDS_##NAME_##_STATUS_ID)); \
signal = signal && (ddsrt_atomic_ld32 (&e->m_entity.m_status.m_status_and_mask) & status_mask); \
} \
if (signal) { \
dds_entity_observers_signal (&e->m_entity, status_mask); \
} \
}

Expand Down
10 changes: 4 additions & 6 deletions src/core/ddsc/src/dds_entity.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ static int compare_instance_handle (const void *va, const void *vb)

const ddsrt_avl_treedef_t dds_entity_children_td = DDSRT_AVL_TREEDEF_INITIALIZER (offsetof (struct dds_entity, m_avlnode_child), offsetof (struct dds_entity, m_iid), compare_instance_handle, 0);

static void dds_entity_observers_signal (dds_entity *observed, uint32_t status);
static void dds_entity_observers_signal_delete (dds_entity *observed);

static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state delstate);
Expand Down Expand Up @@ -1421,7 +1420,7 @@ dds_return_t dds_entity_observer_unregister (dds_entity *observed, dds_waitset *
return rc;
}

static void dds_entity_observers_signal (dds_entity *observed, uint32_t status)
void dds_entity_observers_signal (dds_entity *observed, uint32_t status)
{
for (dds_entity_observer *idx = observed->m_observers; idx; idx = idx->m_next)
idx->m_cb (idx->m_observer, observed->m_hdllink.hdl, status);
Expand All @@ -1448,19 +1447,18 @@ void dds_entity_status_signal (dds_entity *e, uint32_t status)
ddsrt_mutex_unlock (&e->m_observers_lock);
}

void dds_entity_status_set (dds_entity *e, status_mask_t status)
bool dds_entity_status_set (dds_entity *e, status_mask_t status)
{
assert (entity_has_status (e));
uint32_t old, delta, new;
do {
old = ddsrt_atomic_ld32 (&e->m_status.m_status_and_mask);
delta = ((uint32_t) status & (old >> SAM_ENABLED_SHIFT));
if (delta == 0)
return;
return false;
new = old | delta;
} while (!ddsrt_atomic_cas32 (&e->m_status.m_status_and_mask, old, new));
if (delta)
dds_entity_observers_signal (e, status);
return (delta != 0);
}

dds_entity_t dds_get_topic (dds_entity_t entity)
Expand Down
22 changes: 16 additions & 6 deletions src/core/ddsc/src/dds_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ void dds_reader_data_available_cb (struct dds_reader *rd)

struct dds_listener const * const lst = &rd->m_entity.m_listener;
dds_entity * const sub = rd->m_entity.m_parent;

bool rd_signal = dds_entity_status_set (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS);
bool sub_signal = dds_entity_status_set (sub, DDS_DATA_ON_READERS_STATUS);

if (lst->on_data_on_readers)
{
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
Expand Down Expand Up @@ -201,17 +205,23 @@ void dds_reader_data_available_cb (struct dds_reader *rd)
lst->on_data_available (rd->m_entity.m_hdllink.hdl, lst->on_data_available_arg);
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
}
else

rd->m_entity.m_cb_count--;
rd->m_entity.m_cb_pending_count--;

if (sub_signal)
{
dds_entity_status_set (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS);
ddsrt_mutex_lock (&sub->m_observers_lock);
dds_entity_status_set (sub, DDS_DATA_ON_READERS_STATUS);
if (ddsrt_atomic_ld32 (&sub->m_status.m_status_and_mask) & DDS_DATA_ON_READERS_STATUS)
dds_entity_observers_signal (sub, DDS_DATA_ON_READERS_STATUS);
ddsrt_mutex_unlock (&sub->m_observers_lock);
}

rd->m_entity.m_cb_count--;
rd->m_entity.m_cb_pending_count--;

if (rd_signal)
{
if (ddsrt_atomic_ld32 (&rd->m_entity.m_status.m_status_and_mask) & DDS_DATA_AVAILABLE_STATUS)
dds_entity_observers_signal (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS);
}
ddsrt_cond_broadcast (&rd->m_entity.m_observers_cond);
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
}
Expand Down

0 comments on commit 47cccd8

Please sign in to comment.