Skip to content

Commit

Permalink
Add support for hdfs tier audit
Browse files Browse the repository at this point in the history
  • Loading branch information
satish-mittal committed Feb 21, 2013
1 parent c7a7354 commit 5bbe920
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 31 deletions.
149 changes: 137 additions & 12 deletions src/scribe_audit.cpp
@@ -1,4 +1,3 @@
// Copyright (c) 2007-2008 Facebook
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,9 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
// See accompanying file LICENSE or visit the Scribe site at:
// http://developers.facebook.com/scribe/
//
// @author Satish Mittal

#include "common.h"
Expand All @@ -36,6 +32,8 @@ unsigned char const magicBytes[] = {0xAB, 0xCD, 0xEF};
const int headerLength = 16;
// minimum cut-off value of timestamp (01-Jan-2013)
const unsigned long long minTimestamp = 1356998400000LL;
// constant string for file store tier name
static const string fileStoreTier = "hdfs";

AuditManager::AuditManager(const shared_ptr<StoreQueue> pAuditStore) {
auditStore = pAuditStore;
Expand Down Expand Up @@ -124,7 +122,8 @@ void AuditManager::auditMessage(const LogEntry& entry, bool received) {
}

void AuditManager::auditMessages(shared_ptr<logentry_vector_t>& messages,
const string& category, unsigned long offset, unsigned long count, bool received) {
unsigned long offset, unsigned long count, const string& category, bool received,
bool auditFileStore, const string& filename) {
// this store queue must be configured for audit topic
if (!auditStore->isAuditStore()) {
return;
Expand All @@ -139,6 +138,12 @@ void AuditManager::auditMessages(shared_ptr<logentry_vector_t>& messages,
// get the audit message entry in audit map for the given category
shared_ptr<audit_msg_t> audit_msg = getAuditMsg(category);

// if file audit is enabled, get file audit message entry for given filename
shared_ptr<file_audit_msg_t> file_audit_msg;
if (auditFileStore) {
file_audit_msg = getFileAuditMsg(filename, category);
}

for (unsigned long index = offset; index < offset + count; index++) {
// get the timestamp of message and update the appropriate counter in audit msg
unsigned long long tsKey = validateMessageAndGetTimestamp(*(messages->at(index)));
Expand All @@ -149,6 +154,13 @@ void AuditManager::auditMessages(shared_ptr<logentry_vector_t>& messages,

// update audit message counter for the given message
updateAuditMessageCounter(audit_msg, tsKey, received);

// if file audit is enabled and messages are sent, update file audit counters
// for given message
if (auditFileStore && file_audit_msg != NULL && file_audit_msg.get() != NULL
&& received == false) {
updateFileAuditMessageCounter(file_audit_msg, tsKey);
}
}

// finally, release the audit RW mutex
Expand Down Expand Up @@ -197,6 +209,36 @@ shared_ptr<audit_msg_t> AuditManager::getAuditMsg(const string& category) {
return audit_msg;
}

shared_ptr<file_audit_msg_t> AuditManager::getFileAuditMsg(const string& filename,
const string& category) {
shared_ptr<file_audit_msg_t> file_audit_msg;
file_audit_map_t::iterator file_audit_iter;
if ((file_audit_iter = fileAuditMap.find(filename)) != fileAuditMap.end()) {
file_audit_msg = file_audit_iter->second;
}

if (file_audit_msg == NULL) {
// acquire write lock to add a new file audit message for this filename
auditRWMutex->release();
auditRWMutex->acquireWrite();

if ((file_audit_iter = fileAuditMap.find(filename)) != fileAuditMap.end()) {
file_audit_msg = file_audit_iter->second;
} else {
// create a new file audit message for this file name
file_audit_msg = shared_ptr<file_audit_msg_t>(new file_audit_msg_t);
file_audit_msg->topic = category;
file_audit_msg->filename = filename;
file_audit_msg->fileClosed = false;
file_audit_msg->receivedCount = 0;
// add file audit msg to audit map
fileAuditMap[filename] = file_audit_msg;
}
}

return file_audit_msg;
}

void AuditManager::updateAuditMessageCounter(shared_ptr<audit_msg_t>& audit_msg,
unsigned long long timestampKey, bool received) {
// acquire mutex to synchronize access to map and insert/increment counter
Expand All @@ -213,6 +255,33 @@ void AuditManager::updateAuditMessageCounter(shared_ptr<audit_msg_t>& audit_msg,
pthread_mutex_unlock(&(audit_msg->mutex));
}

void AuditManager::updateFileAuditMessageCounter(shared_ptr<file_audit_msg_t>& file_audit_msg,
unsigned long long timestampKey) {
unsigned long long counter = file_audit_msg->received[timestampKey];
file_audit_msg->received[timestampKey] = ++counter;
++(file_audit_msg->receivedCount);
}

void AuditManager::auditFileClosed(const std::string& filename) {
// acquire read lock
RWGuard rwMonitor(*auditRWMutex);

// get the file audit message entry for given filename
shared_ptr<file_audit_msg_t> file_audit_msg;
file_audit_map_t::iterator file_audit_iter;
if ((file_audit_iter = fileAuditMap.find(filename)) != fileAuditMap.end()) {
file_audit_msg = file_audit_iter->second;
}

// file audit entry should be present in map if messages were writen to this file
if (file_audit_msg == NULL) {
return;
}

// set the fileClosed flag to true
file_audit_msg->fileClosed = true;
}

unsigned long long AuditManager::validateMessageAndGetTimestamp(const LogEntry& entry) {
// assuming that logEntry message is of the format:
// <version><magic bytes><timestamp><message size><message>
Expand All @@ -227,14 +296,12 @@ unsigned long long AuditManager::validateMessageAndGetTimestamp(const LogEntry&
// first validate the version byte
int version = (int)(data[0]);
if (version != 1) {
LOG_OPER("Audit: ERROR: version byte mismatch; expected [1] but received [%d]", version);
return 0;
}

// now validate magic bytes
if ((((unsigned char)data[1]) != magicBytes[0]) || (((unsigned char)data[2]) != magicBytes[1]) ||
(((unsigned char)data[3]) != magicBytes[2])) {
LOG_OPER("Audit: ERROR: magic bytes mismatch");
return 0;
}

Expand All @@ -261,8 +328,6 @@ unsigned long long AuditManager::validateMessageAndGetTimestamp(const LogEntry&
((int)(data[14] & 0xff) << 8) |
((int)(data[15] & 0xff));
if ((int)entry.message.length() != size + headerLength) {
LOG_OPER("Audit: ERROR: message length mismatch; expected [%d] but received [%d]",
size, (int)entry.message.length() - headerLength);
return 0;
}

Expand All @@ -278,10 +343,10 @@ void AuditManager::performAuditTask() {
// acquire write lock on auditRWMutex using RWGuard
RWGuard rwMonitor(*auditRWMutex, true);

// create a LogEntry instance from audit message per store and add it to message queue
try {
shared_ptr<audit_msg_t> audit_msg;
audit_map_t::iterator audit_iter;
// create a LogEntry instance from audit message per store and add it to message queue
for (audit_iter = auditMap.begin(); audit_iter != auditMap.end(); audit_iter++) {
audit_msg = audit_iter->second;
// skip auditing if received & sent maps are empty
Expand All @@ -306,9 +371,43 @@ void AuditManager::performAuditTask() {
audit_msg->sentCount = 0;
}
} catch (const std::exception& e) {
LOG_OPER("[Audit] Store thread failed to perform audit task. Error <%s>", e.what());
LOG_OPER("[Audit] Store thread failed to perform message audit task. Error <%s>", e.what());
} catch (...) {
LOG_OPER("[Audit] Store thread failed to perform audit task. Unexpected error");
LOG_OPER("[Audit] Store thread failed to perform message audit task. Unexpected error");
}

// create a LogEntry instance for each file audit message and add it to message queue
try {
shared_ptr<file_audit_msg_t> file_audit_msg;
file_audit_map_t::iterator file_audit_iter = fileAuditMap.begin();

while (file_audit_iter != fileAuditMap.end()) {
file_audit_msg = file_audit_iter->second;

// skip the entry if file is not closed yet
if (file_audit_msg->fileClosed == false) {
++file_audit_iter;
} else {
// skip auditing if received map is empty
if (file_audit_msg->received.size() != 0) {
LOG_OPER("[Audit] category [%s], file [%s], messages received [%llu]",
file_audit_msg->topic.c_str(), file_audit_msg->filename.c_str(),
file_audit_msg->receivedCount);

// create a LogEntry instance from audit msg
shared_ptr<LogEntry> entry = serializeFileAuditMsg(file_audit_msg, timeInMillis);

// add the LogEntry instance to store queue
auditStore->addMessage(entry);
}
// delete the entry from file audit map
fileAuditMap.erase(file_audit_iter++);
}
}
} catch (const std::exception& e) {
LOG_OPER("[Audit] Store thread failed to perform file audit task. Error <%s>", e.what());
} catch (...) {
LOG_OPER("[Audit] Store thread failed to perform file audit task. Unexpected error");
}
}

Expand Down Expand Up @@ -337,3 +436,29 @@ shared_ptr<LogEntry> AuditManager::serializeAuditMsg(shared_ptr<audit_msg_t>& au

return entry;
}

shared_ptr<LogEntry> AuditManager::serializeFileAuditMsg(shared_ptr<file_audit_msg_t>& file_audit_msg,
long timeInMillis) {
// create an instance of Thrift AuditMessage from the given file_audit_msg
boost::shared_ptr<AuditMessage> Audit = boost::shared_ptr<AuditMessage>(new AuditMessage);

Audit->timestamp = timeInMillis;
Audit->topic = file_audit_msg->topic;
Audit->hostname = hostName;
Audit->tier = fileStoreTier;
Audit->windowSize = windowSize;
Audit->received = file_audit_msg->received;
Audit->filenames.push_back(file_audit_msg->filename);

// Perform in-memory Thrift serialization of Audit message content
shared_ptr<TMemoryBuffer> tmembuf(new TMemoryBuffer);
shared_ptr<TBinaryProtocol> tprot(new TBinaryProtocol(tmembuf));
Audit->write(tprot.get());

// create a LogEntry instance using the serialized payload as string
shared_ptr<LogEntry> entry(new LogEntry);
entry->category = auditTopic;
entry->message = tmembuf->getBufferAsString();

return entry;
}
55 changes: 43 additions & 12 deletions src/scribe_audit.h
@@ -1,4 +1,3 @@
// Copyright (c) 2007-2008 Facebook
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,9 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
// See accompanying file LICENSE or visit the Scribe site at:
// http://developers.facebook.com/scribe/
//
// @author Satish Mittal

#ifndef SCRIBE_AUDIT_H
Expand All @@ -23,6 +19,7 @@
#include "common.h"
#include "src/gen-cpp/audit_types.h"

// this struct holds audit info about msgs received/sent for a given topic
struct AuditMsg {
std::string topic;
std::map<long, long> received;
Expand All @@ -32,8 +29,22 @@ struct AuditMsg {
pthread_mutex_t mutex;
};

// this struct holds audit info about mgs written in a given hdfs file
struct FileAuditMsg {
std::string topic;
std::map<long, long> received;
unsigned long long receivedCount;
bool fileClosed;
std::string filename;
};

typedef AuditMsg audit_msg_t;
typedef FileAuditMsg file_audit_msg_t;

// this audit map has key as category and value as audit_msg_t instance
typedef std::map<std::string, boost::shared_ptr<audit_msg_t> > audit_map_t;
// this audit map has key as filename and value as file_audit_msg_t instance
typedef std::map<std::string, boost::shared_ptr<file_audit_msg_t> > file_audit_map_t;

class StoreQueue;

Expand All @@ -44,32 +55,52 @@ class AuditManager {
AuditManager(const boost::shared_ptr<StoreQueue> pAuditStore);
~AuditManager();

// this method allows various threads to audit a message when it is received/sent
// this method allows various threads to audit the event that a message is received/sent
void auditMessage(const scribe::thrift::LogEntry& entry, bool received);
void auditMessages(boost::shared_ptr<logentry_vector_t>& messages, const std::string& category,
unsigned long offset, unsigned long count, bool received);
// this method is called by audit store thread to periodically generate audit messages
// for all categories and add them to message queue.
// this method allows various threads to audit the event that a batch of messages are
// received/sent. Additionally, if scribe sends messages to a file store and auditFileStore
// flag is set to true, this method audits on behalf of file store the event that the
// file store received the messages,
void auditMessages(boost::shared_ptr<logentry_vector_t>& messages, unsigned long offset,
unsigned long count, const std::string& category, bool received, bool auditFileStore,
const std::string& filename);
// this method audits the event that the given file is closed. This would allow audit
// store thread to generate audit message for this file.
void auditFileClosed(const std::string& filename);
// this method is called by audit store thread periodically to generate audit messages
// for all categories/file stores and add them to message queue.
void performAuditTask();

private:
// get the audit message entry in audit map for the given category
boost::shared_ptr<audit_msg_t> getAuditMsg(const std::string& category);
// method to validate message headers. If message is valid it returns timestamp else 0.
// get the file audit message entry in file audit map for the given filename
boost::shared_ptr<file_audit_msg_t> getFileAuditMsg(const std::string& filename,
const std::string& category);
// method to validate message headers. If message header is valid, this method returns
// timestamp key else returns 0.
unsigned long long validateMessageAndGetTimestamp(const scribe::thrift::LogEntry& entry);
// update the audit message counter for the given message
void updateAuditMessageCounter(boost::shared_ptr<audit_msg_t>& audit_msg,
unsigned long long timestampKey, bool received);
unsigned long long timestampKey, bool received);
// update file audit message counter for the given message key. This method will be called
// only when messages are sent to a file store and file audit is enabled.
void updateFileAuditMessageCounter(boost::shared_ptr<file_audit_msg_t>& file_audit_msg,
unsigned long long timestampKey);
// serialize the audit message
logentry_ptr_t serializeAuditMsg(boost::shared_ptr<audit_msg_t>& audit_msg,
long timeInMillis);
long timeInMillis);
// serialize the file audit message
logentry_ptr_t serializeFileAuditMsg(boost::shared_ptr<file_audit_msg_t>& file_audit_msg,
long timeInMillis);

// audit configuration
boost::shared_ptr<StoreQueue> auditStore;
std::string hostName;
std::string tier;
long int windowSize;
audit_map_t auditMap;
file_audit_map_t fileAuditMap;
boost::shared_ptr<apache::thrift::concurrency::ReadWriteMutex> auditRWMutex;
};

Expand Down

0 comments on commit 5bbe920

Please sign in to comment.