Permalink
Browse files

Improve Union Station error reporting.

  • Loading branch information...
1 parent cc3510b commit f59e2bbfcfb08a6182bf2eb011131a4ac9d6b842 @FooBarWidget FooBarWidget committed May 2, 2013
Showing with 62 additions and 7 deletions.
  1. +4 −1 ext/common/agents/LoggingAgent/LoggingServer.h
  2. +58 −6 ext/common/agents/LoggingAgent/RemoteSender.h
@@ -1129,7 +1129,10 @@ class LoggingServer: public EventedMessageServer {
TransactionMap::const_iterator end = transactions.end();
stream << "Number of clients : " << getClients().size() << "\n";
- stream << "RemoteSender queue: " << remoteSender.queued() << " items\n";
+ stream << "\n";
+
+ stream << "RemoteSender:\n";
+ remoteSender.inspect(stream);
stream << "\n";
LogSinkCache::const_iterator sit;
@@ -33,12 +33,14 @@
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
+#include <boost/foreach.hpp>
#include <oxt/thread.hpp>
#include <string>
#include <list>
#include <Logging.h>
#include <StaticString.h>
+#include <Utils.h>
#include <Utils/BlockingQueue.h>
#include <Utils/SystemTime.h>
#include <Utils/ScopeGuard.h>
@@ -164,6 +166,10 @@ class RemoteSender {
}
curl_slist_free_all(headers);
}
+
+ string name() const {
+ return ip + ":" + toString(port);
+ }
bool ping() {
P_DEBUG("Pinging Union Station gateway " << ip << ":" << port);
@@ -258,12 +264,13 @@ class RemoteSender {
BlockingQueue<Item> queue;
oxt::thread *thr;
+ mutable boost::mutex syncher;
list<ServerPtr> servers;
time_t nextCheckupTime;
+ unsigned int packetsSent, packetsDropped;
void threadMain() {
ScopeGuard guard(boost::bind(&RemoteSender::freeThreadData, this));
- nextCheckupTime = 0;
while (true) {
Item item;
@@ -292,21 +299,22 @@ class RemoteSender {
}
bool firstStarted() const {
+ lock_guard<boost::mutex> l(syncher);
return nextCheckupTime == 0;
}
void recheckServers() {
- P_DEBUG("Rechecking Union Station gateway servers (" << gatewayAddress << ")...");
+ P_INFO("Rechecking Union Station gateway servers (" << gatewayAddress << ")...");
vector<string> ips;
vector<string>::const_iterator it;
+ list<ServerPtr> servers;
string hostName;
bool someServersAreDown = false;
ips = resolveHostname(gatewayAddress, gatewayPort);
- P_DEBUG(ips.size() << " Union Station gateway servers found");
+ P_INFO(ips.size() << " Union Station gateway servers found");
- servers.clear();
for (it = ips.begin(); it != ips.end(); it++) {
ServerPtr server = make_shared<Server>(*it, gatewayAddress, gatewayPort,
certificate, &proxyInfo);
@@ -316,7 +324,7 @@ class RemoteSender {
someServersAreDown = true;
}
}
- P_DEBUG(servers.size() << " Union Station gateway servers are up");
+ P_INFO(servers.size() << " Union Station gateway servers are up");
if (servers.empty()) {
scheduleNextCheckup(5 * 60);
@@ -325,9 +333,13 @@ class RemoteSender {
} else {
scheduleNextCheckup(3 * 60 * 60);
}
+
+ lock_guard<boost::mutex> l(syncher);
+ this->servers = servers;
}
void freeThreadData() {
+ lock_guard<boost::mutex> l(syncher);
servers.clear(); // Invoke destructors inside this thread.
}
@@ -345,6 +357,7 @@ class RemoteSender {
}
unsigned int msecUntilNextCheckup() const {
+ lock_guard<boost::mutex> l(syncher);
time_t now = SystemTime::get();
if (now >= nextCheckupTime) {
return 0;
@@ -354,23 +367,31 @@ class RemoteSender {
}
bool timeForCheckup() const {
+ lock_guard<boost::mutex> l(syncher);
return SystemTime::get() >= nextCheckupTime;
}
void sendOut(const Item &item) {
+ unique_lock<boost::mutex> l(syncher);
bool sent = false;
bool someServersWentDown = false;
while (!sent && !servers.empty()) {
// Pick first available server and put it on the back of the list
// for round-robin load balancing.
ServerPtr server = servers.front();
- servers.pop_front();
+ l.unlock();
if (server->send(item)) {
+ l.lock();
+ servers.pop_front();
servers.push_back(server);
sent = true;
+ packetsSent++;
} else {
+ l.lock();
+ servers.pop_front();
someServersWentDown = true;
+ packetsDropped++;
}
}
@@ -386,6 +407,13 @@ class RemoteSender {
* effectively dropped until after the next checkup has detected
* servers that are up.
*/
+ if (!sent) {
+ P_WARN("Dropping Union Station packet because no servers are available: "
+ "key=" << item.unionStationKey <<
+ ", node=" << item.nodeName <<
+ ", category=" << item.category <<
+ ", compressedDataSize=" << item.data.size());
+ }
}
bool compress(const StaticString data[], unsigned int count, string &output) {
@@ -443,6 +471,9 @@ class RemoteSender {
throw RuntimeException("Invalid Union Station proxy address \"" +
proxyAddress + "\": " + e.what());
}
+ nextCheckupTime = 0;
+ packetsSent = 0;
+ packetsDropped = 0;
thr = new oxt::thread(
boost::bind(&RemoteSender::threadMain, this),
"RemoteSender thread",
@@ -494,12 +525,33 @@ class RemoteSender {
if (!queue.tryAdd(item)) {
P_WARN("The Union Station gateway isn't responding quickly enough; dropping packet.");
+ lock_guard<boost::mutex> l(syncher);
+ packetsDropped++;
}
}
unsigned int queued() const {
return queue.size();
}
+
+ template<typename Stream>
+ void inspect(Stream &stream) const {
+ lock_guard<boost::mutex> l(syncher);
+ stream << " Available servers (" << servers.size() << "): ";
+ foreach (const ServerPtr server, servers) {
+ stream << server->name() << " ";
+ }
+ stream << "\n";
+ stream << " Items in queue: " << queue.size() << "\n";
+ stream << " Packet sent out so far: " << packetsSent << "\n";
+ stream << " Packet dropped out so far: " << packetsDropped << "\n";
+ stream << " Next server checkup time: ";
+ if (nextCheckupTime == 0) {
+ stream << "not yet scheduled, waiting for first packet\n";
+ } else {
+ stream << "in " << distanceOfTimeInWords(nextCheckupTime) << "\n";
+ }
+ }
};

0 comments on commit f59e2bb

Please sign in to comment.