Permalink
Browse files

Add support for protocol negotiation

  • Loading branch information...
1 parent 71c29d0 commit 6f9d1cfb64beb60c4e4063c673ef1551393460d5 @readams readams committed Jun 22, 2009
@@ -34,12 +34,13 @@ using namespace std;
using namespace boost;
using asio::ip::tcp;
-Connection::Connection(string& hostName,
- string& portNum,
+Connection::Connection(const string& hostName,
+ const string& portNum,
+ const string& negString,
shared_ptr<ClientConfig>& conf)
- : config(conf), host(hostName), port(portNum), io_service(),
- resolver(io_service), timer(io_service), socket(io_service),
- connbuf(NULL), connstream(NULL), active(false) {
+ : config(conf), host(hostName), port(portNum), negotiationString(negString),
+ io_service(), resolver(io_service), timer(io_service),
+ socket(io_service), connbuf(NULL), connstream(NULL), active(false) {
}
Connection::~Connection() {
@@ -123,6 +124,19 @@ void Connection::handle_connect(const system::error_code& err,
tv.tv_usec = (to - tv.tv_sec*1000) * 1000;
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
+
+ /* Negotiate protocol for the open connection */
+ write(negotiationString.c_str(), negotiationString.length());
+ char res_buffer[2];
+ size_t got = 0;
+ while (got < 2) {
+ got += read_some(res_buffer, 2-got);
+ }
+ if (res_buffer[0] != 'o' || res_buffer[1] != 'k') {
+ throw UnreachableStoreException("Failed to negotiate "
+ "protocol with server");
+ }
+
} else if (endpoint_iterator != tcp::resolver::iterator()) {
// The connection failed. Try the next endpoint in the list.
socket.close();
@@ -41,7 +41,8 @@ ConnectionPool::ConnectionPool(shared_ptr<ClientConfig>& config)
}
-shared_ptr<Connection>& ConnectionPool::checkout(string& host, int port) {
+shared_ptr<Connection>& ConnectionPool::checkout(const string& host, int port,
+ const string& negString) {
stringstream hostKey;
hostKey << host << ":" << port;
@@ -86,7 +87,7 @@ shared_ptr<Connection>& ConnectionPool::checkout(string& host, int port) {
portStr << port;
std::string portString = portStr.str();
//cout << "Creating connection " << hostKey.str() << endl;
- shared_ptr<Connection> conn(new Connection(host, portString,
+ shared_ptr<Connection> conn(new Connection(host, portString, negString,
clientConfig));
(*hep)[(size_t)conn.get()] = make_pair((int)STATUS_UNINIT, conn);
totalConnections += 1;
@@ -226,4 +226,9 @@ bool ProtocolBuffersRequestFormat::readDeleteResponse(std::istream* inputStream)
return res.success();
}
+const std::string& ProtocolBuffersRequestFormat::getNegotiationString() {
+ static const std::string PROTOCOL_BUFFERS_NEG_STRING("pb0");
+ return PROTOCOL_BUFFERS_NEG_STRING;
+}
+
} /* namespace Voldemort */
@@ -48,7 +48,10 @@ SocketStore::~SocketStore() {
std::list<VersionedValue>* SocketStore::get(const std::string& key) {
try {
- ConnectionPoolSentinel conn(connPool->checkout(host, port), connPool);
+ ConnectionPoolSentinel
+ conn(connPool->checkout(host, port,
+ request->getNegotiationString()),
+ connPool);
std::iostream& sstream = conn->get_io_stream();
request->writeGetRequest(&sstream,
@@ -65,7 +68,10 @@ std::list<VersionedValue>* SocketStore::get(const std::string& key) {
void SocketStore::put(const std::string& key, const VersionedValue& value) {
try {
- ConnectionPoolSentinel conn(connPool->checkout(host, port), connPool);
+ ConnectionPoolSentinel
+ conn(connPool->checkout(host, port,
+ request->getNegotiationString()),
+ connPool);
std::iostream& sstream = conn->get_io_stream();
request->writePutRequest(&sstream,
@@ -84,7 +90,10 @@ void SocketStore::put(const std::string& key, const VersionedValue& value) {
bool SocketStore::deleteKey(const std::string& key, const Version& version) {
try {
- ConnectionPoolSentinel conn(connPool->checkout(host, port), connPool);
+ ConnectionPoolSentinel
+ conn(connPool->checkout(host, port,
+ request->getNegotiationString()),
+ connPool);
std::iostream& sstream = conn->get_io_stream();
request->writeDeleteRequest(&sstream,
@@ -243,4 +243,9 @@ bool VoldemortNativeRequestFormat::readDeleteResponse(std::istream* inputStream)
throw VoldemortException("Not implemented");
}
+const std::string& VoldemortNativeRequestFormat::getNegotiationString() {
+ static const std::string VOLDEMORT_NEG_STRING("vp0");
+ return VOLDEMORT_NEG_STRING;
+}
+
} /* namespace Voldemort */
@@ -48,9 +48,11 @@ class Connection {
*
* @param hostName the host to connect to
* @param portNum the port to connect to
+ * @param negString the protocol negotiation string
* @param conf the client config object */
- Connection(std::string& hostName,
- std::string& portNum,
+ Connection(const std::string& hostName,
+ const std::string& portNum,
+ const std::string& negString,
shared_ptr<ClientConfig>& conf);
~Connection();
@@ -132,6 +134,7 @@ class Connection {
std::string host;
std::string port;
+ std::string negotiationString;
asio::io_service io_service;
tcp::resolver resolver;
@@ -57,9 +57,11 @@ class ConnectionPool
*
* @param host the host name
* @param port the port number
+ * @param negString the protocol negotiation string
* @return a shared pointer to the connection
*/
- shared_ptr<Connection>& checkout(string& host, int port);
+ shared_ptr<Connection>& checkout(const string& host, int port,
+ const string& negString);
/**
* Get a connection for the given host and port. If the maximum
@@ -68,6 +68,7 @@ class ProtocolBuffersRequestFormat: public RequestFormat
const VectorClock* version,
bool shouldReroute);
virtual bool readDeleteResponse(std::istream* inputStream);
+ virtual const std::string& getNegotiationString();
};
} /* namespace Voldemort */
@@ -44,8 +44,10 @@ class RequestFormat
* of these will be necessarily implemented by the C++ client.
*/
enum RequestFormatType {
- /** The Voldemort native protocol */
+ /** The Version 0 native protocol */
VOLDEMORT,
+ /** The Version 1 native protocol */
+ VOLDEMORT_V1,
/** Protocol buffers */
PROTOCOL_BUFFERS,
/** Admin request handler protocol */
@@ -153,6 +155,13 @@ class RequestFormat
* @return a newly-alloced RequestFormat object
*/
static RequestFormat* newRequestFormat(RequestFormatType type);
+
+ /**
+ * Get the protocol negotiation string for the request format
+ *
+ * @return the string for negotiating the protocol with the server
+ */
+ virtual const std::string& getNegotiationString() = 0;
};
} /* namespace Voldemort */
@@ -63,6 +63,7 @@ class VoldemortNativeRequestFormat: public RequestFormat
const VectorClock* version,
bool shouldReroute);
virtual bool readDeleteResponse(std::istream* inputStream);
+ virtual const std::string& getNegotiationString();
};
} /* namespace Voldemort */

0 comments on commit 6f9d1cf

Please sign in to comment.