Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

started on annotation-driven functions

  • Loading branch information...
commit 4da8c04af00711569f70e26fedbb85f74276d7e4 1 parent 467755f
David Turanski authored
Showing with 5,715 additions and 28 deletions.
  1. +1 −1  build.gradle
  2. +0 −1  gradle.properties
  3. +5 −2 maven.gradle
  4. +49 −0 src/main/java/org/springframework/data/gemfire/config/AnnotationDrivenBeanDefinitionParser.java
  5. +1 −0  src/main/java/org/springframework/data/gemfire/config/GemfireNamespaceHandler.java
  6. +27 −0 src/main/java/org/springframework/data/gemfire/function/FilterAware.java
  7. +139 −0 src/main/java/org/springframework/data/gemfire/function/FunctionExecution.java
  8. +23 −0 src/main/java/org/springframework/data/gemfire/function/GemfireFunctionCallback.java
  9. +46 −0 src/main/java/org/springframework/data/gemfire/function/GemfireFunctionOperations.java
  10. +250 −0 src/main/java/org/springframework/data/gemfire/function/GemfireFunctionProxyFactoryBean.java
  11. +168 −0 src/main/java/org/springframework/data/gemfire/function/GemfireFunctionTemplate.java
  12. +51 −0 src/main/java/org/springframework/data/gemfire/function/MemberFunctionExecution.java
  13. +50 −0 src/main/java/org/springframework/data/gemfire/function/MembersFunctionExecution.java
  14. +242 −0 src/main/java/org/springframework/data/gemfire/function/MethodInvokingFunction.java
  15. +194 −0 src/main/java/org/springframework/data/gemfire/function/PojoFunctionWrapper.java
  16. +58 −0 src/main/java/org/springframework/data/gemfire/function/RegionFunctionExecution.java
  17. +60 −0 src/main/java/org/springframework/data/gemfire/function/RemoteMethodInvocation.java
  18. +60 −0 src/main/java/org/springframework/data/gemfire/function/ServerFunctionExecution.java
  19. +60 −0 src/main/java/org/springframework/data/gemfire/function/ServersFunctionExecution.java
  20. +64 −0 src/main/java/org/springframework/data/gemfire/function/config/EnableGemfireFunctions.java
  21. +41 −0 src/main/java/org/springframework/data/gemfire/function/config/GemfireFunction.java
  22. +86 −0 src/main/java/org/springframework/data/gemfire/function/config/GemfireFunctionBeanPostProcessor.java
  23. +63 −0 src/main/java/org/springframework/data/gemfire/function/config/GemfireFunctionUtils.java
  24. +33 −0 src/main/java/org/springframework/data/gemfire/function/config/RegionData.java
  25. +4 −2 src/main/resources/META-INF/spring.schemas
  26. +50 −0 src/main/resources/org/springframework/data/gemfire/config/spring-data-gemfire-1.3.xsd
  27. +2,919 −0 src/main/resources/org/springframework/data/gemfire/config/spring-gemfire-1.3.xsd
  28. +28 −22 src/test/java/org/springframework/data/gemfire/ForkUtil.java
  29. +107 −0 src/test/java/org/springframework/data/gemfire/fork/FunctionCacheServerProcess.java
  30. +157 −0 src/test/java/org/springframework/data/gemfire/function/FunctionExecutionTests.java
  31. +135 −0 src/test/java/org/springframework/data/gemfire/function/GemfireFunctionProxyFactoryBeanTest.java
  32. +112 −0 src/test/java/org/springframework/data/gemfire/function/GemfireFunctionTemplateTests.java
  33. +194 −0 src/test/java/org/springframework/data/gemfire/function/MethodInvokingFunctionTests.java
  34. +90 −0 src/test/java/org/springframework/data/gemfire/function/config/AnnotationDrivenFunctionsTest.java
  35. +73 −0 src/test/java/org/springframework/data/gemfire/function/foo/Foo.java
  36. +32 −0 src/test/java/org/springframework/data/gemfire/function/foo/IFoo.java
  37. +29 −0 src/test/resources/org/springframework/data/gemfire/function/GemfireFunctionProxyFactoryBeanTest-context.xml
  38. +14 −0 src/test/resources/org/springframework/data/gemfire/function/config/AnnotationDrivenFunctionsTest-context.xml
View
2  build.gradle
@@ -50,7 +50,7 @@ dependencies {
// GemFire
compile("com.gemstone.gemfire:gemfire:$gemfireVersion")
runtime("antlr:antlr:2.7.7")
- //runtime("commons-modeler;commons-modeler:2.0.1")
+ runtime("commons-modeler:ßcommons-modeler:2.0.1")
// Testing
testCompile "junit:junit-dep:$junitVersion"
View
1  gradle.properties
@@ -8,7 +8,6 @@ slf4jVersion = 1.6.4
springVersion = 3.1.2.RELEASE
springDataCommonsVersion = 1.4.0.RC1
gemfireVersion = 7.0.Beta-SNAPSHOT
-gemfireVersion6x = 6.6.3
# Testing
junitVersion = 4.8.2
View
7 maven.gradle
@@ -27,8 +27,6 @@ def customizePom(pom, gradleProject) {
dep.scope == 'test'
}
- generatedPom.dependencies.find { dep -> dep.groupId == 'com.gemstone.gemfire' && dep.artifactId == 'gemfire' }.version = "$gemfireVersion6x"
-
// add all items necessary for maven central publication
generatedPom.project {
name = gradleProject.description
@@ -61,6 +59,11 @@ def customizePom(pom, gradleProject) {
name = 'David Turanski'
email = 'dturanski@vmware.com'
}
+ developer {
+ id = 'ogierke'
+ name = 'Oliver Gierke'
+ email = 'ogierke@vmware.com'
+ }
}
}
}
View
49 ...in/java/org/springframework/data/gemfire/config/AnnotationDrivenBeanDefinitionParser.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.config;
+
+import org.springframework.beans.factory.config.BeanDefinition;
+import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
+import org.springframework.beans.factory.support.RootBeanDefinition;
+import org.springframework.beans.factory.xml.BeanDefinitionParser;
+import org.springframework.beans.factory.xml.ParserContext;
+import org.springframework.data.gemfire.function.config.GemfireFunctionBeanPostProcessor;
+import org.w3c.dom.Element;
+
+/**
+ * @author David Turanski
+ *
+ */
+public class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {
+
+ /* (non-Javadoc)
+ * @see org.springframework.beans.factory.xml.BeanDefinitionParser#parse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext)
+ */
+ @Override
+ public BeanDefinition parse(Element element, ParserContext parserContext) {
+ registerGemfireFunctionBeanPostProcessor(element, parserContext);
+ return null;
+ }
+
+ /**
+ *
+ */
+ private void registerGemfireFunctionBeanPostProcessor(Element element, ParserContext parserContext) {
+ Object source = parserContext.extractSource(element);
+ RootBeanDefinition beanDefinition = new RootBeanDefinition(GemfireFunctionBeanPostProcessor.class);
+ beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
+ beanDefinition.setSource(source);
+ BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, parserContext.getRegistry());
+ }
+
+}
View
1  src/main/java/org/springframework/data/gemfire/config/GemfireNamespaceHandler.java
@@ -67,5 +67,6 @@ public void init() {
registerBeanDefinitionParser("function-service", new FunctionServiceParser());
// V6 WAN parsers
registerBeanDefinitionParser("gateway-hub", new GatewayHubParser());
+ registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
}
}
View
27 src/main/java/org/springframework/data/gemfire/function/FilterAware.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function;
+
+import java.io.Serializable;
+import java.util.Set;
+/**
+ *
+ * @author David Turanski
+ *
+ */
+public interface FilterAware {
+ /*
+ * Return calling instance to support method chaining
+ */
+ public Object setFilter(Set<? extends Serializable> filter);
+}
View
139 src/main/java/org/springframework/data/gemfire/function/FunctionExecution.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2002-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
+
+import com.gemstone.gemfire.cache.execute.Execution;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.ResultCollector;
+
+/**
+ * @author David Turanski
+ */
+
+public abstract class FunctionExecution<T> {
+ protected final Log logger = LogFactory.getLog(this.getClass());
+
+ private volatile ResultCollector<?, ?> collector;
+ private final Serializable[] args;
+ private Function function;
+ private final String functionId;
+ private long timeout;
+
+ public FunctionExecution(Function function, Serializable... args) {
+ Assert.notNull(function,"function cannot be null");
+ this.function = function;
+ this.functionId = function.getId();
+ this.args = args;
+ }
+
+ public FunctionExecution(String functionId, Serializable... args) {
+ Assert.isTrue(StringUtils.hasLength(functionId),"functionId cannot be null or empty");
+ this.functionId = functionId;
+ this.args = args;
+ }
+
+
+ public ResultCollector<?, ?> getCollector() {
+ return collector;
+ }
+
+ public Serializable[] getArgs() {
+ return args;
+ }
+
+ public String getFunctionId() {
+ return functionId;
+ }
+
+ public Function getFunction() {
+ return function;
+ }
+
+ public void setCollector(ResultCollector<?,?> collector) {
+ this.collector = collector;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public List<T> execute() {
+ Execution execution = this.getExecution();
+ if (getKeys() != null) {
+ execution = execution.withFilter(getKeys());
+ }
+ if (getCollector() != null) {
+ execution = execution.withCollector(getCollector());
+ }
+
+ ResultCollector<?,?> resultsCollector = null;
+
+ execution = execution.withArgs(getArgs());
+
+ if (isRegisteredFunction()){
+
+ resultsCollector = (ResultCollector<?,?>) execution.execute(functionId);
+ } else {
+ resultsCollector = (ResultCollector<?,?>) execution.execute(function);
+ }
+
+ if (this.timeout > 0 ){
+ try {
+ return (List<T>)resultsCollector.getResult(this.timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (FunctionException e) {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ return (List<T>)resultsCollector.getResult();
+ }
+ }
+
+ public T executeAndExtract() {
+ return this.execute().get(0);
+ }
+
+ protected abstract Execution getExecution();
+
+ protected Set<?> getKeys() {
+ return null;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * @return
+ */
+ private boolean isRegisteredFunction() {
+ return function == null;
+ }
+
+}
View
23 src/main/java/org/springframework/data/gemfire/function/GemfireFunctionCallback.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2002-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function;
+
+import com.gemstone.gemfire.cache.execute.Execution;
+
+/**
+ * @author David Turanski
+ *
+ */
+public interface GemfireFunctionCallback<T> {
+ public T doInGemfire( Execution execution );
+}
View
46 src/main/java/org/springframework/data/gemfire/function/GemfireFunctionOperations.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2002-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.execute.Function;
+
+/**
+ * @author David Turanski
+ *
+ * @param <T>
+ */
+public interface GemfireFunctionOperations<T> {
+
+ public abstract List<T> executeOnRegion(Function function, String regionId, Serializable... args);
+
+ public abstract T executeOnRegionAndExtract(Function function, String regionId, Serializable... args);
+
+ public abstract List<T> executeOnRegion(Function function, String regionId, Set<?> keys, Serializable... args);
+
+ public abstract List<T> executeOnRegion(String functionId, String regionId, Serializable... args);
+
+ public abstract List<T> executeOnRegion(String functionId, String regionId, Set<?> keys, Serializable... args);
+
+ public abstract T executeOnRegion(String regionId, GemfireFunctionCallback<T> callback);
+
+ public abstract List<T> executeOnServers(Function function, Serializable... args);
+
+ public abstract List<T> executeOnServers(String functionId, Serializable... args);
+
+ public abstract T executeOnServers(GemfireFunctionCallback<T> callback);
+
+}
View
250 src/main/java/org/springframework/data/gemfire/function/GemfireFunctionProxyFactoryBean.java
@@ -0,0 +1,250 @@
+/*
+ * Copyright 2002-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function;
+
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.aopalliance.intercept.MethodInterceptor;
+import org.aopalliance.intercept.MethodInvocation;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.aop.framework.ProxyFactory;
+import org.springframework.aop.support.AopUtils;
+import org.springframework.beans.factory.BeanClassLoaderAware;
+import org.springframework.beans.factory.FactoryBean;
+import org.springframework.util.Assert;
+import org.springframework.util.ClassUtils;
+
+import com.gemstone.gemfire.cache.execute.FunctionException;
+
+/**
+ * Creates a Proxy to a delegate that is executed as a Gemfire remote function
+ * using {@link MethodInvokingFunction}. Also, adds the {@link FilterAware}
+ *
+ * interface to the proxy which is used to set a data filter when execution is
+ * performed on a partitioned region.
+ *
+ * @author David Turanski
+ *
+ */
+public class GemfireFunctionProxyFactoryBean implements FactoryBean<Object>, MethodInterceptor, BeanClassLoaderAware {
+ private volatile ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
+
+ private final Class<?> serviceInterface;
+
+ private volatile Object serviceProxy;
+
+ private volatile boolean initialized;
+
+ private final String delegateClassName;
+
+ private final ThreadLocal<Set<? extends Serializable>> filter;
+
+ private boolean methodInvokingFunctionRegistered;
+
+ private volatile String regionName;
+
+ private final GemfireFunctionOperations<?> gemfireFunctionOperations;
+
+ private static Log logger = LogFactory.getLog(GemfireFunctionProxyFactoryBean.class);
+
+ /**
+ *
+ * @param serviceInterface the proxy interface
+ * @param delegateClassName the name of the implementation class
+ * @param gemfireFunctionOperations a strategy interface, normally a
+ * {@link GemfireFunctionTemplate}
+ */
+ public GemfireFunctionProxyFactoryBean(Class<?> serviceInterface, String delegateClassName,
+ GemfireFunctionOperations<?> gemfireFunctionOperations) {
+ this.delegateClassName = delegateClassName;
+ Assert.notNull(serviceInterface, "'serviceInterface' must not be null");
+ Assert.isTrue(serviceInterface.isInterface(), "'serviceInterface' must be an interface");
+ this.serviceInterface = serviceInterface;
+
+ Assert.notNull(gemfireFunctionOperations);
+ this.gemfireFunctionOperations = gemfireFunctionOperations;
+ this.filter = new ThreadLocal<Set<? extends Serializable>>();
+ }
+
+ // @Override
+ public void setBeanClassLoader(ClassLoader classLoader) {
+ beanClassLoader = classLoader;
+ }
+
+ // @Override
+ @SuppressWarnings("unchecked")
+ public Object invoke(MethodInvocation invocation) throws Throwable {
+
+ if (AopUtils.isToStringMethod(invocation.getMethod())) {
+ return "Gemfire function proxy for service interface [" + this.serviceInterface + "]";
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("invoking method " + invocation.getMethod().getName());
+ }
+
+ if (isSetFilterMethod(invocation.getMethod())) {
+ setFilter((Set<? extends Serializable>) invocation.getArguments()[0]);
+ return getObject();
+ }
+
+ RemoteMethodInvocation remoteInvocation = new RemoteMethodInvocation(this.delegateClassName, invocation
+ .getMethod().getName(), convertArgsToSerializable(invocation.getArguments()));
+
+ List<?> results = null;
+
+ if (this.methodInvokingFunctionRegistered) {
+ if (this.regionName != null) {
+
+ results = this.gemfireFunctionOperations.executeOnRegion(MethodInvokingFunction.FUNCTION_ID,
+ this.regionName, this.getFilter(), remoteInvocation);
+ }
+ else {
+ if (this.getFilter() != null) {
+ logger.warn("No region is specified. Filter has no effect on a data independent function execution");
+ }
+ results = this.gemfireFunctionOperations.executeOnServers(MethodInvokingFunction.FUNCTION_ID,
+ remoteInvocation);
+ }
+
+ }
+ else {
+ if (this.regionName != null) {
+ results = this.gemfireFunctionOperations.executeOnRegion(new MethodInvokingFunction(), this.regionName,
+ this.getFilter(), remoteInvocation);
+ }
+ else {
+ if (this.getFilter() != null) {
+ logger.warn("No region is specified. Filter has no effect on a data independent function execution");
+ }
+ results = this.gemfireFunctionOperations.executeOnServers(new MethodInvokingFunction(),
+ remoteInvocation);
+ }
+ }
+
+ return extractResult(results, invocation.getMethod().getReturnType());
+ }
+
+ // @Override
+ public Object getObject() throws Exception {
+ if (this.serviceProxy == null) {
+ this.onInit();
+ Assert.notNull(this.serviceProxy, "failed to initialize proxy");
+ }
+ return this.serviceProxy;
+ }
+
+ // @Override
+ public Class<?> getObjectType() {
+ return (this.serviceInterface != null ? this.serviceInterface : null);
+ }
+
+ // @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ protected Set<?> getFilter() {
+ return filter.get();
+ }
+
+ protected void setFilter(Set<? extends Serializable> filter) {
+ this.filter.set(filter);
+ }
+
+ /**
+ * Set to true id {@link MethodInvokingFunction} is a registered function.
+ * If registered, invocations will not create and transport a new instance
+ * of the function to the cache server(s).
+ * @param methodInvokingFunctionRegistered
+ */
+ public void setMethodInvokingFunctionRegistered(boolean methodInvokingFunctionRegistered) {
+ this.methodInvokingFunctionRegistered = methodInvokingFunctionRegistered;
+ }
+
+ /**
+ *
+ * Optional region to use. If set, the function will execute onRegion, if
+ * null the function will execute onServers
+ *
+ * @param regionName
+ */
+ public void setRegionName(String regionName) {
+ this.regionName = regionName;
+ }
+
+ // TODO: Use something like cglib to implement setFilter() directly
+ // to eliminate the need to cast the proxy to FilterAware
+ protected void onInit() {
+ if (this.initialized) {
+ return;
+ }
+ ProxyFactory proxyFactory = new ProxyFactory(serviceInterface, this);
+ proxyFactory.addInterface(FilterAware.class);
+ this.serviceProxy = proxyFactory.getProxy(this.beanClassLoader);
+ this.initialized = true;
+ }
+
+ /*
+ * This tweek is needed to prevent an argument mismatch on the function
+ * invocation.
+ */
+ private Serializable[] convertArgsToSerializable(Object[] array) {
+ return Arrays.copyOf(array, array.length, Serializable[].class);
+ }
+
+ /*
+ * Match the result to the declared return type
+ */
+ private Object extractResult(List<?> results, Class<?> returnType) {
+ Object result = null;
+ if (List.class.isAssignableFrom(returnType)) {
+ result = results;
+ }
+ else {
+ int nonNullItems = 0;
+ for (Object obj : results) {
+ if (obj != null) {
+ if (++nonNullItems > 1) {
+ throw new FunctionException("multiple results found for single valued return type");
+ }
+ else {
+ result = obj;
+ }
+ }
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("returning result as " + result.getClass().getName());
+ }
+ return result;
+ }
+
+ private static boolean isSetFilterMethod(Method method) {
+ try {
+ Method setFilterMethod = FilterAware.class.getMethod("setFilter", Set.class);
+ return method.equals(setFilterMethod);
+ }
+ catch (SecurityException e) {
+ }
+ catch (NoSuchMethodException e) {
+ }
+ return false;
+ }
+
+}
View
168 src/main/java/org/springframework/data/gemfire/function/GemfireFunctionTemplate.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2002-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.util.Assert;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionService;
+import com.gemstone.gemfire.cache.execute.Execution;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+
+/**
+ * @author David Turanski
+ *
+ */
+public class GemfireFunctionTemplate<T> implements InitializingBean, GemfireFunctionOperations<T> {
+ /** Logger available to subclasses */
+ protected final Log log = LogFactory.getLog(getClass());
+ private RegionService cache;
+ private long timeout;
+
+
+ /**
+ *
+ * @param cache
+ */
+ public GemfireFunctionTemplate (RegionService cache) {
+ this.cache = cache;
+ afterPropertiesSet();
+ }
+
+
+ public void afterPropertiesSet() {
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.springframework.data.gemfire.function.GemfireFunctionOperations#executeOnRegion(com.gemstone.gemfire.cache.execute.Function, java.lang.String, java.io.Serializable)
+ */
+ public List<T> executeOnRegion(Function function, String regionId, Serializable... args) {
+ Region<?,?> region = getRegion(regionId);
+ Assert.notNull(region,"Region '" + regionId + "' not found");
+ RegionFunctionExecution<T> execution = new RegionFunctionExecution<T>(region, function, args);
+ execution.setTimeout(this.timeout);
+ return execution.execute();
+ }
+
+ /* (non-Javadoc)
+ * @see org.springframework.data.gemfire.function.GemfireFunctionOperations#executeOnRegionAndExtract(com.gemstone.gemfire.cache.execute.Function, java.lang.String, java.io.Serializable)
+ */
+ public T executeOnRegionAndExtract(Function function, String regionId, Serializable... args) {
+ Region<?,?> region = getRegion(regionId);
+ Assert.notNull(region,"Region '" + regionId + "' not found");
+ RegionFunctionExecution<T> execution = new RegionFunctionExecution<T>(region, function, args);
+ execution.setTimeout(this.timeout);
+ return execution.executeAndExtract();
+ }
+
+ /* (non-Javadoc)
+ * @see org.springframework.data.gemfire.function.GemfireFunctionOperations#executeOnRegion(com.gemstone.gemfire.cache.execute.Function, java.lang.String, java.util.Set, java.io.Serializable)
+ */
+ public List<T> executeOnRegion(Function function, String regionId, Set<?> keys, Serializable... args) {
+ Region<?,?> region = getRegion(regionId);
+ Assert.notNull(region,"Region '" + regionId + "' not found");
+
+ RegionFunctionExecution<T> execution = new RegionFunctionExecution<T>(region, function, args);
+ execution.setKeys(keys);
+ execution.setTimeout(this.timeout);
+ return execution.execute();
+ }
+
+ /* (non-Javadoc)
+ * @see org.springframework.data.gemfire.function.GemfireFunctionOperations#executeOnRegion(java.lang.String, java.lang.String, java.io.Serializable)
+ */
+ public List<T> executeOnRegion(String functionId, String regionId, Serializable... args) {
+ Region<?,?> region = getRegion(regionId);
+ Assert.notNull(region,"Region '" + regionId + "' not found");
+
+ RegionFunctionExecution<T> execution = new RegionFunctionExecution<T>(region, functionId, args);
+ execution.setTimeout(this.timeout);
+ return execution.execute();
+ }
+
+ /* (non-Javadoc)
+ * @see org.springframework.data.gemfire.function.GemfireFunctionOperations#executeOnRegion(java.lang.String, java.lang.String, java.util.Set, java.io.Serializable)
+ */
+ public List<T> executeOnRegion(String functionId, String regionId, Set<?> keys, Serializable... args) {
+ Region<?,?> region = getRegion(regionId);
+ Assert.notNull(region,"Region '" + regionId + "' not found");
+
+ RegionFunctionExecution<T> execution = new RegionFunctionExecution<T>(region, functionId, args);
+ execution.setKeys(keys);
+ execution.setTimeout(this.timeout);
+ return execution.execute();
+ }
+
+ /* (non-Javadoc)
+ * @see org.springframework.data.gemfire.function.GemfireFunctionOperations#executeOnRegion(java.lang.String, org.springframework.data.gemfire.function.GemfireFunctionCallback)
+ */
+ public T executeOnRegion(String regionId, GemfireFunctionCallback<T> callback ) {
+ Region<?,?> region = getRegion(regionId);
+ Assert.notNull(region,"Region '" + regionId + "' not found");
+ Execution execution = FunctionService.onRegion(region);
+ return callback.doInGemfire(execution);
+ }
+
+ /* (non-Javadoc)
+ * @see org.springframework.data.gemfire.function.GemfireFunctionOperations#executeOnServers(com.gemstone.gemfire.cache.execute.Function, java.io.Serializable)
+ */
+ public List<T> executeOnServers(Function function, Serializable... args) {
+ ServersFunctionExecution<T> execution = new ServersFunctionExecution<T>(this.cache, function, args);
+ execution.setTimeout(this.timeout);
+ return execution.execute();
+ }
+
+ /* (non-Javadoc)
+ * @see org.springframework.data.gemfire.function.GemfireFunctionOperations#executeOnServers(java.lang.String, java.io.Serializable)
+ */
+ public List<T> executeOnServers(String functionId, Serializable... args) {
+ ServersFunctionExecution<T> execution = new ServersFunctionExecution<T>(this.cache, functionId, args);
+ execution.setTimeout(this.timeout);
+ return execution.execute();
+ }
+
+ /* (non-Javadoc)
+ * @see org.springframework.data.gemfire.function.GemfireFunctionOperations#executeOnServers(org.springframework.data.gemfire.function.GemfireFunctionCallback)
+ */
+ public T executeOnServers(GemfireFunctionCallback<T> callback ) {
+ Execution execution = FunctionService.onServers(this.cache);
+ return callback.doInGemfire(execution);
+ }
+
+
+ public Region<?,?> getRegion(String regionId) {
+ return this.cache.getRegion(regionId);
+ }
+
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+
+
+}
View
51 src/main/java/org/springframework/data/gemfire/function/MemberFunctionExecution.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2002-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function;
+
+import java.io.Serializable;
+
+import com.gemstone.gemfire.cache.execute.Execution;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+
+/**
+ * @author David Turanski
+ *
+ */
+public class MemberFunctionExecution extends FunctionExecution<Object> {
+
+
+ private final DistributedSystem distributedSystem;
+
+ /**
+ * @param functionId
+ * @param args
+ */
+ public MemberFunctionExecution(DistributedSystem distributedSystem, Function function, Serializable... args) {
+ super(function, args);
+ this.distributedSystem = distributedSystem;
+ }
+
+ /* (non-Javadoc)
+ * @see org.springframework.data.gemfire.function.FunctionExecution#getExecution()
+ */
+ @Override
+ protected Execution getExecution() {
+ return FunctionService.onMember(this.distributedSystem, this.distributedSystem.getDistributedMember());
+ }
+
+
+
+
+}
View
50 src/main/java/org/springframework/data/gemfire/function/MembersFunctionExecution.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2002-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function;
+
+import java.io.Serializable;
+
+import com.gemstone.gemfire.cache.execute.Execution;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+
+/**
+ * @author David Turanski
+ *
+ */
+public class MembersFunctionExecution extends FunctionExecution<Object> {
+
+
+ private final DistributedSystem distributedSystem;
+
+ /**
+ * @param functionId
+ * @param args
+ */
+ public MembersFunctionExecution(DistributedSystem distributedSystem, Function function, Serializable... args) {
+ super(function, args);
+ this.distributedSystem = distributedSystem;
+ }
+
+ /* (non-Javadoc)
+ * @see org.springframework.data.gemfire.function.FunctionExecution#getExecution()
+ */
+ @Override
+ protected Execution getExecution() {
+ return FunctionService.onMembers(this.distributedSystem);
+ }
+
+
+
+}
View
242 src/main/java/org/springframework/data/gemfire/function/MethodInvokingFunction.java
@@ -0,0 +1,242 @@
+/*
+ * Copyright 2002-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function;
+
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.util.ClassUtils;
+import org.springframework.util.ObjectUtils;
+import org.springframework.util.ReflectionUtils;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.cache.execute.ResultSender;
+import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+
+/**
+ * Invokes a POJO's given method as a Gemfire remote function.
+ * If the POJO has a constructor that takes a Map, and the function context is Region, the
+ * region will be injected. The delegate class name, the method name, and the method arguments
+ * are part of a remote function invocation, therefore all arguments must be serializable.
+ * The delegate class must be the class path of the remote cache(s)
+ * @author David Turanski
+ *
+ */
+@SuppressWarnings("serial")
+public class MethodInvokingFunction implements Function {
+ public static final String FUNCTION_ID = MethodInvokingFunction.class.getName();
+ private static transient Log logger = LogFactory.getLog(MethodInvokingFunction.class);
+
+ private volatile boolean HA;
+ private volatile boolean optimizeForWrite;
+ private volatile boolean hasResult;
+ private String id;
+
+
+
+ public MethodInvokingFunction( ) {
+ this.HA = false;
+ this.hasResult = true;
+ this.optimizeForWrite = false;
+ }
+
+ //@Override
+ public String getId() {
+ return this.id;
+ }
+
+ //@Override
+ public boolean hasResult() {
+ return hasResult;
+ }
+
+ public void setHasResult(boolean hasResult) {
+ this.hasResult = hasResult;
+ }
+
+ //@Override
+ public boolean isHA() {
+ return HA;
+ }
+
+ public void setHA(boolean HA) {
+ this.HA = HA;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ //@Override
+ public boolean optimizeForWrite() {
+ return optimizeForWrite;
+ }
+
+
+ public void setOptimizeForWrite(boolean optimizeForWrite) {
+ this.optimizeForWrite = optimizeForWrite;
+ }
+
+ //@Override
+ public void execute(FunctionContext functionContext) {
+ Region<?,?> region = null;
+ RegionFunctionContext regionFunctionContext = (RegionFunctionContext)functionContext;
+
+ region = getRegionForContext(regionFunctionContext);
+
+ RemoteMethodInvocation invocation = null;
+
+ if (regionFunctionContext.getArguments().getClass().isArray()) {
+ invocation = (RemoteMethodInvocation)((Serializable[])regionFunctionContext.getArguments())[0];
+ } else {
+ invocation = (RemoteMethodInvocation)regionFunctionContext.getArguments();
+ }
+
+ regionFunctionContext.getFilter();
+
+ Object instance = createDelegateInstance(invocation.getClassName(), region);
+
+ Serializable result = invokeDelegateMethod(instance,invocation, region);
+
+ if (hasResult()){
+ sendResults(regionFunctionContext.getResultSender(),result);
+ }
+
+ }
+
+ protected final Serializable invokeDelegateMethod(Object instance, RemoteMethodInvocation invocation, Region<?,?> region) {
+
+
+ // Null parameters returns any method signature
+ Method method = ReflectionUtils.findMethod(instance.getClass(),invocation.getMethodName(),(Class<?>[])null);
+
+ if (method == null ) {
+ throw new FunctionException("cannot find method ["+ invocation.getMethodName() +
+ "] on type [" + instance.getClass().getName() + "]");
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(String.format("about to invoke method %s on class %s", invocation.getMethodName(),invocation.getClassName()));
+ for (Object arg: invocation.getArguments()) {
+ logger.debug("arg:"+ arg.getClass().getName() + " " + arg.toString());
+ }
+
+ }
+
+ return (Serializable)ReflectionUtils.invokeMethod(method, instance, (Object[]) invocation.getArguments());
+ }
+
+
+
+ protected final Object createDelegateInstance(String delegateClassName, Map<?,?> region) {
+ Object instance = null;
+ try {
+ Class<?> clazz = Class.forName(delegateClassName);
+ Constructor<?> defaultConstructor = ClassUtils.getConstructorIfAvailable(clazz, new Class<?>[] {});
+ if (region != null){
+ Constructor<?> mapConstructor = ClassUtils.getConstructorIfAvailable(clazz, Map.class);
+ if (mapConstructor != null){
+ instance = mapConstructor.newInstance(region);
+ }
+ }
+ if (instance == null) {
+ if (defaultConstructor == null) {
+ throw new FunctionException("Delegate type [" + delegateClassName + "] has no default constructor");
+ }
+ instance = defaultConstructor.newInstance();
+ }
+ } catch (ClassNotFoundException e) {
+ logger.error(e.getMessage(),e);
+ throw new FunctionException("Delegate type [" + delegateClassName + "] not found", e);
+ } catch (SecurityException e) {
+ logger.error(e.getMessage(),e);
+ throw new FunctionException(e);
+ } catch (IllegalArgumentException e) {
+ logger.error(e.getMessage(),e);
+ throw new FunctionException("Delegate constructor failed",e);
+ } catch (InstantiationException e) {
+ logger.error(e.getMessage(),e);
+ throw new FunctionException("Delegate constructor failed",e);
+ } catch (IllegalAccessException e) {
+ logger.error(e.getMessage(),e);
+ throw new FunctionException("Delegate constructor failed",e);
+ } catch (InvocationTargetException e) {
+ logger.error(e.getMessage(),e);
+ throw new FunctionException("Delegate constructor failed",e);
+ } catch (Throwable t){
+ logger.error(t.getMessage(),t);
+ throw new FunctionException("Delegate constructor failed",t);
+ }
+ return instance;
+ }
+
+ /*
+ * @param regionFunctionContext
+ * @return
+ */
+ private Region<?, ?> getRegionForContext(RegionFunctionContext regionFunctionContext) {
+
+ Region<?,?> region = regionFunctionContext.getDataSet();
+ if (PartitionRegionHelper.isPartitionedRegion(region)) {
+ if (logger.isDebugEnabled()){
+ logger.debug("this is a partitioned region - filtering local data for context");
+ }
+ region = PartitionRegionHelper.getLocalDataForContext(regionFunctionContext);
+ }
+ if (logger.isDebugEnabled()){
+ logger.debug("region contains " + region.size() + " items");
+ }
+ return region;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void sendResults(ResultSender<Object> resultSender, Serializable result) {
+ if (result == null){
+ resultSender.lastResult(null);
+ return;
+ }
+
+ Serializable lastItem = result;
+
+ List<Serializable> results = null;
+ if (ObjectUtils.isArray(result)){
+ results = Arrays.asList((Serializable[])result);
+ } else if (List.class.isAssignableFrom(result.getClass())) {
+ results = (List<Serializable>)result;
+ }
+
+ if (results != null){
+ int i = 0;
+ for (Serializable item: results){
+ if (i++ < results.size() - 1) {
+ resultSender.sendResult(item);
+ } else {
+ lastItem = item;
+ }
+ }
+ }
+ resultSender.lastResult(lastItem);
+ }
+}
+
View
194 src/main/java/org/springframework/data/gemfire/function/PojoFunctionWrapper.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2002-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function;
+
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.util.ClassUtils;
+import org.springframework.util.ObjectUtils;
+import org.springframework.util.ReflectionUtils;
+import org.springframework.util.StringUtils;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.cache.execute.ResultSender;
+import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+import com.gemstone.gemfire.internal.util.ArrayUtils;
+
+/**
+ * Invokes a POJO's given method as a Gemfire remote function.
+ * If the POJO has a constructor that takes a Map, and the function context is Region, the
+ * region will be injected. The delegate class name, the method name, and the method arguments
+ * are part of a remote function invocation, therefore all arguments must be serializable.
+ * The delegate class must be the class path of the remote cache(s)
+ * @author David Turanski
+ *
+ */
+@SuppressWarnings("serial")
+public class PojoFunctionWrapper implements Function {
+
+ private static transient Log logger = LogFactory.getLog(PojoFunctionWrapper.class);
+
+ private volatile boolean HA;
+ private volatile boolean optimizeForWrite;
+ private final boolean hasResult;
+ private final Object target;
+ private final Method method;
+ private final String id;
+ private volatile Integer regionParameterPosition;
+
+ public PojoFunctionWrapper(Object target, Method method, String id) {
+ this.id = StringUtils.hasText(id) ? id : ClassUtils.getQualifiedMethodName(method);
+ this.target = target;
+ this.method = method;
+
+ this.HA = false;
+
+ this.hasResult = !(method.getReturnType().equals(void.class));
+
+ this.optimizeForWrite = false;
+ }
+
+ //@Override
+ public String getId() {
+ return this.id;
+ }
+
+ //@Override
+ public boolean hasResult() {
+ return this.hasResult;
+ }
+
+ //@Override
+ public boolean isHA() {
+ return this.HA;
+ }
+
+ public void setHA(boolean HA) {
+ this.HA = HA;
+ }
+
+ //@Override
+ public boolean optimizeForWrite() {
+ return this.optimizeForWrite;
+ }
+
+ public void setOptimizeForWrite(boolean optimizeForWrite) {
+ this.optimizeForWrite = optimizeForWrite;
+ }
+
+ public void setRegionParameterPosition(int regionParameterPosition) {
+ this.regionParameterPosition = regionParameterPosition;
+ }
+
+ //@Override
+ public void execute(FunctionContext functionContext) {
+
+ Object[] args = (functionContext.getArguments().getClass().isArray()) ? (Object[]) functionContext
+ .getArguments() : new Object[] { functionContext.getArguments() };
+
+ Serializable result = null;
+
+ if (functionContext instanceof RegionFunctionContext) {
+ RegionFunctionContext regionFunctionContext = (RegionFunctionContext) functionContext;
+ Region<?, ?> region = getRegionForContext(regionFunctionContext);
+ //TODO: Not sure if filter is needed at this point
+ Set<?> filter = regionFunctionContext.getFilter();
+ //Insert the region into the associated position
+ if (this.regionParameterPosition != null) {
+ ArrayUtils.insert(args, regionParameterPosition, region);
+ }
+
+ } else {
+
+ }
+
+ result = invokeTargetMethod(args);
+
+ if (hasResult()) {
+ sendResults(functionContext.getResultSender(), result);
+ }
+ }
+
+
+ protected final Serializable invokeTargetMethod(Object[] args) {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(String.format("about to invoke method %s on class %s as function %s", method.getName(), target
+ .getClass().getName(), this.id));
+
+ for (Object arg : args) {
+ logger.debug("arg:" + arg.getClass().getName() + " " + arg.toString());
+ }
+
+ }
+
+ return (Serializable) ReflectionUtils.invokeMethod(method, target, (Object[]) args);
+ }
+
+ /*
+ * @param regionFunctionContext
+ * @return
+ */
+ private Region<?, ?> getRegionForContext(RegionFunctionContext regionFunctionContext) {
+
+ Region<?, ?> region = regionFunctionContext.getDataSet();
+ if (PartitionRegionHelper.isPartitionedRegion(region)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("this is a partitioned region - filtering local data for context");
+ }
+ region = PartitionRegionHelper.getLocalDataForContext(regionFunctionContext);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("region contains " + region.size() + " items");
+ }
+ return region;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void sendResults(ResultSender<Object> resultSender, Serializable result) {
+ if (result == null) {
+ resultSender.lastResult(null);
+ return;
+ }
+
+ Serializable lastItem = result;
+
+ List<Serializable> results = null;
+ if (ObjectUtils.isArray(result)) {
+ results = Arrays.asList((Serializable[]) result);
+ } else if (List.class.isAssignableFrom(result.getClass())) {
+ results = (List<Serializable>) result;
+ }
+
+ if (results != null) {
+ int i = 0;
+ for (Serializable item : results) {
+ if (i++ < results.size() - 1) {
+ resultSender.sendResult(item);
+ } else {
+ lastItem = item;
+ }
+ }
+ }
+ resultSender.lastResult(lastItem);
+ }
+}
View
58 src/main/java/org/springframework/data/gemfire/function/RegionFunctionExecution.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2002-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function;
+
+import java.io.Serializable;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.Execution;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+
+/**
+ * @author David Turanski
+ *
+ */
+public class RegionFunctionExecution<T> extends FunctionExecution<T> {
+
+
+ private final Region<?, ?> region;
+ private volatile Set<?> keys;
+
+ public RegionFunctionExecution(Region<?, ?> region, Function function, Serializable... args) {
+ super(function, args);
+ this.region = region;
+ }
+
+ public RegionFunctionExecution(Region<?, ?> region, String functionId, Serializable... args) {
+ super(functionId, args);
+ this.region = region;
+ }
+
+ public void setKeys(Set<?> keys) {
+ this.keys = keys;
+ }
+
+ protected Set<?> getKeys() {
+ return this.keys;
+ }
+
+ /* (non-Javadoc)
+ * @see org.springframework.data.gemfire.function.FunctionExecution#getExecution()
+ */
+ @Override
+ protected Execution getExecution() {
+ return FunctionService.onRegion(region);
+ }
+}
View
60 src/main/java/org/springframework/data/gemfire/function/RemoteMethodInvocation.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2002-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function;
+
+import java.io.Serializable;
+
+import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
+
+/**
+ * @author David Turanski
+ *
+ */
+@SuppressWarnings("serial")
+public class RemoteMethodInvocation implements Serializable {
+ private final String methodName;
+ private String className;
+ private final Serializable[] arguments;
+
+ public RemoteMethodInvocation(Class<?> clazz, String methodName, Serializable... args) {
+ this(methodName,args);
+ Assert.notNull(clazz , "class cannot be null");
+ this.className = clazz.getName();
+
+ }
+
+
+ public RemoteMethodInvocation(String className, String methodName, Serializable... args) {
+ this(methodName,args);
+ Assert.isTrue(StringUtils.hasLength(className.trim()) , "class cannot be null or empty");
+ this.className = className;
+ }
+
+ private RemoteMethodInvocation(String methodName, Serializable... args){
+ Assert.isTrue(StringUtils.hasLength(methodName.trim()), "methodName cannot be null or empty");
+ Assert.isTrue(StringUtils.hasLength(methodName.trim()), "methodName cannot be null or empty");
+ this.methodName = methodName;
+ this.arguments = args == null ? new Serializable[]{} : args;
+ }
+
+ public String getMethodName() {
+ return methodName;
+ }
+ public String getClassName() {
+ return className;
+ }
+ public Serializable[] getArguments() {
+ return arguments;
+ }
+}
View
60 src/main/java/org/springframework/data/gemfire/function/ServerFunctionExecution.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2002-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function;
+
+import java.io.Serializable;
+
+import com.gemstone.gemfire.cache.RegionService;
+import com.gemstone.gemfire.cache.execute.Execution;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+
+/**
+ * @author David Turanski
+ *
+ */
+public class ServerFunctionExecution<T> extends FunctionExecution<T> {
+
+
+ private final RegionService regionService;
+
+ /**
+ *
+ * @param regionService e.g., Cache,Client, or GemFireCache
+ * @param function
+ * @param args
+ */
+ public ServerFunctionExecution(RegionService regionService, Function function, Serializable... args) {
+ super(function, args);
+ this.regionService = regionService;
+ }
+
+ /**
+ *
+ * @param regionService e.g., Cache,Client, or GemFireCache
+ * @param functionId
+ * @param args
+ */
+ public ServerFunctionExecution(RegionService regionService, String functionId, Serializable... args) {
+ super(functionId, args);
+ this.regionService = regionService;
+ }
+
+ /* (non-Javadoc)
+ * @see org.springframework.data.gemfire.function.FunctionExecution#getExecution()
+ */
+ @Override
+ protected Execution getExecution() {
+ return FunctionService.onServer(this.regionService);
+ }
+}
View
60 src/main/java/org/springframework/data/gemfire/function/ServersFunctionExecution.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2002-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function;
+
+import java.io.Serializable;
+
+import com.gemstone.gemfire.cache.RegionService;
+import com.gemstone.gemfire.cache.execute.Execution;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+
+/**
+ * @author David Turanski
+ *
+ */
+public class ServersFunctionExecution<T> extends FunctionExecution<T> {
+
+
+ private final RegionService regionService;
+
+ /**
+ *
+ * @param regionService e.g., Cache,Client, or GemFireCache
+ * @param function
+ * @param args
+ */
+ public ServersFunctionExecution(RegionService regionService, Function function, Serializable... args) {
+ super(function, args);
+ this.regionService = regionService;
+ }
+
+ /**
+ *
+ * @param regionService e.g., Cache,Client, or GemFireCache
+ * @param functionId
+ * @param args
+ */
+ public ServersFunctionExecution(RegionService regionService, String functionId, Serializable... args) {
+ super(functionId, args);
+ this.regionService = regionService;
+ }
+
+ /* (non-Javadoc)
+ * @see org.springframework.data.gemfire.function.FunctionExecution#getExecution()
+ */
+ @Override
+ protected Execution getExecution() {
+ return FunctionService.onServers(this.regionService);
+ }
+}
View
64 src/main/java/org/springframework/data/gemfire/function/config/EnableGemfireFunctions.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function.config;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.springframework.context.annotation.ComponentScan.Filter;
+
+/**
+ * @author David Turanski
+ *
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Inherited
+public @interface EnableGemfireFunctions {
+ /**
+ * Alias for the {@link #basePackages()} attribute. Allows for more concise annotation declarations e.g.:
+ * {@code @EnableGemfireRepositories("org.my.pkg")} instead of
+ * {@code @EnableGemfireRepositories(basePackages="org.my.pkg")}.
+ */
+ String[] value() default {};
+
+ /**
+ * Base packages to scan for annotated components. {@link #value()} is an alias for (and mutually exclusive with) this
+ * attribute. Use {@link #basePackageClasses()} for a type-safe alternative to String-based package names.
+ */
+ String[] basePackages() default {};
+
+ /**
+ * Type-safe alternative to {@link #basePackages()} for specifying the packages to scan for annotated components. The
+ * package of each class specified will be scanned. Consider creating a special no-op marker class or interface in
+ * each package that serves no purpose other than being referenced by this attribute.
+ */
+ Class<?>[] basePackageClasses() default {};
+
+ /**
+ * Specifies which types are eligible for component scanning. Further narrows the set of candidate components from
+ * everything in {@link #basePackages()} to everything in the base packages that matches the given filter or filters.
+ */
+ Filter[] includeFilters() default {};
+
+ /**
+ * Specifies which types are not eligible for component scanning.
+ */
+ Filter[] excludeFilters() default {};
+
+}
View
41 src/main/java/org/springframework/data/gemfire/function/config/GemfireFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function.config;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * @author David Turanski
+ *
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD})
+public @interface GemfireFunction {
+ /**
+ * The name of the registered function. If not provided the fully qualified method name will be used,
+ * e.g., 'com.example.SomeClass.someMethod'
+ * @return the function id
+ */
+ String id() default "";
+ /**
+ * is the function HA - highly available
+ */
+ boolean HA() default false;
+ /**
+ * is the function optimized for write operations
+ */
+ boolean optimizeForWrite() default false;
+}
View
86 ...va/org/springframework/data/gemfire/function/config/GemfireFunctionBeanPostProcessor.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function.config;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Map;
+
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.core.annotation.AnnotationUtils;
+import org.springframework.util.Assert;
+import org.springframework.util.ReflectionUtils;
+
+/**
+ * @author David Turanski
+ *
+ */
+public class GemfireFunctionBeanPostProcessor implements BeanPostProcessor {
+
+ private static final String GEMFIRE_FUNCTION_ANNOTATION_NAME = GemfireFunction.class.getName();
+ private static final String REGION_DATA_ANNOTATION_NAME = RegionData.class.getName();
+
+ /* (non-Javadoc)
+ * @see org.springframework.beans.factory.config.BeanPostProcessor#postProcessBeforeInitialization(java.lang.Object, java.lang.String)
+ */
+ @Override
+ public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
+ return bean;
+ }
+
+ /* (non-Javadoc)
+ * @see org.springframework.beans.factory.config.BeanPostProcessor#postProcessAfterInitialization(java.lang.Object, java.lang.String)
+ */
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+
+ Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
+
+ for (Method method: methods) {
+ GemfireFunction annotation = AnnotationUtils.getAnnotation(method, GemfireFunction.class);
+ if (annotation != null) {
+ Assert.isTrue(Modifier.isPublic(method.getModifiers()),"The method " + method.getName()+ " annotated with" + GEMFIRE_FUNCTION_ANNOTATION_NAME+ " must be public");
+ Map<String,Object> attributes = AnnotationUtils.getAnnotationAttributes(annotation,false,true);
+
+ processParameterAnnotations(method);
+
+ GemfireFunctionUtils.registerFunctionForPojoMethod(bean, method, attributes, false);
+ }
+ }
+
+ return bean;
+ }
+
+ private void processParameterAnnotations(Method method) {
+ Annotation[][] parameterAnnotations = method.getParameterAnnotations();
+ if (parameterAnnotations.length == 0) {
+ return;
+ }
+
+ Class<?>[] paramTypes = method.getParameterTypes();
+
+ for (int i=0; i< parameterAnnotations.length; i++) {
+ Annotation[] annotations = parameterAnnotations[i];
+ if (annotations.length > 0) {
+ System.out.println("found annotations for parameter in position " + i + " " + paramTypes[i] );
+ for (Annotation annotation:annotations) {
+ System.out.println("Annotation type:" + annotation.annotationType().getName());
+ }
+ }
+
+ }
+
+ }
+}
View
63 src/main/java/org/springframework/data/gemfire/function/config/GemfireFunctionUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function.config;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.data.gemfire.function.PojoFunctionWrapper;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+
+/**
+ * @author David Turanski
+ *
+ */
+public abstract class GemfireFunctionUtils {
+ private static Log log = LogFactory.getLog(GemfireFunctionUtils.class);
+
+ public static void registerFunctionForPojoMethod(Object target, Method method, Map<String,Object> attributes, boolean overwrite) {
+ String id = attributes.containsKey("id") ? (String)attributes.get("id") : "";
+
+ PojoFunctionWrapper function = new PojoFunctionWrapper(target,method, id);
+
+ if (attributes.containsKey("HA")) {
+ function.setHA((Boolean)attributes.get("HA"));
+ }
+ if (attributes.containsKey("optimizeForWrite")) {
+ function.setOptimizeForWrite((Boolean)attributes.get("optimizeForWrite"));
+ }
+
+ if (FunctionService.isRegistered(function.getId())) {
+ if (overwrite) {
+ if (log.isDebugEnabled()) {
+ log.debug("unregistering function definition " + function.getId());
+ }
+ FunctionService.unregisterFunction(function.getId());
+ }
+ }
+ if (!FunctionService.isRegistered(function.getId())){
+ FunctionService.registerFunction(function);
+ if (log.isDebugEnabled()) {
+ log.debug("registered function " + function.getId());
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("function already registered " + function.getId());
+ }
+ }
+ }
+}
View
33 src/main/java/org/springframework/data/gemfire/function/config/RegionData.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function.config;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ *
+ * Used to inject Region data into a function execution. The annotated parameter must be of type
+ * {@link Map}. The contents depends on the region configuration (for a partitioned region, this will
+ * contain only entries for the local partition)
+ * and any filters configured for the function context.
+ * @author David Turanski
+ *
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.PARAMETER})
+public @interface RegionData {
+ String value() default "";
+}
View
6 src/main/resources/META-INF/spring.schemas
@@ -1,6 +1,8 @@
http\://www.springframework.org/schema/gemfire/spring-gemfire-1.0.xsd=org/springframework/data/gemfire/config/spring-gemfire-1.0.xsd
http\://www.springframework.org/schema/gemfire/spring-gemfire-1.1.xsd=org/springframework/data/gemfire/config/spring-gemfire-1.1.xsd
http\://www.springframework.org/schema/gemfire/spring-gemfire-1.2.xsd=org/springframework/data/gemfire/config/spring-gemfire-1.2.xsd
-http\://www.springframework.org/schema/gemfire/spring-gemfire.xsd=org/springframework/data/gemfire/config/spring-gemfire-1.2.xsd
+http\://www.springframework.org/schema/gemfire/spring-gemfire-1.3.xsd=org/springframework/data/gemfire/config/spring-gemfire-1.3.xsd
+http\://www.springframework.org/schema/gemfire/spring-gemfire.xsd=org/springframework/data/gemfire/config/spring-gemfire-1.3.xsd
http\://www.springframework.org/schema/data/gemfire/spring-data-gemfire-1.2.xsd=org/springframework/data/gemfire/config/spring-data-gemfire-1.2.xsd
-http\://www.springframework.org/schema/data/gemfire/spring-data-gemfire.xsd=org/springframework/data/gemfire/config/spring-data-gemfire-1.2.xsd
+http\://www.springframework.org/schema/data/gemfire/spring-data-gemfire-1.3.xsd=org/springframework/data/gemfire/config/spring-data-gemfire-1.3.xsd
+http\://www.springframework.org/schema/data/gemfire/spring-data-gemfire.xsd=org/springframework/data/gemfire/config/spring-data-gemfire-1.3.xsd
View
50 src/main/resources/org/springframework/data/gemfire/config/spring-data-gemfire-1.3.xsd
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<xsd:schema xmlns="http://www.springframework.org/schema/data/gemfire" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:beans="http://www.springframework.org/schema/beans"
+xmlns:tool="http://www.springframework.org/schema/tool"
+xmlns:repository="http://www.springframework.org/schema/data/repository"
+targetNamespace="http://www.springframework.org/schema/data/gemfire" elementFormDefault="qualified" attributeFormDefault="unqualified" version="1.3">
+ <xsd:import namespace="http://www.springframework.org/schema/beans"/>
+ <xsd:import namespace="http://www.springframework.org/schema/tool"/>
+ <xsd:import namespace="http://www.springframework.org/schema/data/repository" schemaLocation="http://www.springframework.org/schema/data/repository/spring-repository.xsd"/>
+ <xsd:import namespace="http://www.springframework.org/schema/gemfire" schemaLocation="http://www.springframework.org/schema/gemfire/spring-gemfire.xsd"/>
+ <!-- -->
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Namespace support for the Spring Data GemFire Repositories.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ <!-- -->
+ <!-- Repositories -->
+ <xsd:element name="repositories">
+ <xsd:complexType>
+ <xsd:complexContent>
+ <xsd:extension base="repository:repositories">
+ <xsd:attributeGroup ref="gemfire-repository-attributes"/>
+ <xsd:attributeGroup ref="repository:repository-attributes"/>
+ </xsd:extension>
+ </xsd:complexContent>
+ </xsd:complexType>
+ </xsd:element>
+ <!-- -->
+ <xsd:attributeGroup name="gemfire-repository-attributes">
+ <xsd:attribute name="mapping-context-ref" type="mappingContextRef">
+ <xsd:annotation>
+ <xsd:documentation>
+ The reference to a MappingContext. If not set a default one will be created.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ </xsd:attributeGroup>
+ <!-- -->
+ <xsd:simpleType name="mappingContextRef">
+ <xsd:annotation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:assignable-to type="org.springframework.data.gemfire.GemfireMappingContext"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ <xsd:union memberTypes="xsd:string"/>
+ </xsd:simpleType>
+ <!-- -->
+</xsd:schema>
View
2,919 src/main/resources/org/springframework/data/gemfire/config/spring-gemfire-1.3.xsd
2,919 additions, 0 deletions not shown
View
50 src/test/java/org/springframework/data/gemfire/ForkUtil.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2011-2012 the original author or authors.
+ * Copyright 2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,24 +35,27 @@
public static OutputStream cloneJVM(String argument) {
String cp = System.getProperty("java.class.path");
- String home = System.getProperty("java.home");
+ String home = System.getProperty("java.home");
+
+ Process proc = null;
String sp = System.getProperty("file.separator");
String java = home + sp + "bin" + sp + "java";
+ String argCp = " -cp " + cp;
String argClass = argument;
- String cmd = java + "-cp " + cp + " " + argClass;
- final Process proc;
+ String cmd = java + argCp + " " + argClass;
try {
- ProcessBuilder builder = new ProcessBuilder(java , "-cp", cp, argClass);
- builder.redirectErrorStream(true);
- proc = builder.start();
+ //ProcessBuilder builder = new ProcessBuilder(cmd, argCp, argClass);
+ //builder.redirectErrorStream(true);
+ proc = Runtime.getRuntime().exec(cmd);
} catch (IOException ioe) {
throw new IllegalStateException("Cannot start command " + cmd, ioe);
}
System.out.println("Started fork from command\n" + cmd);
-
- final BufferedReader br = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+ final Process p = proc;
+
+ final BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));
final AtomicBoolean run = new AtomicBoolean(true);
Thread reader = new Thread(new Runnable() {
@@ -80,11 +83,11 @@ public void run() {
System.out.println("Stopping fork...");
run.set(false);
os = null;
- if (proc != null)
- proc.destroy();
+ if (p != null)
+ p.destroy();
try {
- proc.waitFor();
+ p.waitFor();
} catch (InterruptedException e) {
// ignore
}
@@ -95,9 +98,17 @@ public void run() {
os = proc.getOutputStream();
return os;
}
-
+
+ public static OutputStream cacheServer(Class<?> clazz) {
+ return startCacheServer(clazz.getName());
+ }
+
public static OutputStream cacheServer() {
- String className = "org.springframework.data.gemfire.fork.CacheServerProcess";
+ return startCacheServer("org.springframework.data.gemfire.fork.CacheServerProcess");
+ }
+
+ private static OutputStream startCacheServer(String className) {
+
if (controlFileExists(className)) {
deleteControlFile(className);
}
@@ -131,22 +142,17 @@ public static void sendSignal() {
}
public static boolean deleteControlFile(String name) {
- String path = getControlFilePath(name);
+ String path = TEMP_DIR + File.separator + name;
return new File(path).delete();
}
public static boolean createControlFile(String name) throws IOException {
- String path = getControlFilePath(name);
- System.out.println("creating " + path);
+ String path = TEMP_DIR + File.separator + name;
return new File(path).createNewFile();
}
public static boolean controlFileExists(String name) {
- String path = getControlFilePath(name);
+ String path = TEMP_DIR + File.separator + name;
return new File(path).exists();
}
-
- private static String getControlFilePath(String name) {
- return TEMP_DIR.endsWith(File.separator)? TEMP_DIR + name : TEMP_DIR + File.separator + name;
- }
}
View
107 src/test/java/org/springframework/data/gemfire/fork/FunctionCacheServerProcess.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.data.gemfire.fork;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.Properties;
+
+
+import org.springframework.data.gemfire.ForkUtil;
+import org.springframework.data.gemfire.function.MethodInvokingFunction;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.execute.FunctionAdapter;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+
+/**
+ * @author Costin Leau
+ */
+public class FunctionCacheServerProcess {
+ static Region testRegion;
+ public static void main(String[] args) throws Exception {
+
+ Properties props = new Properties();
+ props.setProperty("name", "CqServer");
+ props.setProperty("log-level", "config");
+
+ System.out.println("\nConnecting to the distributed system and creating the cache.");
+ DistributedSystem ds = DistributedSystem.connect(props);
+ Cache cache = CacheFactory.create(ds);
+
+ // Create region.
+ AttributesFactory factory = new AttributesFactory();
+ factory.setDataPolicy(DataPolicy.REPLICATE);
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ testRegion = cache.createRegion("test-function", factory.create());
+ System.out.println("Test region, " + testRegion.getFullPath() + ", created in cache.");
+
+ // Start Cache Server.
+ CacheServer server = cache.addCacheServer();
+ server.setPort(40404);
+ server.setNotifyBySubscription(true);
+ server.start();
+ System.out.println("Server started");
+
+
+ testRegion.put("one", 1);
+ testRegion.put("two", 2);
+ testRegion.put("three", 3);
+
+ FunctionService.registerFunction(new ServerFunction());
+
+ FunctionService.registerFunction(new MethodInvokingFunction());
+
+ ForkUtil.createControlFile(FunctionCacheServerProcess.class.getName());
+
+ System.out.println("Waiting for shutdown");
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
+ bufferedReader.readLine();
+ }
+
+ static class ServerFunction extends FunctionAdapter {
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.execute.FunctionAdapter#execute(com.gemstone.gemfire.cache.execute.FunctionContext)
+ */
+ @Override
+ public void execute(FunctionContext functionContext) {
+ Object[] args = (Object[])functionContext.getArguments();
+ testRegion.put(args[0], args[1]);
+ functionContext.getResultSender().lastResult(null);
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.execute.FunctionAdapter#getId()
+ */
+ @Override
+ public String getId() {
+ return "serverFunction";
+ }
+
+ }
+
+
+}
View
157 src/test/java/org/springframework/data/gemfire/function/FunctionExecutionTests.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2002-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.springframework.data.gemfire.function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.springframework.data.gemfire.ForkUtil;
+import org.springframework.data.gemfire.fork.FunctionCacheServerProcess;
+import org.springframework.data.gemfire.function.foo.Foo;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.ClientCache;
+import com.gemstone.gemfire.cache.client.ClientCacheFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.cache.client.PoolFactory;
+import com.gemstone.gemfire.cache.client.PoolManager;
+
+/**
+ * @author David Turanski
+ *
+ */
+public class FunctionExecutionTests {
+
+ private static ClientCache cache = null;
+
+ private static Pool pool = null;
+
+ private static Region<String, Integer> clientRegion = null;
+
+ @BeforeClass
+ public static void startUp() throws Exception {
+ ForkUtil.cacheServer(FunctionCacheServerProcess.class);
+
+ Properties props = new Properties();
+ props.put("mcast-port", "0");
+ props.put("name", "function-client");
+ props.put("log-level", "warning");
+
+ ClientCacheFactory ccf = new ClientCacheFactory(props);
+ ccf.setPoolSubscriptionEnabled(true);
+ cache = ccf.create();
+
+ PoolFactory pf = PoolManager.createFactory();
+ pf.addServer("localhost", 40404);
+ pf.setSubscriptionEnabled(true);
+ pool = pf.create("client");
+
+ ClientRegionFactory<String, Integer> crf = cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+ crf.setPoolName("client");
+ clientRegion = crf.create("test-function");
+ }
+
+ @AfterClass
+ public static void cleanUp() {
+ ForkUtil.sendSignal();
+ if (clientRegion != null) {
+ clientRegion.destroyRegion();
+ }
+ if (pool != null) {
+ pool.destroy();
+ pool = null;
+ }
+
+ if (cache != null) {
+ cache.close();
+ }
+ cache = null;
+ }
+
+ @Test
+ public void testRegionExecution() {
+ RemoteMethodInvocation invocation = new RemoteMethodInvocation(Foo.class, "oneArg", "one");
+ RegionFunctionExecution<Integer> execution = new RegionFunctionExecution<Integer>(clientRegion,
+ new MethodInvokingFunction(), invocation);
+
+ int result = execution.executeAndExtract();
+ assertEquals(1, result);
+ }
+
+ @Test
+ public void testRegionExecutionWithRegisteredFunction() {
+ RemoteMethodInvocation invocation = new RemoteMethodInvocation(Foo.class, "oneArg", "one");
+ RegionFunctionExecution<Integer> execution = new RegionFunctionExecution<Integer>(clientRegion,
+ new MethodInvokingFunction().getId(), invocation);
+ int result = execution.executeAndExtract();
+ assertEquals(1, result);
+ }
+
+ // TODO: Filter only works on partitioned region. No effect here, but server
+ // won't start with a partitioned region. Probably because no locator
+ @Test
+ public void testRegionExecutionWithFilter() {
+ RemoteMethodInvocation invocation = new RemoteMethodInvocation(Foo.class, "oneArg", "one");
+ Set<String> keys = new HashSet<String>();
+ keys.add("two");
+ RegionFunctionExecution<Integer> execution = new RegionFunctionExecution<Integer>(clientRegion,
+ new MethodInvokingFunction().getId(), invocation);
+ execution.setKeys(keys);
+ Integer result = execution.executeAndExtract();
+ // assertEquals(null,result.get(0));
+ assertEquals(1, result.intValue());
+ }
+
+ @Test
+ public void testRegionExecutionForMap() {
+ RemoteMethodInvocation invocation = new RemoteMethodInvocation(Foo.class, "getMapWithNoArgs");