Skip to content

Commit

Permalink
fix osquery#4810 syslog pipe hang
Browse files Browse the repository at this point in the history
Refactored osquery#5259 to version 4.0
  • Loading branch information
ycooper committed Aug 27, 2019
1 parent a1bbfd8 commit 21048dc
Show file tree
Hide file tree
Showing 6 changed files with 314 additions and 35 deletions.
1 change: 1 addition & 0 deletions osquery/events/CMakeLists.txt
Expand Up @@ -80,6 +80,7 @@ function(generateOsqueryEvents)
linux/auditdnetlink.h
linux/auditeventpublisher.h
linux/inotify.h
linux/pipe_liner.h
linux/process_events.h
linux/process_file_events.h
linux/selinux_events.h
Expand Down
198 changes: 198 additions & 0 deletions osquery/events/linux/pipe_liner.h
@@ -0,0 +1,198 @@
#ifndef _PIPE_LINER_H_
#define _PIPE_LINER_H_

#include <fcntl.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>

// default max syslog line is 1024, but can be configured as high as 8192

#define LINE_BUF_MAX 16384

struct PipeLinerListener {
virtual void onLine(std::string line) = 0;
virtual ~PipeLinerListener() {}
};

/**
* A non-blocking buffered line reader that uses select()
*/
struct PipeLiner {
PipeLiner(PipeLinerListener* listener, uint32_t chunksize = 4096)
: chunksize_(chunksize),
path_(),
fd_(0),
buf_(),
listener_(listener),
tmpbuf_(chunksize) {
buf_.reserve(chunksize_ * 2);
}
~PipeLiner() {}

/**
* Open the pipe file read-only and non-blocking.
* @returns true on error, false on success
*/
bool open(std::string pipe_file_path) {
if (path_.size() > 0) {
return true;
}
path_ = pipe_file_path;
fd_ = ::open(path_.c_str(), O_RDONLY | O_NONBLOCK);

return (fd_ <= 0);
}

/**
* reads data and calls listener_.onLine() with any new lines.
* @returns -1 if select() error, 0 timeout waiting for data, 1 if data read.
*/
int update() {
fd_set set;
struct timeval timeout = {timeout_sec_, (int)timeout_usec_};

FD_ZERO(&set); /* clear the set */
FD_SET(fd_, &set); /* add our file descriptor to the set */

// is data available?

int rv = select(fd_ + 1, &set, NULL, NULL, &timeout);
if (rv == -1) {
return rv; // error
} else if (rv == 0) {
return rv; // timeout waiting for data
}

// Only iterate N times, rather than getting stuck when lots of data.
// If chunksize is 4096, but line is 8192 bytes, it will take a couple of
// reads to get it all.

for (int i = 0; i < read_loop_count_; i++) {
// setup a pointer to end of existing data in buffer

ssize_t avail = read(fd_, tmpbuf_.data(), tmpbuf_.size() - 1);
if (avail > 0) {
tmpbuf_[avail] = 0; // add null terminator

_onBuffer(tmpbuf_.data(), avail);
}

// if read less than size of buffer, no need to loop

if (avail < (ssize_t)(tmpbuf_.size() - 1))
break;
}

return 1;
}

/*
* close file if opened
*/
void close() {
if (fd_ <= 0) {
return;
}

::close(fd_);
fd_ = 0;
}

/*
* Process a chunk of data from pipe.
* It may or may not include a full line.
*/
void _onBuffer(const char* tmpbuf, ssize_t avail) {
const char* ptr = tmpbuf;
size_t remaining = (size_t)avail;

// check for existing partial line in buf_

if (buf_.size() > 0) {
auto existingSize = buf_.size();

if ((existingSize + avail) > LINE_BUF_MAX) {
// drop existing data
buf_.resize(0);

} else {
// append
buf_.resize(existingSize + avail);
memcpy(buf_.data() + existingSize, tmpbuf, avail);

// overwrite local tracking vars to use _buf
ptr = buf_.data();
remaining = buf_.size();
}
}

// process lines in buffer

while (remaining > 0) {
// find end of line

auto end = ptr + remaining;
auto pos = ptr;
while (pos < end && *pos != '\n') {
pos++;
}

if (pos == end) {
// no end of line
_stash((char*)ptr, remaining);
return;
}

// send to listener

auto line = std::string(ptr, pos - ptr);
auto len = line.size();
if (listener_ != 0L) {
listener_->onLine(line);
}

ptr += len + 1;
remaining -= len + 1;
}

// We were able to read entire buffer - shrink buf_

if (buf_.size() > 0) {
buf_.resize(0);
}
}

/*
* have a partial line, store in buf_ for now
*/
void _stash(char* ptr, size_t len) {
// copy to a temp vector

auto tmp = std::vector<char>(len);
memcpy(tmp.data(), ptr, len);

// add more capacity if needed for next read

if ((buf_.capacity() - len) < chunksize_) {
buf_.reserve(buf_.size() * 2);
}

// copy to buf_

buf_.resize(len);
memcpy(buf_.data(), tmp.data(), len);
}

uint32_t chunksize_;
std::string path_;
int fd_;
std::vector<char> buf_; // used if partial lines remain from last read
PipeLinerListener* listener_;
uint32_t timeout_sec_{0};
uint32_t timeout_usec_{500000}; // 500ms
int read_loop_count_{3};
std::vector<char> tmpbuf_; // buffer filled with read()
};

#endif // _PIPE_LINER_H_
57 changes: 26 additions & 31 deletions osquery/events/linux/syslog.cpp
Expand Up @@ -9,6 +9,7 @@
#include <fcntl.h>
#include <grp.h>
#include <sys/file.h>
#include <sys/select.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
Expand Down Expand Up @@ -61,6 +62,11 @@ Status SyslogEventPublisher::setUp() {
return Status(1, "Publisher disabled via configuration");
}

// The pipe file should exist as part of install / config. We create
// pipe file here if it doesn't exist, but won't receive events until:
// - the /etc/rsyslog.d/90-osquery.conf file is in place
// - the syslog service is restarted

Status s;
if (!pathExists(FLAGS_syslog_pipe_path)) {
VLOG(1) << "Pipe does not exist: creating pipe " << FLAGS_syslog_pipe_path;
Expand All @@ -87,9 +93,7 @@ Status SyslogEventPublisher::setUp() {
// without blocking for a writer. We won't ever write to the pipe, but we
// don't want to block here and will instead block waiting for a read in the
// run() method
readStream_.open(FLAGS_syslog_pipe_path,
std::ifstream::in | std::ifstream::out);
if (!readStream_.good()) {
if (pipeReader_.open(FLAGS_syslog_pipe_path)) {
return Status(1,
"Error opening pipe for reading: " + FLAGS_syslog_pipe_path);
}
Expand Down Expand Up @@ -150,39 +154,30 @@ void SyslogEventPublisher::unlockPipe() {
}
}

Status SyslogEventPublisher::run() {
// This run function will be called by the event factory with ~100ms pause
// (see InterruptableRunnable::pause()) between runs. In case something goes
// weird and there is a huge amount of input, we limit how many logs we
// take in per run to avoid pegging the CPU.
for (size_t i = 0; i < FLAGS_syslog_rate_limit; ++i) {
if (readStream_.rdbuf()->in_avail() == 0) {
// If there is no pending data, we have flushed everything and can wait
// until the next time EventFactory calls run(). This also allows the
// thread to join when it is stopped by EventFactory.
return Status::success();
}
std::string line;
std::getline(readStream_, line);
auto ec = createEventContext();
Status status = populateEventContext(line, ec);
if (status.ok()) {
fire(ec);
if (errorCount_ > 0) {
--errorCount_;
}
} else {
LOG(ERROR) << status.getMessage() << " in line: " << line;
++errorCount_;
if (errorCount_ >= kErrorThreshold) {
return Status(1, "Too many errors in syslog parsing.");
}
void SyslogEventPublisher::onLine(std::string line) {
auto ec = createEventContext();
Status status = populateEventContext(line, ec);
if (status.ok()) {
fire(ec);
if (errorCount_ > 0) {
--errorCount_;
}
} else {
LOG(ERROR) << status.getMessage() << " in line: " << line;
++errorCount_;
}
}

Status SyslogEventPublisher::run() {
pipeReader_.update();
if (errorCount_ >= kErrorThreshold) {
return Status(1, "Too many errors in syslog parsing.");
}
return Status::success();
}

void SyslogEventPublisher::tearDown() {
pipeReader_.close();
unlockPipe();
}

Expand Down Expand Up @@ -218,4 +213,4 @@ bool SyslogEventPublisher::shouldFire(const SyslogSubscriptionContextRef& sc,
const SyslogEventContextRef& ec) const {
return true;
}
}
} // namespace osquery
15 changes: 11 additions & 4 deletions osquery/events/linux/syslog.h
Expand Up @@ -15,6 +15,8 @@

#include <osquery/events.h>

#include "pipe_liner.h"

namespace osquery {

/**
Expand Down Expand Up @@ -53,7 +55,8 @@ using SyslogSubscriptionContextRef = std::shared_ptr<SyslogSubscriptionContext>;
* publisher will read from.
*/
class SyslogEventPublisher
: public EventPublisher<SyslogSubscriptionContext, SyslogEventContext> {
: public EventPublisher<SyslogSubscriptionContext, SyslogEventContext>,
public PipeLinerListener {
DECLARE_PUBLISHER("syslog");

public:
Expand All @@ -66,7 +69,11 @@ class SyslogEventPublisher
Status run() override;

public:
SyslogEventPublisher() : EventPublisher(), errorCount_(0), lockFd_(-1) {}
SyslogEventPublisher()
: EventPublisher(), pipeReader_(this), errorCount_(0), lockFd_(-1) {}

public:
virtual void onLine(std::string line) override;

private:
/// Apply normal subscription to event matching logic.
Expand Down Expand Up @@ -113,7 +120,7 @@ class SyslogEventPublisher
/**
* @brief Input stream for reading from the pipe.
*/
std::fstream readStream_;
PipeLiner pipeReader_;

/**
* @brief Counter used to shut down thread when too many errors occur.
Expand Down Expand Up @@ -201,4 +208,4 @@ class RsyslogCsvSeparator {
private:
bool last_;
};
}
} // namespace osquery
22 changes: 22 additions & 0 deletions osquery/events/tests/CMakeLists.txt
Expand Up @@ -10,6 +10,7 @@ function(osqueryEventsTestsMain)

if(DEFINED PLATFORM_LINUX)
generateOsqueryEventsTestsSyslogtestsTest()
generateOsqueryEventsTestsPipelinertestsTest()
generateOsqueryEventsTestsAudittestsTest()
generateOsqueryEventsTestsProcessfileeventstestsTest()
generateOsqueryEventsTestsInotifytestsTest()
Expand Down Expand Up @@ -88,6 +89,27 @@ function(generateOsqueryEventsTestsSyslogtestsTest)
)
endfunction()

function(generateOsqueryEventsTestsPipelinertestsTest)
add_osquery_executable(osquery_events_tests_pipelinertests-test linux/pipe_liner_tests.cpp)

target_link_libraries(osquery_events_tests_pipelinertests-test PRIVATE
global_cxx_settings
osquery_config_tests_testutils
osquery_core
osquery_core_sql
osquery_database
osquery_events
osquery_filesystem
osquery_remote_tests_remotetestutils
osquery_tables_system_systemtable
osquery_utils
osquery_utils_conversions
plugins_database_ephemeral
specs_tables
thirdparty_googletest
)
endfunction()

function(generateOsqueryEventsTestsAudittestsTest)
add_osquery_executable(osquery_events_tests_audittests-test linux/audit_tests.cpp)

Expand Down

0 comments on commit 21048dc

Please sign in to comment.