Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Implement basic round robin routing strategy for client as well as so…

…me additional cleanups
  • Loading branch information...
commit e647ee14dfc7d994453af704d925254c988b2c74 1 parent 7abc530
@readams readams authored
View
8 bindings/cpp/include/voldemort/ClientConfig.h
@@ -96,6 +96,14 @@ class ClientConfig
* @return the value
*/
int getMaxThreads();
+
+ /**
+ * Get the number of milliseconds a node will be banned before we
+ * try again to connect to it following a failure.
+ *
+ * @return the value
+ */
+ int getNodeBannageMs();
private:
/** Internal implementation details for ClientConfig */
View
24 bindings/cpp/src/ClientConfig.cpp
@@ -25,14 +25,13 @@ class ClientConfigImpl {
public:
int maxConnectionsPerNode;
int maxTotalConnections;
- int maxThreads;
- int maxQueuedRequests;
- long threadIdleMs;
+ // int maxThreads;
+ // int maxQueuedRequests;
+ // long threadIdleMs;
long connectionTimeoutMs;
long socketTimeoutMs;
long routingTimeoutMs;
- long defaultNodeBannageMs;
- int socketBufferSize;
+ long nodeBannageMs;
std::list<std::string>* bootstrapUrls;
};
@@ -40,14 +39,13 @@ ClientConfig::ClientConfig() {
pimpl_ = new ClientConfigImpl();
pimpl_->maxConnectionsPerNode = 0;
pimpl_->maxTotalConnections = 500;
- pimpl_->maxThreads = 5;
- pimpl_->maxQueuedRequests = 500;
- pimpl_->threadIdleMs = 100000;
+ // pimpl_->maxThreads = 5;
+ // pimpl_->maxQueuedRequests = 500;
+ // pimpl_->threadIdleMs = 100000;
pimpl_->connectionTimeoutMs = 500;
pimpl_->socketTimeoutMs = 5000;
pimpl_->routingTimeoutMs = 15000;
- pimpl_->defaultNodeBannageMs = 30000;
- pimpl_->socketBufferSize = 64 * 1024;
+ pimpl_->nodeBannageMs = 30000;
pimpl_->bootstrapUrls = NULL;
}
@@ -84,8 +82,14 @@ long ClientConfig::getSocketTimeoutMs() {
return pimpl_->socketTimeoutMs;
}
+#if 0
int ClientConfig::getMaxThreads() {
return pimpl_->maxThreads;
}
+#endif
+
+int ClientConfig::getNodeBannageMs() {
+ return pimpl_->nodeBannageMs;
+}
} /* namespace Voldemort */
View
27 bindings/cpp/src/Cluster.cpp
@@ -39,22 +39,6 @@ enum STATE {
STATE_PARTITIONS
};
-Node::Node(int id,
- std::string& host,
- int httpPort,
- int socketPort,
- int adminPort,
- shared_ptr<std::list<int> >& partitions)
- : id_(id), host_(host), httpPort_(httpPort), socketPort_(socketPort),
- adminPort_(adminPort), partitions_(partitions) {
-
-}
-
-Node::Node()
- : id_(-1), httpPort_(0), socketPort_(0),
- adminPort_(0), partitions_(new std::list<int>) {
-}
-
void Cluster::startElement(void* data, const XML_Char*el, const XML_Char **attr) {
Cluster* cl = (Cluster*) data;
@@ -202,4 +186,15 @@ boost::shared_ptr<Node>& Cluster::getNodeById(int nodeId) {
throw VoldemortException("Invalid node ID: " + nodeId);
}
+std::ostream& operator<<(std::ostream& output, const Cluster& cluster) {
+ output << "Cluster('" << cluster.name << "', [";
+ std::map<int, boost::shared_ptr<Node> >::const_iterator it;
+ for (it = cluster.nodesById.begin(); it != cluster.nodesById.end(); ++it) {
+ if (it != cluster.nodesById.begin()) output << ", ";
+ output << *(it->second);
+ }
+ output << "])";
+ return output;
+}
+
} /* namespace Voldemort */
View
5 bindings/cpp/src/Makefile.am
@@ -30,11 +30,14 @@ libvoldemort_la_SOURCES = \
include/ProtocolBuffersRequestFormat.h \
include/Connection.h \
include/ConnectionPool.h \
+ include/Node.h \
include/Cluster.h \
include/RoutedStore.h \
include/InconsistencyResolvingStore.h \
include/VectorClockInconsistencyResolver.h \
include/TimeBasedInconsistencyResolver.h \
+ include/RoutingStrategy.h \
+ include/RoundRobinRoutingStrategy.h \
DefaultStoreClient.cpp \
VersionedValue.cpp \
VectorClock.cpp \
@@ -46,11 +49,13 @@ libvoldemort_la_SOURCES = \
ProtocolBuffersRequestFormat.cpp \
Connection.cpp \
ConnectionPool.cpp \
+ Node.cpp \
Cluster.cpp \
RoutedStore.cpp \
InconsistencyResolvingStore.cpp \
VectorClockInconsistencyResolver.cpp \
TimeBasedInconsistencyResolver.cpp \
+ RoundRobinRoutingStrategy.cpp \
voldemort-client.pb.cc \
voldemort-client.pb.h
View
39 bindings/cpp/src/Node.cpp
@@ -1,6 +1,6 @@
/* -*- C++ -*-; c-basic-offset: 4; indent-tabs-mode: nil */
/*
- * Implementation for Cluster class.
+ * Implementation for Node class.
*
* Copyright (c) 2009 Webroot Software, Inc.
*
@@ -17,10 +17,8 @@
* the License.
*/
-#include "Cluster.h"
-#include <voldemort/VoldemortException.h>
-#include <iostream>
-#include <string.h>
+#include "Node.h"
+#include <sys/time.h>
namespace Voldemort {
@@ -33,13 +31,40 @@ Node::Node(int id,
int adminPort,
shared_ptr<std::list<int> >& partitions)
: id_(id), host_(host), httpPort_(httpPort), socketPort_(socketPort),
- adminPort_(adminPort), partitions_(partitions) {
+ adminPort_(adminPort), partitions_(partitions),
+ isAvailable_(true), lastChecked_(0) {
}
Node::Node()
: id_(-1), httpPort_(0), socketPort_(0),
- adminPort_(0), partitions_(new std::list<int>) {
+ adminPort_(0), partitions_(new std::list<int>),
+ isAvailable_(true), lastChecked_(0) {
+}
+
+void Node::setAvailable(bool avail) {
+ isAvailable_ = true;
+
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ lastChecked_ = (uint64_t)tv.tv_sec*1000 + (uint64_t)tv.tv_usec/1000;
+}
+
+uint64_t Node::getMsSinceLastChecked() {
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ uint64_t time = (uint64_t)tv.tv_sec*1000 + (uint64_t)tv.tv_usec/1000;
+ return (time - lastChecked_);
+}
+
+bool Node::isAvailable(uint64_t timeout) {
+ return (isAvailable_ ||
+ getMsSinceLastChecked() > timeout);
+}
+
+std::ostream& operator<<(std::ostream& output, const Node& node) {
+ output << "Node" << node.id_;
+ return output;
}
} /* namespace Voldemort */
View
4 bindings/cpp/src/ProtocolBuffersRequestFormat.cpp
@@ -72,9 +72,9 @@ static VectorClock* readVectorClock(const voldemort::VectorClock* vvc) {
return new VectorClock(&entries, (uint64_t)vvc->timestamp());
}
+/* TODO map error types to various exception objects derived
+ from VoldemortException */
static void throwException(const voldemort::Error& error) {
- /* XXX - TODO map error types to various exception objects derived
- from VoldemortException */
switch(error.error_code()) {
case 2:
throw InsufficientOperationalNodesException(error.error_message());
View
49 bindings/cpp/src/RoundRobinRoutingStrategy.cpp
@@ -0,0 +1,49 @@
+/* -*- C++ -*-; c-basic-offset: 4; indent-tabs-mode: nil */
+/*
+ * Implementation for SocketStoreClientFactory class.
+ *
+ * Copyright (c) 2009 Webroot Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+#include "RoundRobinRoutingStrategy.h"
+
+namespace Voldemort {
+
+shared_ptr<std::list<shared_ptr<Node> > >
+RoundRobinRoutingStrategy::routeRequest(const std::string& key) {
+ shared_ptr<std::list<shared_ptr<Node> > >
+ prefList(new std::list<shared_ptr<Node> >());
+
+ int clusterSize = cluster->getNodeMap()->size();
+ int i = 0;
+ while (i < clusterSize) {
+ if (startIterator == cluster->getNodeMap()->end()) {
+ startIterator = cluster->getNodeMap()->begin();
+ }
+
+ Node* node = startIterator->second.get();
+ if (node->isAvailable(clientConfig->getNodeBannageMs())) {
+ prefList->push_back(startIterator->second);
+ ++startIterator;
+ break;
+ }
+ ++startIterator;
+ }
+ return prefList;
+}
+
+using namespace boost;
+
+} /* namespace Voldemort */
View
151 bindings/cpp/src/RoutedStore.cpp
@@ -18,12 +18,14 @@
*/
#include "RoutedStore.h"
-#include "voldemort/VoldemortException.h"
-
+#include "voldemort/UnreachableStoreException.h"
+#include "voldemort/InsufficientOperationalNodesException.h"
+#include <iostream>
namespace Voldemort {
using namespace boost;
+using namespace std;
static const bool REPAIR_READS = true;
@@ -31,8 +33,10 @@ RoutedStore::RoutedStore(const std::string& storeName,
shared_ptr<ClientConfig>& config,
shared_ptr<Cluster>& clust,
shared_ptr<std::map<int, shared_ptr<Store> > >& map,
- shared_ptr<threadpool::pool> pool)
- : name(storeName), cluster(clust), clusterMap(map), threadPool(pool) {
+ shared_ptr<threadpool::pool>& pool,
+ shared_ptr<RoutingStrategy>& routingStrat)
+ : name(storeName), cluster(clust), clusterMap(map), threadPool(pool),
+ routingStrategy(routingStrat) {
}
@@ -40,22 +44,143 @@ RoutedStore::~RoutedStore() {
close();
}
+static bool doGetFromStore(const std::string& key,
+ std::list<VersionedValue>** result,
+ Node* node,
+ Store* store) {
+ *result = NULL;
+ try {
+ *result = store->get(key);
+ node->setAvailable(true);
+ return true;
+ } catch (UnreachableStoreException& e) {
+ node->setAvailable(false);
+ }
+ return false;
+}
+
std::list<VersionedValue>* RoutedStore::get(const std::string& key) {
- std::map<int, shared_ptr<Store> >::const_iterator it =
- clusterMap->begin();
- return it->second->get(key);
+ std::list<VersionedValue>* result = NULL;
+ bool status = false;
+ {
+ /* Start by routing to the preferred list one at a time. */
+ RoutingStrategy::prefListp prefList = routingStrategy->routeRequest(key);
+ RoutingStrategy::prefList::const_iterator it;
+ for (it = prefList->begin(); it != prefList->end(); ++it) {
+ status = doGetFromStore(key, &result,
+ it->get(),
+ (*clusterMap)[(*it)->getId()].get());
+ if (status) return result;
+ }
+ }
+ {
+ /* If that fails just try every node in the cluster */
+ const Cluster::nodeMap* nm = cluster->getNodeMap();
+ Cluster::nodeMap::const_iterator it;
+ for (it = nm->begin(); it != nm->end(); ++it) {
+ if (it->second.get()->isAvailable(clientConfig->getNodeBannageMs())) {
+ status = doGetFromStore(key, &result,
+ it->second.get(),
+ (*clusterMap)[it->first].get());
+ }
+ if (status) return result;
+ }
+ }
+
+ throw InsufficientOperationalNodesException("Could not reach any "
+ "node for operation");
+}
+
+static bool doPutFromStore(const std::string& key,
+ const VersionedValue& value,
+ Node* node,
+ Store* store) {
+ try {
+ store->put(key, value);
+ node->setAvailable(true);
+ return true;
+ } catch (UnreachableStoreException& e) {
+ node->setAvailable(false);
+ }
+ return false;
}
void RoutedStore::put(const std::string& key, const VersionedValue& value) {
- std::map<int, shared_ptr<Store> >::const_iterator it =
- clusterMap->begin();
- return it->second->put(key, value);
+ bool status = false;
+ {
+ /* Start by routing to the preferred list one at a time. */
+ RoutingStrategy::prefListp prefList = routingStrategy->routeRequest(key);
+ RoutingStrategy::prefList::const_iterator it;
+ for (it = prefList->begin(); it != prefList->end(); ++it) {
+ status = doPutFromStore(key, value,
+ it->get(),
+ (*clusterMap)[(*it)->getId()].get());
+ if (status) return;
+ }
+ }
+ {
+ /* If that fails just try every node in the cluster */
+ const Cluster::nodeMap* nm = cluster->getNodeMap();
+ Cluster::nodeMap::const_iterator it;
+ for (it = nm->begin(); it != nm->end(); ++it) {
+ if (it->second.get()->isAvailable(clientConfig->getNodeBannageMs())) {
+ status = doPutFromStore(key, value,
+ it->second.get(),
+ (*clusterMap)[it->first].get());
+ }
+ if (status) return;
+ }
+ }
+
+ throw InsufficientOperationalNodesException("Could not reach any "
+ "node for operation");
+}
+
+static bool doDeleteFromStore(const std::string& key,
+ const Version& version,
+ bool* result,
+ Node* node,
+ Store* store) {
+ try {
+ *result = store->deleteKey(key, version);
+ node->setAvailable(true);
+ return true;
+ } catch (UnreachableStoreException& e) {
+ node->setAvailable(false);
+ }
+ return false;
}
bool RoutedStore::deleteKey(const std::string& key, const Version& version) {
- std::map<int, shared_ptr<Store> >::const_iterator it =
- clusterMap->begin();
- return it->second->deleteKey(key, version);
+ bool status = false;
+ bool result = false;
+ {
+ /* Start by routing to the preferred list one at a time. */
+ RoutingStrategy::prefListp prefList = routingStrategy->routeRequest(key);
+ RoutingStrategy::prefList::const_iterator it;
+ for (it = prefList->begin(); it != prefList->end(); ++it) {
+ status = doDeleteFromStore(key, version, &result,
+ it->get(),
+ (*clusterMap)[(*it)->getId()].get());
+ if (status) return result;
+ }
+ }
+ {
+ /* If that fails just try every node in the cluster */
+ const Cluster::nodeMap* nm = cluster->getNodeMap();
+ Cluster::nodeMap::const_iterator it;
+ for (it = nm->begin(); it != nm->end(); ++it) {
+ if (it->second.get()->isAvailable(clientConfig->getNodeBannageMs())) {
+ status = doDeleteFromStore(key, version, &result,
+ it->second.get(),
+ (*clusterMap)[it->first].get());
+ }
+ if (status) return result;
+ }
+ }
+
+ throw InsufficientOperationalNodesException("Could not reach any "
+ "node for operation");
}
const std::string* RoutedStore::getName() {
View
78 bindings/cpp/src/SocketStore.cpp
@@ -19,7 +19,7 @@
#include "SocketStore.h"
#include "ConnectionPool.h"
-#include "voldemort/VoldemortException.h"
+#include "voldemort/UnreachableStoreException.h"
#include <sstream>
#include <boost/bind.hpp>
@@ -47,43 +47,57 @@ SocketStore::~SocketStore() {
}
std::list<VersionedValue>* SocketStore::get(const std::string& key) {
- ConnectionPoolSentinel conn(connPool->checkout(host, port), connPool);
- std::iostream& sstream = conn->get_io_stream();
-
- request->writeGetRequest(&sstream,
- &name,
- &key,
- reroute);
- sstream.flush();
- return request->readGetResponse(&sstream);
+ try {
+ ConnectionPoolSentinel conn(connPool->checkout(host, port), connPool);
+ std::iostream& sstream = conn->get_io_stream();
+
+ request->writeGetRequest(&sstream,
+ &name,
+ &key,
+ reroute);
+ sstream.flush();
+ return request->readGetResponse(&sstream);
+ } catch (VoldemortException& e) {
+ throw UnreachableStoreException(std::string("Failure to get ") + host +
+ std::string(": ") + e.what());
+ }
}
void SocketStore::put(const std::string& key, const VersionedValue& value) {
- ConnectionPoolSentinel conn(connPool->checkout(host, port), connPool);
- std::iostream& sstream = conn->get_io_stream();
-
- request->writePutRequest(&sstream,
- &name,
- &key,
- value.getValue(),
- dynamic_cast<const VectorClock*>(value.getVersion()),
- reroute);
- sstream.flush();
- request->readPutResponse(&sstream);
-
+ try {
+ ConnectionPoolSentinel conn(connPool->checkout(host, port), connPool);
+ std::iostream& sstream = conn->get_io_stream();
+
+ request->writePutRequest(&sstream,
+ &name,
+ &key,
+ value.getValue(),
+ dynamic_cast<const VectorClock*>(value.getVersion()),
+ reroute);
+ sstream.flush();
+ request->readPutResponse(&sstream);
+ } catch (VoldemortException& e) {
+ throw UnreachableStoreException(std::string("Failure to put ") + host +
+ std::string(": ") + e.what());
+ }
}
bool SocketStore::deleteKey(const std::string& key, const Version& version) {
- ConnectionPoolSentinel conn(connPool->checkout(host, port), connPool);
- std::iostream& sstream = conn->get_io_stream();
-
- request->writeDeleteRequest(&sstream,
- &name,
- &key,
- dynamic_cast<const VectorClock*>(&version),
- reroute);
- sstream.flush();
- return request->readDeleteResponse(&sstream);
+ try {
+ ConnectionPoolSentinel conn(connPool->checkout(host, port), connPool);
+ std::iostream& sstream = conn->get_io_stream();
+
+ request->writeDeleteRequest(&sstream,
+ &name,
+ &key,
+ dynamic_cast<const VectorClock*>(&version),
+ reroute);
+ sstream.flush();
+ return request->readDeleteResponse(&sstream);
+ } catch (VoldemortException& e) {
+ throw UnreachableStoreException(std::string("Failure to delete ") + host +
+ std::string(": ") + e.what());
+ }
}
const std::string* SocketStore::getName() {
View
15 bindings/cpp/src/SocketStoreClientFactory.cpp
@@ -27,6 +27,7 @@
#include "InconsistencyResolvingStore.h"
#include "TimeBasedInconsistencyResolver.h"
#include "VectorClockInconsistencyResolver.h"
+#include "RoundRobinRoutingStrategy.h"
#include <iostream>
#include <exception>
@@ -101,8 +102,8 @@ Store* SocketStoreClientFactoryImpl::getStore(const string& storeName,
}
#define THROW_BOOTSTRAP \
- throw VoldemortException("Invalid bootstrap URL " + url + \
- ": Expected tcp://host:port");
+ throw BootstrapFailureException("Invalid bootstrap URL " + url + \
+ ": Expected tcp://host:port");
#define EXPECT_C(char, next) \
if (tolower(*it) != char) \
THROW_BOOTSTRAP; \
@@ -190,6 +191,7 @@ VersionedValue SocketStoreClientFactoryImpl::bootstrapMetadata(const string& key
return vvs->front();
}
} catch (std::exception& e) {
+ /* XXX - TODO Need a real logging mechanism */
cerr << "Warning: Could not bootstrap '" << *it << "': "
<< e.what() << endl;
}
@@ -244,13 +246,18 @@ Store* SocketStoreClientFactory::getRawStore(std::string& storeName,
//VersionedValue storevv = pimpl_->bootstrapMetadata(STORES_KEY);
//const std::string* storesXml = storevv.getValue();
+ // Routed store
+ shared_ptr<RoutingStrategy>
+ routingStrategy(new RoundRobinRoutingStrategy(pimpl_->config,
+ cluster));
shared_ptr<Store> routedStore(new RoutedStore(storeName,
pimpl_->config,
cluster,
clusterMap,
- pimpl_->threadPool));
+ pimpl_->threadPool,
+ routingStrategy));
+
- /* XXX - TODO Add inconsistency resolver */
InconsistencyResolvingStore* conStore = new InconsistencyResolvingStore(routedStore);
try {
shared_ptr<InconsistencyResolver>
View
90 bindings/cpp/src/include/Cluster.h
@@ -21,6 +21,8 @@
#ifndef CLUSTER_H
#define CLUSTER_H
+#include "Node.h"
+
#include <string>
#include <map>
#include <list>
@@ -33,78 +35,6 @@ namespace Voldemort {
using namespace boost;
class Cluster;
-
-/**
- * Represent a Voldemort node configuration
- */
-class Node
-{
-public:
- /**
- * Construct a new Node object
- *
- * @param id the node ID
- * @param host the hostname for the node
- * @param httpPort the HTTP port for the node
- * @param socketPort the socket port for the node
- * @param adminPort the admin port for the node
- * @param partitions the list of partitions hosted on the node
- */
- Node(int id,
- std::string& host,
- int httpPort,
- int socketPort,
- int adminPort,
- shared_ptr<std::list<int> >& partitions);
- Node();
- ~Node() {}
-
- /**
- * Get the Node ID for this node
- *
- * @return the node ID
- */
- int getId() { return id_; }
-
- /**
- * Get the host name for this node
- *
- * @return the host name
- */
- const std::string& getHost() { return host_; }
-
- /**
- * Get the HTTP port for this node
- *
- * @return the HTTP port
- */
- int getHttpPort() { return httpPort_; }
-
- /**
- * Get the socket port for this node
- *
- * @return the socket port
- */
- int getSocketPort() { return socketPort_; }
-
- /**
- * Get the admin port for this node
- *
- * @return the admin port
- */
- int getAdminPort() { return adminPort_; }
-
-
-private:
- friend class Cluster;
-
- int id_;
- std::string host_;
- int httpPort_;
- int socketPort_;
- int adminPort_;
- shared_ptr<std::list<int> > partitions_;
-};
/**
* Represent a Voldemort cluster configuration
@@ -136,13 +66,27 @@ class Cluster
int getNodeCount() { return nodesById.size(); }
/**
+ * Type used for node map
+ */
+ typedef std::map<int, boost::shared_ptr<Node> > nodeMap;
+
+ /**
* Get the node map
*
* @return the node map
*/
- const std::map<int, boost::shared_ptr<Node> >* getNodeMap()
+ const nodeMap* getNodeMap()
{ return &nodesById; }
+ /**
+ * Stream insertion operator for cluster
+ *
+ * @param output the stream
+ * @param cluster the cluster object
+ * @return the stream
+ */
+ friend std::ostream& operator<<(std::ostream& output, const Cluster& cluster);
+
private:
static void startElement(void* data, const XML_Char*el, const XML_Char **attr);
static void endElement(void* data, const XML_Char *el);
View
52 bindings/cpp/src/include/Node.h
@@ -94,6 +94,54 @@ class Node
*/
int getAdminPort() { return adminPort_; }
+ /**
+ * Return whether the node is current available in the node list.
+ *
+ * @return true if the node is available
+ */
+ bool isAvailable() { return isAvailable_; }
+
+ /**
+ * Return whether the node is current available in the node list,
+ * or if its unavailable but the timeout period specified has
+ * elapsed.
+ *
+ * @return true if the node is available
+ */
+ bool isAvailable(uint64_t timeout);
+
+ /**
+ * Set whether the node is available
+ *
+ * @param avail true if the node is available, false otherwise
+ */
+ void setAvailable(bool avail);
+
+ /**
+ * Get the system time in milliseconds when this node was last
+ * checked.
+ *
+ * @return the time in milliseconds
+ */
+ uint64_t getLastChecked() { return lastChecked_; }
+
+ /**
+ * Get the number of milliseconds since the last time this node
+ * was checked for availability.
+ *
+ * @return the number of milliseconds since we last checked for
+ * this node.
+ */
+ uint64_t getMsSinceLastChecked();
+
+ /**
+ * Stream insertion operator for cluster
+ *
+ * @param output the stream
+ * @param node the node object
+ * @return the stream
+ */
+ friend std::ostream& operator<<(std::ostream& output, const Node& node);
private:
friend class Cluster;
@@ -104,6 +152,10 @@ class Node
int socketPort_;
int adminPort_;
shared_ptr<std::list<int> > partitions_;
+
+ /* Node status */
+ bool isAvailable_;
+ uint64_t lastChecked_;
};
View
58 bindings/cpp/src/include/RoundRobinRoutingStrategy.h
@@ -0,0 +1,58 @@
+/* -*- C++ -*-; c-basic-offset: 4; indent-tabs-mode: nil */
+/*!
+ * @file RoundRobinRoutingStrategy.h
+ * @brief Interface definition file for RoundRobinRoutingStrategy
+ */
+/* Copyright (c) 2009 Webroot Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+#ifndef ROUNDROBINROUTINGSTRATEGY_H
+#define ROUNDROBINROUTINGSTRATEGY_H
+
+#include "RoutingStrategy.h"
+
+namespace Voldemort {
+
+using namespace boost;
+
+/**
+ * Round robin routing strategy. Will route requests to
+ */
+class RoundRobinRoutingStrategy : public RoutingStrategy
+{
+public:
+ /**
+ * Construct a new round robin routing strategy object
+ *
+ * @param config the @ref ClientConfig object
+ * @param clust the @ref Cluster object
+ */
+ RoundRobinRoutingStrategy(shared_ptr<ClientConfig>& config,
+ shared_ptr<Cluster>& clust)
+ : RoutingStrategy(config, clust),
+ startIterator(cluster->getNodeMap()->begin()) { }
+
+ // RoutingStrategy interface
+ virtual ~RoundRobinRoutingStrategy() { }
+ virtual shared_ptr<std::list<shared_ptr<Node> > >
+ routeRequest(const std::string& key);
+
+private:
+ Cluster::nodeMap::const_iterator startIterator;
+};
+
+} /* namespace Voldemort */
+
+#endif /* ROUNDROBINROUTINGSTRATEGY_H */
View
6 bindings/cpp/src/include/RoutedStore.h
@@ -24,6 +24,7 @@
#include <voldemort/Store.h>
#include <voldemort/ClientConfig.h>
#include "Cluster.h"
+#include "RoutingStrategy.h"
#include <map>
#include <boost/shared_ptr.hpp>
@@ -49,12 +50,14 @@ class RoutedStore: public Store
* routed store
* @param map a mapping from node ID to Store used for routing
* @param pool the thread pool to use
+ * @param routingStrat the routing strategy to use
*/
RoutedStore(const std::string& storeName,
shared_ptr<ClientConfig>& config,
shared_ptr<Cluster>& clust,
shared_ptr<std::map<int, shared_ptr<Store> > >& map,
- shared_ptr<threadpool::pool> pool);
+ shared_ptr<threadpool::pool>& pool,
+ shared_ptr<RoutingStrategy>& routingStrat);
virtual ~RoutedStore();
// Store interface
@@ -72,6 +75,7 @@ class RoutedStore: public Store
shared_ptr<ClientConfig> clientConfig;
shared_ptr<std::map<int, shared_ptr<Store> > > clusterMap;
shared_ptr<threadpool::pool> threadPool;
+ shared_ptr<RoutingStrategy> routingStrategy;
};
} /* namespace Voldemort */
View
84 bindings/cpp/src/include/RoutingStrategy.h
@@ -0,0 +1,84 @@
+/* -*- C++ -*-; c-basic-offset: 4; indent-tabs-mode: nil */
+/*!
+ * @file RoutingStrategy.h
+ * @brief Interface definition file for RoutingStrategy
+ */
+/* Copyright (c) 2009 Webroot Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+#ifndef ROUTINGSTRATEGY_H
+#define ROUTINGSTRATEGY_H
+
+#include <voldemort/ClientConfig.h>
+#include "Cluster.h"
+#include <map>
+
+#include <boost/shared_ptr.hpp>
+
+namespace Voldemort {
+
+using namespace boost;
+
+/**
+ * Interface for methods to for routing requests to Voldemort nodes.
+ */
+class RoutingStrategy
+{
+public:
+ /**
+ * Construct a new routing strategy object
+ *
+ * @param config the @ref ClientConfig object
+ * @param clust the @ref Cluster object
+ */
+ RoutingStrategy(shared_ptr<ClientConfig>& config,
+ shared_ptr<Cluster>& clust)
+ : cluster(clust), clientConfig(config) { }
+
+ virtual ~RoutingStrategy() { }
+
+ /**
+ * Preferred node list
+ */
+ typedef std::list<shared_ptr<Node> > prefList;
+
+ /**
+ * Shared pointer to a @ref prefList
+ */
+ typedef shared_ptr<std::list<shared_ptr<Node> > > prefListp;
+
+ /**
+ * Route the request and return the preferred node list
+ *
+ * @param key the key
+ * @return The node list
+ */
+ virtual prefListp routeRequest(const std::string& key) = 0;
+
+protected:
+ /**
+ * The @ref Cluster object for this cluster
+ */
+ shared_ptr<Cluster> cluster;
+
+ /**
+ * The @ref ClientConfig object for this instance
+ */
+ shared_ptr<ClientConfig> clientConfig;
+};
+
+} /* namespace Voldemort */
+
+#endif /* ROUTINGSTRATEGY_H */
View
10 bindings/cpp/src/include/VectorClock.h
@@ -100,7 +100,15 @@ class VectorClock: public Version
*/
uint64_t getTimestamp();
- friend std::ostream& operator<<(std::ostream&, const VectorClock& );
+ /**
+ * Stream insertion operator for cluster
+ *
+ * @param output the stream
+ * @param vc the @ref VectorClock object
+ * @return the stream
+ */
+ friend std::ostream& operator<<(std::ostream& output,
+ const VectorClock& vc);
private:
// Disable copy constructor
Please sign in to comment.
Something went wrong with that request. Please try again.