From 4972b29a5f57678dbb2fd314b1ddf9f639ae958f Mon Sep 17 00:00:00 2001 From: lfili Date: Fri, 20 Sep 2013 11:47:16 +0200 Subject: [PATCH 1/2] Cleaning code and HRCPP-9 --- include/infinispan/hotrod/Configuration.h | 24 ------------------- include/infinispan/hotrod/exceptions.h | 7 +++--- src/hotrod/api/Configuration.cpp | 13 ---------- src/hotrod/api/exceptions.cpp | 10 +++++--- .../impl/configuration/Configuration.cpp | 2 +- .../configuration/ConfigurationBuilder.cpp | 2 +- .../impl/operations/RetryOnFailureOperation.h | 4 +++- src/hotrod/impl/transport/tcp/Socket.cpp | 1 - .../transport/tcp/TcpTransportFactory.cpp | 5 ++-- src/hotrod/sys/BasicMarshaller.h | 2 +- 10 files changed, 19 insertions(+), 51 deletions(-) delete mode 100644 include/infinispan/hotrod/Configuration.h delete mode 100644 src/hotrod/api/Configuration.cpp diff --git a/include/infinispan/hotrod/Configuration.h b/include/infinispan/hotrod/Configuration.h deleted file mode 100644 index 9b898177..00000000 --- a/include/infinispan/hotrod/Configuration.h +++ /dev/null @@ -1,24 +0,0 @@ -#ifndef ISPN_HOTROD_CONFIGURATION_H -#define ISPN_HOTROD_CONFIGURATION_H - -#include - -namespace infinispan { -namespace hotrod { - -// TODO -class Configuration -{ - public: - // TODO: spostare (java ConfigurationProperties) - static const char* PROTOCOL_VERSION_12; - - const std::string & getProtocolVersion() const; - - private: - std::string protocolVersion; -}; - -}} // namespace - -#endif /* ISPN_HOTROD_CONFIGURATION_H */ diff --git a/include/infinispan/hotrod/exceptions.h b/include/infinispan/hotrod/exceptions.h index 1fac3d14..c0d5775e 100644 --- a/include/infinispan/hotrod/exceptions.h +++ b/include/infinispan/hotrod/exceptions.h @@ -4,7 +4,6 @@ #include "infinispan/hotrod/ImportExport.h" -#include "hotrod/impl/transport/tcp/InetSocketAddress.h" #include #include @@ -33,12 +32,14 @@ struct HR_EXTERN HotRodClientException : public Exception struct HR_EXTERN TransportException : public HotRodClientException { - transport::InetSocketAddress serverAddress; + std::string host; + int port; TransportException(const std::string& host, int port, const std::string&); ~TransportException() throw(); - const transport::InetSocketAddress& getServerAddress() const; + const std::string& getHost() const; + int getPort() const; }; struct HR_EXTERN InvalidResponseException : public HotRodClientException diff --git a/src/hotrod/api/Configuration.cpp b/src/hotrod/api/Configuration.cpp deleted file mode 100644 index 0faf3808..00000000 --- a/src/hotrod/api/Configuration.cpp +++ /dev/null @@ -1,13 +0,0 @@ -#include "infinispan/hotrod/Configuration.h" - -namespace infinispan { -namespace hotrod { - -const char* Configuration::PROTOCOL_VERSION_12 = "1.2"; - -const std::string& Configuration::getProtocolVersion() const -{ - return protocolVersion; -} - -}} /* namespace */ diff --git a/src/hotrod/api/exceptions.cpp b/src/hotrod/api/exceptions.cpp index 7852ad39..98be8c5a 100644 --- a/src/hotrod/api/exceptions.cpp +++ b/src/hotrod/api/exceptions.cpp @@ -11,12 +11,16 @@ const char* Exception::what() const throw() { return message.c_str(); } HotRodClientException::HotRodClientException(const std::string& msg) : Exception(msg) {} -TransportException::TransportException(const std::string& host, int port, const std::string& msg) -: HotRodClientException(msg), serverAddress(host, port) {} +TransportException::TransportException(const std::string& h, int p, const std::string& msg) +: HotRodClientException(msg), host(h), port(p) {} TransportException::~TransportException() throw() {} -const transport::InetSocketAddress& TransportException::getServerAddress() const { return serverAddress;} +const std::string& TransportException::getHost() const { return host;} +int TransportException::getPort() const { return port;} + + +//const transport::InetSocketAddress& TransportException::getServerAddress() const { return serverAddress;} InvalidResponseException::InvalidResponseException(const std::string& msg) : HotRodClientException(msg) {} diff --git a/src/hotrod/impl/configuration/Configuration.cpp b/src/hotrod/impl/configuration/Configuration.cpp index a2dcddc9..6cc29528 100644 --- a/src/hotrod/impl/configuration/Configuration.cpp +++ b/src/hotrod/impl/configuration/Configuration.cpp @@ -7,7 +7,7 @@ namespace hotrod { const char* Configuration::PROTOCOL_VERSION_12 = "1.2"; -Configuration::Configuration(std::string protocolVersion_, +Configuration::Configuration(std::string /*protocolVersion*/, const ConnectionPoolConfiguration& cpc, int connTimeout, bool forceReturnVal, diff --git a/src/hotrod/impl/configuration/ConfigurationBuilder.cpp b/src/hotrod/impl/configuration/ConfigurationBuilder.cpp index bd55b8a0..a8675936 100644 --- a/src/hotrod/impl/configuration/ConfigurationBuilder.cpp +++ b/src/hotrod/impl/configuration/ConfigurationBuilder.cpp @@ -147,7 +147,7 @@ Configuration ConfigurationBuilder::create() internalKeySizeEstimate); } -ConfigurationBuilder& ConfigurationBuilder::read(Configuration& bean) +ConfigurationBuilder& ConfigurationBuilder::read(Configuration& /*bean*/) { // FIXME: read pool, ssl and server configs internalProtocolVersion = bean.getProtocolVersion(); diff --git a/src/hotrod/impl/operations/RetryOnFailureOperation.h b/src/hotrod/impl/operations/RetryOnFailureOperation.h index 93ce2523..e4f9e8ec 100644 --- a/src/hotrod/impl/operations/RetryOnFailureOperation.h +++ b/src/hotrod/impl/operations/RetryOnFailureOperation.h @@ -5,6 +5,7 @@ #include "hotrod/impl/operations/HotRodOperation.h" #include "hotrod/impl/transport/TransportFactory.h" +#include "hotrod/impl/transport/tcp/InetSocketAddress.h" namespace infinispan { namespace hotrod { @@ -26,7 +27,8 @@ template class RetryOnFailureOperation : public HotRodOperation } catch(const TransportException& te) { // Invalidate transport since this exception means that this // instance is no longer usable and should be destroyed - transportFactory->invalidateTransport(te.getServerAddress(), transport); + transport::InetSocketAddress isa(te.getHost(),te.getPort()); + transportFactory->invalidateTransport(isa, transport); // TODO: error management //releaseTransport(transport); logErrorAndThrowExceptionIfNeeded(retryCount, te); diff --git a/src/hotrod/impl/transport/tcp/Socket.cpp b/src/hotrod/impl/transport/tcp/Socket.cpp index 46f819f0..ebb89dfa 100644 --- a/src/hotrod/impl/transport/tcp/Socket.cpp +++ b/src/hotrod/impl/transport/tcp/Socket.cpp @@ -21,7 +21,6 @@ Socket::Socket() : {} void Socket::connect(const std::string& host, int port) { - std::cout << "host " << host << " port " << port << std::endl; socket.connect(host, port); } diff --git a/src/hotrod/impl/transport/tcp/TcpTransportFactory.cpp b/src/hotrod/impl/transport/tcp/TcpTransportFactory.cpp index 0ab6ef0c..6db29848 100644 --- a/src/hotrod/impl/transport/tcp/TcpTransportFactory.cpp +++ b/src/hotrod/impl/transport/tcp/TcpTransportFactory.cpp @@ -125,9 +125,8 @@ void TcpTransportFactory::pingServers() { void TcpTransportFactory::updateTransportCount() { unsigned int maxActive = connectionPool->getMaxActive(); if (maxActive > 0) { - transportCount = (maxActive * servers.size() > maxActive) ? - maxActive * servers.size() : maxActive; - //to avoid int overflow when maxActive is very high! + transportCount = maxActive * servers.size(); + // TODO: in java code avoid int overflow when maxActive is very high! } else { transportCount = 10 * servers.size(); } diff --git a/src/hotrod/sys/BasicMarshaller.h b/src/hotrod/sys/BasicMarshaller.h index cf06e0dd..e4af510c 100644 --- a/src/hotrod/sys/BasicMarshaller.h +++ b/src/hotrod/sys/BasicMarshaller.h @@ -47,7 +47,7 @@ class BasicMarshaller : public infinispan::hotrod::Marshaller { for (int i = 0 ; i < 4 ; i++) { buf[3-i] = (char) ((s) >> (8*i)); } - b.set(buf, 4, &BasicMarshallerHelper::noRelease); + b.set(buf, 4); } int* unmarshall(const ScopedBuffer& b) { int result = 0; From 0ed1af087ff8d589e341327faa1b06626ebf9027 Mon Sep 17 00:00:00 2001 From: lfili Date: Wed, 25 Sep 2013 12:17:32 +0200 Subject: [PATCH 2/2] HRCPP-9 partial: resolved inclusion of InetSocketAddress.h + cleaning (Configuration issues in comments) HRCPP-6 multithread support, changed implementation with one instance of RemoteCacheImpl (like java) --- include/infinispan/hotrod/RemoteCache.h | 20 ++- include/infinispan/hotrod/RemoteCacheBase.h | 11 +- .../infinispan/hotrod/RemoteCacheManager.h | 19 +-- include/infinispan/hotrod/exceptions.h | 11 ++ src/hotrod/api/RemoteCacheBase.cpp | 41 ++---- src/hotrod/api/RemoteCacheManager.cpp | 17 ++- src/hotrod/api/exceptions.cpp | 4 + src/hotrod/impl/RemoteCacheImpl.cpp | 73 ++++++---- src/hotrod/impl/RemoteCacheImpl.h | 36 ++--- src/hotrod/impl/RemoteCacheManagerImpl.cpp | 78 +++++++--- src/hotrod/impl/RemoteCacheManagerImpl.h | 29 ++-- .../impl/configuration/Configuration.cpp | 2 +- .../configuration/ConfigurationBuilder.cpp | 2 +- .../impl/operations/AbstractKeyOperation.h | 2 +- .../operations/AbstractKeyValueOperation.h | 2 +- .../impl/operations/BulkGetKeysOperation.cpp | 2 +- .../impl/operations/BulkGetKeysOperation.h | 2 +- .../impl/operations/BulkGetOperation.cpp | 2 +- src/hotrod/impl/operations/BulkGetOperation.h | 2 +- src/hotrod/impl/operations/ClearOperation.cpp | 2 +- src/hotrod/impl/operations/ClearOperation.h | 2 +- .../impl/operations/ContainsKeyOperation.cpp | 2 +- .../impl/operations/ContainsKeyOperation.h | 2 +- .../operations/FaultTolerantPingOperation.cpp | 2 +- .../operations/FaultTolerantPingOperation.h | 2 +- src/hotrod/impl/operations/GetOperation.cpp | 2 +- src/hotrod/impl/operations/GetOperation.h | 2 +- .../operations/GetWithMetadataOperation.cpp | 2 +- .../operations/GetWithMetadataOperation.h | 2 +- .../impl/operations/OperationsFactory.cpp | 136 ++++++++---------- .../impl/operations/OperationsFactory.h | 6 +- .../impl/operations/PutIfAbsentOperation.cpp | 2 +- .../impl/operations/PutIfAbsentOperation.h | 2 +- src/hotrod/impl/operations/PutOperation.cpp | 2 +- src/hotrod/impl/operations/PutOperation.h | 2 +- .../RemoveIfUnmodifiedOperation.cpp | 2 +- .../operations/RemoveIfUnmodifiedOperation.h | 2 +- .../impl/operations/RemoveOperation.cpp | 2 +- src/hotrod/impl/operations/RemoveOperation.h | 2 +- .../ReplaceIfUnmodifiedOperation.cpp | 2 +- .../operations/ReplaceIfUnmodifiedOperation.h | 2 +- .../impl/operations/ReplaceOperation.cpp | 2 +- src/hotrod/impl/operations/ReplaceOperation.h | 2 +- .../impl/operations/RetryOnFailureOperation.h | 4 +- src/hotrod/impl/operations/StatsOperation.cpp | 2 +- src/hotrod/impl/operations/StatsOperation.h | 2 +- src/hotrod/impl/protocol/CodecFactory.cpp | 14 +- src/hotrod/impl/protocol/CodecFactory.h | 2 +- .../transport/tcp/GenericKeyedObjectPool.h | 30 ++-- .../tcp/GenericKeyedObjectPoolFactory.h | 11 +- .../tcp/PropsKeyedObjectPoolFactory.h | 2 +- .../transport/tcp/TcpTransportFactory.cpp | 60 +++++--- .../impl/transport/tcp/TcpTransportFactory.h | 10 +- 53 files changed, 400 insertions(+), 278 deletions(-) diff --git a/include/infinispan/hotrod/RemoteCache.h b/include/infinispan/hotrod/RemoteCache.h index ce32a677..1a6bf632 100644 --- a/include/infinispan/hotrod/RemoteCache.h +++ b/include/infinispan/hotrod/RemoteCache.h @@ -18,9 +18,6 @@ namespace hotrod { template class RemoteCache : private RemoteCacheBase { public: - RemoteCache(const RemoteCache &other): RemoteCacheBase(this, other), - keyMarshaller(other.keyMarshaller), valueMarshaller(other.valueMarshaller) {} - V* get(const K& key) { ScopedBuffer vbuf; base_get(&key, &vbuf); @@ -140,10 +137,25 @@ template class RemoteCache : private RemoteCacheBase return *this; } + RemoteCache(const RemoteCache &other) : + RemoteCacheBase(other), keyMarshaller(other.keyMarshaller), valueMarshaller(other.valueMarshaller) + { + setMarshallers(this, &keyMarshall, &valueMarshall, &keyUnmarshall, &valueUnmarshall); + } + + RemoteCache& operator=(const RemoteCache& other) { + RemoteCacheBase::operator=(other); + keyMarshaller = other.keyMarshaller; + valueMarshaller = other.valueMarshaller; + setMarshallers(this, &keyMarshall, &valueMarshall, &keyUnmarshall, &valueUnmarshall); + return *this; + } + private: - RemoteCache(const std::string& name) : RemoteCacheBase(name) { + RemoteCache() : RemoteCacheBase() { setMarshallers(this, &keyMarshall, &valueMarshall, &keyUnmarshall, &valueUnmarshall); } + // type-hiding and resurrecting support static void keyMarshall(void *thisp, const void* key, void* buf) { ((RemoteCache *) thisp)->keyMarshaller->marshall(*(const K *) key, *(ScopedBuffer *) buf); diff --git a/include/infinispan/hotrod/RemoteCacheBase.h b/include/infinispan/hotrod/RemoteCacheBase.h index 89207449..b2cd45bb 100644 --- a/include/infinispan/hotrod/RemoteCacheBase.h +++ b/include/infinispan/hotrod/RemoteCacheBase.h @@ -19,6 +19,8 @@ class OperationsFactory; } class RemoteCacheImpl; +class RemoteCacheManagerImpl; + typedef void (*MarshallHelperFn) (void*, const void*, void*); typedef void* (*UnmarshallHelperFn) (void*, const void*); @@ -42,16 +44,13 @@ class HR_EXTERN RemoteCacheBase void base_clear(); void base_withFlags(Flag flag); - void init(const std::string& name, operations::OperationsFactory* operationFactory); + void init(operations::OperationsFactory* operationFactory); - protected: - RemoteCacheBase(const std::string& name); - RemoteCacheBase(void *newRemoteCachePtr, const RemoteCacheBase &); + protected: + RemoteCacheBase(); void setMarshallers(void* rc, MarshallHelperFn kf, MarshallHelperFn vf, UnmarshallHelperFn ukf, UnmarshallHelperFn uvf); private: - RemoteCacheBase(const RemoteCacheBase &); - void *remoteCachePtr; MarshallHelperFn baseKeyMarshallFn; MarshallHelperFn baseValueMarshallFn; diff --git a/include/infinispan/hotrod/RemoteCacheManager.h b/include/infinispan/hotrod/RemoteCacheManager.h index c3ea6b1c..46bb5e24 100644 --- a/include/infinispan/hotrod/RemoteCacheManager.h +++ b/include/infinispan/hotrod/RemoteCacheManager.h @@ -29,24 +29,25 @@ class HR_EXTERN RemoteCacheManager : public Handle void start(); void stop(); bool isStarted(); + // TODO: change to std::map? const Configuration& getConfiguration(); template RemoteCache getCache( bool forceReturnValue = false) { - RemoteCache rcache(""); + RemoteCache rcache; initCache(rcache, forceReturnValue); rcache.keyMarshaller.reset(new sys::BasicMarshaller()); rcache.valueMarshaller.reset(new sys::BasicMarshaller()); return rcache; - } + } template RemoteCache getCache( const std::string& name, bool forceReturnValue = false) { - RemoteCache rcache(name); - initCache(rcache, forceReturnValue); + RemoteCache rcache; + initCache(rcache, name, forceReturnValue); rcache.keyMarshaller.reset(new sys::BasicMarshaller()); rcache.valueMarshaller.reset(new sys::BasicMarshaller()); return rcache; @@ -56,18 +57,19 @@ class HR_EXTERN RemoteCacheManager : public Handle HR_SHARED_PTR > km, HR_SHARED_PTR > vm, bool forceReturnValue = false) { - RemoteCache rcache(""); + RemoteCache rcache; initCache(rcache, forceReturnValue); rcache.keyMarshaller = km; rcache.valueMarshaller = vm; - return rcache; } + return rcache; + } template RemoteCache getCache( HR_SHARED_PTR > km, HR_SHARED_PTR > vm, const std::string& name, bool forceReturnValue = false) { - RemoteCache rcache(name); - initCache(rcache, forceReturnValue); + RemoteCache rcache; + initCache(rcache, name, forceReturnValue); rcache.keyMarshaller = km; rcache.valueMarshaller = vm; return rcache; @@ -75,6 +77,7 @@ class HR_EXTERN RemoteCacheManager : public Handle private: void initCache(RemoteCacheBase& cache, bool forceReturnValue); + void initCache(RemoteCacheBase& cache, const std::string& name, bool forceReturnValue); // not implemented RemoteCacheManager(const RemoteCacheManager&); diff --git a/include/infinispan/hotrod/exceptions.h b/include/infinispan/hotrod/exceptions.h index c0d5775e..02f7d942 100644 --- a/include/infinispan/hotrod/exceptions.h +++ b/include/infinispan/hotrod/exceptions.h @@ -57,6 +57,17 @@ struct HR_EXTERN InternalException : public HotRodClientException InternalException(const std::string&); }; +struct HR_EXTERN RemoteCacheManagerNotStartedException : public HotRodClientException +{ + RemoteCacheManagerNotStartedException(const std::string&); +}; + +// not existent in java code +struct HR_EXTERN RemoteCacheNotExistException : public HotRodClientException +{ + RemoteCacheNotExistException(const std::string&); +}; + }} // namespace #endif /* ISPN_HOTROD_EXCEPTIONS_H */ diff --git a/src/hotrod/api/RemoteCacheBase.cpp b/src/hotrod/api/RemoteCacheBase.cpp index 0accc272..808910ad 100644 --- a/src/hotrod/api/RemoteCacheBase.cpp +++ b/src/hotrod/api/RemoteCacheBase.cpp @@ -1,27 +1,16 @@ #include "infinispan/hotrod/RemoteCacheBase.h" +#include "infinispan/hotrod/RemoteCacheManager.h" #include "hotrod/impl/RemoteCacheImpl.h" -//#include "hotrod/impl/MetadataValueImpl.h" #include namespace infinispan { namespace hotrod { -RemoteCacheBase::RemoteCacheBase(const std::string& name) : - Handle(new RemoteCacheImpl(*this, name)) {} +RemoteCacheBase::RemoteCacheBase() : Handle(NULL) {} -RemoteCacheBase::RemoteCacheBase(void *newRemoteCachePtr, const RemoteCacheBase &other): - Handle(new RemoteCacheImpl(*this, other.impl->getName())), - remoteCachePtr(newRemoteCachePtr), - baseKeyMarshallFn(other.baseKeyMarshallFn), - baseValueMarshallFn(other.baseValueMarshallFn), - baseKeyUnmarshallFn(other.baseKeyUnmarshallFn), - baseValueUnmarshallFn(other.baseValueUnmarshallFn) { - impl->init(*other.impl); -} - void RemoteCacheBase::setMarshallers(void* rc, MarshallHelperFn kf, MarshallHelperFn vf, UnmarshallHelperFn ukf, UnmarshallHelperFn uvf) { remoteCachePtr = rc; baseKeyMarshallFn = kf; @@ -46,59 +35,59 @@ void* RemoteCacheBase::baseValueUnmarshall(const void* buf) { return baseValueUnmarshallFn(remoteCachePtr, buf); } -void RemoteCacheBase::init(const std::string& name, operations::OperationsFactory* operationFactory) { - impl->init(name, operationFactory); +void RemoteCacheBase::init(operations::OperationsFactory* operationFactory) { + impl->init(operationFactory); } void RemoteCacheBase::base_get(const void *key, void *buf) { - impl->get(key, buf); + impl->get(*this, key, buf); } void RemoteCacheBase::base_put(const void *key, const void *val, int64_t life, int64_t idle, void *buf) { - impl->put(key, val, life, idle, buf); + impl->put(*this, key, val, life, idle, buf); } void RemoteCacheBase::base_putIfAbsent(const void *key, const void *val, int64_t life, int64_t idle, void *buf) { - impl->putIfAbsent(key, val, life, idle, buf); + impl->putIfAbsent(*this, key, val, life, idle, buf); } void RemoteCacheBase::base_replace(const void *key, const void *val, int64_t life, int64_t idle, void *buf) { - impl->replace(key, val, life, idle, buf); + impl->replace(*this, key, val, life, idle, buf); } void RemoteCacheBase::base_remove(const void *key, void *rbuf) { - impl->remove(key, rbuf); + impl->remove(*this, key, rbuf); } void RemoteCacheBase::base_containsKey(const void *key, bool *res){ - impl->containsKey(key, res); + impl->containsKey(*this, key, res); } void RemoteCacheBase::base_replaceWithVersion(const void *key, const void *value, int64_t version, int64_t life, int64_t idle, bool *res) { - impl->replaceWithVersion(key, value, version, life, idle, res); + impl->replaceWithVersion(*this, key, value, version, life, idle, res); } void RemoteCacheBase::base_removeWithVersion(const void *key, int64_t version, bool *res) { - impl->removeWithVersion(key, version, res); + impl->removeWithVersion(*this, key, version, res); } void RemoteCacheBase::base_getWithMetadata( const void *key, void* vbuf, MetadataValue* metadata) { - impl->getWithMetadata(key, vbuf, metadata); + impl->getWithMetadata(*this, key, vbuf, metadata); } void RemoteCacheBase::base_getBulk(int size, std::map* mbuf) { - impl->getBulk(size, mbuf); + impl->getBulk(*this, size, mbuf); } void RemoteCacheBase::base_keySet(int scope, std::set* result) { - impl->keySet(scope, result); + impl->keySet(*this, scope, result); } void RemoteCacheBase::base_stats(std::map* stats) diff --git a/src/hotrod/api/RemoteCacheManager.cpp b/src/hotrod/api/RemoteCacheManager.cpp index 9117910c..623f7ef3 100644 --- a/src/hotrod/api/RemoteCacheManager.cpp +++ b/src/hotrod/api/RemoteCacheManager.cpp @@ -20,11 +20,22 @@ RemoteCacheManager::RemoteCacheManager(bool start_) RemoteCacheManager::RemoteCacheManager(const std::map& properties, bool start_) - : Handle(new RemoteCacheManagerImpl(properties,start_)) { } + : Handle(new RemoteCacheManagerImpl(properties, start_)) { } void RemoteCacheManager::initCache( - RemoteCacheBase& cache, bool forceReturnValue) { - impl->initCache(*cache.impl, forceReturnValue); + RemoteCacheBase& cache, bool forceReturnValue) +{ + cache.impl = impl->createRemoteCache(forceReturnValue); + if(!impl) + throw RemoteCacheNotExistException("cache doesn't exist"); +} + +void RemoteCacheManager::initCache( + RemoteCacheBase& cache, const std::string& name, bool forceReturnValue) +{ + cache.impl = impl->createRemoteCache(name, forceReturnValue); + if(!impl) + throw RemoteCacheNotExistException("cache doesn't exist"); } void RemoteCacheManager::start() { diff --git a/src/hotrod/api/exceptions.cpp b/src/hotrod/api/exceptions.cpp index 98be8c5a..5c294163 100644 --- a/src/hotrod/api/exceptions.cpp +++ b/src/hotrod/api/exceptions.cpp @@ -29,4 +29,8 @@ RemoteNodeSuspectException::RemoteNodeSuspectException(const std::string& msg) : InternalException::InternalException(const std::string& msg) : HotRodClientException(msg) {} +RemoteCacheManagerNotStartedException::RemoteCacheManagerNotStartedException(const std::string& msg) : HotRodClientException(msg) {} + +RemoteCacheNotExistException::RemoteCacheNotExistException(const std::string& msg) : HotRodClientException(msg) {} + }} /* namespace */ diff --git a/src/hotrod/impl/RemoteCacheImpl.cpp b/src/hotrod/impl/RemoteCacheImpl.cpp index 497c46b5..72cc4c68 100644 --- a/src/hotrod/impl/RemoteCacheImpl.cpp +++ b/src/hotrod/impl/RemoteCacheImpl.cpp @@ -1,5 +1,7 @@ #include "hotrod/sys/types.h" +#include "hotrod/sys/Msg.h" #include "hotrod/impl/RemoteCacheImpl.h" +#include "hotrod/impl/RemoteCacheManagerImpl.h" #include "hotrod/impl/operations/OperationsFactory.h" #include "hotrod/impl/operations/GetOperation.h" #include "hotrod/impl/operations/PutOperation.h" @@ -29,13 +31,14 @@ namespace hotrod { using namespace operations; using namespace transport; using namespace protocol; +using namespace sys; -RemoteCacheImpl::RemoteCacheImpl(RemoteCacheBase& base, const std::string& n) - :remoteCacheBase(base), name(n) -{} +RemoteCacheImpl::RemoteCacheImpl(RemoteCacheManagerImpl& rcm, const std::string& n) + :remoteCacheManager(rcm), name(n) {} -void RemoteCacheImpl::get(const void *k, void* b) { - ScopedBuffer kbuf; +void RemoteCacheImpl::get(RemoteCacheBase& remoteCacheBase, const void *k, void* b) { + assertRemoteCacheManagerIsStarted(); + ScopedBuffer kbuf; ScopedBuffer& vbuf(*(ScopedBuffer *)b); remoteCacheBase.baseKeyMarshall(k, &kbuf); hrbytes keyBytes(kbuf.getBytes(), kbuf.getLength()); @@ -45,7 +48,8 @@ void RemoteCacheImpl::get(const void *k, void* b) { bytes.releaseTo(vbuf); } -void RemoteCacheImpl::put(const void *k, const void* v, uint64_t life, uint64_t idle, void* b) { +void RemoteCacheImpl::put(RemoteCacheBase& remoteCacheBase, const void *k, const void* v, uint64_t life, uint64_t idle, void* b) { + assertRemoteCacheManagerIsStarted(); ScopedBuffer kbuf, vbuf; ScopedBuffer& obuf(*(ScopedBuffer *)b); remoteCacheBase.baseKeyMarshall(k, &kbuf); @@ -59,7 +63,8 @@ void RemoteCacheImpl::put(const void *k, const void* v, uint64_t life, uint64_t bytes.releaseTo(obuf); } -void RemoteCacheImpl::putIfAbsent(const void *k, const void* v, uint64_t life, uint64_t idle, void* b) { +void RemoteCacheImpl::putIfAbsent(RemoteCacheBase& remoteCacheBase, const void *k, const void* v, uint64_t life, uint64_t idle, void* b) { + assertRemoteCacheManagerIsStarted(); ScopedBuffer kbuf, vbuf; ScopedBuffer& obuf(*(ScopedBuffer *)b); remoteCacheBase.baseKeyMarshall(k, &kbuf); @@ -72,13 +77,13 @@ void RemoteCacheImpl::putIfAbsent(const void *k, const void* v, uint64_t life, u bytes.releaseTo(obuf); } - -void RemoteCacheImpl::ping() { +PingResult RemoteCacheImpl::ping() { hr_scoped_ptr op(operationsFactory->newFaultTolerantPingOperation()); - op->execute(); + return op->execute(); } -void RemoteCacheImpl::replace(const void *k, const void* v, uint64_t life, uint64_t idle, void* b) { +void RemoteCacheImpl::replace(RemoteCacheBase& remoteCacheBase, const void *k, const void* v, uint64_t life, uint64_t idle, void* b) { + assertRemoteCacheManagerIsStarted(); ScopedBuffer kbuf, vbuf; ScopedBuffer& obuf(*(ScopedBuffer *)b); remoteCacheBase.baseKeyMarshall(k, &kbuf); @@ -91,7 +96,8 @@ void RemoteCacheImpl::replace(const void *k, const void* v, uint64_t life, uint6 bytes.releaseTo(obuf); } -void RemoteCacheImpl::remove(const void* k, void* b) { +void RemoteCacheImpl::remove(RemoteCacheBase& remoteCacheBase, const void* k, void* b) { + assertRemoteCacheManagerIsStarted(); ScopedBuffer kbuf; ScopedBuffer& vbuf(*(ScopedBuffer *)b); remoteCacheBase.baseKeyMarshall(k, &kbuf); @@ -102,7 +108,8 @@ void RemoteCacheImpl::remove(const void* k, void* b) { bytes.releaseTo(vbuf); } -void RemoteCacheImpl::containsKey(const void* k, bool* r) { +void RemoteCacheImpl::containsKey(RemoteCacheBase& remoteCacheBase, const void* k, bool* r) { + assertRemoteCacheManagerIsStarted(); ScopedBuffer kbuf; remoteCacheBase.baseKeyMarshall(k, &kbuf); hrbytes keyBytes(kbuf.getBytes(), kbuf.getLength()); @@ -111,9 +118,10 @@ void RemoteCacheImpl::containsKey(const void* k, bool* r) { *r = gco->execute(); } -void RemoteCacheImpl::replaceWithVersion( +void RemoteCacheImpl::replaceWithVersion(RemoteCacheBase& remoteCacheBase, const void* k, const void* v, uint64_t version, uint64_t life, uint64_t idle, bool* res) { + assertRemoteCacheManagerIsStarted(); ScopedBuffer kbuf, vbuf; remoteCacheBase.baseKeyMarshall(k, &kbuf); remoteCacheBase.baseValueMarshall(v, &vbuf); @@ -125,7 +133,8 @@ void RemoteCacheImpl::replaceWithVersion( *res = response.isUpdated(); } -void RemoteCacheImpl::removeWithVersion(const void* k, uint64_t version, bool* res) { +void RemoteCacheImpl::removeWithVersion(RemoteCacheBase& remoteCacheBase, const void* k, uint64_t version, bool* res) { + assertRemoteCacheManagerIsStarted(); ScopedBuffer kbuf; remoteCacheBase.baseKeyMarshall(k, &kbuf); hrbytes keyBytes(kbuf.getBytes(), kbuf.getLength()); @@ -135,9 +144,10 @@ void RemoteCacheImpl::removeWithVersion(const void* k, uint64_t version, bool* r *res = response.isUpdated(); } -void RemoteCacheImpl::getWithMetadata( +void RemoteCacheImpl::getWithMetadata(RemoteCacheBase& remoteCacheBase, const void *k, void* b, MetadataValue* metadata) { + assertRemoteCacheManagerIsStarted(); ScopedBuffer kbuf; ScopedBuffer& vbuf(*(ScopedBuffer *)b); remoteCacheBase.baseKeyMarshall(k, &kbuf); @@ -152,7 +162,8 @@ void RemoteCacheImpl::getWithMetadata( metadata->maxIdle = m.maxIdle; } -void RemoteCacheImpl::getBulk(int size, std::map* mbuf) { +void RemoteCacheImpl::getBulk(RemoteCacheBase& remoteCacheBase, int size, std::map* mbuf) { + assertRemoteCacheManagerIsStarted(); hr_scoped_ptr gco(operationsFactory->newBulkGetOperation(size)); std::map res = gco->execute(); for(std::map::iterator i = res.begin(); i != res.end(); i++) { @@ -165,7 +176,8 @@ void RemoteCacheImpl::getBulk(int size, std::map* mbuf) { } } -void RemoteCacheImpl::keySet(int scope, std::set* result) { +void RemoteCacheImpl::keySet(RemoteCacheBase& remoteCacheBase, int scope, std::set* result) { + assertRemoteCacheManagerIsStarted(); hr_scoped_ptr gco(operationsFactory->newBulkGetKeysOperation(scope)); std::set res = gco->execute(); for(std::set::const_iterator i = res.begin(); i != res.end(); i++) { @@ -177,11 +189,13 @@ void RemoteCacheImpl::keySet(int scope, std::set* result) { } void RemoteCacheImpl::stats(std::map* statistics) { + assertRemoteCacheManagerIsStarted(); hr_scoped_ptr gco(operationsFactory->newStatsOperation()); *statistics = gco->execute(); } void RemoteCacheImpl::clear() { + assertRemoteCacheManagerIsStarted(); hr_scoped_ptr gco(operationsFactory->newClearOperation()); gco->execute(); } @@ -190,19 +204,11 @@ const std::string& RemoteCacheImpl::getName() const { return name; } -void RemoteCacheImpl::init(const std::string& n, OperationsFactory* of) -{ - name = n; - operationsFactory = HR_SHARED_PTR(of); -} - -void RemoteCacheImpl::init(const RemoteCacheImpl &other) { - name = other.name; - operationsFactory = other.operationsFactory; +void RemoteCacheImpl::init(OperationsFactory* of) { + operationsFactory.reset(of); } -void RemoteCacheImpl::withFlags(Flag flags) -{ +void RemoteCacheImpl::withFlags(Flag flags) { operationsFactory->setFlags(flags); } @@ -215,4 +221,13 @@ void RemoteCacheImpl::applyDefaultExpirationFlags(uint64_t lifespan, uint64_t ma } } +void RemoteCacheImpl::assertRemoteCacheManagerIsStarted() { + if (!remoteCacheManager.isStarted()) { + // TODO: log + throw RemoteCacheManagerNotStartedException( + Msg() << "Cannot perform operations on a cache associated with an unstarted RemoteCacheManager. " + << "Use RemoteCacheManager.start before using the remote cache."); + } +} + }} /* namespace */ diff --git a/src/hotrod/impl/RemoteCacheImpl.h b/src/hotrod/impl/RemoteCacheImpl.h index 8c089c02..a3084003 100644 --- a/src/hotrod/impl/RemoteCacheImpl.h +++ b/src/hotrod/impl/RemoteCacheImpl.h @@ -7,6 +7,7 @@ #include "infinispan/hotrod/ScopedBuffer.h" #include "infinispan/hotrod/RemoteCacheBase.h" #include "hotrod/impl/MetadataValueImpl.h" +#include "hotrod/impl/operations/PingOperation.h" namespace infinispan { namespace hotrod { @@ -15,41 +16,42 @@ namespace operations { class OperationsFactory; } +class RemoteCacheManagerImpl; + class RemoteCacheImpl { public: - RemoteCacheImpl(RemoteCacheBase& base, const std::string& name); - void get(const void* key, void *buf); - void put(const void *key, const void* val, uint64_t life, uint64_t idle, void* b); - void putIfAbsent(const void *key, const void* val, uint64_t life, uint64_t idle, void* b); - void replace(const void *key, const void* val, uint64_t life, uint64_t idle, void* b); - void remove(const void* key, void* buf); - void containsKey(const void* key, bool* res); - void replaceWithVersion(const void* k, const void* v, uint64_t version, uint64_t life, uint64_t idle, bool* res); - void removeWithVersion(const void* k, uint64_t version, bool* res); - void getWithMetadata(const void *key, void* vbuf, MetadataValue* metadata); - void getBulk(int size, std::map* mbuf); - void keySet(int scope, std::set* result); + RemoteCacheImpl(RemoteCacheManagerImpl& rcm, const std::string& name); + void get(RemoteCacheBase& rcb, const void* key, void *buf); + void put(RemoteCacheBase& rcb, const void *key, const void* val, uint64_t life, uint64_t idle, void* b); + void putIfAbsent(RemoteCacheBase& rcb, const void *key, const void* val, uint64_t life, uint64_t idle, void* b); + void replace(RemoteCacheBase& rcb, const void *key, const void* val, uint64_t life, uint64_t idle, void* b); + void remove(RemoteCacheBase& rcb, const void* key, void* buf); + void containsKey(RemoteCacheBase& rcb, const void* key, bool* res); + void replaceWithVersion(RemoteCacheBase& rcb, const void* k, const void* v, uint64_t version, uint64_t life, uint64_t idle, bool* res); + void removeWithVersion(RemoteCacheBase& rcb, const void* k, uint64_t version, bool* res); + void getWithMetadata(RemoteCacheBase& rcb, const void *key, void* vbuf, MetadataValue* metadata); + void getBulk(RemoteCacheBase& rcb, int size, std::map* mbuf); + void keySet(RemoteCacheBase& rcb, int scope, std::set* result); void stats(std::map* stats); void clear(); - void ping(); + operations::PingResult ping(); - void init(const std::string& name, operations::OperationsFactory* operationsFactory); - void init(const RemoteCacheImpl &other); + void init(operations::OperationsFactory* operationsFactory); void withFlags(Flag flag); const std::string& getName() const; private: - RemoteCacheBase& remoteCacheBase; + RemoteCacheManagerImpl& remoteCacheManager; HR_SHARED_PTR operationsFactory; std::string name; void applyDefaultExpirationFlags(uint64_t lifespan, uint64_t maxIdle); - + void assertRemoteCacheManagerIsStarted(); }; }} // namespace infinispan::hotrod diff --git a/src/hotrod/impl/RemoteCacheManagerImpl.cpp b/src/hotrod/impl/RemoteCacheManagerImpl.cpp index 68193b87..e411582f 100644 --- a/src/hotrod/impl/RemoteCacheManagerImpl.cpp +++ b/src/hotrod/impl/RemoteCacheManagerImpl.cpp @@ -9,23 +9,25 @@ namespace infinispan { namespace hotrod { -//using namespace configuration; using namespace protocol; using namespace transport; using namespace operations; +using namespace sys; const std::string ISPN_CLIENT_HOTROD_SERVER_LIST("infinispan.client.hotrod.server_list"); +const std::string DefaultCacheName = ""; + RemoteCacheManagerImpl::RemoteCacheManagerImpl(bool start_) - : transportFactory(0), started(false), - configuration(ConfigurationBuilder().build()), codec(0), topologyId(0) + : started(false), topologyId(0), + configuration(ConfigurationBuilder().build()), codec(0) { if (start_) start(); } RemoteCacheManagerImpl::RemoteCacheManagerImpl(const std::map& properties, bool start_) - : transportFactory(0), started(false), - configuration(ConfigurationBuilder().build()), codec(0), topologyId(0) + : started(false), topologyId(0), + configuration(ConfigurationBuilder().build()), codec(0) { std::map::const_iterator server_prop; @@ -39,26 +41,32 @@ RemoteCacheManagerImpl::RemoteCacheManagerImpl(const std::map l(lock); + codec = CodecFactory::getCodec(configuration.getProtocolVersion().c_str()); + if (!started) { + transportFactory.reset(TransportFactory::newInstance()); transportFactory->start(*codec, configuration, topologyId); + for(std::map::iterator iter = cacheName2RemoteCache.begin(); + iter != cacheName2RemoteCache.end(); ++iter ) + { + startRemoteCache(*iter->second.first, iter->second.second); + } + started = true; } } void RemoteCacheManagerImpl::stop() { - if (isStarted()) { + ScopedLock l(lock); + if (started) { transportFactory->destroy(); - delete transportFactory; - transportFactory = NULL; - started = false; } } bool RemoteCacheManagerImpl::isStarted() { + ScopedLock l(lock); return started; } @@ -66,17 +74,55 @@ const Configuration& RemoteCacheManagerImpl::getConfiguration() { return configuration; } -void RemoteCacheManagerImpl::initCache( - RemoteCacheImpl& cache, bool forceReturnValue) +HR_SHARED_PTR RemoteCacheManagerImpl::createRemoteCache( + bool forceReturnValue) +{ + return createRemoteCache(DefaultCacheName,forceReturnValue); +} + +HR_SHARED_PTR RemoteCacheManagerImpl::createRemoteCache( + const std::string& name, bool forceReturnValue) +{ + ScopedLock l(lock); + std::map::iterator iter = cacheName2RemoteCache.find(name); + if (iter == cacheName2RemoteCache.end()) { + HR_SHARED_PTR rcache(new RemoteCacheImpl(*this,name)); + startRemoteCache(*rcache, forceReturnValue); + if (configuration.isPingOnStartup()) { + // If ping not successful assume that the cache does not exist + // Default cache is always started, so don't do for it + if (rcache->getName() != DefaultCacheName && ping(*rcache) == CACHE_DOES_NOT_EXIST) { + rcache.reset(); + return rcache; + } + } + // If ping on startup is disabled, or cache is defined in server + cacheName2RemoteCache[name] = RemoteCacheHolder(rcache, forceReturnValue); + return rcache; + } + + return iter->second.first; +} + +void RemoteCacheManagerImpl::startRemoteCache(RemoteCacheImpl& remoteCache, bool forceReturnValue) { OperationsFactory* operationsFactory = new OperationsFactory( transportFactory, - cache.getName(), + remoteCache.getName(), topologyId, forceReturnValue, *CodecFactory::getCodec(configuration.getProtocolVersion().c_str())); - cache.init(cache.getName(), operationsFactory); + remoteCache.init(operationsFactory); + +} + +PingResult RemoteCacheManagerImpl::ping(RemoteCacheImpl& remoteCache) { + if (!transportFactory) { + return FAIL; + } + + return remoteCache.ping(); } }} // namespace infinispan::hotrod diff --git a/src/hotrod/impl/RemoteCacheManagerImpl.h b/src/hotrod/impl/RemoteCacheManagerImpl.h index 3e50021b..96ad9034 100644 --- a/src/hotrod/impl/RemoteCacheManagerImpl.h +++ b/src/hotrod/impl/RemoteCacheManagerImpl.h @@ -1,14 +1,12 @@ #ifndef ISPN_HOTROD_REMOTECACHEMANAGERIMPL_H #define ISPN_HOTROD_REMOTECACHEMANAGERIMPL_H - - -#include "infinispan/hotrod/ImportExport.h" -#include "infinispan/hotrod/Handle.h" -#include "infinispan/hotrod/RemoteCache.h" +#include "hotrod/impl/RemoteCacheImpl.h" #include "hotrod/impl/configuration/Configuration.h" #include "hotrod/impl/protocol/Codec.h" #include "hotrod/impl/transport/TransportFactory.h" +#include "hotrod/impl/operations/PingOperation.h" +#include "hotrod/sys/Mutex.h" #include @@ -19,23 +17,30 @@ class RemoteCacheManagerImpl { public: RemoteCacheManagerImpl(bool start = true); - //RemoteCacheManagerImpl(const Configuration& configuration, bool start = true); RemoteCacheManagerImpl(const std::map& properties, bool start = true); - void start(); + HR_SHARED_PTR createRemoteCache(bool forceReturnValue); + HR_SHARED_PTR createRemoteCache(const std::string& name, bool forceReturnValue); + + void start(); void stop(); bool isStarted(); const Configuration& getConfiguration(); - void initCache(RemoteCacheImpl& cache, bool forceReturnValue); private: - transport::TransportFactory* transportFactory; - // TODO: volatile + sys::Mutex lock; bool started; + int64_t topologyId; Configuration configuration; protocol::Codec* codec; - // TODO: atomic, initialized to 1 - int64_t topologyId; + + typedef std::pair, bool> RemoteCacheHolder; + std::map cacheName2RemoteCache; + + operations::PingResult ping(RemoteCacheImpl& remoteCache); + HR_SHARED_PTR transportFactory; + + void startRemoteCache(RemoteCacheImpl& remoteCache, bool forceReturnValue); }; }} // namespace infinispan::hotrod diff --git a/src/hotrod/impl/configuration/Configuration.cpp b/src/hotrod/impl/configuration/Configuration.cpp index 6cc29528..26308e51 100644 --- a/src/hotrod/impl/configuration/Configuration.cpp +++ b/src/hotrod/impl/configuration/Configuration.cpp @@ -17,7 +17,7 @@ Configuration::Configuration(std::string /*protocolVersion*/, int socketTimeoutPar, SslConfiguration sslConfig, bool tcpNoDelayPar, - int vSizeEstimate) : protocolVersion(protocolVersion_), connectionPoolConfiguration(cpc), connectionTimeout(connTimeout), + int vSizeEstimate) : protocolVersion(PROTOCOL_VERSION_12), connectionPoolConfiguration(cpc), connectionTimeout(connTimeout), forceReturnValue(forceReturnVal), keySizeEstimate(kSizeEstimate), pingOnStartup(pingOnStartupPar), servers(serversConfiguration), socketTimeout(socketTimeoutPar), sslConfiguration(sslConfig),tcpNoDelay(tcpNoDelayPar),valueSizeEstimate(vSizeEstimate) { diff --git a/src/hotrod/impl/configuration/ConfigurationBuilder.cpp b/src/hotrod/impl/configuration/ConfigurationBuilder.cpp index a8675936..bd55b8a0 100644 --- a/src/hotrod/impl/configuration/ConfigurationBuilder.cpp +++ b/src/hotrod/impl/configuration/ConfigurationBuilder.cpp @@ -147,7 +147,7 @@ Configuration ConfigurationBuilder::create() internalKeySizeEstimate); } -ConfigurationBuilder& ConfigurationBuilder::read(Configuration& /*bean*/) +ConfigurationBuilder& ConfigurationBuilder::read(Configuration& bean) { // FIXME: read pool, ssl and server configs internalProtocolVersion = bean.getProtocolVersion(); diff --git a/src/hotrod/impl/operations/AbstractKeyOperation.h b/src/hotrod/impl/operations/AbstractKeyOperation.h index 88ca7556..739cf580 100644 --- a/src/hotrod/impl/operations/AbstractKeyOperation.h +++ b/src/hotrod/impl/operations/AbstractKeyOperation.h @@ -23,7 +23,7 @@ template class AbstractKeyOperation : public RetryOnFailureOperation AbstractKeyOperation( const protocol::Codec& _codec, - transport::TransportFactory* _transportFactory, + HR_SHARED_PTR _transportFactory, const hrbytes& _key, const hrbytes& _cacheName, uint32_t _topologyId, uint32_t _flags) : RetryOnFailureOperation( diff --git a/src/hotrod/impl/operations/AbstractKeyValueOperation.h b/src/hotrod/impl/operations/AbstractKeyValueOperation.h index a671c9e4..26c3bc5b 100644 --- a/src/hotrod/impl/operations/AbstractKeyValueOperation.h +++ b/src/hotrod/impl/operations/AbstractKeyValueOperation.h @@ -16,7 +16,7 @@ template class AbstractKeyValueOperation : public AbstractKeyOperation< protected: AbstractKeyValueOperation( const protocol::Codec& codec_, - transport::TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& key_, const hrbytes& cacheName_, uint32_t topologyId_, diff --git a/src/hotrod/impl/operations/BulkGetKeysOperation.cpp b/src/hotrod/impl/operations/BulkGetKeysOperation.cpp index 8c365ae9..38ecdb0a 100644 --- a/src/hotrod/impl/operations/BulkGetKeysOperation.cpp +++ b/src/hotrod/impl/operations/BulkGetKeysOperation.cpp @@ -14,7 +14,7 @@ using namespace infinispan::hotrod::transport; BulkGetKeysOperation::BulkGetKeysOperation( const Codec& codec_, - infinispan::hotrod::transport::TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& cacheName_, uint32_t topologyId_, uint32_t flags_, diff --git a/src/hotrod/impl/operations/BulkGetKeysOperation.h b/src/hotrod/impl/operations/BulkGetKeysOperation.h index ccedb88e..0ab7c240 100644 --- a/src/hotrod/impl/operations/BulkGetKeysOperation.h +++ b/src/hotrod/impl/operations/BulkGetKeysOperation.h @@ -20,7 +20,7 @@ class BulkGetKeysOperation : public RetryOnFailureOperation > protected: BulkGetKeysOperation( const infinispan::hotrod::protocol::Codec& codec_, - infinispan::hotrod::transport::TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& cacheName_, uint32_t topologyId_, uint32_t flags_, diff --git a/src/hotrod/impl/operations/BulkGetOperation.cpp b/src/hotrod/impl/operations/BulkGetOperation.cpp index ac642576..1e081624 100644 --- a/src/hotrod/impl/operations/BulkGetOperation.cpp +++ b/src/hotrod/impl/operations/BulkGetOperation.cpp @@ -14,7 +14,7 @@ using namespace infinispan::hotrod::transport; BulkGetOperation::BulkGetOperation( const Codec& codec_, - infinispan::hotrod::transport::TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& cacheName_, uint32_t topologyId_, uint32_t flags_, diff --git a/src/hotrod/impl/operations/BulkGetOperation.h b/src/hotrod/impl/operations/BulkGetOperation.h index 5fa9336a..a87bd1b1 100644 --- a/src/hotrod/impl/operations/BulkGetOperation.h +++ b/src/hotrod/impl/operations/BulkGetOperation.h @@ -20,7 +20,7 @@ class BulkGetOperation : public RetryOnFailureOperation transportFactory, const hrbytes& cacheName_, uint32_t topologyId_, uint32_t flags_, diff --git a/src/hotrod/impl/operations/ClearOperation.cpp b/src/hotrod/impl/operations/ClearOperation.cpp index 69750ecd..5e9b8895 100644 --- a/src/hotrod/impl/operations/ClearOperation.cpp +++ b/src/hotrod/impl/operations/ClearOperation.cpp @@ -12,7 +12,7 @@ using namespace infinispan::hotrod::transport; ClearOperation::ClearOperation( const Codec& codec_, - infinispan::hotrod::transport::TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& cacheName_, uint32_t topologyId_, uint32_t flags_) diff --git a/src/hotrod/impl/operations/ClearOperation.h b/src/hotrod/impl/operations/ClearOperation.h index 3d31e9cd..56635470 100644 --- a/src/hotrod/impl/operations/ClearOperation.h +++ b/src/hotrod/impl/operations/ClearOperation.h @@ -18,7 +18,7 @@ class ClearOperation : public RetryOnFailureOperation protected: ClearOperation( const infinispan::hotrod::protocol::Codec& codec_, - infinispan::hotrod::transport::TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& cacheName_, uint32_t topologyId_, uint32_t flags_); diff --git a/src/hotrod/impl/operations/ContainsKeyOperation.cpp b/src/hotrod/impl/operations/ContainsKeyOperation.cpp index 7e31bd69..9c5a8438 100644 --- a/src/hotrod/impl/operations/ContainsKeyOperation.cpp +++ b/src/hotrod/impl/operations/ContainsKeyOperation.cpp @@ -13,7 +13,7 @@ using namespace infinispan::hotrod::transport; ContainsKeyOperation::ContainsKeyOperation( const Codec& codec_, - infinispan::hotrod::transport::TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& key_, const hrbytes& cacheName_, uint32_t topologyId_, diff --git a/src/hotrod/impl/operations/ContainsKeyOperation.h b/src/hotrod/impl/operations/ContainsKeyOperation.h index 6d2f485a..de413d12 100644 --- a/src/hotrod/impl/operations/ContainsKeyOperation.h +++ b/src/hotrod/impl/operations/ContainsKeyOperation.h @@ -21,7 +21,7 @@ class ContainsKeyOperation : public AbstractKeyOperation private: ContainsKeyOperation( const infinispan::hotrod::protocol::Codec& codec, - infinispan::hotrod::transport::TransportFactory* transportFactory, + HR_SHARED_PTR transportFactory, const hrbytes& key, const hrbytes& cacheName, uint32_t topologyId, uint32_t flags); diff --git a/src/hotrod/impl/operations/FaultTolerantPingOperation.cpp b/src/hotrod/impl/operations/FaultTolerantPingOperation.cpp index 90331737..e1957fe9 100644 --- a/src/hotrod/impl/operations/FaultTolerantPingOperation.cpp +++ b/src/hotrod/impl/operations/FaultTolerantPingOperation.cpp @@ -13,7 +13,7 @@ using namespace infinispan::hotrod::transport; FaultTolerantPingOperation::FaultTolerantPingOperation( const Codec& codec_, - transport::TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& cacheName_, uint32_t topologyId_, uint32_t flags_) diff --git a/src/hotrod/impl/operations/FaultTolerantPingOperation.h b/src/hotrod/impl/operations/FaultTolerantPingOperation.h index 40da3e84..7a3b4735 100644 --- a/src/hotrod/impl/operations/FaultTolerantPingOperation.h +++ b/src/hotrod/impl/operations/FaultTolerantPingOperation.h @@ -18,7 +18,7 @@ class FaultTolerantPingOperation : public RetryOnFailureOperation protected: FaultTolerantPingOperation( const protocol::Codec& codec_, - transport::TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& cacheName_, uint32_t topologyId_, uint32_t flags_); diff --git a/src/hotrod/impl/operations/GetOperation.cpp b/src/hotrod/impl/operations/GetOperation.cpp index afc2baeb..e7472b62 100644 --- a/src/hotrod/impl/operations/GetOperation.cpp +++ b/src/hotrod/impl/operations/GetOperation.cpp @@ -12,7 +12,7 @@ using infinispan::hotrod::protocol::Codec; using namespace infinispan::hotrod::transport; GetOperation::GetOperation( - const Codec& _codec, TransportFactory* _transportFactory, const hrbytes& _key, + const Codec& _codec, HR_SHARED_PTR _transportFactory, const hrbytes& _key, const hrbytes& _cacheName, uint32_t _topologyId, uint32_t _flags) : AbstractKeyOperation( _codec, _transportFactory, _key, _cacheName, _topologyId, _flags) diff --git a/src/hotrod/impl/operations/GetOperation.h b/src/hotrod/impl/operations/GetOperation.h index 31a37bb7..8e11cb6e 100644 --- a/src/hotrod/impl/operations/GetOperation.h +++ b/src/hotrod/impl/operations/GetOperation.h @@ -21,7 +21,7 @@ class GetOperation : public AbstractKeyOperation private: GetOperation( const infinispan::hotrod::protocol::Codec& codec, - infinispan::hotrod::transport::TransportFactory* transportFactory, + HR_SHARED_PTR transportFactory, const hrbytes& key, const hrbytes& cacheName, uint32_t topologyId, uint32_t flags); diff --git a/src/hotrod/impl/operations/GetWithMetadataOperation.cpp b/src/hotrod/impl/operations/GetWithMetadataOperation.cpp index c2fd50a8..06fbc369 100644 --- a/src/hotrod/impl/operations/GetWithMetadataOperation.cpp +++ b/src/hotrod/impl/operations/GetWithMetadataOperation.cpp @@ -14,7 +14,7 @@ using namespace infinispan::hotrod::transport; GetWithMetadataOperation::GetWithMetadataOperation( const Codec& codec_, - TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& key_, const hrbytes& cacheName_, uint32_t topologyId_, diff --git a/src/hotrod/impl/operations/GetWithMetadataOperation.h b/src/hotrod/impl/operations/GetWithMetadataOperation.h index e969c9b2..88e17afe 100644 --- a/src/hotrod/impl/operations/GetWithMetadataOperation.h +++ b/src/hotrod/impl/operations/GetWithMetadataOperation.h @@ -20,7 +20,7 @@ class GetWithMetadataOperation private: GetWithMetadataOperation( const infinispan::hotrod::protocol::Codec& codec, - infinispan::hotrod::transport::TransportFactory* transportFactory, + HR_SHARED_PTR transportFactory, const hrbytes& key, const hrbytes& cacheName, uint32_t topologyId, uint32_t flags); diff --git a/src/hotrod/impl/operations/OperationsFactory.cpp b/src/hotrod/impl/operations/OperationsFactory.cpp index 02e28b71..0bcdef1e 100644 --- a/src/hotrod/impl/operations/OperationsFactory.cpp +++ b/src/hotrod/impl/operations/OperationsFactory.cpp @@ -25,140 +25,128 @@ namespace infinispan { namespace hotrod { +namespace operations { using namespace protocol; using namespace transport; -namespace operations { - OperationsFactory::OperationsFactory( - TransportFactory*& tf, const std::string& cn, - uint32_t tid, bool frv, const Codec& c) : - transportFactory(tf), codec(c), forceReturnValue(frv), topologyId(tid), flags() + HR_SHARED_PTR tf, const std::string& cn, + uint32_t tid, bool frv, const Codec& c) : + transportFactory(tf), codec(c), forceReturnValue(frv), topologyId(tid), flags() { - cacheNameBytes.reserve(cn.length()); - memcpy(cacheNameBytes.bytes(), cn.c_str(), cn.length()); + cacheNameBytes.reserve(cn.length()); + memcpy(cacheNameBytes.bytes(), cn.c_str(), cn.length()); } PingOperation* OperationsFactory::newPingOperation(Transport& transport) { - return new PingOperation(codec, topologyId, transport, cacheNameBytes); + return new PingOperation(codec, topologyId, transport, cacheNameBytes); } GetOperation* OperationsFactory::newGetKeyOperation(const hrbytes& key) { - return new GetOperation( - codec, transportFactory, key, cacheNameBytes, topologyId, getFlags()); + return new GetOperation( + codec, transportFactory, key, cacheNameBytes, topologyId, getFlags()); } PutOperation* OperationsFactory::newPutKeyValueOperation( - const hrbytes& key, const hrbytes& value, - uint32_t lifespanSecs, uint32_t maxIdleSecs) + const hrbytes& key, const hrbytes& value, + uint32_t lifespanSecs, uint32_t maxIdleSecs) { - return new PutOperation( - codec, transportFactory, key, cacheNameBytes, - topologyId, getFlags(), value, lifespanSecs, maxIdleSecs); + return new PutOperation( + codec, transportFactory, key, cacheNameBytes, + topologyId, getFlags(), value, lifespanSecs, maxIdleSecs); } PutIfAbsentOperation* OperationsFactory::newPutIfAbsentOperation( - const hrbytes& key, const hrbytes& value, - uint32_t lifespanSecs, uint32_t maxIdleSecs) + const hrbytes& key, const hrbytes& value, + uint32_t lifespanSecs, uint32_t maxIdleSecs) { - return new PutIfAbsentOperation( - codec, transportFactory, key, cacheNameBytes, - topologyId, getFlags(), value, lifespanSecs, maxIdleSecs); + return new PutIfAbsentOperation( + codec, transportFactory, key, cacheNameBytes, + topologyId, getFlags(), value, lifespanSecs, maxIdleSecs); } ReplaceOperation* OperationsFactory::newReplaceOperation( - const hrbytes& key, const hrbytes& value, - uint32_t lifespanSecs, uint32_t maxIdleSecs) + const hrbytes& key, const hrbytes& value, + uint32_t lifespanSecs, uint32_t maxIdleSecs) { - return new ReplaceOperation( - codec, transportFactory, key, cacheNameBytes, - topologyId, getFlags(), value, lifespanSecs, maxIdleSecs); + return new ReplaceOperation( + codec, transportFactory, key, cacheNameBytes, + topologyId, getFlags(), value, lifespanSecs, maxIdleSecs); } -RemoveOperation* OperationsFactory::newRemoveOperation(const hrbytes& key) -{ - return new RemoveOperation( - codec, transportFactory, key, cacheNameBytes, topologyId, getFlags()); +RemoveOperation* OperationsFactory::newRemoveOperation(const hrbytes& key) { + return new RemoveOperation( + codec, transportFactory, key, cacheNameBytes, topologyId, getFlags()); } -ContainsKeyOperation* OperationsFactory::newContainsKeyOperation(const hrbytes& key) -{ - return new ContainsKeyOperation( - codec, transportFactory, key, cacheNameBytes, topologyId, getFlags()); +ContainsKeyOperation* OperationsFactory::newContainsKeyOperation(const hrbytes& key) { + return new ContainsKeyOperation( + codec, transportFactory, key, cacheNameBytes, topologyId, getFlags()); } ReplaceIfUnmodifiedOperation* OperationsFactory::newReplaceIfUnmodifiedOperation( - const hrbytes& key, const hrbytes& value, - uint32_t lifespanSecs, uint32_t maxIdleSecs, int64_t version) + const hrbytes& key, const hrbytes& value, + uint32_t lifespanSecs, uint32_t maxIdleSecs, int64_t version) { - return new ReplaceIfUnmodifiedOperation( - codec, transportFactory, key, cacheNameBytes, - topologyId, getFlags(), value, lifespanSecs, maxIdleSecs, version); + return new ReplaceIfUnmodifiedOperation( + codec, transportFactory, key, cacheNameBytes, + topologyId, getFlags(), value, lifespanSecs, maxIdleSecs, version); } RemoveIfUnmodifiedOperation* OperationsFactory::newRemoveIfUnmodifiedOperation( const hrbytes& key, int64_t version) { - return new RemoveIfUnmodifiedOperation( - codec, transportFactory, key, cacheNameBytes, topologyId, getFlags(), version); + return new RemoveIfUnmodifiedOperation( + codec, transportFactory, key, cacheNameBytes, topologyId, getFlags(), version); } -GetWithMetadataOperation* OperationsFactory::newGetWithMetadataOperation(const hrbytes& key) -{ - return new GetWithMetadataOperation( - codec, transportFactory, key, cacheNameBytes, topologyId, getFlags()); +GetWithMetadataOperation* OperationsFactory::newGetWithMetadataOperation(const hrbytes& key) { + return new GetWithMetadataOperation( + codec, transportFactory, key, cacheNameBytes, topologyId, getFlags()); } -BulkGetOperation* OperationsFactory::newBulkGetOperation(int size) -{ - return new BulkGetOperation( - codec, transportFactory, cacheNameBytes, topologyId, getFlags(), size); +BulkGetOperation* OperationsFactory::newBulkGetOperation(int size) { + return new BulkGetOperation( + codec, transportFactory, cacheNameBytes, topologyId, getFlags(), size); } -BulkGetKeysOperation* OperationsFactory::newBulkGetKeysOperation(int scope) -{ - return new BulkGetKeysOperation( - codec, transportFactory, cacheNameBytes, topologyId, getFlags(), scope); +BulkGetKeysOperation* OperationsFactory::newBulkGetKeysOperation(int scope) { + return new BulkGetKeysOperation( + codec, transportFactory, cacheNameBytes, topologyId, getFlags(), scope); } -StatsOperation* OperationsFactory::newStatsOperation() -{ - return new StatsOperation( - codec, transportFactory, cacheNameBytes, topologyId, getFlags()); +StatsOperation* OperationsFactory::newStatsOperation() { + return new StatsOperation( + codec, transportFactory, cacheNameBytes, topologyId, getFlags()); } -ClearOperation* OperationsFactory::newClearOperation() -{ - return new ClearOperation( - codec, transportFactory, cacheNameBytes, topologyId, getFlags()); +ClearOperation* OperationsFactory::newClearOperation() { + return new ClearOperation( + codec, transportFactory, cacheNameBytes, topologyId, getFlags()); } -FaultTolerantPingOperation* OperationsFactory::newFaultTolerantPingOperation() -{ - return new FaultTolerantPingOperation( - codec, transportFactory, cacheNameBytes, topologyId, getFlags()); +FaultTolerantPingOperation* OperationsFactory::newFaultTolerantPingOperation() { + return new FaultTolerantPingOperation( + codec, transportFactory, cacheNameBytes, topologyId, getFlags()); } -uint32_t OperationsFactory::getFlags() -{ - uint32_t result = flags; +uint32_t OperationsFactory::getFlags() { + uint32_t result = flags; if (forceReturnValue) { result |= FORCE_RETURN_VALUE; - } - flags = 0; - return result; + } + flags = 0; + return result; } -void OperationsFactory::addFlags(uint32_t f) -{ +void OperationsFactory::addFlags(uint32_t f) { flags |= f; } -void OperationsFactory::setFlags(uint32_t f) -{ +void OperationsFactory::setFlags(uint32_t f) { flags = f; } diff --git a/src/hotrod/impl/operations/OperationsFactory.h b/src/hotrod/impl/operations/OperationsFactory.h index cd8d0134..b3494988 100644 --- a/src/hotrod/impl/operations/OperationsFactory.h +++ b/src/hotrod/impl/operations/OperationsFactory.h @@ -86,19 +86,17 @@ class OperationsFactory void setFlags(uint32_t flags); private: - infinispan::hotrod::transport::TransportFactory*& transportFactory; + HR_SHARED_PTR transportFactory; const infinispan::hotrod::protocol::Codec& codec; hrbytes cacheNameBytes; bool forceReturnValue; - // TODO: atomic uint32_t topologyId; - // TODO: thread local uint32_t flags; uint32_t getFlags(); OperationsFactory( - infinispan::hotrod::transport::TransportFactory*& transportFactory, + HR_SHARED_PTR transportFactory, const std::string& cacheName, uint32_t topologyId, bool forceReturnValue, const infinispan::hotrod::protocol::Codec& codec); diff --git a/src/hotrod/impl/operations/PutIfAbsentOperation.cpp b/src/hotrod/impl/operations/PutIfAbsentOperation.cpp index cc37782f..012d672a 100644 --- a/src/hotrod/impl/operations/PutIfAbsentOperation.cpp +++ b/src/hotrod/impl/operations/PutIfAbsentOperation.cpp @@ -11,7 +11,7 @@ using infinispan::hotrod::protocol::Codec; using namespace infinispan::hotrod::transport; PutIfAbsentOperation::PutIfAbsentOperation( - const Codec& _codec, TransportFactory* _transportFactory, const hrbytes& _key, + const Codec& _codec, HR_SHARED_PTR _transportFactory, const hrbytes& _key, const hrbytes& _cacheName, uint32_t _topologyId, uint32_t _flags, const hrbytes& _value, uint32_t _lifespan, uint32_t _maxIdle) : AbstractKeyValueOperation( diff --git a/src/hotrod/impl/operations/PutIfAbsentOperation.h b/src/hotrod/impl/operations/PutIfAbsentOperation.h index 485b1bc8..cd65a37d 100644 --- a/src/hotrod/impl/operations/PutIfAbsentOperation.h +++ b/src/hotrod/impl/operations/PutIfAbsentOperation.h @@ -20,7 +20,7 @@ class PutIfAbsentOperation : public AbstractKeyValueOperation private: PutIfAbsentOperation( const infinispan::hotrod::protocol::Codec& codec, - infinispan::hotrod::transport::TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& key, const hrbytes& cacheName, uint32_t topologyId, diff --git a/src/hotrod/impl/operations/PutOperation.cpp b/src/hotrod/impl/operations/PutOperation.cpp index c1a661ac..1fab9a85 100644 --- a/src/hotrod/impl/operations/PutOperation.cpp +++ b/src/hotrod/impl/operations/PutOperation.cpp @@ -14,7 +14,7 @@ using namespace infinispan::hotrod::transport; PutOperation::PutOperation( const Codec& codec_, - TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& key_, const hrbytes& cacheName_, uint32_t topologyId_, diff --git a/src/hotrod/impl/operations/PutOperation.h b/src/hotrod/impl/operations/PutOperation.h index ca157b25..058d6bbd 100644 --- a/src/hotrod/impl/operations/PutOperation.h +++ b/src/hotrod/impl/operations/PutOperation.h @@ -19,7 +19,7 @@ class PutOperation : public AbstractKeyValueOperation private: PutOperation( const infinispan::hotrod::protocol::Codec& codec_, - infinispan::hotrod::transport::TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& key_, const hrbytes& cacheName_, uint32_t topologyId_, diff --git a/src/hotrod/impl/operations/RemoveIfUnmodifiedOperation.cpp b/src/hotrod/impl/operations/RemoveIfUnmodifiedOperation.cpp index 98cd5047..78edfeac 100644 --- a/src/hotrod/impl/operations/RemoveIfUnmodifiedOperation.cpp +++ b/src/hotrod/impl/operations/RemoveIfUnmodifiedOperation.cpp @@ -13,7 +13,7 @@ using namespace infinispan::hotrod::transport; RemoveIfUnmodifiedOperation::RemoveIfUnmodifiedOperation( const Codec& codec_, - infinispan::hotrod::transport::TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& key_, const hrbytes& cacheName_, uint32_t topologyId_, diff --git a/src/hotrod/impl/operations/RemoveIfUnmodifiedOperation.h b/src/hotrod/impl/operations/RemoveIfUnmodifiedOperation.h index e9d1b012..d14cd391 100644 --- a/src/hotrod/impl/operations/RemoveIfUnmodifiedOperation.h +++ b/src/hotrod/impl/operations/RemoveIfUnmodifiedOperation.h @@ -23,7 +23,7 @@ class RemoveIfUnmodifiedOperation private: RemoveIfUnmodifiedOperation( const infinispan::hotrod::protocol::Codec& codec, - infinispan::hotrod::transport::TransportFactory* transportFactory, + HR_SHARED_PTR transportFactory, const hrbytes& key, const hrbytes& cacheName, uint32_t topologyId, uint32_t flags, int64_t version); diff --git a/src/hotrod/impl/operations/RemoveOperation.cpp b/src/hotrod/impl/operations/RemoveOperation.cpp index 74b71890..3ebd7321 100644 --- a/src/hotrod/impl/operations/RemoveOperation.cpp +++ b/src/hotrod/impl/operations/RemoveOperation.cpp @@ -13,7 +13,7 @@ using namespace infinispan::hotrod::transport; RemoveOperation::RemoveOperation( const Codec& codec_, - TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& key_, const hrbytes& cacheName_, uint32_t topologyId_, diff --git a/src/hotrod/impl/operations/RemoveOperation.h b/src/hotrod/impl/operations/RemoveOperation.h index 5e4dced0..fe46a33e 100644 --- a/src/hotrod/impl/operations/RemoveOperation.h +++ b/src/hotrod/impl/operations/RemoveOperation.h @@ -21,7 +21,7 @@ class RemoveOperation : public AbstractKeyOperation private: RemoveOperation( const infinispan::hotrod::protocol::Codec& codec, - infinispan::hotrod::transport::TransportFactory* transportFactory, + HR_SHARED_PTR transportFactory, const hrbytes& key, const hrbytes& cacheName, uint32_t topologyId, uint32_t flags); diff --git a/src/hotrod/impl/operations/ReplaceIfUnmodifiedOperation.cpp b/src/hotrod/impl/operations/ReplaceIfUnmodifiedOperation.cpp index ee7ed20d..7c8308bd 100644 --- a/src/hotrod/impl/operations/ReplaceIfUnmodifiedOperation.cpp +++ b/src/hotrod/impl/operations/ReplaceIfUnmodifiedOperation.cpp @@ -11,7 +11,7 @@ using infinispan::hotrod::protocol::Codec; using namespace infinispan::hotrod::transport; ReplaceIfUnmodifiedOperation::ReplaceIfUnmodifiedOperation( - const Codec& _codec, TransportFactory* _transportFactory, const hrbytes& _key, + const Codec& _codec, HR_SHARED_PTR _transportFactory, const hrbytes& _key, const hrbytes& _cacheName, uint32_t _topologyId, uint32_t _flags, const hrbytes& _value, uint32_t _lifespan, uint32_t _maxIdle, int64_t _version) : AbstractKeyValueOperation( diff --git a/src/hotrod/impl/operations/ReplaceIfUnmodifiedOperation.h b/src/hotrod/impl/operations/ReplaceIfUnmodifiedOperation.h index c772aee4..d6939951 100644 --- a/src/hotrod/impl/operations/ReplaceIfUnmodifiedOperation.h +++ b/src/hotrod/impl/operations/ReplaceIfUnmodifiedOperation.h @@ -19,7 +19,7 @@ class ReplaceIfUnmodifiedOperation private: ReplaceIfUnmodifiedOperation( const protocol::Codec& codec, - transport::TransportFactory* transportFactory, + HR_SHARED_PTR transportFactory, const hrbytes& key, const hrbytes& cacheName, uint32_t topologyId, diff --git a/src/hotrod/impl/operations/ReplaceOperation.cpp b/src/hotrod/impl/operations/ReplaceOperation.cpp index 5c7eb976..e23b5c56 100644 --- a/src/hotrod/impl/operations/ReplaceOperation.cpp +++ b/src/hotrod/impl/operations/ReplaceOperation.cpp @@ -11,7 +11,7 @@ using infinispan::hotrod::protocol::Codec; using namespace infinispan::hotrod::transport; ReplaceOperation::ReplaceOperation( - const Codec& _codec, TransportFactory* _transportFactory, const hrbytes& _key, + const Codec& _codec, HR_SHARED_PTR _transportFactory, const hrbytes& _key, const hrbytes& _cacheName, uint32_t _topologyId, uint32_t _flags, const hrbytes& _value, uint32_t _lifespan, uint32_t _maxIdle) : AbstractKeyValueOperation( diff --git a/src/hotrod/impl/operations/ReplaceOperation.h b/src/hotrod/impl/operations/ReplaceOperation.h index d48c8dfd..81db0f18 100644 --- a/src/hotrod/impl/operations/ReplaceOperation.h +++ b/src/hotrod/impl/operations/ReplaceOperation.h @@ -20,7 +20,7 @@ class ReplaceOperation : public AbstractKeyValueOperation private: ReplaceOperation( const infinispan::hotrod::protocol::Codec& codec, - infinispan::hotrod::transport::TransportFactory* transportFactory, + HR_SHARED_PTR transportFactory, const hrbytes& key, const hrbytes& cacheName, uint32_t topologyId, diff --git a/src/hotrod/impl/operations/RetryOnFailureOperation.h b/src/hotrod/impl/operations/RetryOnFailureOperation.h index e4f9e8ec..b4e172e6 100644 --- a/src/hotrod/impl/operations/RetryOnFailureOperation.h +++ b/src/hotrod/impl/operations/RetryOnFailureOperation.h @@ -48,7 +48,7 @@ template class RetryOnFailureOperation : public HotRodOperation protected: RetryOnFailureOperation( const protocol::Codec& _codec, - transport::TransportFactory* _transportFactory, + HR_SHARED_PTR _transportFactory, const hrbytes& _cacheName, uint32_t _topologyId, uint32_t _flags) : HotRodOperation(_codec, _flags, _cacheName, _topologyId), transportFactory(_transportFactory) {} @@ -79,7 +79,7 @@ template class RetryOnFailureOperation : public HotRodOperation virtual ~RetryOnFailureOperation() {} - transport::TransportFactory* transportFactory; + HR_SHARED_PTR transportFactory; }; }}} // namespace infinispan::hotrod::operations diff --git a/src/hotrod/impl/operations/StatsOperation.cpp b/src/hotrod/impl/operations/StatsOperation.cpp index 5b05c679..52cdb7ae 100644 --- a/src/hotrod/impl/operations/StatsOperation.cpp +++ b/src/hotrod/impl/operations/StatsOperation.cpp @@ -14,7 +14,7 @@ using namespace infinispan::hotrod::transport; StatsOperation::StatsOperation( const Codec& codec_, - infinispan::hotrod::transport::TransportFactory* transportFactory_, + HR_SHARED_PTR transportFactory_, const hrbytes& cacheName_, uint32_t topologyId_, uint32_t flags_) diff --git a/src/hotrod/impl/operations/StatsOperation.h b/src/hotrod/impl/operations/StatsOperation.h index fc1d3bc5..564bf010 100644 --- a/src/hotrod/impl/operations/StatsOperation.h +++ b/src/hotrod/impl/operations/StatsOperation.h @@ -21,7 +21,7 @@ class StatsOperation : public RetryOnFailureOperation transportFactory_, const hrbytes& cacheName_, uint32_t topologyId_, uint32_t flags_); diff --git a/src/hotrod/impl/protocol/CodecFactory.cpp b/src/hotrod/impl/protocol/CodecFactory.cpp index 7da874b0..122a612e 100644 --- a/src/hotrod/impl/protocol/CodecFactory.cpp +++ b/src/hotrod/impl/protocol/CodecFactory.cpp @@ -3,6 +3,7 @@ #include "hotrod/impl/protocol/CodecFactory.h" #include "hotrod/impl/protocol/Codec12.h" #include "hotrod/impl/configuration/Configuration.h" +#include "hotrod/sys/RunOnce.h" #include @@ -10,19 +11,22 @@ namespace infinispan { namespace hotrod { namespace protocol { -bool CodecFactory::initialized = false; +using namespace sys; + std::map CodecFactory::codecMap; Codec* CodecFactory::getCodec(const char* version) { - if(!initialized) { - codecMap[Configuration::PROTOCOL_VERSION_12] = new Codec12(); - initialized = true; - } + RunOnce initCodecMap(&init); + initCodecMap.runOnce(); Codec* result = CodecFactory::codecMap[version]; return result; } +void CodecFactory::init() { + codecMap[Configuration::PROTOCOL_VERSION_12] = new Codec12(); +} + }}} // namespace diff --git a/src/hotrod/impl/protocol/CodecFactory.h b/src/hotrod/impl/protocol/CodecFactory.h index 3dc1854d..f89a50c1 100644 --- a/src/hotrod/impl/protocol/CodecFactory.h +++ b/src/hotrod/impl/protocol/CodecFactory.h @@ -19,8 +19,8 @@ class CodecFactory private: CodecFactory(); + static void init(); static std::map codecMap; - static bool initialized; }; }}} // namespace infinispan::hotrod::protocol diff --git a/src/hotrod/impl/transport/tcp/GenericKeyedObjectPool.h b/src/hotrod/impl/transport/tcp/GenericKeyedObjectPool.h index 90f624b5..bd9d8e9d 100644 --- a/src/hotrod/impl/transport/tcp/GenericKeyedObjectPool.h +++ b/src/hotrod/impl/transport/tcp/GenericKeyedObjectPool.h @@ -20,7 +20,7 @@ template class GenericKeyedObjectPool { public: GenericKeyedObjectPool( - KeyedPoolableObjectFactory& factory_, + HR_SHARED_PTR > factory_, int maxActive_, WhenExhaustedAction whenExhaustedAction_, long maxWait_, @@ -40,7 +40,8 @@ template class GenericKeyedObjectPool testOnReturn(testOnReturn_), testWhileIdle(testWhileIdle_), timeBetweenEvictionRunsMillis(timeBetweenEvictionRuns_), numTestsPerEvictionRun(numTestsPerEvictionRun_), - minEvictableIdleTimeMillis(minEvictableIdleTime_), lifo(lifo_) { } + minEvictableIdleTimeMillis(minEvictableIdleTime_), lifo(lifo_), closed(false) + { } // TODO: ALL IMPLEMENTATION @@ -59,49 +60,60 @@ template class GenericKeyedObjectPool } V& borrowObject(const K& key) { + sys::ScopedLock l(lock); + if(closed) throw HotRodClientException("POOL CLOSED"); V* val = poolMap[key]; if (!val) { - val = &factory.makeObject(key); + val = &factory->makeObject(key); poolMap[key] = val; } - factory.activateObject(key, *val); + factory->activateObject(key, *val); return *val; } void invalidateObject(const K& key, V* val) { + sys::ScopedLock l(lock); poolMap[key] = NULL; - factory.destroyObject(key, *val); + if(val!=NULL) + factory->destroyObject(key, *val); } void clear() { + sys::ScopedLock l(lock); for(typename std::map::iterator iter = poolMap.begin() ; iter != poolMap.end() ; ++iter) { clear(iter->first); } } void clear(const K& key) { + sys::ScopedLock l(lock); V* val = poolMap[key]; if (val) { - factory.destroyObject(key, *val); + factory->destroyObject(key, *val); } poolMap.erase(key); } void preparePool(const K& key) { + sys::ScopedLock l(lock); poolMap[key] = NULL; } void close() { + sys::ScopedLock l(lock); clear(); + closed = true; } void addObject(const K& key) { - V& val = factory.makeObject(key); + sys::ScopedLock l(lock); + V& val = factory->makeObject(key); poolMap[key] = &val; } private: - KeyedPoolableObjectFactory& factory; + HR_SHARED_PTR > factory; + sys::Mutex lock; std::map poolMap; int maxIdle; @@ -118,6 +130,8 @@ template class GenericKeyedObjectPool long minEvictableIdleTimeMillis; bool lifo; + bool closed; + }; }}} // namespace infinispan::hotrod::transport diff --git a/src/hotrod/impl/transport/tcp/GenericKeyedObjectPoolFactory.h b/src/hotrod/impl/transport/tcp/GenericKeyedObjectPoolFactory.h index 50be7ca4..801ea4df 100644 --- a/src/hotrod/impl/transport/tcp/GenericKeyedObjectPoolFactory.h +++ b/src/hotrod/impl/transport/tcp/GenericKeyedObjectPoolFactory.h @@ -14,7 +14,7 @@ template class GenericKeyedObjectPoolFactory { public: GenericKeyedObjectPoolFactory( - KeyedPoolableObjectFactory& factory_, + KeyedPoolableObjectFactory* factory_, int maxActive_, WhenExhaustedAction whenExhaustedAction_, long maxWait_, @@ -28,13 +28,16 @@ template class GenericKeyedObjectPoolFactory long minEvictableIdleTimeMillis_, bool testWhileIdle_, bool lifo_) - : factory(factory_), maxIdle(maxIdle_), maxActive(maxActive_), + : maxIdle(maxIdle_), maxActive(maxActive_), maxTotal(maxTotal_), minIdle(minIdle_), maxWait(maxWait_), whenExhaustedAction(whenExhaustedAction_), testOnBorrow(testOnBorrow_), testOnReturn(testOnReturn_), testWhileIdle(testWhileIdle_), timeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis_), numTestsPerEvictionRun(numTestsPerEvictionRun_), - minEvictableIdleTimeMillis(minEvictableIdleTimeMillis_), lifo(lifo_) { } + minEvictableIdleTimeMillis(minEvictableIdleTimeMillis_), lifo(lifo_) + { + factory.reset(factory_); + } int getMaxIdle() { return maxIdle; } int getMaxActive() { return maxActive; } @@ -74,7 +77,7 @@ template class GenericKeyedObjectPoolFactory ~GenericKeyedObjectPoolFactory() {/*delete &factory;*/} // SEGMENTATION FAULT private: - KeyedPoolableObjectFactory& factory; + HR_SHARED_PTR > factory; int maxIdle; int maxActive; diff --git a/src/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.h b/src/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.h index fe30c831..f4a77261 100644 --- a/src/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.h +++ b/src/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.h @@ -16,7 +16,7 @@ template class PropsKeyedObjectPoolFactory : { public: PropsKeyedObjectPoolFactory( - KeyedPoolableObjectFactory& factory_, + KeyedPoolableObjectFactory* factory_, const ConnectionPoolConfiguration& configuration) : GenericKeyedObjectPoolFactory( factory_, diff --git a/src/hotrod/impl/transport/tcp/TcpTransportFactory.cpp b/src/hotrod/impl/transport/tcp/TcpTransportFactory.cpp index 6db29848..ea4f7cb5 100644 --- a/src/hotrod/impl/transport/tcp/TcpTransportFactory.cpp +++ b/src/hotrod/impl/transport/tcp/TcpTransportFactory.cpp @@ -12,10 +12,10 @@ namespace infinispan { namespace hotrod { +namespace transport { using protocol::Codec; - -namespace transport { +using namespace sys; TransportFactory* TransportFactory::newInstance() { return new TcpTransportFactory(); @@ -24,8 +24,7 @@ TransportFactory* TransportFactory::newInstance() { void TcpTransportFactory::start( Codec& codec, const Configuration& configuration, int64_t topologyId) { - // TODO: multithread (lock) - // TODO: consistent hash + ScopedLock l(lock); bool pingOnStartup = configuration.isPingOnStartup(); for (std::vector::const_iterator iter=configuration.getServersConfiguration().begin(); iter!=configuration.getServersConfiguration().end(); iter++) @@ -33,16 +32,17 @@ void TcpTransportFactory::start( servers.push_back(InetSocketAddress(iter->getHost(), iter->getPort())); } - balancer = RequestBalancingStrategy::newInstance(); + balancer.reset(RequestBalancingStrategy::newInstance()); tcpNoDelay = configuration.isTcpNoDelay(); soTimeout = configuration.getSocketTimeout(); connectTimeout = configuration.getConnectionTimeout(); // TODO: SSL configuration - PropsKeyedObjectPoolFactory poolFactory( - *new TransportObjectFactory(codec, *this, topologyId, pingOnStartup), - configuration.getConnectionPoolConfiguration()); + PropsKeyedObjectPoolFactory* poolFactory = + new PropsKeyedObjectPoolFactory( + new TransportObjectFactory(codec, *this, topologyId, pingOnStartup), + configuration.getConnectionPoolConfiguration()); createAndPreparePool(poolFactory); balancer->setServers(servers); @@ -54,8 +54,12 @@ void TcpTransportFactory::start( } Transport& TcpTransportFactory::getTransport() { - const InetSocketAddress& server = balancer->nextServer(); - return borrowTransportFromPool(server); + const InetSocketAddress* server = NULL; + { + ScopedLock l(lock); + server = &balancer->nextServer(); + } + return borrowTransportFromPool(*server); } Transport& TcpTransportFactory::getTransport(const hrbytes& /*key*/) { @@ -64,41 +68,47 @@ Transport& TcpTransportFactory::getTransport(const hrbytes& /*key*/) { } void TcpTransportFactory::releaseTransport(Transport& transport) { + GenericKeyedObjectPool* pool = getConnectionPool(); TcpTransport& tcpTransport = dynamic_cast(transport); if (!tcpTransport.isValid()) { - connectionPool->invalidateObject(tcpTransport.getServerAddress(), &tcpTransport); + pool->invalidateObject(tcpTransport.getServerAddress(), &tcpTransport); } else { - connectionPool->returnObject(tcpTransport.getServerAddress(), tcpTransport); + pool->returnObject(tcpTransport.getServerAddress(), tcpTransport); } } void TcpTransportFactory::invalidateTransport( const InetSocketAddress& serverAddress, Transport* transport) { - connectionPool->invalidateObject( + GenericKeyedObjectPool* pool = getConnectionPool(); + pool->invalidateObject( serverAddress, dynamic_cast(transport)); } bool TcpTransportFactory::isTcpNoDelay() { + ScopedLock l(lock); return tcpNoDelay; } int TcpTransportFactory::getTransportCount() { + ScopedLock l(lock); return transportCount; } int TcpTransportFactory::getSoTimeout() { + ScopedLock l(lock); return soTimeout; } int TcpTransportFactory::getConnectTimeout() { + ScopedLock l(lock); return connectTimeout; } void TcpTransportFactory::createAndPreparePool( - PropsKeyedObjectPoolFactory& poolFactory) + PropsKeyedObjectPoolFactory* poolFactory) { - connectionPool = poolFactory.createPool(); + connectionPool.reset(poolFactory->createPool()); for (std::vector::const_iterator i = servers.begin(); i != servers.end() ; ++i) { @@ -123,7 +133,8 @@ void TcpTransportFactory::pingServers() { } void TcpTransportFactory::updateTransportCount() { - unsigned int maxActive = connectionPool->getMaxActive(); + ScopedLock l(lock); + int64_t maxActive = connectionPool->getMaxActive(); if (maxActive > 0) { transportCount = maxActive * servers.size(); // TODO: in java code avoid int overflow when maxActive is very high! @@ -133,7 +144,8 @@ void TcpTransportFactory::updateTransportCount() { } void TcpTransportFactory::destroy() { - connectionPool->clear(); + ScopedLock l(lock); + connectionPool->clear(); connectionPool->close(); // TODO: clean connection pool /* @@ -143,17 +155,19 @@ void TcpTransportFactory::destroy() { log.warn("Exception while shutting down the connection pool.", e); } */ - delete connectionPool; - connectionPool = NULL; - delete balancer; - balancer = NULL; } Transport& TcpTransportFactory::borrowTransportFromPool( const InetSocketAddress& server) { - // TODO - return connectionPool->borrowObject(server); + GenericKeyedObjectPool* pool = getConnectionPool(); + return pool->borrowObject(server); +} + +GenericKeyedObjectPool* TcpTransportFactory::getConnectionPool() +{ + ScopedLock l(lock); + return connectionPool.get(); } }}} // namespace infinispan::hotrod::transport diff --git a/src/hotrod/impl/transport/tcp/TcpTransportFactory.h b/src/hotrod/impl/transport/tcp/TcpTransportFactory.h index f53c11f1..a60cdf29 100644 --- a/src/hotrod/impl/transport/tcp/TcpTransportFactory.h +++ b/src/hotrod/impl/transport/tcp/TcpTransportFactory.h @@ -6,6 +6,7 @@ #include "hotrod/impl/transport/Transport.h" #include "hotrod/impl/transport/TransportFactory.h" #include "hotrod/impl/transport/tcp/TcpTransport.h" +#include "hotrod/sys/Mutex.h" #include @@ -37,19 +38,22 @@ class TcpTransportFactory : public TransportFactory int getConnectTimeout(); private: + sys::Mutex lock; std::vector servers; bool tcpNoDelay; int soTimeout; int connectTimeout; int transportCount; - GenericKeyedObjectPool* connectionPool; - RequestBalancingStrategy* balancer; + + HR_SHARED_PTR > connectionPool; + HR_SHARED_PTR balancer; void createAndPreparePool( - PropsKeyedObjectPoolFactory& poolFactory); + PropsKeyedObjectPoolFactory* poolFactory); void updateTransportCount(); void pingServers(); Transport& borrowTransportFromPool(const InetSocketAddress& server); + GenericKeyedObjectPool* getConnectionPool(); }; }}} // namespace infinispan::hotrod::transport