Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Make AnalyticsLogger more robust against errors. Instead of bailing o…

…ut, only print warnings.
  • Loading branch information...
commit 777c29f8f8fc8ee5596060a87a7a8eae0ce72e4c 1 parent 811b88c
@FooBarWidget FooBarWidget authored
Showing with 501 additions and 285 deletions.
  1. +481 −261 ext/common/Logging.h
  2. +20 −24 test/cxx/LoggingTest.cpp
View
742 ext/common/Logging.h
@@ -26,6 +26,8 @@
#define _PASSENGER_LOGGING_H_
#include <boost/shared_ptr.hpp>
+#include <boost/noncopyable.hpp>
+#include <oxt/thread.hpp>
#include <oxt/system_calls.hpp>
#include <oxt/backtrace.hpp>
@@ -38,21 +40,23 @@
#include <pthread.h>
#include <string>
#include <map>
+#include <stdexcept>
#include <ostream>
#include <sstream>
#include <cstdio>
#include <ctime>
#include <cerrno>
-#include "RandomGenerator.h"
-#include "FileDescriptor.h"
-#include "MessageClient.h"
-#include "StaticString.h"
-#include "Exceptions.h"
-#include "Utils.h"
-#include "Utils/StrIntUtils.h"
-#include "Utils/MD5.h"
-#include "Utils/SystemTime.h"
+#include <RandomGenerator.h>
+#include <FileDescriptor.h>
+#include <StaticString.h>
+#include <Exceptions.h>
+#include <Utils.h>
+#include <Utils/ScopeGuard.h>
+#include <Utils/MessageIO.h>
+#include <Utils/StrIntUtils.h>
+#include <Utils/MD5.h>
+#include <Utils/SystemTime.h>
namespace Passenger {
@@ -152,51 +156,89 @@ void setDebugFile(const char *logFile = NULL);
/********** Analytics logging facilities *********/
-struct AnalyticsLoggerSharedData {
- boost::mutex lock;
- MessageClient client;
+// All access to the file descriptor must be synchronized through the lock.
+struct AnalyticsLoggerConnection {
+ mutable boost::mutex lock;
+ FileDescriptor fd;
- void disconnect(bool checkErrorResponse = false) {
- if (checkErrorResponse && client.connected()) {
- // Maybe the server sent us an error message and closed
- // the connection. Let's check.
- TRACE_POINT();
- vector<string> args;
- bool hasData = true;
-
- try {
- hasData = client.read(args);
- } catch (const SystemException &e) {
- if (e.code() != ECONNRESET) {
- throw;
- }
+ AnalyticsLoggerConnection(FileDescriptor _fd)
+ : fd(_fd)
+ { }
+
+ bool connected() const {
+ return fd != -1;
+ }
+
+ bool disconnect(string &errorResponse) {
+ if (!connected()) {
+ return false;
+ }
+
+ // The server might send an "error" array message
+ // just before disconnecting. Try to read it.
+ TRACE_POINT();
+ vector<string> response;
+ try {
+ while (true) {
+ unsigned long long timeout = 10000;
+ response = readArrayMessage(fd, &timeout);
}
-
- UPDATE_TRACE_POINT();
- client.disconnect();
- if (hasData) {
- if (args[0] == "error") {
- throw IOException("The logging server responded with an error: " + args[1]);
- } else {
- throw IOException("The logging server sent an unexpected reply.");
- }
+ } catch (const TimeoutException &) {
+ // This means that the last message isn't an array
+ // message or that the server didn't send it quickly
+ // enough. In any case, discard whatever previous
+ // array messages we were able to read because they're
+ // guaranteed not to be the error message we're expecting.
+ response.clear();
+ } catch (const SystemException &e) {
+ // We treat ECONNRESET the same as EOFException.
+ // Other errors are treated as TimeoutException.
+ if (e.code() != ECONNRESET) {
+ response.clear();
}
+ } catch (const EOFException &) {
+ // Do nothing. We've successfully read the last array message.
+ }
+
+ this_thread::disable_interruption di;
+ this_thread::disable_syscall_interruption dsi;
+ UPDATE_TRACE_POINT();
+ fd.close();
+
+ if (response.size() == 2 && response[0] == "error") {
+ errorResponse = response[1];
+ return true;
} else {
- client.disconnect();
+ return false;
}
}
+
+ void disconnect() {
+ fd.close();
+ }
};
-typedef shared_ptr<AnalyticsLoggerSharedData> AnalyticsLoggerSharedDataPtr;
+
+typedef shared_ptr<AnalyticsLoggerConnection> AnalyticsLoggerConnectionPtr;
+
+
+enum ExceptionHandlingMode {
+ PRINT,
+ THROW,
+ IGNORE
+};
+
class AnalyticsLog {
private:
static const int INT64_STR_BUFSIZE = 22; // Long enough for a 64-bit number.
-
- AnalyticsLoggerSharedDataPtr sharedData;
- string txnId;
- string groupName;
- string category;
- string unionStationKey;
+ static const unsigned long long IO_TIMEOUT = 5000000; // In microseconds.
+
+ const AnalyticsLoggerConnectionPtr connection;
+ const string txnId;
+ const string groupName;
+ const string category;
+ const string unionStationKey;
+ const ExceptionHandlingMode exceptionHandlingMode;
bool shouldFlushToDiskAfterClose;
/**
@@ -227,68 +269,135 @@ class AnalyticsLog {
return buffer + 1;
}
-public:
- AnalyticsLog() { }
-
- AnalyticsLog(const AnalyticsLoggerSharedDataPtr &sharedData, const string &txnId,
- const string &groupName, const string &category, const string &unionStationKey)
- {
- this->sharedData = sharedData;
- this->txnId = txnId;
- this->groupName = groupName;
- this->category = category;
- this->unionStationKey = unionStationKey;
- shouldFlushToDiskAfterClose = false;
+ template<typename ExceptionType>
+ void handleException(const ExceptionType &e) {
+ switch (exceptionHandlingMode) {
+ case THROW:
+ throw e;
+ case PRINT:
+ try {
+ const tracable_exception &te =
+ dynamic_cast<const tracable_exception &>(e);
+ P_WARN(te.what() << "\n" << te.backtrace());
+ } catch (const bad_cast &) {
+ P_WARN(e.what());
+ }
+ break;
+ default:
+ break;
+ }
}
+public:
+ AnalyticsLog()
+ : exceptionHandlingMode(PRINT)
+ { }
+
+ AnalyticsLog(const AnalyticsLoggerConnectionPtr &_connection,
+ const string &_txnId,
+ const string &_groupName,
+ const string &_category,
+ const string &_unionStationKey,
+ ExceptionHandlingMode _exceptionHandlingMode = PRINT)
+ : connection(_connection),
+ txnId(_txnId),
+ groupName(_groupName),
+ category(_category),
+ unionStationKey(_unionStationKey),
+ exceptionHandlingMode(_exceptionHandlingMode),
+ shouldFlushToDiskAfterClose(false)
+ { }
+
~AnalyticsLog() {
- if (sharedData != NULL) {
- lock_guard<boost::mutex> l(sharedData->lock);
- if (sharedData->client.connected()) {
- try {
- char timestamp[2 * sizeof(unsigned long long) + 1];
- integerToHexatri<unsigned long long>(SystemTime::getUsec(),
- timestamp);
- sharedData->client.write("closeTransaction",
- txnId.c_str(), timestamp, NULL);
- } catch (const SystemException &e) {
- if (e.code() == EPIPE || e.code() == ECONNRESET) {
- TRACE_POINT();
- sharedData->disconnect(true);
- } else {
- throw;
- }
- }
-
- if (shouldFlushToDiskAfterClose) {
- vector<string> args;
- sharedData->client.write("flush", NULL);
- sharedData->client.read(args);
- }
+ TRACE_POINT();
+ if (connection == NULL) {
+ return;
+ }
+ lock_guard<boost::mutex> l(connection->lock);
+ if (!connection->connected()) {
+ return;
+ }
+
+ char timestamp[2 * sizeof(unsigned long long) + 1];
+ integerToHexatri<unsigned long long>(SystemTime::getUsec(),
+ timestamp);
+
+ UPDATE_TRACE_POINT();
+ ScopeGuard guard(boost::bind(&AnalyticsLoggerConnection::disconnect,
+ connection.get()));
+ try {
+ unsigned long long timeout = IO_TIMEOUT;
+ writeArrayMessage(connection->fd, &timeout,
+ "closeTransaction",
+ txnId.c_str(),
+ timestamp,
+ NULL);
+
+ if (shouldFlushToDiskAfterClose) {
+ UPDATE_TRACE_POINT();
+ timeout = IO_TIMEOUT;
+ writeArrayMessage(connection->fd, &timeout,
+ "flush", NULL);
+ readArrayMessage(connection->fd, &timeout);
+ }
+ guard.clear();
+ } catch (const SystemException &e) {
+ string errorResponse;
+
+ UPDATE_TRACE_POINT();
+ guard.clear();
+ if (connection->disconnect(errorResponse)) {
+ handleException(IOException(
+ string("Logging agent disconnected with error: ") +
+ e.what()));
+ } else {
+ handleException(e);
}
}
}
void message(const StaticString &text) {
- if (sharedData != NULL) {
- lock_guard<boost::mutex> l(sharedData->lock);
- if (sharedData->client.connected()) {
- char timestamp[2 * sizeof(unsigned long long) + 1];
- integerToHexatri<unsigned long long>(SystemTime::getUsec(), timestamp);
- sharedData->client.write("log", txnId.c_str(),
- timestamp, NULL);
- sharedData->client.writeScalar(text);
+ TRACE_POINT();
+ if (connection == NULL) {
+ return;
+ }
+ lock_guard<boost::mutex> l(connection->lock);
+ if (!connection->connected()) {
+ return;
+ }
+
+ char timestamp[2 * sizeof(unsigned long long) + 1];
+ integerToHexatri<unsigned long long>(SystemTime::getUsec(), timestamp);
+
+ UPDATE_TRACE_POINT();
+ ScopeGuard guard(boost::bind(&AnalyticsLoggerConnection::disconnect,
+ connection.get()));
+ try {
+ unsigned long long timeout = IO_TIMEOUT;
+ writeArrayMessage(connection->fd, &timeout,
+ "log",
+ txnId.c_str(),
+ timestamp,
+ NULL);
+ writeScalarMessage(connection->fd, text, &timeout);
+ guard.clear();
+ } catch (const std::exception &e) {
+ string errorResponse;
+
+ UPDATE_TRACE_POINT();
+ guard.clear();
+ if (connection->disconnect(errorResponse)) {
+ handleException(IOException(
+ string("Logging agent disconnected with error: ") +
+ e.what()));
+ } else {
+ handleException(e);
}
}
}
void abort(const StaticString &text) {
- if (sharedData != NULL) {
- lock_guard<boost::mutex> l(sharedData->lock);
- if (sharedData->client.connected()) {
- message("ABORT");
- }
- }
+ message("ABORT");
}
void flushToDiskAfterClose(bool value) {
@@ -296,7 +405,7 @@ class AnalyticsLog {
}
bool isNull() const {
- return sharedData == NULL;
+ return connection == NULL;
}
string getTxnId() const {
@@ -318,9 +427,10 @@ class AnalyticsLog {
typedef shared_ptr<AnalyticsLog> AnalyticsLogPtr;
-class AnalyticsScopeLog {
+
+class AnalyticsScopeLog: public boost::noncopyable {
private:
- AnalyticsLog *log;
+ AnalyticsLog * const log;
enum {
NAME,
GRANULAR
@@ -346,8 +456,9 @@ class AnalyticsScopeLog {
}
public:
- AnalyticsScopeLog(const AnalyticsLogPtr &log, const char *name) {
- this->log = log.get();
+ AnalyticsScopeLog(const AnalyticsLogPtr &_log, const char *name)
+ : log(_log.get())
+ {
type = NAME;
data.name = name;
ok = false;
@@ -373,16 +484,18 @@ class AnalyticsScopeLog {
}
}
- AnalyticsScopeLog(const AnalyticsLogPtr &log, const char *beginMessage,
- const char *endMessage, const char *abortMessage = NULL
- ) {
- this->log = log.get();
- if (log != NULL) {
+ AnalyticsScopeLog(const AnalyticsLogPtr &_log,
+ const char *beginMessage,
+ const char *endMessage,
+ const char *abortMessage = NULL)
+ : log(_log.get())
+ {
+ if (_log != NULL) {
type = GRANULAR;
data.granular.endMessage = endMessage;
data.granular.abortMessage = abortMessage;
ok = abortMessage == NULL;
- log->message(beginMessage);
+ _log->message(beginMessage);
}
}
@@ -429,36 +542,37 @@ class AnalyticsScopeLog {
}
};
+
class AnalyticsLogger {
private:
- /** A special lock type for AnalyticsLoggerSharedData that also
+ /** A special lock type for AnalyticsLoggerConnection that also
* keeps a smart pointer to the data structure so that the mutex
* is not destroyed prematurely.
*/
- struct SharedDataLock {
- AnalyticsLoggerSharedDataPtr sharedData;
+ struct ConnectionLock {
+ AnalyticsLoggerConnectionPtr connection;
bool locked;
- SharedDataLock(const AnalyticsLoggerSharedDataPtr &d)
- : sharedData(d)
+ ConnectionLock(const AnalyticsLoggerConnectionPtr &c)
+ : connection(c)
{
- d->lock.lock();
+ c->lock.lock();
locked = true;
}
- ~SharedDataLock() {
+ ~ConnectionLock() {
if (locked) {
- sharedData->lock.unlock();
+ connection->lock.unlock();
}
}
- void reset(const AnalyticsLoggerSharedDataPtr &d, bool lockNow = true) {
+ void reset(const AnalyticsLoggerConnectionPtr &c, bool lockNow = true) {
if (locked) {
- sharedData->lock.unlock();
+ connection->lock.unlock();
}
- sharedData = d;
+ connection = c;
if (lockNow) {
- sharedData->lock.lock();
+ connection->lock.lock();
locked = true;
} else {
locked = false;
@@ -467,39 +581,84 @@ class AnalyticsLogger {
void lock() {
assert(!locked);
- sharedData->lock.lock();
+ connection->lock.lock();
locked = true;
}
};
- static const int RETRY_SLEEP = 200000; // microseconds
-
- string serverAddress;
- string username;
- string password;
- string nodeName;
+ const string serverAddress;
+ const string username;
+ const string password;
+ const string nodeName;
RandomGenerator randomGenerator;
- /** Lock protecting the fields that follow, but not the contents of the shared data. */
+ /** Lock protecting the fields that follow, but not the
+ * contents of the connection object.
+ */
mutable boost::mutex lock;
unsigned int maxConnectTries;
unsigned long long reconnectTimeout;
unsigned long long nextReconnectTime;
- /** @invariant sharedData != NULL */
- AnalyticsLoggerSharedDataPtr sharedData;
+ /** Normally never NULL, except when constructed with the default constructor
+ * or if serverName is empty. In those cases the AnalyticsLogger object is
+ * considered unusable.
+ */
+ AnalyticsLoggerConnectionPtr connection;
+
+ static string determineNodeName(const string &givenNodeName) {
+ if (givenNodeName.empty()) {
+ return getHostName();
+ } else {
+ return givenNodeName;
+ }
+ }
+
+ static bool isNetworkError(int code) {
+ return code == EPIPE || code == ECONNREFUSED || code == ECONNRESET
+ || code == EHOSTUNREACH || code == ENETDOWN || code == ENETUNREACH
+ || code == ETIMEDOUT;
+ }
bool connected() const {
- return sharedData->client.connected();
+ return connection->connected();
}
void connect() {
TRACE_POINT();
+ FileDescriptor fd;
vector<string> args;
+ unsigned long long timeout = 15000000;
+
+ fd = connectToServer(serverAddress);
+ if (!readArrayMessage(fd, args, &timeout)) {
+ throw IOException("The logging agent closed the connection before sending a version identifier.");
+ }
+ if (args.size() != 2 || args[0] != "version") {
+ throw IOException("The logging agent server didn't sent a valid version identifier.");
+ }
+ if (args[1] != "1") {
+ string message = string("Unsupported logging agent protocol version ") +
+ args[1] + ".";
+ throw IOException(message);
+ }
+
+ UPDATE_TRACE_POINT();
+ writeScalarMessage(fd, username, &timeout);
+ writeScalarMessage(fd, password, &timeout);
+
+ UPDATE_TRACE_POINT();
+ if (!readArrayMessage(fd, args, &timeout)) {
+ throw IOException("The logging agent did not send an authentication response.");
+ } else if (args.size() != 1) {
+ throw IOException("The authentication response that the logging agent sent is not valid.");
+ } else if (args[0] != "ok") {
+ throw SecurityException("The logging agent server denied authentication: " + args[0]);
+ }
- sharedData->client.connect(serverAddress, username, password);
- sharedData->client.write("init", nodeName.c_str(), NULL);
- if (!sharedData->client.read(args)) {
+ UPDATE_TRACE_POINT();
+ writeArrayMessage(fd, &timeout, "init", nodeName.c_str(), NULL);
+ if (!readArrayMessage(fd, args, &timeout)) {
throw SystemException("Cannot connect to logging server", ECONNREFUSED);
} else if (args.size() != 1) {
throw IOException("Logging server returned an invalid reply for the 'init' command");
@@ -509,58 +668,48 @@ class AnalyticsLogger {
throw IOException("Logging server returned an invalid reply for the 'init' command");
}
- // Upon a write() error we want to attempt to read() the error
- // message before closing the socket.
- sharedData->client.setAutoDisconnect(false);
- }
-
- void disconnect(bool checkErrorResponse = false) {
- sharedData->disconnect(checkErrorResponse);
- // We create a new SharedData here so that existing AnalyticsLog
- // objects still refer to the old client object and don't interfere
- // with any newly-established connections.
- sharedData.reset(new AnalyticsLoggerSharedData());
- }
-
- bool isNetworkError(int code) const {
- return code == EPIPE || code == ECONNREFUSED || code == ECONNRESET
- || code == EHOSTUNREACH || code == ENETDOWN || code == ENETUNREACH
- || code == ETIMEDOUT;
+ connection = make_shared<AnalyticsLoggerConnection>(fd);
}
public:
AnalyticsLogger() { }
- AnalyticsLogger(const string &serverAddress, const string &username,
- const string &password, const string &nodeName = "")
+ AnalyticsLogger(const string &_serverAddress, const string &_username,
+ const string &_password, const string &_nodeName = "")
+ : serverAddress(_serverAddress),
+ username(_username),
+ password(_password),
+ nodeName(determineNodeName(_nodeName))
{
- this->serverAddress = serverAddress;
- this->username = username;
- this->password = password;
- if (nodeName.empty()) {
- this->nodeName = getHostName();
- } else {
- this->nodeName = nodeName;
- }
if (!serverAddress.empty()) {
- sharedData.reset(new AnalyticsLoggerSharedData());
+ connection = make_shared<AnalyticsLoggerConnection>(FileDescriptor());
}
if (isLocalSocketAddress(serverAddress)) {
maxConnectTries = 10;
} else {
maxConnectTries = 1;
}
- maxConnectTries = 10;
reconnectTimeout = 1000000;
nextReconnectTime = 0;
}
- AnalyticsLogPtr newTransaction(const string &groupName, const string &category = "requests",
+ template<typename T>
+ static bool instanceof(const std::exception &e) {
+ try {
+ dynamic_cast<const T &>(e);
+ return true;
+ } catch (const bad_cast &) {
+ return false;
+ }
+ }
+
+ AnalyticsLogPtr newTransaction(const string &groupName,
+ const string &category = "requests",
const string &unionStationKey = string(),
const string &filters = string())
{
if (serverAddress.empty()) {
- return ptr(new AnalyticsLog());
+ return make_shared<AnalyticsLog>();
}
unsigned long long timestamp = SystemTime::getUsec();
@@ -593,129 +742,200 @@ class AnalyticsLogger {
integerToHexatri<unsigned long long>(timestamp, timestampStr);
unique_lock<boost::mutex> l(lock);
- SharedDataLock sl(sharedData);
+ if (SystemTime::getUsec() < nextReconnectTime) {
+ return make_shared<AnalyticsLog>();
+ }
+ ConnectionLock cl(connection);
- if (SystemTime::getUsec() >= nextReconnectTime) {
- unsigned int tryCount = 0;
-
- while (tryCount < maxConnectTries) {
- try {
- if (!connected()) {
- TRACE_POINT();
- connect();
- }
- sharedData->client.write("openTransaction",
- txnId,
- groupName.c_str(),
- "",
- category.c_str(),
- timestampStr,
- unionStationKey.c_str(),
- "true",
- "true",
- filters.c_str(),
- NULL);
-
- vector<string> args;
- sharedData->client.read(args);
- if (args.size() == 2 && args[0] == "error") {
- disconnect();
- throw IOException("The logging server responded with an error: " + args[1]);
- } else if (args.empty() || args[0] != "ok") {
- disconnect();
- throw IOException("The logging server sent an unexpected reply.");
- }
-
- return ptr(new AnalyticsLog(sharedData,
- string(txnId, end - txnId),
- groupName, category,
- unionStationKey));
- } catch (const SystemException &e) {
- TRACE_POINT();
- if (e.code() == ENOENT || isNetworkError(e.code())) {
- tryCount++;
- disconnect(true);
- sl.reset(sharedData, false);
- l.unlock();
- if (tryCount < maxConnectTries) {
- syscalls::usleep(RETRY_SLEEP);
- }
- l.lock();
- sl.lock();
- } else {
- disconnect();
- throw;
- }
+ if (!connected()) {
+ TRACE_POINT();
+ try {
+ connect();
+ cl.reset(connection);
+ } catch (const TimeoutException &) {
+ P_WARN("Timeout trying to connect to the logging agent at " << serverAddress << "; " <<
+ "will reconnect in " << reconnectTimeout / 1000000 << " second(s).");
+ nextReconnectTime = SystemTime::getUsec() + reconnectTimeout;
+ return make_shared<AnalyticsLog>();
+ } catch (const tracable_exception &e) {
+ nextReconnectTime = SystemTime::getUsec() + reconnectTimeout;
+ if (instanceof<IOException>(e) || instanceof<SystemException>(e)) {
+ P_WARN("Cannot connect to the logging agent at " << serverAddress <<
+ " (" << e.what() << "); will reconnect in " <<
+ reconnectTimeout / 1000000 << " second(s).");
+ return make_shared<AnalyticsLog>();
+ } else {
+ throw;
}
+ }
+ }
+
+ ScopeGuard guard(boost::bind(
+ &AnalyticsLoggerConnection::disconnect,
+ connection.get()));
+ try {
+ unsigned long long timeout = 15000000;
+
+ writeArrayMessage(connection->fd, &timeout,
+ "openTransaction",
+ txnId,
+ groupName.c_str(),
+ "",
+ category.c_str(),
+ timestampStr,
+ unionStationKey.c_str(),
+ "true",
+ "true",
+ filters.c_str(),
+ NULL);
+
+ vector<string> args;
+ if (!readArrayMessage(connection->fd, args, &timeout)) {
+ P_WARN("The logging agent at " << serverAddress <<
+ " closed the connection (no error message given);" <<
+ " will reconnect in " << reconnectTimeout / 1000000 <<
+ " second(s).");
+ nextReconnectTime = SystemTime::getUsec() + reconnectTimeout;
+ return make_shared<AnalyticsLog>();
+ } else if (args.size() == 2 && args[0] == "error") {
+ P_WARN("The logging agent at " << serverAddress <<
+ " closed the connection (error message: " << args[1] <<
+ "); will reconnect in " << reconnectTimeout / 1000000 <<
+ " second(s).");
+ nextReconnectTime = SystemTime::getUsec() + reconnectTimeout;
+ return make_shared<AnalyticsLog>();
+ } else if (args.empty() || args[0] != "ok") {
+ P_WARN("The logging agent at " << serverAddress <<
+ " sent an unexpected reply;" <<
+ " will reconnect in " << reconnectTimeout / 1000000 <<
+ " second(s).");
+ nextReconnectTime = SystemTime::getUsec() + reconnectTimeout;
+ return make_shared<AnalyticsLog>();
+ }
+
+ guard.clear();
+ return make_shared<AnalyticsLog>(connection,
+ string(txnId, end - txnId),
+ groupName, category,
+ unionStationKey);
+
+ } catch (const TimeoutException &) {
+ P_WARN("Timeout trying to communicate with the logging agent at " << serverAddress << "; " <<
+ "will reconnect in " << reconnectTimeout / 1000000 << " second(s).");
+ nextReconnectTime = SystemTime::getUsec() + reconnectTimeout;
+ return make_shared<AnalyticsLog>();
+
+ } catch (const SystemException &e) {
+ if (e.code() == ENOENT || isNetworkError(e.code())) {
+ string errorResponse;
- // Failed to connect.
- P_WARN("Cannot connect to the logging agent (" << serverAddress << "); " <<
- "retrying in " << reconnectTimeout / 1000000 << " second(s).");
+ guard.clear();
+ if (connection->disconnect(errorResponse)) {
+ P_WARN("The logging agent at " << serverAddress <<
+ " closed the connection (error message: " << errorResponse <<
+ "); will reconnect in " << reconnectTimeout / 1000000 <<
+ " second(s).");
+ } else {
+ P_WARN("The logging agent at " << serverAddress <<
+ " closed the connection (no error message given);" <<
+ " will reconnect in " << reconnectTimeout / 1000000 <<
+ " second(s).");
+ }
nextReconnectTime = SystemTime::getUsec() + reconnectTimeout;
+ return make_shared<AnalyticsLog>();
+ } else {
+ throw;
}
}
- return ptr(new AnalyticsLog());
}
AnalyticsLogPtr continueTransaction(const string &txnId, const string &groupName,
const string &category = "requests", const string &unionStationKey = string())
{
if (serverAddress.empty() || txnId.empty()) {
- return ptr(new AnalyticsLog());
+ return make_shared<AnalyticsLog>();
}
char timestampStr[2 * sizeof(unsigned long long) + 1];
integerToHexatri<unsigned long long>(SystemTime::getUsec(), timestampStr);
unique_lock<boost::mutex> l(lock);
- SharedDataLock sl(sharedData);
+ if (SystemTime::getUsec() < nextReconnectTime) {
+ return make_shared<AnalyticsLog>();
+ }
+ ConnectionLock cl(connection);
- if (SystemTime::getUsec() >= nextReconnectTime) {
- unsigned int tryCount = 0;
-
- while (tryCount < maxConnectTries) {
- try {
- if (!connected()) {
- TRACE_POINT();
- connect();
- }
- sharedData->client.write("openTransaction",
- txnId.c_str(),
- groupName.c_str(),
- "",
- category.c_str(),
- timestampStr,
- unionStationKey.c_str(),
- "true",
- NULL);
- return ptr(new AnalyticsLog(sharedData,
- txnId, groupName, category,
- unionStationKey));
- } catch (const SystemException &e) {
- TRACE_POINT();
- if (e.code() == EPIPE || isNetworkError(e.code())) {
- tryCount++;
- disconnect(true);
- sl.reset(sharedData, false);
- l.unlock();
- if (tryCount < maxConnectTries) {
- syscalls::usleep(RETRY_SLEEP);
- }
- l.lock();
- sl.lock();
- } else {
- disconnect();
- throw;
- }
+ if (!connected()) {
+ TRACE_POINT();
+ try {
+ connect();
+ cl.reset(connection);
+ } catch (const TimeoutException &) {
+ P_WARN("Timeout trying to connect to the logging agent at " << serverAddress << "; " <<
+ "will reconnect in " << reconnectTimeout / 1000000 << " second(s).");
+ nextReconnectTime = SystemTime::getUsec() + reconnectTimeout;
+ return make_shared<AnalyticsLog>();
+ } catch (const tracable_exception &e) {
+ nextReconnectTime = SystemTime::getUsec() + reconnectTimeout;
+ if (instanceof<IOException>(e) || instanceof<SystemException>(e)) {
+ P_WARN("Cannot connect to the logging agent at " << serverAddress <<
+ " (" << e.what() << "); will reconnect in " <<
+ reconnectTimeout / 1000000 << " second(s).");
+ return make_shared<AnalyticsLog>();
+ } else {
+ throw;
}
}
+ }
+
+ ScopeGuard guard(boost::bind(
+ &AnalyticsLoggerConnection::disconnect,
+ connection.get()));
+ try {
+ unsigned long long timeout = 15000000;
+ writeArrayMessage(connection->fd, &timeout,
+ "openTransaction",
+ txnId.c_str(),
+ groupName.c_str(),
+ "",
+ category.c_str(),
+ timestampStr,
+ unionStationKey.c_str(),
+ "true",
+ NULL);
+ guard.clear();
+ return make_shared<AnalyticsLog>(connection,
+ txnId, groupName, category,
+ unionStationKey);
- // Failed to connect.
- P_WARN("Cannot connect to the logging agent (" << serverAddress << "); " <<
- "retrying in " << reconnectTimeout / 1000000 << " second(s).");
+ } catch (const TimeoutException &) {
+ P_WARN("Timeout trying to communicate with the logging agent at " << serverAddress << "; " <<
+ "will reconnect in " << reconnectTimeout / 1000000 << " second(s).");
nextReconnectTime = SystemTime::getUsec() + reconnectTimeout;
+ return make_shared<AnalyticsLog>();
+
+ } catch (const SystemException &e) {
+ if (e.code() == ENOENT || isNetworkError(e.code())) {
+ string errorResponse;
+
+ guard.clear();
+ if (connection->disconnect(errorResponse)) {
+ P_WARN("The logging agent at " << serverAddress <<
+ " closed the connection (error message: " << errorResponse <<
+ "); will reconnect in " << reconnectTimeout / 1000000 <<
+ " second(s).");
+ } else {
+ P_WARN("The logging agent at " << serverAddress <<
+ " closed the connection (no error message given);" <<
+ " will reconnect in " << reconnectTimeout / 1000000 <<
+ " second(s).");
+ }
+ nextReconnectTime = SystemTime::getUsec() + reconnectTimeout;
+ return make_shared<AnalyticsLog>();
+ } else {
+ throw;
+ }
}
- return ptr(new AnalyticsLog());
}
void setMaxConnectTries(unsigned int value) {
@@ -746,8 +966,8 @@ class AnalyticsLogger {
FileDescriptor getConnection() const {
lock_guard<boost::mutex> l(lock);
- lock_guard<boost::mutex> l2(sharedData->lock);
- return sharedData->client.getConnection();
+ lock_guard<boost::mutex> l2(connection->lock);
+ return connection->fd;
}
/**
View
44 test/cxx/LoggingTest.cpp
@@ -152,7 +152,7 @@ namespace tut {
log->getGroupName(), log->getCategory());
log2->message("message 2");
log2->flushToDiskAfterClose(true);
-
+
log.reset();
log2.reset();
@@ -268,24 +268,6 @@ namespace tut {
ensure_equals(data, "localhost");
}
- TEST_METHOD(10) {
- // newTransaction() reestablishes the connection to the logging
- // server if the logging server crashed and was restarted
- SystemTime::forceAll(TODAY);
-
- logger->newTransaction("foobar");
- stopLoggingServer();
- startLoggingServer();
-
- AnalyticsLogPtr log = logger->newTransaction("foobar");
- log->message("hello");
- log->flushToDiskAfterClose(true);
- log.reset();
-
- string data = readAll(loggingDir + "/1/" FOOBAR_LOCALHOST_PREFIX "/requests/2010/01/13/12/log.txt");
- ensure("(1)", data.find("hello\n") != string::npos);
- }
-
TEST_METHOD(11) {
// newTransaction() does not reconnect to the server for a short
// period of time if connecting failed
@@ -305,22 +287,36 @@ namespace tut {
}
TEST_METHOD(12) {
- // continueTransaction() reestablishes the connection to the logging
- // server if the logging server crashed and was restarted
+ // If the logging server crashed and was restarted then
+ // newTransaction() and continueTransaction() print a warning and return
+ // a null log object. One of the next newTransaction()/continueTransaction()
+ // calls will reestablish the connection when the connection timeout
+ // has passed.
SystemTime::forceAll(TODAY);
+ AnalyticsLogPtr log, log2;
- AnalyticsLogPtr log = logger->newTransaction("foobar");
+ log = logger->newTransaction("foobar");
logger2->continueTransaction(log->getTxnId(), "foobar");
stopLoggingServer();
startLoggingServer();
- AnalyticsLogPtr log2 = logger2->continueTransaction(log->getTxnId(), "foobar");
+ log = logger->newTransaction("foobar");
+ ensure("(1)", log->isNull());
+ log2 = logger2->continueTransaction("some-id", "foobar");
+ ensure("(2)", log2->isNull());
+
+ SystemTime::forceAll(TODAY + 60000000);
+ log = logger->newTransaction("foobar");
+ ensure("(3)", !log->isNull());
+ log2 = logger2->continueTransaction(log->getTxnId(), "foobar");
+ ensure("(4)", !log2->isNull());
log2->message("hello");
log2->flushToDiskAfterClose(true);
+ log.reset();
log2.reset();
string data = readAll(loggingDir + "/1/" FOOBAR_LOCALHOST_PREFIX "/requests/2010/01/13/12/log.txt");
- ensure("(1)", data.find("hello\n") != string::npos);
+ ensure("(5)", data.find("hello\n") != string::npos);
}
TEST_METHOD(13) {
Please sign in to comment.
Something went wrong with that request. Please try again.