Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per Stream Session Limits #1598

Merged
merged 10 commits into from
May 15, 2024
4 changes: 4 additions & 0 deletions aeron-client/src/main/c/collections/aeron_int64_counter_map.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ extern int aeron_int64_counter_map_get_and_dec(aeron_int64_counter_map_t *map, c
extern void aeron_int64_counter_map_for_each(
aeron_int64_counter_map_t *map, aeron_int64_counter_map_for_each_func_t func, void *clientd);

extern void aeron_int64_counter_map_remove_if(
aeron_int64_counter_map_t *map, aeron_int64_counter_map_predicate_func_t func, void *clientd);


31 changes: 31 additions & 0 deletions aeron-client/src/main/c/collections/aeron_int64_counter_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ inline int aeron_int64_counter_map_get_and_dec(aeron_int64_counter_map_t *map, c
}

typedef void (*aeron_int64_counter_map_for_each_func_t)(void *clientd, int64_t key, int64_t value);
typedef bool (*aeron_int64_counter_map_predicate_func_t)(void *clientd, int64_t key, int64_t value);

inline void aeron_int64_counter_map_for_each(
aeron_int64_counter_map_t *map, aeron_int64_counter_map_for_each_func_t func, void *clientd)
Expand All @@ -354,4 +355,34 @@ inline void aeron_int64_counter_map_for_each(
}
}

inline void aeron_int64_counter_map_remove_if(
aeron_int64_counter_map_t *map, aeron_int64_counter_map_predicate_func_t func, void *clientd)
{
size_t i = 0;
size_t remaining = map->size;

while (i < map->entries_length && remaining > 0)
{
bool is_removed = false;
int64_t key = map->entries[i];
int64_t value = map->entries[i + 1];
if (map->initial_value != value)
{
if (func(clientd, key, value))
{
is_removed = (map->initial_value != aeron_int64_counter_map_remove(map, key));
}
}

if (is_removed)
{
--remaining;
}
else
{
i += 2;
}
}
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
* limitations under the License.
*/

#ifndef AERON_INT32_TO_PTR_HASH_MAP_H
#define AERON_INT32_TO_PTR_HASH_MAP_H
#ifndef AERON_INT64_TO_PTR_HASH_MAP_H
#define AERON_INT64_TO_PTR_HASH_MAP_H

#include <errno.h>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,38 @@ TEST_F(Int64CounterMapTest, shouldForEachNonEmptyMap)

ASSERT_EQ(called, 1u);
}

TEST_F(Int64CounterMapTest, shouldRemoveIfValueMatches)
{
const int64_t initialValue = -2;
ASSERT_EQ(aeron_int64_counter_map_init(&m_map, initialValue, 8, AERON_MAP_DEFAULT_LOAD_FACTOR), 0);

for (int64_t i = 0; i < 10; i++)
{
int64_t value = i / 2;
aeron_int64_counter_map_put(&m_map, i, value, nullptr);
}

const int64_t value_to_remove = 3;
aeron_int64_counter_map_remove_if(
&m_map,
[](void *clientd, int64_t key, int64_t value)
{
int64_t client_v = *(int64_t *)clientd;
return client_v == value;
},
(void *)&value_to_remove);

for (int64_t i = 0; i < 10; i++)
{
int64_t value = i / 2;
if (value_to_remove == value)
{
EXPECT_EQ(initialValue, aeron_int64_counter_map_get(&m_map, i));
}
else
{
EXPECT_EQ(value, aeron_int64_counter_map_get(&m_map, i));
}
}
}
Loading