Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add AbstractPE.expire()

  • Loading branch information...
commit b11da4f4ebf242d99626a8395f344ae41520ba70 1 parent 737fa28
Bruce Robbins authored
4 s4-core/src/main/java/io/s4/persist/ConMapPersister.java
View
@@ -110,6 +110,10 @@ public void setAsynch(String key, Object value, int period) {
}
public void set(String key, Object value, int period) {
+ if (value == null) {
+ cache.remove(key);
+ return;
+ }
persistCount.getAndIncrement();
CacheEntry ce = new CacheEntry();
ce.value = value;
5 s4-core/src/main/java/io/s4/persist/HashMapPersister.java
View
@@ -108,6 +108,11 @@ public void setAsynch(String key, Object value, int period) {
}
public void set(String key, Object value, int period) {
+ if (value == null) {
+ cache.remove(key);
+ return;
+ }
+
synchronized (this) {
persistCount++;
}
12 s4-core/src/main/java/io/s4/processor/AbstractPE.java
View
@@ -102,6 +102,7 @@ public String getName() {
transient private boolean logPauses = false;
private String id;
transient protected SchemaContainer schemaContainer = new SchemaContainer();
+ transient private PrototypeWrapper prototypeWrapper;
transient private boolean recoveryAttempted = false;
// true if state may have changed
@@ -172,6 +173,10 @@ public Clock getClock() {
return clock;
}
+ public void setPrototypeWrapper(PrototypeWrapper prototypeWrapper) {
+ this.prototypeWrapper = prototypeWrapper;
+ }
+
public AbstractPE() {
OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(this.getClass());
Class<?> overloadDispatcherClass = oldg.generate();
@@ -649,6 +654,13 @@ public void processEvent(RecoveryEvent recoveryEvent) {
isCheckpointingEvent = true;
recover();
}
+
+ /**
+ * This method expires the current PE.
+ **/
+ protected void expire() {
+ this.prototypeWrapper.expire(this.keyValueString);
+ }
class PeriodicInvoker implements Runnable {
17 s4-core/src/main/java/io/s4/processor/PrototypeWrapper.java
View
@@ -20,7 +20,6 @@
import io.s4.persist.Persister;
import io.s4.util.clock.Clock;
-import java.lang.reflect.Method;
import java.util.List;
import org.apache.log4j.Logger;
@@ -47,11 +46,8 @@ public PrototypeWrapper(AbstractPE prototype, Clock s4Clock) {
((ConMapPersister) lookupTable).setSelfClean(true);
((ConMapPersister) lookupTable).init();
// set the persister in prototype
- Method method = prototype.getClass().getMethod("setLookupTable",
- Persister.class);
- method.invoke(prototype, lookupTable);
- } catch (NoSuchMethodException e) {
- // this is expected
+ prototype.setLookupTable(lookupTable);
+ prototype.setPrototypeWrapper(this);
} catch (Exception e) {
// this is not expected
Logger.getLogger("s4")
@@ -109,6 +105,15 @@ public AbstractPE lookupPE(String keyValue) {
return pe;
}
+
+ public void expire(String keyValue) {
+ try {
+ lookupTable.set(keyValue, null, 0);
+ } catch (Exception e) {
+ logger.error("exception when removing pe for key:" + keyValue, e);
+ }
+
+ }
public int getPECount() {
return lookupTable.keySet().size();
Please sign in to comment.
Something went wrong with that request. Please try again.