Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace pthread's rwlock with a FIFO-queueing read-write lock #14839

Merged
merged 5 commits into from
Sep 13, 2022

Conversation

vikman90
Copy link
Member

@vikman90 vikman90 commented Sep 9, 2022

Related issue
#4592

This PR aims to introduce a FIFO-queueing read-write lock shared library, and replace the pthread's rwlock objects with this new structure.

Rationale

Some modules in Wazuh, such as Remoted and Logcollector, are made up of several threads that access memory in different ways:

  • Remoted:
    • Many threads decrypt messages from agents (read mode).
    • One thread reloads file client.keys and updates the memory (write mode).
  • Logcollector:
    • Many threads collect data from files, logging services, and commands (read mode).
    • One thread reloads files (write mode).

In order to prevent data access serialization in reading mode, those modules use read-write locks from the POSIX threads library. This implementation gives preference to readers, which may lead to writer starvation when those modules are highly loaded as there always be many readers working (see #2761).

We solved that problem using the following call, which sets the preference for writers:

pthread_rwlockattr_setkind_np(PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);

That function is non-portable (as the _np suffix indicates), and is available in the GNU C Library. However, this makes it impossible to compile with the musl library, thus porting Wazuh to platforms like Alpine Linux (avoiding writer starvation).

Proof of concept

Code

main.c
// Read-write lock starvation PoC
//
// Compile with -pthread
// Optional flag: -D PREVENT_STARVATION

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdbool.h>
#include <unistd.h>
#include <pthread.h>

#define BUFFER_LEN 64
#define N_READERS 80
#define WRITER_DELAY_SEC 1
#define READER_DELAY_NSEC 10000

#define LOCK_READ(rwlock, block) pthread_rwlock_rdlock(rwlock); block; pthread_rwlock_unlock(rwlock);
#define LOCK_WRITE(rwlock, block) pthread_rwlock_wrlock(rwlock); block; pthread_rwlock_unlock(rwlock);

static pthread_rwlock_t buffer_rwlock;

static void init_rwlock()
{
    pthread_rwlockattr_t attr;
    pthread_rwlockattr_init(&attr);

#ifdef PREVENT_STARVATION
    pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
#endif

    errno = pthread_rwlock_init(&buffer_rwlock, &attr);

    if (errno != 0)
    {
        perror("pthread_rwlock_init");
        abort();
    }

    pthread_rwlockattr_destroy(&attr);
}

static void * reader(void * p)
{
    char * buffer = (char *)p;
    char target[BUFFER_LEN];
    struct timespec delay = {
        .tv_sec = 0,
        .tv_nsec = READER_DELAY_NSEC,
    };

    while (true)
    {
        LOCK_READ(&buffer_rwlock,
        {
            memcpy(target, buffer, BUFFER_LEN);
            nanosleep(&delay, NULL);
        })
    }

    return NULL;
}

static void * writer(void * p)
{
    char * buffer = (char *)p;
    int i = 0;

    while (true)
    {
        LOCK_WRITE(&buffer_rwlock,
        {
            snprintf(buffer, BUFFER_LEN, "%d", i++);
            printf("Writer %s\n", buffer);
        })

        sleep(WRITER_DELAY_SEC);
    }

    return NULL;
}

int main()
{
    pthread_t thread;
    char buffer[BUFFER_LEN];
    int i;

    init_rwlock();

    for (i = 0; i < N_READERS; i++)
    {
        errno = pthread_create(&thread, NULL, reader, (void *)buffer);

        if (errno != 0)
        {
            perror("pthread_start(reader)");
            abort();
        }
    }

    errno = pthread_create(&thread, NULL, writer, (void *)buffer);

    if (errno != 0)
    {
        perror("pthread_start(writer)");
        abort();
    }

    pthread_exit(NULL);
}

Test (starvation)

gcc -O2 -pthread -o rwtest_fail main.c
./rwtest_fail

There is no output: the writer is starved.

Test (prevent starvation)

gcc -O2 -DPREVENT_STARVATION -pthread -o rwtest_cur main.c
./rwtest_cur
Writer 0
Writer 1
Writer 2
(...)

The program prints precisely one line per second, indicating that the writer is working as expected.

Proposed fix

Enclose calls to lock the read-writer locks either in read or write mode, pretending this behavior:

  • Threads acquire the lock in the order they arrive, with no preference for readers or writers.
  • If a reader tries to acquire the lock while it's locked in reader mode, then that reader also takes the lock.

This way, we implement a structure rwlock_t with its respective functions and replace the usage of pthread_rwlock_t with this new library in:

  • Logcollector.
  • Remoted.

We have decided to omit to refactor the structure OSHash due to its complexity and scope (Analysisd).

Proof of concept (1)

Code

rwlock.h
// Read-write lock library (proposal 1)

#ifndef RWLOCK_H
#define RWLOCK_h

#include <pthread.h>

#define RWLOCK_LOCK_READ(rwlock, block) rwlock_lock_read(rwlock); block; rwlock_unlock(rwlock);
#define RWLOCK_LOCK_WRITE(rwlock, block) rwlock_lock_write(rwlock); block; rwlock_unlock(rwlock);

typedef struct {
    pthread_mutex_t mutex;
    pthread_rwlock_t rwlock;
} rwlock_t;

void rwlock_init(rwlock_t * rwlock);
void rwlock_lock_read(rwlock_t * rwlock);
void rwlock_lock_write(rwlock_t * rwlock);
void rwlock_unlock(rwlock_t * rwlock);
void rwlock_destroy(rwlock_t * rwlock);

#endif
rwlock.c
// Read-write lock library (proposal 1)

#include "rwlock.h"

void rwlock_init(rwlock_t * rwlock) {
    pthread_mutex_init(&rwlock->mutex, NULL);
    pthread_rwlock_init(&rwlock->rwlock, NULL);
}

void rwlock_lock_read(rwlock_t * rwlock) {
    pthread_mutex_lock(&rwlock->mutex);
    pthread_rwlock_rdlock(&rwlock->rwlock);
    pthread_mutex_unlock(&rwlock->mutex);
}

void rwlock_lock_write(rwlock_t * rwlock) {
    pthread_mutex_lock(&rwlock->mutex);
    pthread_rwlock_wrlock(&rwlock->rwlock);
    pthread_mutex_unlock(&rwlock->mutex);
}

void rwlock_unlock(rwlock_t * rwlock) {
    pthread_rwlock_unlock(&rwlock->rwlock);
}

void rwlock_destroy(rwlock_t * rwlock) {
    pthread_mutex_destroy(&rwlock->mutex);
    pthread_rwlock_destroy(&rwlock->rwlock);
}
main.c
// Read-write lock starvation PoC (proposal 1)
//
// Compile with -pthread

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdbool.h>
#include <unistd.h>
#include <pthread.h>
#include "rwlock.h"

#define BUFFER_LEN 64
#define N_READERS 80
#define WRITER_DELAY_SEC 1
#define READER_DELAY_NSEC 10000

static rwlock_t buffer_rwlock;

static void * reader(void * p) {
    char * buffer = (char *)p;
    char target[BUFFER_LEN];
    struct timespec delay = {
        .tv_sec = 0,
        .tv_nsec = READER_DELAY_NSEC,
    };

    while (true) {
        RWLOCK_LOCK_READ(&buffer_rwlock, {
            memcpy(target, buffer, BUFFER_LEN);
            nanosleep(&delay, NULL);
        })
    }

    return NULL;
}

static void * writer(void * p) {
    char * buffer = (char *)p;
    int i = 0;

    while (true) {
        RWLOCK_LOCK_WRITE(&buffer_rwlock, {
            snprintf(buffer, BUFFER_LEN, "%d", i++);
            printf("Writer %s\n", buffer);
        })

        sleep(WRITER_DELAY_SEC);
    }

    return NULL;
}

int main() {
    pthread_t thread;
    char buffer[BUFFER_LEN];
    int i;

    rwlock_init(&buffer_rwlock);

    for (i = 0; i < N_READERS; i++) {
        errno = pthread_create(&thread, NULL, reader, (void *)buffer);

        if (errno != 0) {
            perror("pthread_start(reader)");
            abort();
        }
    }

    errno = pthread_create(&thread, NULL, writer, (void *)buffer);

    if (errno != 0) {
        perror("pthread_start(writer)");
        abort();
    }

    pthread_exit(NULL);
}

Test

gcc -O2 -pthread -o rwtest_1 *.c
./rwtest_1
Writer 0
Writer 1
Writer 2
(...)

Proof of concept (2)

This is a variant with a custom implementation of the read-write lock, using a counter and a condition variable.

rwlock.h
// Read-write lock library (proposal 2)

#ifndef RWLOCK_H
#define RWLOCK_h

#include <pthread.h>

#define RWLOCK_LOCK_READ(rwlock, block) rwlock_lock_read(rwlock); block; rwlock_unlock(rwlock);
#define RWLOCK_LOCK_WRITE(rwlock, block) rwlock_lock_write(rwlock); block; rwlock_unlock(rwlock);

typedef struct {
    pthread_mutex_t enter_lock;
    pthread_mutex_t data_lock;
    pthread_cond_t unlocked;
    long data;
} rwlock_t;

void rwlock_init(rwlock_t * rwlock);
void rwlock_lock_read(rwlock_t * rwlock);
void rwlock_lock_write(rwlock_t * rwlock);
void rwlock_unlock(rwlock_t * rwlock);
void rwlock_destroy(rwlock_t * rwlock);

#endif
rwlock.c
// Read-write lock library (proposal 2)

#include "rwlock.h"

void rwlock_init(rwlock_t * rwlock) {
    pthread_mutex_init(&rwlock->enter_lock, NULL);
    pthread_mutex_init(&rwlock->data_lock, NULL);
    pthread_cond_init(&rwlock->unlocked, NULL);
    rwlock->data = 0;
}

void rwlock_lock_read(rwlock_t * rwlock) {
    pthread_mutex_lock(&rwlock->enter_lock);
    pthread_mutex_lock(&rwlock->data_lock);

    while (rwlock->data == -1) {
        pthread_cond_wait(&rwlock->unlocked, &rwlock->data_lock);
    }

    rwlock->data++;

    pthread_mutex_unlock(&rwlock->data_lock);
    pthread_mutex_unlock(&rwlock->enter_lock);
}

void rwlock_lock_write(rwlock_t * rwlock) {
    pthread_mutex_lock(&rwlock->enter_lock);
    pthread_mutex_lock(&rwlock->data_lock);

    while (rwlock->data != 0) {
        pthread_cond_wait(&rwlock->unlocked, &rwlock->data_lock);
    }

    rwlock->data = -1;

    pthread_mutex_unlock(&rwlock->data_lock);
    pthread_mutex_unlock(&rwlock->enter_lock);
}

void rwlock_unlock(rwlock_t * rwlock) {
    pthread_mutex_lock(&rwlock->data_lock);

    if (rwlock->data > 0) {
        // Locked for readers: decrement
        rwlock->data--;
    } else {
        // Locked for a writer: unset
        rwlock->data = 0;
    }

    pthread_cond_signal(&rwlock->unlocked);
    pthread_mutex_unlock(&rwlock->data_lock);
}

void rwlock_destroy(rwlock_t * rwlock) {
    pthread_mutex_destroy(&rwlock->enter_lock);
    pthread_mutex_destroy(&rwlock->data_lock);
    pthread_cond_destroy(&rwlock->unlocked);
}
main.c
// Read-write lock starvation PoC (proposal 2)
//
// Compile with -pthread

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdbool.h>
#include <unistd.h>
#include <pthread.h>
#include "rwlock.h"

#define BUFFER_LEN 64
#define N_READERS 80
#define WRITER_DELAY_SEC 1
#define READER_DELAY_NSEC 10000

static rwlock_t buffer_rwlock;

static void * reader(void * p) {
    char * buffer = (char *)p;
    char target[BUFFER_LEN];
    struct timespec delay = {
        .tv_sec = 0,
        .tv_nsec = READER_DELAY_NSEC,
    };

    while (true) {
        RWLOCK_LOCK_READ(&buffer_rwlock, {
            memcpy(target, buffer, BUFFER_LEN);
            nanosleep(&delay, NULL);
        })
    }

    return NULL;
}

static void * writer(void * p) {
    char * buffer = (char *)p;
    int i = 0;

    while (true) {
        RWLOCK_LOCK_WRITE(&buffer_rwlock, {
            snprintf(buffer, BUFFER_LEN, "%d", i++);
            printf("Writer %s\n", buffer);
        })

        sleep(WRITER_DELAY_SEC);
    }

    return NULL;
}

int main() {
    pthread_t thread;
    char buffer[BUFFER_LEN];
    int i;

    rwlock_init(&buffer_rwlock);

    for (i = 0; i < N_READERS; i++) {
        errno = pthread_create(&thread, NULL, reader, (void *)buffer);

        if (errno != 0) {
            perror("pthread_start(reader)");
            abort();
        }
    }

    errno = pthread_create(&thread, NULL, writer, (void *)buffer);

    if (errno != 0) {
        perror("pthread_start(writer)");
        abort();
    }

    pthread_exit(NULL);
}

Test

gcc -O2 -pthread -o rwtest_2 *.c
./rwtest_2
Writer 0
Writer 1
Writer 2
(...)

Footprint test

We ran the three implementations, having the following CPU usage:

Implementation CPU Overhead
rwtest_cur (current) 657.3 %
rwtest_1 (proposal 1) 660.0 % 0.41 %
rwtest_2 (proposal 2) 752.3 % 14.5 %

Having that both fixes meet the functional expectations, and the first one adds an imperceptible overhead, we decided to apply the first fix.

Tests

  • Functional and stress test with PoC.
  • Port the PoC test to the CMocka tests.
  • Memory test with AddressSanitizer on PoC.
  • Memory test with AddressSanitizer on CMocka test.
  • Race condition test with ThreadSanitizer on PoC.

@vikman90 vikman90 added type/enhancement New feature or request module/logcollector module/remote module/shared This issue refers to shared code or independent of a particular component. type/enhancement/usability labels Sep 9, 2022
@vikman90 vikman90 self-assigned this Sep 9, 2022
w_rwlock_unlock(&can_read_rwlock);
RWLOCK_LOCK_WRITE(&can_read_rwlock, {
_can_read = value;
});
}

int can_read() {

int ret;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to declare this variable, because it can return _can_read directly?

Copy link
Contributor

@FrancoRivero FrancoRivero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@vikman90 vikman90 merged commit 1213fe9 into 4592-wazuh-agent-package-for-alpine-linux Sep 13, 2022
@vikman90 vikman90 deleted the 4592-rwlock branch September 13, 2022 09:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
module/logcollector module/remote module/shared This issue refers to shared code or independent of a particular component. type/enhancement/usability type/enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants