Skip to content

Commit

Permalink
Fix crash and deadlocks in the support for recursive logging (#7127)
Browse files Browse the repository at this point in the history
The code has been slightly refactored to remove an incorrect usage
of std::futures in a queue, where it was possible to try to wait on the
same std::future twice, which would lead to a crash.

Even after fixing the crash, it was possible that threads dealing
with recursive logging would change order and wait on a std::future
of another thread, causing a deadlock.

Finally the deferred log relaying wasn't always used when it should've,
due to a confusion between boolean values.
A enum class has been used instead to be more expressive about
the relaying mode asked.
  • Loading branch information
Smjert committed May 30, 2021
1 parent acfa52b commit 3541ef9
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 45 deletions.
2 changes: 1 addition & 1 deletion osquery/dispatcher/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ void SchedulerRunner::maybeReloadSchedule(uint64_t time_step) {
void SchedulerRunner::maybeFlushLogs(uint64_t time_step) {
// GLog is not re-entrant, so logs must be flushed in a dedicated thread.
if ((time_step % 3) == 0) {
relayStatusLogs(true);
relayStatusLogs(LoggerRelayMode::Async);
}
}

Expand Down
7 changes: 3 additions & 4 deletions osquery/logger/data_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

namespace osquery {

enum class LoggerRelayMode { Sync, Async };

/// Set the verbose mode, changes Glog's sinking logic and will affect plugins.
void setVerboseLevel();

Expand Down Expand Up @@ -120,14 +122,11 @@ Status logSnapshotQuery(const QueryLogItem& item);
* Extensions, the registry, configuration, and optional config/logger plugins
* are all protected as a monitored worker.
*/
void relayStatusLogs(bool async = false);
void relayStatusLogs(LoggerRelayMode relay_mode = LoggerRelayMode::Sync);

/// Inspect the number of internal-buffered status log lines.
size_t queuedStatuses();

/// Inspect the number of active internal status log sender threads.
size_t queuedSenders();

/**
* @brief Write a log line to the OS system log.
*
Expand Down
57 changes: 22 additions & 35 deletions osquery/logger/logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include <algorithm>
#include <future>
#include <optional>
#include <queue>
#include <thread>

Expand Down Expand Up @@ -153,10 +154,6 @@ class BufferedLogSink : public google::LogSink, private boost::noncopyable {
/// Retrieve the list of enabled plugins that should have logs forwarded.
const std::vector<std::string>& enabledPlugins() const;

public:
/// Queue of sender functions that relay status logs to all plugins.
std::queue<std::future<void>> senders;

public:
BufferedLogSink(BufferedLogSink const&) = delete;
void operator=(BufferedLogSink const&) = delete;
Expand Down Expand Up @@ -187,8 +184,8 @@ class BufferedLogSink : public google::LogSink, private boost::noncopyable {
/// Mutex protecting accesses to buffered status logs.
Mutex kBufferedLogSinkLogs;

/// Mutex protecting queued status log futures.
Mutex kBufferedLogSinkSenders;
/// Used to wait on the thread that defers relaying the buffered status logs
thread_local std::optional<std::future<void>> kOptBufferedLogSinkSender;

static void serializeIntermediateLog(const std::vector<StatusLogLine>& log,
PluginRequest& request) {
Expand Down Expand Up @@ -292,7 +289,7 @@ void initLogger(const std::string& name) {
if (forward) {
// Begin forwarding after all plugins have been set up.
BufferedLogSink::get().enable();
relayStatusLogs(true);
relayStatusLogs(LoggerRelayMode::Sync);
}
}

Expand Down Expand Up @@ -331,27 +328,26 @@ void BufferedLogSink::send(google::LogSeverity severity,

// The daemon will relay according to the schedule.
if (enabled_ && !isDaemon()) {
relayStatusLogs(FLAGS_logger_status_sync);
relayStatusLogs(FLAGS_logger_status_sync ? LoggerRelayMode::Sync
: LoggerRelayMode::Async);
}
}

void BufferedLogSink::WaitTillSent() {
std::future<void> first;

{
WriteLock lock(kBufferedLogSinkSenders);
if (senders.empty()) {
return;
if (kOptBufferedLogSinkSender.has_value()) {
if (!isPlatform(PlatformType::TYPE_WINDOWS)) {
kOptBufferedLogSinkSender->wait();
} else {
/* We cannot wait indefinitely because glog doesn't use read/write locks
on Windows. When we are in a recursive logging situation, there's a
thread that is waiting here for a new thread it launched to finish its
logging, and it does so while holding an exclusive lock inside glog
(sink_mutex_), instead of in read mode only. The new thread needs to be
able to acquire the same lock to log the message though,
so unless this thread yields, we end up in a deadlock. */
kOptBufferedLogSinkSender->wait_for(std::chrono::microseconds(100));
}
first = std::move(senders.back());
senders.pop();
}

if (!isPlatform(PlatformType::TYPE_WINDOWS)) {
first.wait();
} else {
// Windows is locking by scheduling an async on the main thread.
first.wait_for(std::chrono::microseconds(100));
kOptBufferedLogSinkSender.reset();
}
}

Expand Down Expand Up @@ -483,12 +479,7 @@ size_t queuedStatuses() {
return BufferedLogSink::get().dump().size();
}

size_t queuedSenders() {
ReadLock lock(kBufferedLogSinkSenders);
return BufferedLogSink::get().senders.size();
}

void relayStatusLogs(bool async) {
void relayStatusLogs(LoggerRelayMode relay_mode) {
if (FLAGS_disable_logging || !databaseInitialized()) {
// The logger plugins may not be setUp if logging is disabled.
// If the database is not setUp, or is in a reset, status logs continue
Expand Down Expand Up @@ -533,16 +524,12 @@ void relayStatusLogs(bool async) {
}
});

if (async) {
if (relay_mode == LoggerRelayMode::Sync) {
sender();
} else {
std::packaged_task<void()> task(std::move(sender));
auto result = task.get_future();
kOptBufferedLogSinkSender = task.get_future();
std::thread(std::move(task)).detach();

// Lock accesses to the sender queue.
WriteLock lock(kBufferedLogSinkSenders);
BufferedLogSink::get().senders.push(std::move(result));
}
}

Expand Down
7 changes: 3 additions & 4 deletions osquery/logger/tests/logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ class RecursiveLoggerPlugin : public LoggerPlugin {
}

public:
size_t statuses{0};
std::atomic<size_t> statuses{0};
};

TEST_F(LoggerTests, test_recursion) {
Expand Down Expand Up @@ -482,14 +482,13 @@ TEST_F(LoggerTests, test_recursion) {
EXPECT_EQ(3U, plugin->statuses);

// All of recursive log lines will sink during the next call.
relayStatusLogs(true);
relayStatusLogs(LoggerRelayMode::Sync);
EXPECT_EQ(4U, plugin->statuses);
relayStatusLogs(true);
relayStatusLogs(LoggerRelayMode::Sync);
EXPECT_EQ(5U, plugin->statuses);
setToolType(tool_type);

EXPECT_EQ(0U, queuedStatuses());
EXPECT_EQ(0U, queuedSenders());

// Make sure the test file does not create a filesystem log.
// This will happen if the logtostderr is not set.
Expand Down
2 changes: 1 addition & 1 deletion plugins/logger/tests/filesystem_logger_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ TEST_F(FilesystemLoggerTests, test_log_status) {
lines = osquery::split(content, "\n").size();
EXPECT_EQ(6U, lines);

relayStatusLogs(true);
relayStatusLogs(LoggerRelayMode::Async);
content.clear();
readFile(status_path, content);
lines = osquery::split(content, "\n").size();
Expand Down

0 comments on commit 3541ef9

Please sign in to comment.