diff --git a/CMakeLists.txt b/CMakeLists.txt index 150e16d9..50c24ca2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -563,6 +563,17 @@ set_target_properties(events PROPERTIES COMPILE_DEFINITIONS "${DLLEXPORT_STATIC} set_target_properties (events PROPERTIES COMPILE_FLAGS "${COMPILER_FLAGS} ${WARNING_FLAGS_NO_PEDANTIC} ${NO_UNUSED_FLAGS}") target_link_libraries (events hotrod hotrod_protobuf ${PROTOBUF_LIBRARY} ${platform_libs}) +add_executable (nearCacheTest test/NearCacheTest.cpp) +target_include_directories(nearCacheTest PUBLIC "${CMAKE_CURRENT_BINARY_DIR}/test/query_proto" + "${INCLUDE_FILES_DIR}" + "${CMAKE_CURRENT_BINARY_DIR}" + "${PROTOBUF_INCLUDE_DIR}") +set_property(TARGET nearCacheTest PROPERTY CXX_STANDARD 11) +set_property(TARGET nearCacheTest PROPERTY CXX_STANDARD_REQUIRED ON) +set_target_properties(nearCacheTest PROPERTIES COMPILE_DEFINITIONS "${DLLEXPORT_STATIC}") +set_target_properties (nearCacheTest PROPERTIES COMPILE_FLAGS "${COMPILER_FLAGS} ${WARNING_FLAGS_NO_PEDANTIC} ${NO_UNUSED_FLAGS}") +target_link_libraries (nearCacheTest hotrod hotrod_protobuf ${PROTOBUF_LIBRARY} ${platform_libs}) + add_executable (simple-tls test/SimpleTLS.cpp) target_include_directories(simple-tls PUBLIC "${CMAKE_CURRENT_BINARY_DIR}/test/query_proto" "${INCLUDE_FILES_DIR}" @@ -650,6 +661,7 @@ else (NOT ((EXISTS "${HOTROD_JBOSS_HOME}/bin/standalone.sh") AND (EXISTS "${HOTR add_test (queryTest queryTest) add_test (queryTest-static queryTest-static) add_test (events events) + add_test (nearCacheTest nearCacheTest) add_test (stop_server ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test/bin/server_ctl.py stop) add_test (start_ssl_server ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test/bin/server_ctl.py start ${JAVA_RUNTIME} ${HOTROD_JBOSS_HOME} standalone-hotrod-ssl.xml) add_test (probe_ssl_port ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test/bin/probe_port.py localhost 11222 60) diff --git a/include/infinispan/hotrod/Configuration.h b/include/infinispan/hotrod/Configuration.h index e1073201..be817a08 100644 --- a/include/infinispan/hotrod/Configuration.h +++ b/include/infinispan/hotrod/Configuration.h @@ -7,9 +7,10 @@ #include #include "infinispan/hotrod/portable.h" #include "infinispan/hotrod/ImportExport.h" -#include "ConnectionPoolConfiguration.h" -#include "ServerConfiguration.h" -#include "SslConfiguration.h" +#include "infinispan/hotrod/ConnectionPoolConfiguration.h" +#include "infinispan/hotrod/ServerConfiguration.h" +#include "infinispan/hotrod/SslConfiguration.h" +#include "infinispan/hotrod/NearCacheConfiguration.h" #include "infinispan/hotrod/FailOverRequestBalancingStrategy.h" #include "infinispan/hotrod/JBasicEventMarshaller.h" @@ -49,6 +50,7 @@ class Configuration bool _tcpNoDelay, int _valueSizeEstimate, int _maxRetries, + NearCacheConfiguration _nearCacheConfiguration, FailOverRequestBalancingStrategy::ProducerFn bsp=0, const event::EventMarshaller &eventMarshaller = event::JBasicEventMarshaller()): protocolVersion(_protocolVersion), protocolVersionPtr(), @@ -56,7 +58,7 @@ class Configuration connectionTimeout(_connectionTimeout), forceReturnValue(_forceReturnValue), keySizeEstimate(_keySizeEstimate), socketTimeout(_socketTimeout), sslConfiguration(_sslConfiguration),tcpNoDelay(_tcpNoDelay), - valueSizeEstimate(_valueSizeEstimate), maxRetries(_maxRetries), balancingStrategyProducer(bsp), + valueSizeEstimate(_valueSizeEstimate), maxRetries(_maxRetries), nearCacheConfiguration(_nearCacheConfiguration), balancingStrategyProducer(bsp), eventMarshaller(eventMarshaller) { std::map> tmpMap; @@ -173,6 +175,8 @@ class Configuration SslConfiguration& getSslConfiguration() { return sslConfiguration; } HR_EXTERN const event::EventMarshaller &getEventMarshaller() const; + const NearCacheConfiguration& getNearCacheConfiguration() const { return nearCacheConfiguration; } + private: portable::string protocolVersion; portable::local_ptr protocolVersionPtr; @@ -186,6 +190,7 @@ class Configuration bool tcpNoDelay; int valueSizeEstimate; int maxRetries; + const NearCacheConfiguration nearCacheConfiguration; FailOverRequestBalancingStrategy::ProducerFn balancingStrategyProducer; const event::EventMarshaller &eventMarshaller; diff --git a/include/infinispan/hotrod/ConfigurationBuilder.h b/include/infinispan/hotrod/ConfigurationBuilder.h index aca02309..38eeea2c 100644 --- a/include/infinispan/hotrod/ConfigurationBuilder.h +++ b/include/infinispan/hotrod/ConfigurationBuilder.h @@ -14,11 +14,13 @@ #include "ConnectionPoolConfigurationBuilder.h" #include "ServerConfigurationBuilder.h" #include "SslConfigurationBuilder.h" +#include "NearCacheConfiguration.h" using namespace infinispan::hotrod::event; namespace infinispan { namespace hotrod { + class ClusterConfigurationBuilder { public: @@ -32,6 +34,43 @@ class ClusterConfigurationBuilder std::vector& servers; ConfigurationBuilder &m_parent; }; + +/** + * Configuration classes for near cache + */ +class NearCacheConfigurationBuilder : public ConfigurationChildBuilder +{ +public: + NearCacheConfigurationBuilder(ConfigurationBuilder& _builder) : ConfigurationChildBuilder(_builder) {} + + int getMaxEntries() const { + return m_maxEntries; + } + + NearCacheConfigurationBuilder& maxEntries(int maxEntries = 0) { + this->m_maxEntries = maxEntries; + return *this; + } + + NearCacheMode getMode() const { + return m_mode; + } + + NearCacheConfigurationBuilder& mode(NearCacheMode mode = DISABLED) { + this->m_mode = mode; + return *this; + } + + NearCacheConfiguration create() + { + return NearCacheConfiguration(m_mode,m_maxEntries); + } + + private: + NearCacheMode m_mode=DISABLED; + int m_maxEntries=0; +}; + /** * ConfigurationBuilder used to generate immutable Configuration objects that are in turn * used to configure RemoteCacheManager instances. @@ -54,7 +93,8 @@ class ConfigurationBuilder __pragma(warning(suppress:4355)) // passing uninitialized 'this' connectionPoolConfigurationBuilder(*this), __pragma(warning(suppress:4355)) - sslConfigurationBuilder(*this) + sslConfigurationBuilder(*this), + nearCacheConfigurationBuilder(*this) {} void validate() {} @@ -178,6 +218,15 @@ class ConfigurationBuilder return sslConfigurationBuilder; } + /** + * Returns NearCacheConfigurationBuilder for near cache enabling and configuration. + * + *\return NearCacheConfigurationBuilder instance to be used for configuration + */ + NearCacheConfigurationBuilder& nearCache() { + return nearCacheConfigurationBuilder; + } + /** * Set tcpNoDelay for this ConfigurationBuilder. Default is true. * @@ -259,6 +308,7 @@ class ConfigurationBuilder m_tcpNoDelay, m_valueSizeEstimate, m_maxRetries, + nearCacheConfigurationBuilder.create(), m_balancingStrategyProducer, m_eventMarshaller); @@ -298,6 +348,7 @@ class ConfigurationBuilder ConnectionPoolConfigurationBuilder connectionPoolConfigurationBuilder; SslConfigurationBuilder sslConfigurationBuilder; JBasicEventMarshaller m_defaultEventMarshaller; + NearCacheConfigurationBuilder nearCacheConfigurationBuilder; EventMarshaller &m_eventMarshaller=m_defaultEventMarshaller; }; diff --git a/include/infinispan/hotrod/NearCacheConfiguration.h b/include/infinispan/hotrod/NearCacheConfiguration.h new file mode 100644 index 00000000..da37bac6 --- /dev/null +++ b/include/infinispan/hotrod/NearCacheConfiguration.h @@ -0,0 +1,47 @@ +/* + * NearCacheConfiguration.h + * + * Created on: Nov 29, 2016 + * Author: rigazilla + */ + +#ifndef INCLUDE_INFINISPAN_HOTROD_NEARCACHECONFIGURATION_H_ +#define INCLUDE_INFINISPAN_HOTROD_NEARCACHECONFIGURATION_H_ + + +#include "infinispan/hotrod/ImportExport.h" + +namespace infinispan { +namespace hotrod { + +enum NearCacheMode { DISABLED=0, INVALIDATED=1 }; + +class HR_EXTERN NearCacheConfiguration +{ +public: + NearCacheConfiguration(NearCacheMode mode=DISABLED, int maxEntries=0) : m_mode(mode), m_maxEntries(maxEntries) {} + + int getMaxEntries() const { + return m_maxEntries; + } + + void maxEntries(int maxEntries = 0) { + this->m_maxEntries = maxEntries; + } + + NearCacheMode getMode() const { + return m_mode; + } + + void mode(NearCacheMode mode = DISABLED) { + this->m_mode = mode; + } +private: + NearCacheMode m_mode=DISABLED; + int m_maxEntries=0; +}; +} +} + + +#endif /* INCLUDE_INFINISPAN_HOTROD_NEARCACHECONFIGURATION_H_ */ diff --git a/include/infinispan/hotrod/RemoteCache.h b/include/infinispan/hotrod/RemoteCache.h index 62c88b1c..64967906 100644 --- a/include/infinispan/hotrod/RemoteCache.h +++ b/include/infinispan/hotrod/RemoteCache.h @@ -32,7 +32,6 @@ namespace infinispan { namespace hotrod { - /** * Provides remote reference to a cache residing on a Hot Rod server/cluster. * @@ -909,11 +908,12 @@ template class RemoteCache : private RemoteCacheBase return *this; } - private: + protected: RemoteCache() : RemoteCacheBase() { setMarshallers(this, &keyMarshall, &valueMarshall, &keyUnmarshall, &valueUnmarshall); } + private: uint64_t toSeconds(uint64_t time, TimeUnit unit) { uint64_t result; switch (unit) { diff --git a/include/infinispan/hotrod/RemoteCacheBase.h b/include/infinispan/hotrod/RemoteCacheBase.h index 06346608..c66f3a67 100644 --- a/include/infinispan/hotrod/RemoteCacheBase.h +++ b/include/infinispan/hotrod/RemoteCacheBase.h @@ -88,6 +88,7 @@ class RemoteCacheBase friend class RemoteCacheManager; friend class RemoteCacheImpl; +friend class NearRemoteCacheImpl; friend class KeyUnmarshallerFtor; friend class ValueUnmarshallerFtor; template diff --git a/include/infinispan/hotrod/RemoteCacheManager.h b/include/infinispan/hotrod/RemoteCacheManager.h index 12ca33e1..4522ae87 100644 --- a/include/infinispan/hotrod/RemoteCacheManager.h +++ b/include/infinispan/hotrod/RemoteCacheManager.h @@ -113,10 +113,11 @@ class HR_EXTERN RemoteCacheManager const std::string key = forceReturnValue ? "/true" : "/false"; if (remoteCacheMap.find(key)==remoteCacheMap.end()) { - RemoteCache *pRc= new RemoteCache(); + RemoteCache *pRc; + pRc= new RemoteCache(); remoteCacheMap[key]= std::unique_ptr(pRc); RemoteCache *rcache=(RemoteCache *)remoteCacheMap[key].get(); - initCache(*rcache, forceReturnValue); + initCache(*rcache, forceReturnValue, getConfiguration().getNearCacheConfiguration()); rcache->keyMarshaller.reset(new BasicMarshaller(), &Marshaller::destroy); rcache->valueMarshaller.reset(new BasicMarshaller(), &Marshaller::destroy); return *rcache; @@ -191,14 +192,14 @@ class HR_EXTERN RemoteCacheManager remoteCacheMap[key] = std::unique_ptr < RemoteCacheBase > (pRc); RemoteCache *rcache = (RemoteCache *) remoteCacheMap[key].get(); - initCache(*rcache, forceReturnValue); + initCache(*rcache, forceReturnValue, getConfiguration().getNearCacheConfiguration()); rcache->keyMarshaller.reset(km, kd); rcache->valueMarshaller.reset(vm, vd); return *rcache; } RemoteCache *rcache = (RemoteCache *) remoteCacheMap[key].get(); - initCache(*rcache, forceReturnValue); + initCache(*rcache, forceReturnValue, getConfiguration().getNearCacheConfiguration()); rcache->keyMarshaller.reset(km, kd); rcache->valueMarshaller.reset(vm, vd); return *rcache; @@ -243,14 +244,14 @@ class HR_EXTERN RemoteCacheManager remoteCacheMap[key] = std::unique_ptr < RemoteCacheBase > (pRc); RemoteCache *rcache = (RemoteCache *) remoteCacheMap[key].get(); - initCache(*rcache, name.c_str(), forceReturnValue); + initCache(*rcache, name.c_str(), forceReturnValue, getConfiguration().getNearCacheConfiguration()); rcache->keyMarshaller.reset(km, kd); rcache->valueMarshaller.reset(vm, vd); return *rcache; } RemoteCache *rcache = (RemoteCache *) remoteCacheMap[key].get(); - initCache(*rcache, name.c_str(), forceReturnValue); + initCache(*rcache, name.c_str(), forceReturnValue, getConfiguration().getNearCacheConfiguration()); rcache->keyMarshaller.reset(km, kd); rcache->valueMarshaller.reset(vm, vd); return *rcache; @@ -295,8 +296,8 @@ class HR_EXTERN RemoteCacheManager void init(const portable::map& configuration, bool start); - void initCache(RemoteCacheBase& cache, bool forceReturnValue); - void initCache(RemoteCacheBase& cache, const char *name, bool forceReturnValue); + void initCache(RemoteCacheBase& cache, bool forceReturnValue, NearCacheConfiguration nc = NearCacheConfiguration()); + void initCache(RemoteCacheBase& cache, const char *name, bool forceReturnValue, NearCacheConfiguration nc = NearCacheConfiguration()); // not implemented RemoteCacheManager(const RemoteCacheManager&); diff --git a/jni/src/main/java/org/infinispan/client/hotrod/configuration/Configuration.java b/jni/src/main/java/org/infinispan/client/hotrod/configuration/Configuration.java index b2ae43f6..ad128968 100644 --- a/jni/src/main/java/org/infinispan/client/hotrod/configuration/Configuration.java +++ b/jni/src/main/java/org/infinispan/client/hotrod/configuration/Configuration.java @@ -10,6 +10,7 @@ import org.infinispan.client.hotrod.impl.transport.tcp.RequestBalancingStrategy; import org.infinispan.client.hotrod.configuration.ConnectionPoolConfiguration; import org.infinispan.client.hotrod.configuration.SslConfiguration; +import org.infinispan.client.hotrod.configuration.NearCacheConfiguration; import org.infinispan.commons.configuration.BuiltBy; import org.infinispan.commons.marshall.Marshaller; @@ -51,10 +52,10 @@ public Configuration(org.infinispan.client.hotrod.jni.Configuration jniConfigura Configuration(ExecutorFactoryConfiguration asyncExecutorFactory, Class balancingStrategy, ClassLoader classLoader, ConnectionPoolConfiguration connectionPool, int connectionTimeout, Class[] consistentHashImpl, boolean forceReturnValues, int keySizeEstimate, Class marshallerClass, boolean pingOnStartup, String protocolVersion, List servers, List failOverServers, int socketTimeout, SslConfiguration ssl, boolean tcpNoDelay, - Class transportFactory, int valueSizeEstimate, int maxRetries) { + Class transportFactory, int valueSizeEstimate, int maxRetries, NearCacheConfiguration nearCache) { this.jniConfiguration = new org.infinispan.client.hotrod.jni.Configuration(protocolVersion, connectionPool.getJniConnectionPoolConfiguration(), connectionTimeout, forceReturnValues, keySizeEstimate, null, socketTimeout, ssl.getJniSslConfiguration(), tcpNoDelay, - valueSizeEstimate, maxRetries); + valueSizeEstimate, maxRetries, nearCache.getJniNearCacheConfiguration()); this.asyncExecutorFactory = asyncExecutorFactory; this.balancingStrategy = balancingStrategy; this.classLoader = new WeakReference(classLoader); @@ -134,6 +135,10 @@ public SslConfiguration ssl() { return new SslConfiguration(this.jniConfiguration.getSslConfiguration()); } + public NearCacheConfiguration nearCache() { + return new NearCacheConfiguration(this.jniConfiguration.getNearCacheConfiguration()); + } + public boolean tcpNoDelay() { return this.jniConfiguration.isTcpNoDelay(); } diff --git a/jni/src/main/java/org/infinispan/client/hotrod/configuration/NearCacheConfiguration.java b/jni/src/main/java/org/infinispan/client/hotrod/configuration/NearCacheConfiguration.java new file mode 100644 index 00000000..690d4bfa --- /dev/null +++ b/jni/src/main/java/org/infinispan/client/hotrod/configuration/NearCacheConfiguration.java @@ -0,0 +1,48 @@ +package org.infinispan.client.hotrod.configuration; + + +/** + * NearCacheConfiguration. + * + * @author Vittorio Rigamonti + */ + +import org.infinispan.client.hotrod.jni.NearCacheMode; + +public class NearCacheConfiguration { + private final NearCacheMode mode; + private final int maxEntries; + private org.infinispan.client.hotrod.jni.NearCacheConfiguration jniNearCacheConfiguration; + + public org.infinispan.client.hotrod.jni.NearCacheConfiguration getJniNearCacheConfiguration() { + return jniNearCacheConfiguration; + } + + public NearCacheConfiguration(org.infinispan.client.hotrod.jni.NearCacheConfiguration jniNearCacheConfiguration) { + super(); + this.jniNearCacheConfiguration = jniNearCacheConfiguration; + mode=NearCacheMode.DISABLED; + maxEntries=0; + } + + NearCacheConfiguration(NearCacheMode mode, int maxEntries) { + this.mode=mode; + this.maxEntries=maxEntries; + this.jniNearCacheConfiguration = new org.infinispan.client.hotrod.jni.NearCacheConfiguration(NearCacheMode.DISABLED, 0); + } + + NearCacheMode getMode() + { + return mode; + } + + int getMaxEntries() + { + return maxEntries; + } + + @Override + public String toString() { + return "NearCacheConfiguration [mode=" + (mode==NearCacheMode.INVALIDATED ? "INVALIDATED" : "DISABLED") + ", maxEntries=" + maxEntries+"]"; + } +} diff --git a/jni/src/main/swig/java.i b/jni/src/main/swig/java.i index 61186ba7..b4c35247 100644 --- a/jni/src/main/swig/java.i +++ b/jni/src/main/swig/java.i @@ -16,6 +16,9 @@ %include "std_pair.i" +%include "enums.swg" +%javaconst(1); + //#define std::shared_ptr std::shared_ptr %{ @@ -28,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -55,6 +59,7 @@ using namespace infinispan::hotrod; %include "infinispan/hotrod/ConnectionPoolConfiguration.h" %include "infinispan/hotrod/ServerConfiguration.h" %include "infinispan/hotrod/SslConfiguration.h" +%include "infinispan/hotrod/NearCacheConfiguration.h" %immutable infinispan::hotrod::Configuration::PROTOCOL_VERSION_10; %immutable infinispan::hotrod::Configuration::PROTOCOL_VERSION_11; diff --git a/src/hotrod/api/RemoteCacheManager.cpp b/src/hotrod/api/RemoteCacheManager.cpp index a3a2ec72..59e0ac23 100644 --- a/src/hotrod/api/RemoteCacheManager.cpp +++ b/src/hotrod/api/RemoteCacheManager.cpp @@ -30,15 +30,15 @@ RemoteCacheManager::~RemoteCacheManager() { } void RemoteCacheManager::initCache( - RemoteCacheBase& cache, bool forceReturnValue) + RemoteCacheBase& cache, bool forceReturnValue, NearCacheConfiguration nc) { - cache.impl = IMPL->createRemoteCache(forceReturnValue); + cache.impl = IMPL->createRemoteCache(forceReturnValue, nc); } void RemoteCacheManager::initCache( - RemoteCacheBase& cache, const char *name, bool forceReturnValue) + RemoteCacheBase& cache, const char *name, bool forceReturnValue, NearCacheConfiguration nc) { - cache.impl = IMPL->createRemoteCache(std::string(name), forceReturnValue); + cache.impl = IMPL->createRemoteCache(std::string(name), forceReturnValue, nc); } void RemoteCacheManager::start() { diff --git a/src/hotrod/impl/CustomClientListener.h b/src/hotrod/impl/CustomClientListener.h new file mode 100644 index 00000000..c8be308d --- /dev/null +++ b/src/hotrod/impl/CustomClientListener.h @@ -0,0 +1,103 @@ +/* + * CustomClientListener.h + * + * Created on: Nov 29, 2016 + * Author: rigazilla + */ + +#ifndef SRC_HOTROD_IMPL_CUSTOMCLIENTLISTENER_H_ +#define SRC_HOTROD_IMPL_CUSTOMCLIENTLISTENER_H_ + +#include +#include +#include + +namespace infinispan { +namespace hotrod { + +namespace transport { +class Transport; +} + +namespace protocol { +class Codec20; +} +class RemoteCacheBase; +template +class RemoteCache; + +namespace event { + +class CustomClientListener : public ClientListener +{ +public: + void add_listener(std::function >)> callback) { + createdCallbacks.push_back(callback); + } + void add_listener(std::function >)> callback) { + expiredCallbacks.push_back(callback); + } + void add_listener(std::function >)> callback) { + modifiedCallbacks.push_back(callback); + } + void add_listener(std::function >)> callback) { + removedCallbacks.push_back(callback); + } + void add_listener(std::function callback) { + customCallbacks.push_back(callback); + } + + + + virtual void processEvent(ClientCacheEntryCreatedEvent> marshEv, std::vectorlistId, uint8_t isCustom) const + { + for (auto callable: createdCallbacks) + { + callable(marshEv); + } + } + + virtual void processEvent(ClientCacheEntryModifiedEvent> marshEv, std::vectorlistId, uint8_t isCustom) const + { + for (auto callable: modifiedCallbacks) + { + callable(marshEv); + } + } + + virtual void processEvent(ClientCacheEntryRemovedEvent> marshEv, std::vectorlistId, uint8_t isCustom) const + { + for (auto callable: removedCallbacks) + { + callable(marshEv); + } + } + + virtual void processEvent(ClientCacheEntryExpiredEvent> marshEv, std::vectorlistId, uint8_t isCustom) const + { + for (auto callable: expiredCallbacks) + { + callable(marshEv); + } + } + + virtual void processEvent(ClientCacheEntryCustomEvent ev, std::vectorlistId, uint8_t isCustom) const + { + for (auto callable: customCallbacks) + { + callable(ev); + } + } +private: + std::list>)>> createdCallbacks; + std::list>)>> expiredCallbacks; + std::list>)>> modifiedCallbacks; + std::list>)>> removedCallbacks; + std::list> customCallbacks; +}; + + + +}}} + +#endif /* SRC_HOTROD_IMPL_CUSTOMCLIENTLISTENER_H_ */ diff --git a/src/hotrod/impl/NearRemoteCacheImpl.h b/src/hotrod/impl/NearRemoteCacheImpl.h new file mode 100644 index 00000000..c0274fae --- /dev/null +++ b/src/hotrod/impl/NearRemoteCacheImpl.h @@ -0,0 +1,211 @@ +/* + * NearRemoteCache.h + * + * Created on: Nov 29, 2016 + * Author: rigazilla + */ + +#ifndef INCLUDE_INFINISPAN_HOTROD_NEARREMOTECACHE_H_ +#define INCLUDE_INFINISPAN_HOTROD_NEARREMOTECACHE_H_ +#include "hotrod/impl/RemoteCacheImpl.h" +#include "hotrod/impl/RemoteCacheManagerImpl.h" +#include "hotrod/impl/CustomClientListener.h" +#include +#include +#include +#include +#include + +namespace infinispan { +namespace hotrod { + +class NearRemoteCacheImpl: public RemoteCacheImpl { +public: + + NearRemoteCacheImpl(RemoteCacheManagerImpl& rcm, std::string cacheName, + const NearCacheConfiguration& conf) : + RemoteCacheImpl(rcm, cacheName), maxEntries(conf.getMaxEntries()), cl() { + } + + virtual void *get(RemoteCacheBase& rcb, const void* key) { + VersionedValue version; + return getWithVersion(rcb, key, &version); + } + + virtual void *put(RemoteCacheBase& rcb, const void *key, const void* val, + uint64_t life, uint64_t idle) { + std::vector kbuf; + rcb.baseKeyMarshall(key, kbuf); + removeElementFromMap(kbuf); + return RemoteCacheImpl::put(rcb, key, val, life, idle); + } + + virtual void *replace(RemoteCacheBase& rcb, const void *key, + const void* val, uint64_t life, uint64_t idle) { + std::vector kbuf; + rcb.baseKeyMarshall(key, kbuf); + removeElementFromMap(kbuf); + return RemoteCacheImpl::replace(rcb, key, val, life, idle); + } + + virtual bool replaceWithVersion(RemoteCacheBase& rcb, const void* k, + const void* v, uint64_t version, uint64_t life, uint64_t idle) { + bool replaced = RemoteCacheImpl::replaceWithVersion(rcb, k, v, version, + life, idle); + if (replaced) { + std::vector kbuf; + rcb.baseKeyMarshall(k, kbuf); + removeElementFromMap(kbuf); + } + return replaced; + } + + virtual void *remove(RemoteCacheBase& rcb, const void* key) { + std::vector kbuf; + rcb.baseKeyMarshall(key, kbuf); + removeElementFromMap(kbuf); + return RemoteCacheImpl::remove(rcb, key); + } + virtual bool removeWithVersion(RemoteCacheBase& rcb, const void* k, + uint64_t version) { + bool removed = RemoteCacheImpl::removeWithVersion(rcb, k, version); + if (removed) { + std::vector kbuf; + rcb.baseKeyMarshall(k, kbuf); + removeElementFromMap(kbuf); + } + return removed; + } + virtual void *getWithVersion(RemoteCacheBase& rcb, const void *key, + VersionedValue* version) { + std::vector kbuf, vbuf; + rcb.baseKeyMarshall(key, kbuf); + if (_nearMap.find(kbuf) == _nearMap.end()) { + void* value = RemoteCacheImpl::getWithVersion(rcb, key, version); + if (value) + { + VersionedValueImpl > valueForMap; + rcb.baseValueMarshall(value, vbuf); + valueForMap.setValue(vbuf); + addElementToMap(kbuf, valueForMap); + } + return value; + } + version->version = _nearMap[kbuf].getVersion(); + return rcb.baseValueUnmarshall(_nearMap[kbuf].getValue()); + } + virtual void clear() { + RemoteCacheImpl::clear(); + clearMap(); + } + virtual void init(operations::OperationsFactory* operationsFactory) { + RemoteCacheImpl::init(operationsFactory); + startListener(); + } +private: + unsigned int maxEntries; + std::map, VersionedValueImpl > > _nearMap; + std::deque > _nearFifo; + std::mutex _nearMutex; + std::vector > filterFactoryParams; + std::vector > converterFactoryParams; + event::CustomClientListener cl; + void addElementToMap(std::vector& key, + VersionedValueImpl>& value) { + std::lock_guard guard(_nearMutex); + if (maxEntries > 0) { + _nearFifo.push_back(key); + if (maxEntries > 0 && _nearMap.size() >= maxEntries) { + // Remove oldest element + while (!_nearFifo.empty() + && _nearMap.find(_nearFifo.front()) == _nearMap.end()) { + _nearFifo.pop_front(); + } + if (!_nearFifo.empty()) { + _nearMap.erase(_nearFifo.front()); + _nearFifo.pop_front(); + } + } + } + _nearMap[key] = value; + } + + void removeElementFromMap(std::vector& key) { + std::lock_guard guard(_nearMutex); + auto it = std::find(_nearFifo.begin(), _nearFifo.end(), key); + if (it != _nearFifo.end()) { + _nearFifo.erase(it); + _nearMap.erase(key); + } + } + + void clearMap() { + std::lock_guard guard(_nearMutex); + _nearFifo.clear(); + _nearMap.clear(); + } + + void invalidateCache() { + std::lock_guard guard(_nearMutex); + _nearMap.clear(); + } + void startListener() { + std::string convStr("___eager-key-value-version-converter"); + cl.converterFactoryName = std::vector(convStr.begin(), + convStr.end()); + cl.useRawData = true; + std::function f = + [this] (ClientCacheEntryCustomEvent ce) {listener(this->_nearMap, ce);}; + std::function < void() > failOverHandler = + [this] () { + this->invalidateCache(); + }; + cl.add_listener(f); + this->addClientListener(cl, filterFactoryParams, converterFactoryParams, + failOverHandler); + } + void listener( + std::map, VersionedValueImpl > > &map, + ClientCacheEntryCustomEvent &ce) { + // bytearray format is + const std::vector &v = ce.getEventData(); + if (v.size() == 0) + return; + unsigned int sizeKey = v[0]; + if (sizeKey == 0) + return; + auto i = v.begin() + 1; + auto f = v.begin() + 1 + sizeKey; + std::vector key(i, f); + unsigned int sizeValue; + std::vector value; + if (sizeKey + 1 < v.size()) { + sizeValue = *(v.data() + sizeKey + 1); + const char* i1 = v.data() + sizeKey + 2; + const char* f1 = v.data() + sizeKey + 2 + sizeValue; + value = std::vector(i1, f1); + } + else + { + // If no value it's an entry removed event + removeElementFromMap(key); + return; + } + unsigned long version = 0; + if (v.size() - sizeKey - sizeValue - 2 >= 8) { + for (unsigned int i = 0; i < 8; i++) { + version = (version << 8) + v[sizeKey + sizeValue + 2 + i]; + } + } + VersionedValueImpl > vv; + vv.setValue(value); + vv.setVersion(version); + addElementToMap(key, vv); + } + +}; + +} +} + +#endif /* INCLUDE_INFINISPAN_HOTROD_NEARREMOTECACHE_H_ */ diff --git a/src/hotrod/impl/RemoteCacheImpl.h b/src/hotrod/impl/RemoteCacheImpl.h index 0f23c2bc..d9745f0c 100644 --- a/src/hotrod/impl/RemoteCacheImpl.h +++ b/src/hotrod/impl/RemoteCacheImpl.h @@ -30,21 +30,21 @@ class RemoteCacheImpl: public portable::counted_object { public: RemoteCacheImpl(RemoteCacheManagerImpl& rcm, const std::string& name); - void *get(RemoteCacheBase& rcb, const void* key); - void *put(RemoteCacheBase& rcb, const void *key, const void* val, uint64_t life, uint64_t idle); + virtual void *get(RemoteCacheBase& rcb, const void* key); + virtual void *put(RemoteCacheBase& rcb, const void *key, const void* val, uint64_t life, uint64_t idle); void *putIfAbsent(RemoteCacheBase& rcb, const void *key, const void* val, uint64_t life, uint64_t idle); - void *replace(RemoteCacheBase& rcb, const void *key, const void* val, uint64_t life, uint64_t idle); - void *remove(RemoteCacheBase& rcb, const void* key); + virtual void *replace(RemoteCacheBase& rcb, const void *key, const void* val, uint64_t life, uint64_t idle); + virtual void *remove(RemoteCacheBase& rcb, const void* key); bool containsKey(RemoteCacheBase& rcb, const void* key); - bool replaceWithVersion(RemoteCacheBase& rcb, const void* k, const void* v, uint64_t version, uint64_t life, uint64_t idle); - bool removeWithVersion(RemoteCacheBase& rcb, const void* k, uint64_t version); + virtual bool replaceWithVersion(RemoteCacheBase& rcb, const void* k, const void* v, uint64_t version, uint64_t life, uint64_t idle); + virtual bool removeWithVersion(RemoteCacheBase& rcb, const void* k, uint64_t version); void *getWithMetadata(RemoteCacheBase& rcb, const void *key, MetadataValue* metadata); - void *getWithVersion(RemoteCacheBase& rcb, const void *key, VersionedValue* version); + virtual void *getWithVersion(RemoteCacheBase& rcb, const void *key, VersionedValue* version); void getBulk(RemoteCacheBase& rcb, portable::map &mbuf); void getBulk(RemoteCacheBase& rcb, int size, portable::map &mbuf); void keySet(RemoteCacheBase& rcb, int scope, portable::vector &result); void stats(portable::map &stats); - void clear(); + virtual void clear(); uint64_t size(); std::vector execute(std::vector cmdName, const std::map,std::vector>& args); QueryResponse query(const QueryRequest & qr); @@ -52,7 +52,7 @@ class RemoteCacheImpl: public portable::counted_object CacheTopologyInfo getCacheTopologyInfo(); void addClientListener(ClientListener&, const std::vector >, const std::vector >, const std::function &); void removeClientListener(ClientListener&); - void init(operations::OperationsFactory* operationsFactory); + virtual void init(operations::OperationsFactory* operationsFactory); void withFlags(Flag flag); @@ -64,7 +64,6 @@ class RemoteCacheImpl: public portable::counted_object private: RemoteCacheManagerImpl& remoteCacheManager; - std::shared_ptr operationsFactory; std::string name; diff --git a/src/hotrod/impl/RemoteCacheManagerImpl.cpp b/src/hotrod/impl/RemoteCacheManagerImpl.cpp index 772befce..7fc58556 100644 --- a/src/hotrod/impl/RemoteCacheManagerImpl.cpp +++ b/src/hotrod/impl/RemoteCacheManagerImpl.cpp @@ -1,3 +1,4 @@ +#include #include "infinispan/hotrod/ConfigurationBuilder.h" #include "hotrod/impl/RemoteCacheManagerImpl.h" #include "hotrod/impl/RemoteCacheImpl.h" @@ -88,13 +89,13 @@ const Configuration& RemoteCacheManagerImpl::getConfiguration() { } RemoteCacheImpl *RemoteCacheManagerImpl::createRemoteCache( - bool forceReturnValue) + bool forceReturnValue, NearCacheConfiguration nc) { - return createRemoteCache(DefaultCacheName,forceReturnValue); + return createRemoteCache(DefaultCacheName,forceReturnValue, nc); } RemoteCacheImpl *RemoteCacheManagerImpl::createRemoteCache( - const std::string& name, bool forceReturnValue) + const std::string& name, bool forceReturnValue, NearCacheConfiguration nc) { ScopedLock l(lock); std::map::iterator iter = cacheName2RemoteCache.find(name); @@ -103,7 +104,15 @@ RemoteCacheImpl *RemoteCacheManagerImpl::createRemoteCache( return iter->second.first.get(); } // Cache not found, creating... - RemoteCacheImpl *rcache = new RemoteCacheImpl(*this, name); + RemoteCacheImpl *rcache; + if (nc.getMode()==NearCacheMode::DISABLED) + { + rcache = new RemoteCacheImpl(*this, name); + } + else + { + rcache = new NearRemoteCacheImpl(*this, name, nc); + } try { startRemoteCache(*rcache, forceReturnValue); if (name != DefaultCacheName) { diff --git a/src/hotrod/impl/RemoteCacheManagerImpl.h b/src/hotrod/impl/RemoteCacheManagerImpl.h index 0cdfb2dd..f1517fc4 100644 --- a/src/hotrod/impl/RemoteCacheManagerImpl.h +++ b/src/hotrod/impl/RemoteCacheManagerImpl.h @@ -22,8 +22,8 @@ class RemoteCacheManagerImpl RemoteCacheManagerImpl(const std::map& properties, bool start_); // Deprecated RemoteCacheManagerImpl(const Configuration& configuration, bool start = true); - RemoteCacheImpl *createRemoteCache(bool forceReturnValue); - RemoteCacheImpl *createRemoteCache(const std::string& name, bool forceReturnValue); + RemoteCacheImpl *createRemoteCache(bool forceReturnValue, NearCacheConfiguration nc); + RemoteCacheImpl *createRemoteCache(const std::string& name, bool forceReturnValue, NearCacheConfiguration nc); void start(); void stop(); diff --git a/src/hotrod/impl/VersionedValueImpl.h b/src/hotrod/impl/VersionedValueImpl.h index 20d62339..38957655 100644 --- a/src/hotrod/impl/VersionedValueImpl.h +++ b/src/hotrod/impl/VersionedValueImpl.h @@ -13,6 +13,10 @@ template class VersionedValueImpl : public VersionedValue version = _version; } + unsigned long getVersion() { + return version; + } + V getValue() { return value; } diff --git a/test/NearCacheTest.cpp b/test/NearCacheTest.cpp new file mode 100644 index 00000000..b4b7854e --- /dev/null +++ b/test/NearCacheTest.cpp @@ -0,0 +1,61 @@ +#include "infinispan/hotrod/ConfigurationBuilder.h" +#include "infinispan/hotrod/RemoteCacheManager.h" +#include "infinispan/hotrod/RemoteCache.h" +#include "infinispan/hotrod/Version.h" + +#include "infinispan/hotrod/JBasicMarshaller.h" +#include "infinispan/hotrod/CacheClientListener.h" +#include "infinispan/hotrod/ClientEvent.h" + +#include +#include +#include +using namespace infinispan::hotrod; +using namespace infinispan::hotrod::event; + + +int main(int argc, char** argv) { + ConfigurationBuilder nearCacheBuilder; + nearCacheBuilder.addServer().host(argc > 1 ? argv[1] : "127.0.0.1").port(argc > 2 ? atoi(argv[2]) : 11222); + nearCacheBuilder.protocolVersion(Configuration::PROTOCOL_VERSION_24); + nearCacheBuilder.balancingStrategyProducer(nullptr); + nearCacheBuilder.nearCache().mode(NearCacheMode::INVALIDATED).maxEntries(10); + RemoteCacheManager nearCacheManager(nearCacheBuilder.build(), false); + JBasicMarshaller *km = new JBasicMarshaller(); + JBasicMarshaller *vm = new JBasicMarshaller(); + nearCacheManager.start(); + RemoteCache nearCache = nearCacheManager.getCache(km, + &Marshaller::destroy, + vm, + &Marshaller::destroy); + nearCache.clear(); + // Read stats to do some checks on hits and miss counter + std::map statsBegin= nearCache.stats(); + auto hitsBegin = std::stoi(statsBegin["hits"]); + auto missesBegin = std::stoi(statsBegin["misses"]); + // Only the first get goes to the remote cache and miss the value + // then all the gets are resolved nearly + nearCache.get("key1"); + nearCache.put("key1", "value1"); + std::string *rest = nearCache.get("key1"); + std::cout << "Got result from near cache:" << ((rest) ? *rest : "null") << std::endl; + nearCache.get("key1"); + std::map stats1= nearCache.stats(); + auto hits1 = std::stoi(stats1["hits"]); + auto misses1 = std::stoi(stats1["misses"]); + std::cout << "Remote misses is: " << misses1-missesBegin << "" << std::endl; + std::cout << "Remote hits is: " << hits1-hitsBegin << "" << std::endl; + for(int i=2; i <= 11; i++) + { // fill cache with 10 more entries (11 in total) + nearCache.put("key"+std::to_string(i),std::to_string(i)); + } + // now key1 one should not be near + nearCache.get("key1"); // remote get. Stored near (this delete key2 nearly) + nearCache.get("key2"); // remote get. Stored near (this delete key3 nearly) + nearCache.get("key1"); // near + std::map statsEnd= nearCache.stats(); + auto hitsEnd = std::stoi(statsEnd["hits"]); + auto missesEnd = std::stoi(statsEnd["misses"]); + std::cout << "Remote misses is: " << missesEnd-missesBegin << "" << std::endl; + std::cout << "Remote hits is: " << hitsEnd-hitsBegin << "" << std::endl; +}