Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Checking in FunctionExecution quick start example.

  • Loading branch information...
commit 8b43260089a0939121563ddb715f6c82a4409da8 1 parent f846cfd
wxlund authored
View
31 vfgf-quickstart/src/main/java/quickstart/FunctionExecutionPeer1App.java
@@ -0,0 +1,31 @@
+package quickstart;
+
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import quickstart.function.execution.FunctionExecutionPeer1;
+
+public class FunctionExecutionPeer1App {
+
+ private static final String[] CONFIGS = new String[] { "function-execution-peer-app-context.xml" };
+
+ public static void main(String[] args) {
+
+ try {
+
+ String[] res = (args != null && args.length > 0 ? args : CONFIGS);
+ AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
+ res);
+ // shutdown the context along with the VM
+ ctx.registerShutdownHook();
+ FunctionExecutionPeer1 bean = ctx
+ .getBean(FunctionExecutionPeer1.class);
+ bean.run();
+
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
View
31 vfgf-quickstart/src/main/java/quickstart/FunctionExecutionPeer2App.java
@@ -0,0 +1,31 @@
+package quickstart;
+
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import quickstart.function.execution.FunctionExecutionPeer2;
+
+public class FunctionExecutionPeer2App {
+
+ private static final String[] CONFIGS = new String[] { "function-execution-peer-app-context.xml" };
+
+ public static void main(String[] args) {
+
+ try {
+
+ String[] res = (args != null && args.length > 0 ? args : CONFIGS);
+ AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
+ res);
+ // shutdown the context along with the VM
+ ctx.registerShutdownHook();
+ FunctionExecutionPeer2 bean = ctx
+ .getBean(FunctionExecutionPeer2.class);
+ bean.run();
+
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
View
52 vfgf-quickstart/src/main/java/quickstart/MultiGetFunction.java
@@ -0,0 +1,52 @@
+package quickstart;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.execute.FunctionAdapter;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+/**
+ * Application Function to retrieve values for multiple keys in a region
+ *
+ * @author Gemstone Systems Inc
+ * @since 6.0
+ *
+ */
+public class MultiGetFunction extends FunctionAdapter {
+
+ public void execute(FunctionContext fc) {
+ if(fc instanceof RegionFunctionContext){
+ RegionFunctionContext context = (RegionFunctionContext)fc;
+ Set keys = context.getFilter();
+ Set keysTillSecondLast = new HashSet();
+ int setSize = keys.size();
+ Iterator keysIterator = keys.iterator();
+ for (int i = 0; i < (setSize - 1); i++) {
+ keysTillSecondLast.add(keysIterator.next());
+ }
+ for (Object k : keysTillSecondLast) {
+ context.getResultSender().sendResult(
+ (Serializable)PartitionRegionHelper.getLocalDataForContext(context)
+ .get(k));
+ }
+ Object lastResult = keysIterator.next();
+ context.getResultSender().lastResult(
+ (Serializable)PartitionRegionHelper.getLocalDataForContext(context)
+ .get(lastResult));
+ }else {
+ fc.getResultSender().lastResult(Runtime.getRuntime().freeMemory()/(1024*1024));
+ }
+ }
+
+ public String getId() {
+ return getClass().getName();
+ }
+}
+
View
93 vfgf-quickstart/src/main/java/quickstart/MyArrayListResultCollector.java
@@ -0,0 +1,93 @@
+package quickstart;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.distributed.DistributedMember;
+
+/**
+ *
+ * MyArrayListResultCollector gathers result from all the function execution
+ * nodes.<br>
+ * Using a custom ResultCollector a user can sort/aggrgate the result. This
+ * implementation stores the result in a List. The size of the list will be same
+ * as the no of nodes on which a function got executed
+ *
+ * @author Gemstone Systems Inc
+ * @since 6.0
+ *
+ */
+public class MyArrayListResultCollector implements
+ ResultCollector<Serializable,Serializable> {
+
+ final ArrayList<Serializable> result = new ArrayList<Serializable>();
+
+ /**
+ * Adds a single function execution result from a remote node to the
+ * ResultCollector
+ *
+ * @param resultOfSingleExecution
+ * @param memberID
+ */
+ public void addResult(DistributedMember memberID,
+ Serializable resultOfSingleExecution) {
+ this.result.add(resultOfSingleExecution);
+ }
+
+ /**
+ * Waits if necessary for the computation to complete, and then retrieves its
+ * result.<br>
+ * If {@link Function#hasResult()} is false, upon calling
+ * {@link ResultCollector#getResult()} throws {@link FunctionException}.
+ *
+ * @return the Serializable computed result
+ * @throws FunctionException
+ * if something goes wrong while retrieving the result
+ */
+ public Serializable getResult() throws FunctionException {
+ return this.result;
+ }
+
+ /**
+ * Waits if necessary for at most the given time for the computation to
+ * complete, and then retrieves its result, if available. <br>
+ * If {@link Function#hasResult()} is false, upon calling
+ * {@link ResultCollector#getResult()} throws {@link FunctionException}.
+ *
+ * @param timeout
+ * the maximum time to wait
+ * @param unit
+ * the time unit of the timeout argument
+ * @return Serializable computed result
+ * @throws FunctionException
+ * if something goes wrong while retrieving the result
+ */
+ public Serializable getResult(long timeout, TimeUnit unit)
+ throws FunctionException, InterruptedException {
+ return this.result;
+ }
+
+ /**
+ * GemFire will invoke this method before re-executing function (in case of
+ * Function Execution HA) This is to clear the previous execution results from
+ * the result collector
+ *
+ * @since 6.3
+ */
+ public void clearResults() {
+ result.clear();
+ }
+
+ /**
+ * Call back provided to caller, which is called after function execution is
+ * complete and caller can retrieve results using
+ * {@link ResultCollector#getResult()}
+ *
+ */
+ public void endResults() {}
+
+}
View
59 vfgf-quickstart/src/main/java/quickstart/function/execution/FunctionExecutionPeer1.java
@@ -0,0 +1,59 @@
+package quickstart.function.execution;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+
+import javax.annotation.Resource;
+
+import org.springframework.data.gemfire.GemfireTemplate;
+import org.springframework.stereotype.Component;
+
+import quickstart.MultiGetFunction;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+
+/**
+ * This is the peer to which FunctionExecutionPeer2 connects for function execution.
+ * This peer executes the function execution request and returns the results to
+ * the requesting peer
+ *
+ * @author GemStone Systems, Inc.
+ * @since 6.0
+ */
+@Component
+public class FunctionExecutionPeer1 {
+
+ @Resource(name = "gemfireTemplate1")
+ private GemfireTemplate exampleRegion;
+ private final BufferedReader stdinReader;
+
+ public FunctionExecutionPeer1() {
+ this.stdinReader = new BufferedReader(new InputStreamReader(System.in));
+ }
+
+ public void run() throws Exception {
+
+ writeToStdout("Peer to which other peer sends request for function Execution");
+ writeToStdout("Connecting to the distributed system and creating the cache... ");
+
+ // Get the exampleRegion
+ writeToStdout("Example region \"" + exampleRegion.getRegion().getFullPath()
+ + "\" created in cache.");
+
+ writeToStdout("Registering the function MultiGetFunction on Peer");
+ MultiGetFunction function = new MultiGetFunction();
+ FunctionService.registerFunction(function);
+
+ writeToStdout("Please start Other Peer And Then Press Enter to continue.");
+ stdinReader.readLine();
+
+ System.out.println("Closing the cache and disconnecting.");
+ }
+
+ private static void writeToStdout(String msg) {
+ // System.out.print("[FunctionExecutionPeer1] ");
+ System.out.println(msg);
+ }
+
+}
View
102 vfgf-quickstart/src/main/java/quickstart/function/execution/FunctionExecutionPeer2.java
@@ -0,0 +1,102 @@
+package quickstart.function.execution;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.Resource;
+
+import org.springframework.data.gemfire.GemfireTemplate;
+import org.springframework.stereotype.Component;
+
+import quickstart.MultiGetFunction;
+import quickstart.MyArrayListResultCollector;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.Execution;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.cache.execute.ResultCollector;
+
+/**
+ * In this example of peer-to-peer function execution, one peer sends a request
+ * for function execution to another peer. FunctionExecutionPeer2 creates a region,
+ * populates the region and sends a function execution request to FunctionExecutionPeer1
+ * while simultaneously executing the function on its own region.
+ * It collects the result from its own execution as well as from FunctionExecutionPeer1.
+ * Please refer to the quickstart guide for instructions on how to run this
+ * example.
+ *
+ * @author GemStone Systems, Inc.
+ * @since 6.0
+ */
+@Component
+public class FunctionExecutionPeer2 {
+ @Resource(name = "gemfireTemplate2")
+ private GemfireTemplate exampleRegion;
+ private final BufferedReader stdinReader;
+
+ public FunctionExecutionPeer2() {
+ this.stdinReader = new BufferedReader(new InputStreamReader(System.in));
+ }
+
+ public static void main(String[] args) throws Exception {
+ new FunctionExecutionPeer2().run();
+ }
+
+ public void run() throws Exception {
+
+ writeToStdout("Peer sending function Execution request to other peer as well as executing function on its own region");
+
+ writeToStdout("Connecting to the distributed system and creating the cache... ");
+
+ // Get the exampleRegion
+ writeToStdout("Example region \"" + exampleRegion.getRegion().getFullPath()
+ + "\" created in cache.");
+
+ // Populate the region
+ for (int i = 0; i < 20; i++) {
+ exampleRegion.put("KEY_" + i, "VALUE_" + i);
+ }
+ writeToStdout("Example region \"" + exampleRegion.getRegion().getFullPath()
+ + "\" is populated.");
+
+ writeToStdout("Press Enter to continue.");
+ stdinReader.readLine();
+
+ writeToStdout("Executing Function : MultiGetFunction on region \""
+ + exampleRegion.getRegion().getFullPath()
+ + "\" with filter size " + 3 + " and with MyArrayListResultCollector.");
+ MultiGetFunction function = new MultiGetFunction();
+ FunctionService.registerFunction(function);
+
+ writeToStdout("Press Enter to continue.");
+ stdinReader.readLine();
+
+ Set<String> keysForGet = new HashSet<String>();
+ keysForGet.add("KEY_4");
+ keysForGet.add("KEY_9");
+ keysForGet.add("KEY_7");
+ Execution execution = FunctionService.onRegion(exampleRegion.getRegion()).withFilter(
+ keysForGet).withArgs(Boolean.TRUE).withCollector(
+ new MyArrayListResultCollector());
+ ResultCollector rc = execution.execute(function);
+
+ writeToStdout("Function executed successfully. Now getting the result");
+
+ List result = (List)rc.getResult();
+ writeToStdout("Got result with size " + result.size() + ".");
+ writeToStdout("Press Enter to continue.");
+ stdinReader.readLine();
+
+ System.out.println("Closing the cache and disconnecting.");
+ }
+
+ private static void writeToStdout(String msg) {
+ System.out.println(msg);
+ }
+
+}
View
14 vfgf-quickstart/src/main/resources/function-execution-peer-app-context.xml
@@ -0,0 +1,14 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:p="http://www.springframework.org/schema/p"
+ xmlns:context="http://www.springframework.org/schema/context"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
+
+ <import resource="function-execution-peer-cache-context.xml"/>
+
+ <!-- find beans by scanning the classpath -->
+ <context:component-scan base-package="quickstart.function.execution" />
+
+</beans>
View
29 vfgf-quickstart/src/main/resources/function-execution-peer-cache-context.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
+ xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util"
+ xmlns:gfe="http://www.springframework.org/schema/gemfire"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
+ http://www.springframework.org/schema/gemfire http://www.springframework.org/schema/gemfire/spring-gemfire.xsd
+ http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
+
+
+ <!-- GemFire cache bean -->
+ <gfe:cache id="peer1"/>
+ <gfe:cache id="peer2"/>
+
+ <gfe:partitioned-region id="peer1Region" name="exampleRegion" local-max-memory="50"
+ copies="0" total-buckets="13" cache-ref="peer1"/>
+
+ <gfe:partitioned-region id="peer2Region" name="exampleRegion" local-max-memory="50"
+ copies="0" total-buckets="13" cache-ref="peer2"/>
+
+ <gfe:transaction-manager id="tx-manager" cache-ref="peer1" />
+
+ <bean id="gemfireTemplate1" class="org.springframework.data.gemfire.GemfireTemplate" p:region-ref="peer1Region"/>
+ <bean id="gemfireTemplate2" class="org.springframework.data.gemfire.GemfireTemplate" p:region-ref="peer2Region"/>
+
+ <tx:annotation-driven transaction-manager="tx-manager" />
+
+</beans>

6 comments on commit 8b43260

@dturanski
Collaborator

@wxlund @droberts18 I tagged the master branch as 'original' so you can get back to it. Going forward, I can reuse this code but I'm making some significant changes intended to improve the developer experience. Also I'm coordinating with the GemFire team who are revamping the core product samples as well. Some of the changes here include 1) separating each sample into a sub project so the code base is very clear. 2) Simplifying the code and configuration to make it easier to read, emphasizing features over good program style. 3) Focus primarily on the Spring Gemfire programming model vs GemFire topologies, etc. So starting locators, etc. will be out of scope, at least initially. Let me know if you're ok with this approach.

@wxlund
Collaborator
@wxlund
Collaborator
@dturanski
Collaborator

@wxlund .Thanks for you comments. I saw Graeme's demo at Spring One a couple of years ago it was awesome. I'll take a look. We may have different objectives. For a customer demo, you want to have the full Gemfire install on the local machine, show locators, gfsh, etc. You likely have an audience with both dev and ops. For a lone developer downloading sample code, building it and trying it out, none of that is necessary to get started. You can do any kind of client-server demo connecting directly to the server. For example, I can easily demo continuous query in eclipse. Same thing for integration testing after the sale is made. You can use spring profiles to support a local config - cache server with no locators - and a deployed config, remote server and locators.

@dturanski
Collaborator

Pretty awesome demo for a variety of reasons. It's not hard to set up Run configurations to start a Cache server (or locators for that matter). I use that myself quite a bit. I didn't see any evidence of a locator in that demo although he did have to set a GEMFIRE env variable to point to the local install. I'm guessing that was there already and not part of the plugin download.

@droberts18
Collaborator

I started with the lightweight approach. When I was converting the examples I was annoyed that it told me to install Gemfire. What was converted to use Spring Gemfire didn't require me to do that anymore. That is why I created CacheServerApp class.

I think down the road when getting production ready guys will use locators. Having that experience from STS will help.

Please sign in to comment.
Something went wrong with that request. Please try again.