New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ISPN-6893 Scala removal from server/core module #4449
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
package org.infinispan.server.core; | ||
|
||
import org.infinispan.commons.util.concurrent.ConcurrentHashSet; | ||
|
||
import java.util.Set; | ||
|
||
/** | ||
* Abstract class providing stock implementations for {@link CacheIgnoreAware} so all that is required is to extend | ||
* this class. | ||
* @author gustavonalle | ||
* @author wburns | ||
* @since 9.0 | ||
*/ | ||
public class AbstractCacheIgnoreAware implements CacheIgnoreAware { | ||
|
||
private Set<String> ignoredCaches = new ConcurrentHashSet<>(); | ||
|
||
public void setIgnoredCaches(Set<String> cacheNames) { | ||
ignoredCaches.clear(); | ||
cacheNames.forEach(ignoredCaches::add); | ||
} | ||
|
||
public void unignore(String cacheName) { | ||
ignoredCaches.remove(cacheName); | ||
} | ||
|
||
public void ignoreCache(String cacheName) { | ||
ignoredCaches.add(cacheName); | ||
} | ||
|
||
public boolean isCacheIgnored(String cacheName) { | ||
return ignoredCaches.contains(cacheName); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
package org.infinispan.server.core; | ||
|
||
import java.net.InetSocketAddress; | ||
import javax.management.DynamicMBean; | ||
import javax.management.MBeanServer; | ||
import javax.management.ObjectName; | ||
|
||
import org.infinispan.commons.CacheException; | ||
import org.infinispan.commons.logging.LogFactory; | ||
import org.infinispan.configuration.global.GlobalConfiguration; | ||
import org.infinispan.factories.components.ManageableComponentMetadata; | ||
import org.infinispan.jmx.JmxUtil; | ||
import org.infinispan.jmx.ResourceDMBean; | ||
import org.infinispan.manager.EmbeddedCacheManager; | ||
import org.infinispan.server.core.configuration.ProtocolServerConfiguration; | ||
import org.infinispan.server.core.logging.Log; | ||
import org.infinispan.server.core.transport.NettyTransport; | ||
|
||
/** | ||
* A common protocol server dealing with common property parameter validation and assignment and transport lifecycle. | ||
* | ||
* @author Galder Zamarreño | ||
* @author wburns | ||
* @since 4.1 | ||
*/ | ||
public abstract class AbstractProtocolServer<A extends ProtocolServerConfiguration> extends AbstractCacheIgnoreAware | ||
implements ProtocolServer<A> { | ||
|
||
private final Log log = LogFactory.getLog(getClass(), Log.class); | ||
|
||
private final String protocolName; | ||
|
||
protected NettyTransport transport; | ||
protected EmbeddedCacheManager cacheManager; | ||
protected A configuration; | ||
private ObjectName transportObjName; | ||
private MBeanServer mbeanServer; | ||
|
||
protected AbstractProtocolServer(String protocolName) { | ||
this.protocolName = protocolName; | ||
} | ||
|
||
protected void startInternal(A configuration, EmbeddedCacheManager cacheManager) { | ||
this.configuration = configuration; | ||
this.cacheManager = cacheManager; | ||
|
||
if (log.isDebugEnabled()) { | ||
log.debugf("Starting server with configuration: %s", configuration); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just how it was before, tbh it probably doesn't really matter since this is only ever invoked once. The debug enabled is probably just a tiny fraction faster when it isn't enabled. |
||
} | ||
|
||
// Start default cache | ||
startDefaultCache(); | ||
|
||
startTransport(); | ||
} | ||
|
||
@Override | ||
public final void start(A configuration, EmbeddedCacheManager cacheManager) { | ||
try { | ||
configuration.ignoredCaches().forEach(this::ignoreCache); | ||
startInternal(configuration, cacheManager); | ||
} catch (RuntimeException t) { | ||
stop(); | ||
throw t; | ||
} | ||
} | ||
|
||
protected void startTransport() { | ||
InetSocketAddress address = new InetSocketAddress(configuration.host(), configuration.port()); | ||
transport = new NettyTransport(address, configuration, getQualifiedName(), cacheManager); | ||
transport.initializeHandler(getInitializer()); | ||
|
||
// Register transport MBean regardless | ||
registerTransportMBean(); | ||
|
||
transport.start(); | ||
} | ||
|
||
protected void registerTransportMBean() { | ||
GlobalConfiguration globalCfg = cacheManager.getCacheManagerConfiguration(); | ||
mbeanServer = JmxUtil.lookupMBeanServer(globalCfg); | ||
String groupName = String.format("type=Server,name=%s", getQualifiedName()); | ||
String jmxDomain = JmxUtil.buildJmxDomain(globalCfg, mbeanServer, groupName); | ||
|
||
// Pick up metadata from the component metadata repository | ||
ManageableComponentMetadata meta = LifecycleCallbacks.componentMetadataRepo | ||
.findComponentMetadata(transport.getClass()).toManageableComponentMetadata(); | ||
try { | ||
// And use this metadata when registering the transport as a dynamic MBean | ||
DynamicMBean dynamicMBean = new ResourceDMBean(transport, meta); | ||
|
||
transportObjName = new ObjectName(String.format("%s:%s,component=%s", jmxDomain, groupName, | ||
meta.getJmxObjectName())); | ||
JmxUtil.registerMBean(dynamicMBean, transportObjName, mbeanServer); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
protected void unregisterTransportMBean() throws Exception { | ||
if (mbeanServer != null && transportObjName != null) { | ||
// Unregister mbean(s) | ||
JmxUtil.unregisterMBean(transportObjName, mbeanServer); | ||
} | ||
} | ||
|
||
protected String getQualifiedName() { | ||
return protocolName + (configuration.name().length() > 0 ? "-" : "") + configuration.name(); | ||
} | ||
|
||
@Override | ||
public void stop() { | ||
boolean isDebug = log.isDebugEnabled(); | ||
if (isDebug && configuration != null) | ||
log.debugf("Stopping server listening in %s:%d", configuration.host(), configuration.port()); | ||
|
||
if (transport != null) | ||
transport.stop(); | ||
|
||
try { | ||
unregisterTransportMBean(); | ||
} catch (Exception e) { | ||
throw new CacheException(e); | ||
} | ||
|
||
if (isDebug) | ||
log.debug("Server stopped"); | ||
} | ||
|
||
public EmbeddedCacheManager getCacheManager() { | ||
return cacheManager; | ||
} | ||
|
||
public String getHost() { | ||
return configuration.host(); | ||
} | ||
|
||
public int getPort() { | ||
return configuration.port(); | ||
} | ||
|
||
@Override | ||
public A getConfiguration() { | ||
return configuration; | ||
} | ||
|
||
protected void startDefaultCache() { | ||
cacheManager.getCache(configuration.defaultCacheName()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package org.infinispan.server.core; | ||
|
||
import java.util.Set; | ||
|
||
/** | ||
* Defines an interface to be used when a cache is to be ignored by a server implementation. Any implementation should | ||
* be thread safe and allow for concurrent methods to be invoked. | ||
* @author gustavonalle | ||
* @author wburns | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please add something meaningful to this javadoc? |
||
* @since 9.0 | ||
*/ | ||
public interface CacheIgnoreAware { | ||
|
||
/** | ||
* Replaces all ignored caches with the set provided | ||
* @param cacheNames the set of caches to now ignore | ||
*/ | ||
void setIgnoredCaches(Set<String> cacheNames); | ||
|
||
/** | ||
* No longer ignore the given cache if it was before | ||
* @param cacheName the cache to now not ignore | ||
*/ | ||
void unignore(String cacheName); | ||
|
||
/** | ||
* Ignores a given cache if it wasn't before | ||
* @param cacheName the cache to ignore | ||
*/ | ||
void ignoreCache(String cacheName); | ||
|
||
/** | ||
* Queries whether the cache is ignored | ||
* @param cacheName the cache to see if it is ignored | ||
* @return whether or not the cache is ignored | ||
*/ | ||
boolean isCacheIgnored(String cacheName); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package org.infinispan.server.core; | ||
|
||
/** | ||
* Externalizer ids used by Server module {@link org.infinispan.commons.marshall.AdvancedExternalizer} implementations. | ||
* TODO: update URL below | ||
* Information about the valid id range can be found <a href="http://community.jboss.org/docs/DOC-16198">here</a> | ||
* | ||
* @author Galder Zamarreño | ||
* @author wburns | ||
* @since 9.0 | ||
*/ | ||
public final class ExternalizerIds { | ||
private ExternalizerIds() { } | ||
|
||
public static final int SERVER_ENTRY_VERSION = 1100; | ||
public static final int MEMCACHED_METADATA = 1101; | ||
public static final int TOPOLOGY_ADDRESS = 1102; | ||
public static final int TOPOLOGY_VIEW = 1103; | ||
public static final int SERVER_ADDRESS = 1104; | ||
public static final int MIME_METADATA = 1105; | ||
public static final int BINARY_FILTER = 1106; | ||
public static final int BINARY_CONVERTER = 1107; | ||
public static final int KEY_VALUE_VERSION_CONVERTER = 1108; | ||
public static final int BINARY_FILTER_CONVERTER = 1109; | ||
public static final int KEY_VALUE_WITH_PREVIOUS_CONVERTER = 1110; | ||
public static final int ITERATION_FILTER = 1111; | ||
public static final int QUERY_ITERATION_FILTER = 1112; | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package org.infinispan.server.core; | ||
|
||
import org.infinispan.configuration.cache.Configuration; | ||
import org.infinispan.configuration.global.GlobalConfiguration; | ||
import org.infinispan.factories.components.ComponentMetadataRepo; | ||
import org.infinispan.factories.ComponentRegistry; | ||
import org.infinispan.factories.GlobalComponentRegistry; | ||
import org.infinispan.lifecycle.AbstractModuleLifecycle; | ||
|
||
/** | ||
* Module lifecycle callbacks implementation that enables module specific | ||
* {@link org.infinispan.commons.marshall.AdvancedExternalizer} implementations to be registered. | ||
* | ||
* @author Galder Zamarreño | ||
* @since 5.0 | ||
*/ | ||
public class LifecycleCallbacks extends AbstractModuleLifecycle { | ||
|
||
static ComponentMetadataRepo componentMetadataRepo; | ||
|
||
@Override | ||
public void cacheManagerStarting(GlobalComponentRegistry gcr, GlobalConfiguration globalConfiguration) { | ||
componentMetadataRepo = gcr.getComponentMetadataRepo(); | ||
} | ||
|
||
@Override | ||
public void cacheStarting(ComponentRegistry cr, Configuration configuration, String cacheName) { | ||
configuration.storeAsBinary().enabled(false); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,48 +1,50 @@ | ||
package org.infinispan.server.core | ||
package org.infinispan.server.core; | ||
|
||
import org.infinispan.manager.EmbeddedCacheManager | ||
import org.infinispan.server.core.configuration.ProtocolServerConfiguration | ||
import io.netty.channel.{Channel, ChannelInitializer, ChannelInboundHandler, ChannelOutboundHandler} | ||
import io.netty.channel.Channel; | ||
import io.netty.channel.ChannelInboundHandler; | ||
import io.netty.channel.ChannelInitializer; | ||
import io.netty.channel.ChannelOutboundHandler; | ||
import org.infinispan.manager.EmbeddedCacheManager; | ||
import org.infinispan.server.core.configuration.ProtocolServerConfiguration; | ||
|
||
/** | ||
* Represents a protocol compliant server. | ||
* | ||
* @author Galder Zamarreño | ||
* @since 4.1 | ||
* @author wburns | ||
* @since 9.0 | ||
*/ | ||
trait ProtocolServer extends CacheIgnoreAware { | ||
type SuitableConfiguration <: ProtocolServerConfiguration | ||
|
||
public interface ProtocolServer<C extends ProtocolServerConfiguration> extends CacheIgnoreAware { | ||
/** | ||
* Starts the server backed by the given cache manager and with the corresponding configuration. | ||
*/ | ||
def start(configuration: SuitableConfiguration, cacheManager: EmbeddedCacheManager) | ||
void start(C configuration, EmbeddedCacheManager cacheManager); | ||
|
||
/** | ||
* Stops the server | ||
*/ | ||
def stop | ||
void stop(); | ||
|
||
/** | ||
* Gets the encoder for this protocol server. The encoder is responsible for writing back common header responses | ||
* back to client. This method can return null if the server has no encoder. You can find an example of the server | ||
* that has no encoder in the Memcached server. | ||
*/ | ||
def getEncoder: ChannelOutboundHandler | ||
ChannelOutboundHandler getEncoder(); | ||
|
||
/** | ||
* Gets the decoder for this protocol server. The decoder is responsible for reading client requests. | ||
* This method cannot return null. | ||
*/ | ||
def getDecoder: ChannelInboundHandler | ||
ChannelInboundHandler getDecoder(); | ||
|
||
/** | ||
* Returns the configuration used to start this server | ||
*/ | ||
def getConfiguration: SuitableConfiguration | ||
C getConfiguration(); | ||
|
||
/** | ||
* Returns a pipeline factory | ||
*/ | ||
def getInitializer: ChannelInitializer[Channel] | ||
ChannelInitializer<Channel> getInitializer(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add something meaningful to this javadoc?