Permalink
Browse files

Merge branch 'master' of git@github.com:voldemort/voldemort

  • Loading branch information...
2 parents 7033e0f + d6afc5b commit a8c71ea1686ac82fc58490bca00ff8f7f63592b0 @jkreps jkreps committed Jun 20, 2009
View
@@ -34,6 +34,11 @@ do
CLASSPATH=$CLASSPATH:$file
done
+for file in $base_dir/contrib/hadoop-store-builder/lib/*.jar;
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
CLASSPATH=$CLASSPATH:$base_dir/dist/resources
if [ -z $VOLD_OPTS ]; then
@@ -68,13 +68,29 @@ class ClientConfig
int getMaxConnectionsPerNode();
/**
- * Get the maximum number of allowed connections per node
+ * Set the maximum number of allowed connections per node
+ *
+ * @param val the value
+ * @return a pointer to this object useful for chaining
+ */
+ ClientConfig* setMaxConnectionsPerNode(int val);
+
+ /**
+ * Get the maximum number of allowed connections total
*
* @return the value
*/
int getMaxTotalConnections();
/**
+ * Set the maximum number of allowed connections total
+ *
+ * @param val the value
+ * @return a pointer to this object useful for chaining
+ */
+ ClientConfig* setMaxTotalConnections(int val);
+
+ /**
* Get the number of milliseconds to wait for a connection to
* start before timing out.
*
@@ -83,6 +99,15 @@ class ClientConfig
long getConnectionTimeoutMs();
/**
+ * Set the number of milliseconds to wait for a connection to
+ * start before timing out.
+ *
+ * @param val the value
+ * @return a pointer to this object useful for chaining
+ */
+ ClientConfig* setConnectionTimeoutMs(long val);
+
+ /**
* Get the number of milliseconds to wait for on reading or
* writing to a socket before timing out.
*
@@ -91,19 +116,30 @@ class ClientConfig
long getSocketTimeoutMs();
/**
- * Get the maximum number of client threads
+ * Set the number of milliseconds to wait for on reading or
+ * writing to a socket before timing out.
*
- * @return the value
+ * @param val the value
+ * @return a pointer to this object useful for chaining
*/
- int getMaxThreads();
+ ClientConfig* setSocketTimeoutMs(long val);
/**
* Get the number of milliseconds a node will be banned before we
* try again to connect to it following a failure.
*
* @return the value
*/
- int getNodeBannageMs();
+ long getNodeBannageMs();
+
+ /**
+ * Set the number of milliseconds a node will be banned before we
+ * try again to connect to it following a failure.
+ *
+ * @param val the value
+ * @return a pointer to this object useful for chaining
+ */
+ ClientConfig* setNodeBannageMs(long val);
private:
/** Internal implementation details for ClientConfig */
@@ -25,9 +25,6 @@ class ClientConfigImpl {
public:
int maxConnectionsPerNode;
int maxTotalConnections;
- // int maxThreads;
- // int maxQueuedRequests;
- // long threadIdleMs;
long connectionTimeoutMs;
long socketTimeoutMs;
long routingTimeoutMs;
@@ -39,9 +36,6 @@ ClientConfig::ClientConfig() {
pimpl_ = new ClientConfigImpl();
pimpl_->maxConnectionsPerNode = 0;
pimpl_->maxTotalConnections = 500;
- // pimpl_->maxThreads = 5;
- // pimpl_->maxQueuedRequests = 500;
- // pimpl_->threadIdleMs = 100000;
pimpl_->connectionTimeoutMs = 500;
pimpl_->socketTimeoutMs = 5000;
pimpl_->routingTimeoutMs = 15000;
@@ -70,26 +64,45 @@ int ClientConfig::getMaxConnectionsPerNode() {
return pimpl_->maxConnectionsPerNode;
}
+ClientConfig* ClientConfig::setMaxConnectionsPerNode(int val) {
+ pimpl_->maxConnectionsPerNode = val;
+ return this;
+}
+
int ClientConfig::getMaxTotalConnections() {
return pimpl_->maxTotalConnections;
}
+ClientConfig* ClientConfig::setMaxTotalConnections(int val) {
+ pimpl_->maxTotalConnections = val;
+ return this;
+}
+
long ClientConfig::getConnectionTimeoutMs() {
return pimpl_->connectionTimeoutMs;
}
+ClientConfig* ClientConfig::setConnectionTimeoutMs(long val) {
+ pimpl_->connectionTimeoutMs = val;
+ return this;
+}
+
long ClientConfig::getSocketTimeoutMs() {
return pimpl_->socketTimeoutMs;
}
-#if 0
-int ClientConfig::getMaxThreads() {
- return pimpl_->maxThreads;
+ClientConfig* ClientConfig::setSocketTimeoutMs(long val) {
+ pimpl_->socketTimeoutMs = val;
+ return this;
}
-#endif
-int ClientConfig::getNodeBannageMs() {
+long ClientConfig::getNodeBannageMs() {
return pimpl_->nodeBannageMs;
}
+ClientConfig* ClientConfig::setNodeBannageMs(long val) {
+ pimpl_->nodeBannageMs = val;
+ return this;
+}
+
} /* namespace Voldemort */
@@ -40,6 +40,15 @@ Connection::Connection(string& hostName,
: config(conf), host(hostName), port(portNum), io_service(),
resolver(io_service), timer(io_service), socket(io_service),
connbuf(NULL), connstream(NULL), active(false) {
+}
+
+Connection::~Connection() {
+ close();
+ if (connstream) delete connstream;
+ if (connbuf) delete connbuf;
+}
+
+void Connection::connect() {
// Start an asynchronous resolve to translate the server and service names
// into a list of endpoints.
tcp::resolver::query query(host, port);
@@ -48,13 +57,7 @@ Connection::Connection(string& hostName,
asio::placeholders::error,
asio::placeholders::iterator));
- wait_for_operation(conf->getConnectionTimeoutMs());
-}
-
-Connection::~Connection() {
- close();
- if (connstream) delete connstream;
- if (connbuf) delete connbuf;
+ wait_for_operation(config->getConnectionTimeoutMs());
}
void Connection::check_error(const system::error_code& err) {
@@ -64,10 +67,6 @@ void Connection::check_error(const system::error_code& err) {
}
}
-static void set_result(optional<system::error_code>* a, system::error_code b) {
- a->reset(b);
-}
-
void Connection::wait_for_operation(long millis)
{
op_timeout = false;
@@ -161,12 +160,14 @@ size_t Connection::read_some(char* buffer, size_t bufferLen) {
return (size_t)bytes;
}
+#if 0
void Connection::handle_data_op(const system::error_code& err,
size_t transferred) {
check_error(err);
bytesTransferred = transferred;
op_complete = true;
}
+#endif
size_t Connection::write(const char* buffer, size_t bufferLen) {
#if 0
@@ -45,52 +45,66 @@ shared_ptr<Connection>& ConnectionPool::checkout(string& host, int port) {
stringstream hostKey;
hostKey << host << ":" << port;
- host_entry_ptr& hep = pool[hostKey.str()];
+ shared_ptr<Connection>* connRet = NULL;
{
- lock_guard<mutex> guard(poolMutex);
+ // Ensure that we don't have too many connections in the pool
+ unique_lock<mutex> lock(poolMutex);
+
+ host_entry_ptr& hep = pool[hostKey.str()];
+ int& host_ready_count = ready_count[hostKey.str()];
if (!hep.get()) {
hep = host_entry_ptr(new host_entry());
+ host_ready_count = 0;
}
- // Search for an active connection to try
- host_entry::iterator heit;
- for (heit = hep->begin(); heit != hep->end(); ++heit) {
- if (heit->second.first == STATUS_READY) {
- heit->second.first = STATUS_CHECKED_OUT;
- return heit->second.second;
- }
+ while ((host_ready_count == 0) &&
+ (((clientConfig->getMaxConnectionsPerNode() > 0) &&
+ (hep->size() >= (size_t)clientConfig->getMaxConnectionsPerNode())) ||
+ ((clientConfig->getMaxTotalConnections() > 0) &&
+ (totalConnections >= clientConfig->getMaxTotalConnections())))) {
+ checkinCond.wait(lock);
}
- }
- // Fall back to creating a new connection
- {
- // Ensure that we don't have too many connections in the pool
- unique_lock<mutex> lock(poolMutex);
-
- while (((clientConfig->getMaxConnectionsPerNode() > 0) &&
- (hep->size() >= (size_t)clientConfig->getMaxConnectionsPerNode())) ||
- ((clientConfig->getMaxTotalConnections() > 0) &&
- (totalConnections >= clientConfig->getMaxTotalConnections()))) {
- checkinCond.wait(lock);
+ // Search for a ready connection to try
+ if (host_ready_count > 0) {
+ host_entry::iterator heit;
+ for (heit = hep->begin(); heit != hep->end(); ++heit) {
+ if (heit->second.first == STATUS_READY) {
+ heit->second.first = STATUS_CHECKED_OUT;
+ host_ready_count -= 1;
+ return heit->second.second;
+ }
+ }
}
+
+ // Otherwise fall back to creating a new connection
+
+ // Reserve a spot in the pool
+ stringstream portStr;
+ portStr << port;
+ std::string portString = portStr.str();
+ //cout << "Creating connection " << hostKey.str() << endl;
+ shared_ptr<Connection> conn(new Connection(host, portString,
+ clientConfig));
+ (*hep)[(size_t)conn.get()] = make_pair((int)STATUS_UNINIT, conn);
+ totalConnections += 1;
+ connRet = &(((*hep)[(size_t)conn.get()]).second);
}
- stringstream portStr;
- portStr << port;
- std::string portString = portStr.str();
- // cout << "Allocating new Connection " << hostKey.str() << endl;
+ // Connect to the remote host
+ try {
+ (*connRet)->connect();
+ } catch (...) {
+ checkin(*connRet);
+ throw;
+ }
- shared_ptr<Connection> conn(new Connection(host, portString,
- clientConfig));
- shared_ptr<Connection>* connRet = NULL;
{
- lock_guard<mutex> guard(poolMutex);
-
- (*hep)[(size_t)conn.get()] = make_pair((int)STATUS_CHECKED_OUT, conn);
- totalConnections += 1;
- connRet = &(((*hep)[(size_t)conn.get()]).second);
+ lock_guard<mutex> guard(poolMutex);
+ (*pool[hostKey.str()])[(size_t)connRet->get()].first =
+ (int)STATUS_CHECKED_OUT;
}
return *connRet;
@@ -113,7 +127,10 @@ void ConnectionPool::checkin(shared_ptr<Connection>& conn) {
if (ce.first != STATUS_CHECKED_OUT || !conn->is_active()) {
/* Something horrible has happened to our connection */
hep->erase((size_t)conn.get());
+ totalConnections -= 1;
+ //cout << "Destroying connection " << hostKey << endl;
} else {
+ ready_count[hostKey] += 1;
ce.first = STATUS_READY;
}
}
View
@@ -43,7 +43,7 @@ Node::Node()
}
void Node::setAvailable(bool avail) {
- isAvailable_ = true;
+ isAvailable_ = avail;
struct timeval tv;
gettimeofday(&tv, NULL);
@@ -59,7 +59,7 @@ uint64_t Node::getMsSinceLastChecked() {
bool Node::isAvailable(uint64_t timeout) {
return (isAvailable_ ||
- getMsSinceLastChecked() > timeout);
+ (getMsSinceLastChecked() > timeout));
}
std::ostream& operator<<(std::ostream& output, const Node& node) {
@@ -61,6 +61,8 @@ static void setupVectorClock(voldemort::VectorClock* vvc, const VectorClock* vc)
entry->set_node_id((*it).first);
entry->set_version((*it).second);
}
+
+ vvc->set_timestamp(vc->getTimestamp());
}
static VectorClock* readVectorClock(const voldemort::VectorClock* vvc) {
@@ -117,7 +117,7 @@ const std::list<std::pair<short, uint64_t> >* VectorClock::getEntries() const {
return versions;
}
-uint64_t VectorClock::getTimestamp() {
+uint64_t VectorClock::getTimestamp() const {
return timestamp;
}
@@ -55,6 +55,11 @@ class Connection {
~Connection();
/**
+ * Connect to the remote host
+ */
+ void connect();
+
+ /**
* Close the underlying connection. After calling this the
* Connection object cannot be used.
*/
@@ -117,8 +122,10 @@ class Connection {
void handle_resolve(const system::error_code& err,
tcp::resolver::iterator endpoint_iterator);
+#if 0
void handle_data_op(const system::error_code& err,
size_t transferred);
+#endif
void check_error(const system::error_code& err);
shared_ptr<ClientConfig> config;
Oops, something went wrong.

0 comments on commit a8c71ea

Please sign in to comment.