Skip to content

Commit

Permalink
Extend env_default.h and env_facebook.h to respect implementation dif…
Browse files Browse the repository at this point in the history
…ferences

between facebook and the open-source community.

Summary:
1. put configuration classes in env_*.h in scribe namespace;
2. add scribe::concurrency for concurrency mechanisms;
3. make scribe_server.cpp: scribeHandler::scribeHandlerLock configurable;
4. add scribe::clock for time utilities;

Test Plan:
just compile it

DiffCamp Revision: 118366
Reviewed By: groys
Commenters: jsong
CC: agiardullo, zshao, jsong, groys, yliang, datafreeway-dev@lists
Revert Plan:
OK

git-svn-id: svn+ssh://tubbs/svnapps/fbomb/branches/scribe-os/fbcode/scribe@28661 2248de34-8caa-4a3c-bc55-5e52d9d7b73a
  • Loading branch information
yliang authored and groys committed Jun 7, 2010
1 parent b07d15e commit 2123b73
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 58 deletions.
69 changes: 35 additions & 34 deletions src/dynamic_bucket_updater.cpp
Expand Up @@ -50,30 +50,30 @@ bool DynamicBucketUpdater::getHost(const string& category,
pconf->getInt("timeout", timeout);

if (!service.empty()) {
success = DynamicBucketUpdater::getHost(g_Handler.get(),
category,
ttl,
(uint32_t)bid,
host,
port,
service,
serviceOptions,
timeout, timeout, timeout);
success = DynamicBucketUpdater::getHost(g_Handler.get(),
category,
ttl,
(uint32_t)bid,
host,
port,
service,
serviceOptions,
timeout, timeout, timeout);
} else {
success = DynamicBucketUpdater::getHost(g_Handler.get(),
category,
ttl,
(uint32_t)bid,
host,
port,
updaterHost,
atoi(updaterPort.c_str()),
timeout, timeout, timeout);
category,
ttl,
(uint32_t)bid,
host,
port,
updaterHost,
atoi(updaterPort.c_str()),
timeout, timeout, timeout);
}

if (!success) {
LOG_OPER("[%s] dynamic bucket updater failed: bid=%ld",
category.c_str(), bid);
category.c_str(), bid);
}
return success;
}
Expand Down Expand Up @@ -171,22 +171,23 @@ bool DynamicBucketUpdater::getHost(facebook::fb303::FacebookBase *fbBase,
uint32_t sendTimeout,
uint32_t recvTimeout) {
server_vector_t servers;
bool success = network_config::getService(serviceName,
serviceOptions,
servers);

// Cannot open if we couldn't find any servers
if (!success || servers.empty()) {
LOG_OPER("[%s] Failed to get servers from Service [%s] for dynamic bucket updater",
category.c_str(), serviceName.c_str());

return false;
}

// randomly pick one from the service
int which = rand() % servers.size();
string updateHost = servers[which].first;
uint32_t updatePort = servers[which].second;
bool success = scribe::network_config::getService(serviceName,
serviceOptions,
servers);

// Cannot open if we couldn't find any servers
if (!success || servers.empty()) {
LOG_OPER("[%s] Failed to get servers from Service [%s] "
"for dynamic bucket updater",
category.c_str(), serviceName.c_str());

return false;
}

// randomly pick one from the service
int which = rand() % servers.size();
string updateHost = servers[which].first;
uint32_t updatePort = servers[which].second;
return DynamicBucketUpdater::getHost(fbBase, category, ttl, bid,
host, port,
updateHost, updatePort,
Expand Down
40 changes: 40 additions & 0 deletions src/env_default.h
Expand Up @@ -42,6 +42,8 @@
fprintf(stderr,"[%s] " #format_string " \n", dbgtime,##__VA_ARGS__); \
}

namespace scribe {

/*
* Network based configuration and directory service
*/
Expand All @@ -57,6 +59,42 @@ class network_config {
}
};

/*
* Concurrency mechanisms
*/

class concurrency {
public:
// returns a new instance of read/write mutex.
// you can choose different implementations based on your needs.
static boost::shared_ptr<apache::thrift::concurrency::ReadWriteMutex>
createReadWriteMutex() {
using apache::thrift::concurrency::ReadWriteMutex;

return boost::shared_ptr<ReadWriteMutex>(new ReadWriteMutex());
}
};

/*
* Time functions
*/

class clock {
public:
static unsigned long nowInMsec() {
// There is a minor race condition between the 2 calls below,
// but the chance is really small.

// Get current time in timeval
struct timeval tv;
gettimeofday(&tv, NULL);

// Get current time in sec
time_t sec = time(NULL);

return ((unsigned long)sec) * 1000 + (tv.tv_usec / 1000);
}
};

/*
* Hash functions
Expand Down Expand Up @@ -86,4 +124,6 @@ class strhash {
}
};

} // !namespace scribe

#endif // SCRIBE_ENV
17 changes: 9 additions & 8 deletions src/scribe_server.cpp
Expand Up @@ -169,6 +169,7 @@ scribeHandler::scribeHandler(unsigned long int server_port, const std::string& c
maxQueueSize(DEFAULT_MAX_QUEUE_SIZE),
newThreadPerCategory(true) {
time(&lastMsgTime);
scribeHandlerLock = scribe::concurrency::createReadWriteMutex();
}

scribeHandler::~scribeHandler() {
Expand All @@ -179,7 +180,7 @@ scribeHandler::~scribeHandler() {
// Returns the handler status, but overwrites it with WARNING if it's
// ALIVE and at least one store has a nonempty status.
fb_status scribeHandler::getStatus() {
RWGuard monitor(scribeHandlerLock);
RWGuard monitor(*scribeHandlerLock);
Guard status_monitor(statusLock);

fb_status return_status(status);
Expand Down Expand Up @@ -209,7 +210,7 @@ void scribeHandler::setStatus(fb_status new_status) {
// Returns the handler status details if non-empty,
// otherwise the first non-empty store status found
void scribeHandler::getStatusDetails(std::string& _return) {
RWGuard monitor(scribeHandlerLock);
RWGuard monitor(*scribeHandlerLock);
Guard status_monitor(statusLock);

_return = statusDetails;
Expand Down Expand Up @@ -427,7 +428,7 @@ void scribeHandler::addMessage(
ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
ResultCode result = TRY_LATER;

scribeHandlerLock.acquireRead();
scribeHandlerLock->acquireRead();
if(status == STOPPING) {
result = TRY_LATER;
goto end;
Expand Down Expand Up @@ -460,8 +461,8 @@ ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
// Try creating a new store for this category if we didn't find one
if (store_list == NULL) {
// Need write lock to create a new category
scribeHandlerLock.release();
scribeHandlerLock.acquireWrite();
scribeHandlerLock->release();
scribeHandlerLock->acquireWrite();

// This may cause some duplicate messages if some messages in this batch
// were already added to queues
Expand Down Expand Up @@ -492,7 +493,7 @@ ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
result = OK;

end:
scribeHandlerLock.release();
scribeHandlerLock->release();
return result;
}

Expand Down Expand Up @@ -542,14 +543,14 @@ void scribeHandler::stopStores() {
}

void scribeHandler::shutdown() {
RWGuard monitor(scribeHandlerLock, true);
RWGuard monitor(*scribeHandlerLock, true);
stopStores();
// calling stop to allow thrift to clean up client states and exit
server->stop();
}

void scribeHandler::reinitialize() {
RWGuard monitor(scribeHandlerLock, true);
RWGuard monitor(*scribeHandlerLock, true);

// reinitialize() will re-read the config file and re-configure the stores.
// This is done without shutting down the Thrift server, so this will not
Expand Down
3 changes: 2 additions & 1 deletion src/scribe_server.h
Expand Up @@ -105,7 +105,8 @@ class scribeHandler : virtual public scribe::thrift::scribeIf,
* A single mutex is fine since it only needs to be locked in write mode
* during start/stop/reinitialize or when we need to create a new category.
*/
apache::thrift::concurrency::ReadWriteMutex scribeHandlerLock;
boost::shared_ptr<apache::thrift::concurrency::ReadWriteMutex>
scribeHandlerLock;

// disallow empty construction, copy, and assignment
scribeHandler();
Expand Down
27 changes: 12 additions & 15 deletions src/store.cpp
Expand Up @@ -1771,8 +1771,8 @@ void NetworkStore::configure(pStoreConf configuration, pStoreConf parent) {
configmod = NULL;
} else {
// set remote host port
string host;
uint32_t port;
string host;
uint32_t port;
if (configmod->getHostFunc(categoryHandled, storeConf.get(), host, port)) {
remoteHost = host;
remotePort = port;
Expand All @@ -1797,8 +1797,8 @@ void NetworkStore::periodicCheck() {
// if it is different from the current configuration
// then close and open again
LOG_OPER("[%s] dynamic configred network store destination changed. old value:<%s:%lu>, new value:<%s:%lu>",
categoryHandled.c_str(), remoteHost.c_str(), remotePort,
host.c_str(), (long unsigned)port);
categoryHandled.c_str(), remoteHost.c_str(), remotePort,
host.c_str(), (long unsigned)port);
remoteHost = host;
remotePort = port;
close();
Expand All @@ -1821,8 +1821,8 @@ bool NetworkStore::open() {
if (lastServiceCheck <= (time_t) (now - serviceCacheTimeout)) {
lastServiceCheck = now;

success =
network_config::getService(serviceName, serviceOptions, servers);
success = scribe::network_config::getService(serviceName, serviceOptions,
servers);
}

// Cannot open if we couldn't find any servers
Expand Down Expand Up @@ -2088,21 +2088,18 @@ void BucketStore::createBuckets(pStoreConf configuration) {
unsigned long i;

if (configuration->getString("bucket_subdir", tmp_string)) {
error_msg =
"cannot have bucket_subdir when defining multiple buckets";
goto handle_error;
error_msg = "cannot have bucket_subdir when defining multiple buckets";
goto handle_error;
}

if (configuration->getString("bucket_offset", tmp_string)) {
error_msg =
"cannot have bucket_offset when defining multiple buckets";
goto handle_error;
error_msg = "cannot have bucket_offset when defining multiple buckets";
goto handle_error;
}

if (configuration->getString("failure_bucket", tmp_string)) {
error_msg =
"cannot have failure_bucket when defining multiple buckets";
goto handle_error;
error_msg = "cannot have failure_bucket when defining multiple buckets";
goto handle_error;
}

// Configure stores named 'bucket0, bucket1, bucket2, ... bucket{numBuckets}
Expand Down

0 comments on commit 2123b73

Please sign in to comment.