Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove LWLocks #25

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 20 additions & 36 deletions logerrors.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ typedef struct slow_log_info {
} SlowLogInfo;

typedef struct messages_buffer {
LWLock lock;
int current_interval_index;
pg_atomic_uint32 current_message_index;
pg_atomic_uint64 current_interval_index;
pg_atomic_uint64 current_message_index;
/* depends on messages per interval and max intervals count */
MessageInfo buffer[messages_per_interval * max_actual_intervals_count];
} MessagesBuffer;
Expand Down Expand Up @@ -134,7 +133,6 @@ global_variables_init()
char* excluded_errcode_str;
char excluded_errcodes_copy[error_codes_count * (len_sqlstate_str + 1)];
global_variables->intervals_count = intervals_count;
/* +5 because we don't want take lock on MessagesBuffer while pg_log_errors_stats is running */
global_variables->actual_intervals_count = intervals_count + 5;
global_variables->interval = interval;

Expand Down Expand Up @@ -182,25 +180,16 @@ static void
add_message(int err_code, Oid db_oid, Oid user_oid, int message_type_index) {
int index_to_write;
int current_message;
int current_interval;
if (global_variables == NULL)
return;
LWLockAcquire(&global_variables->messagesBuffer.lock, LW_EXCLUSIVE);
current_message = pg_atomic_read_u32(&global_variables->messagesBuffer.current_message_index);
index_to_write = global_variables->messagesBuffer.current_interval_index * messages_per_interval
+ current_message;
if (current_message >= messages_per_interval) {
/* too many messages per one interval, save current instead of random message in interval */
srand(time(0));
index_to_write = global_variables->messagesBuffer.current_interval_index * messages_per_interval +
rand() % messages_per_interval;
}

current_interval = pg_atomic_read_u64(&global_variables->messagesBuffer.current_interval_index) % (uint64)global_variables->actual_intervals_count;
current_message = pg_atomic_fetch_add_u64(&global_variables->messagesBuffer.current_message_index, 1) % (uint64)messages_per_interval;
index_to_write = current_interval * messages_per_interval + current_message;
global_variables->messagesBuffer.buffer[index_to_write].db_oid = db_oid;
global_variables->messagesBuffer.buffer[index_to_write].user_oid = user_oid;
global_variables->messagesBuffer.buffer[index_to_write].error_code = err_code;
global_variables->messagesBuffer.buffer[index_to_write].message_type_index = message_type_index;
pg_atomic_write_u32(&global_variables->messagesBuffer.current_message_index, current_message + 1);
LWLockRelease(&global_variables->messagesBuffer.lock);
global_variables->messagesBuffer.buffer[index_to_write].error_code = err_code;
}

static char*
Expand Down Expand Up @@ -232,9 +221,9 @@ logerrors_init()
err_name = hash_search(error_names_hashtable, (void *) &key, HASH_ENTER, &found);
err_name->name = (char*)error_names[i];
}
pg_atomic_init_u32(&global_variables->messagesBuffer.current_message_index, 0);
pg_atomic_init_u64(&global_variables->messagesBuffer.current_message_index, 0);
pg_atomic_init_u64(&global_variables->messagesBuffer.current_interval_index, 0);
MemSet(&global_variables->total_count, 0, message_types_count);
LWLockInitialize(&global_variables->messagesBuffer.lock, LWLockNewTrancheId());
for (i = 0; i < message_types_count; ++i) {
pg_atomic_init_u32(&global_variables->total_count[i], 0);
}
Expand All @@ -251,24 +240,21 @@ static void
logerrors_update_info()
{
int i;
int current_index;
int prev_index;
int current_interval;
if (global_variables == NULL) {
return;
}
LWLockAcquire(&global_variables->messagesBuffer.lock, LW_EXCLUSIVE);
prev_index = global_variables->messagesBuffer.current_interval_index;
global_variables->messagesBuffer.current_interval_index = (prev_index + 1)
% global_variables->actual_intervals_count;
current_index = global_variables->messagesBuffer.current_interval_index;
current_interval = (pg_atomic_read_u64(&global_variables->messagesBuffer.current_interval_index) + (uint64)1) %
(uint64)global_variables->actual_intervals_count;
for (i = 0; i < messages_per_interval; ++i) {
global_variables->messagesBuffer.buffer[i + current_index * messages_per_interval].error_code = -1;
global_variables->messagesBuffer.buffer[i + current_index * messages_per_interval].db_oid = -1;
global_variables->messagesBuffer.buffer[i + current_index * messages_per_interval].user_oid = -1;
global_variables->messagesBuffer.buffer[i + current_index * messages_per_interval].message_type_index = -1;
global_variables->messagesBuffer.buffer[i + current_interval * messages_per_interval].error_code = -1;
global_variables->messagesBuffer.buffer[i + current_interval * messages_per_interval].db_oid = -1;
global_variables->messagesBuffer.buffer[i + current_interval * messages_per_interval].user_oid = -1;
global_variables->messagesBuffer.buffer[i + current_interval * messages_per_interval].message_type_index = -1;
}
pg_atomic_write_u32(&global_variables->messagesBuffer.current_message_index, 0);
LWLockRelease(&global_variables->messagesBuffer.lock);
pg_atomic_write_u64(&global_variables->messagesBuffer.current_message_index, 0);
// no locking is required as this is the only place where the current_interval_index changes
pg_atomic_write_u64(&global_variables->messagesBuffer.current_interval_index, current_interval);
}

void
Expand Down Expand Up @@ -657,9 +643,7 @@ pg_log_errors_stats(PG_FUNCTION_ARGS)
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);

LWLockAcquire(&global_variables->messagesBuffer.lock, LW_EXCLUSIVE);
current_interval_index = global_variables->messagesBuffer.current_interval_index;
LWLockRelease(&global_variables->messagesBuffer.lock);
current_interval_index = pg_atomic_read_u64(&global_variables->messagesBuffer.current_interval_index) % (uint64)global_variables->actual_intervals_count;
/* 'TOTAL' counters */
for (lvl_i = 0; lvl_i < message_types_count; ++lvl_i) {

Expand Down