Permalink
Browse files

Merge remote branch 'sanmi/master'

  • Loading branch information...
2 parents ca5c640 + 3231a36 commit 6450c8051ea97008ae62c14960fa73dda579ee05 @brucerobbins brucerobbins committed Jan 25, 2011
View
8 pom.xml
@@ -107,10 +107,10 @@
<version>1.0.1</version>
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>3.8.1</version>
- <scope>test</scope>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.4</version>
+ <scope>test</scope>
</dependency>
</dependencies>
<build>
View
25 src/main/java/io/s4/processor/AbstractPE.java
@@ -4,7 +4,7 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -22,14 +22,14 @@
import io.s4.dispatcher.partitioner.KeyInfo.KeyPathElementName;
import io.s4.persist.Persister;
import io.s4.schema.Schema;
-import io.s4.schema.SchemaContainer;
import io.s4.schema.Schema.Property;
+import io.s4.schema.SchemaContainer;
import io.s4.util.clock.Clock;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.log4j.Logger;
@@ -74,6 +74,7 @@ public String getName() {
private int outputsBeforePause = -1;
private long pauseTimeInMillis;
private boolean logPauses = false;
+ private String initMethod = null;
public void setSaveKeyRecord(boolean saveKeyRecord) {
this.saveKeyRecord = saveKeyRecord;
@@ -98,6 +99,19 @@ public void setS4Clock(Clock s4Clock) {
}
}
+ /**
+ * The name of a method to be used as an initializer. The method will be
+ * called after the object is cloned from the prototype PE.
+ */
+ public void setInitMethod(String initMethod)
+ {
+ this.initMethod = initMethod;
+ }
+
+ public String getInitMethod() {
+ return this.initMethod;
+ }
+
public Clock getS4Clock() {
return s4Clock;
}
@@ -349,7 +363,8 @@ private void initFrequency() {
*/
public Object clone() {
try {
- return super.clone();
+ Object clone = super.clone();
+ return clone;
} catch (CloneNotSupportedException e) {
throw new RuntimeException(e);
}
@@ -445,5 +460,5 @@ public void run() {
} // end if lookup table is not null
}
}
- }
+ }
}
View
2 src/main/java/io/s4/processor/ProcessingElement.java
@@ -29,4 +29,6 @@
public Object clone();
public String getId();
+
+ public String getInitMethod();
}
View
6 src/main/java/io/s4/processor/PrototypeWrapper.java
@@ -63,6 +63,12 @@ public ProcessingElement getPE(String keyValue) {
pe = (ProcessingElement) lookupTable.get(keyValue);
if (pe == null) {
pe = (ProcessingElement) prototype.clone();
+ //invoke the initialization method if it has been specified
+ if (pe.getInitMethod() != null) {
+ Method initMethod = pe.getClass().getMethod(pe.getInitMethod(), new Class[0]);
+ initMethod.invoke(pe, (new Object[0]));
+ }
+
}
// update the last update time on the entry
lookupTable.set(keyValue, pe, prototype.getTtl());
View
34 src/test/java/io/s4/processor/MockPE.java
@@ -0,0 +1,34 @@
+package io.s4.processor;
+
+/**
+ * Mock PE for asserting correct behavior of AbstractPE
+ */
+public class MockPE extends AbstractPE {
+
+ private int initializeCount = 0;
+
+ public void testInitialize() {
+ initializeCount++;
+ }
+
+ public void processEvent(Object obj) {
+ }
+
+ @Override
+ public String getId() {
+ return null;
+ }
+
+ @Override
+ public void output() {
+
+ }
+
+ /**
+ * @return the initializeCount
+ */
+ public int getInitializeCount() {
+ return initializeCount;
+ }
+
+}
View
31 src/test/java/io/s4/processor/TestPrototypeWrapper.java
@@ -0,0 +1,31 @@
+package io.s4.processor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import io.s4.util.clock.WallClock;
+
+import org.junit.Test;
+
+public class TestPrototypeWrapper
+{
+
+ /**
+ * Verifies ability to set an initialize method that will be called when
+ * a new PE is instantiated
+ */
+ @Test
+ public void testCloneAndInitialize() {
+ MockPE prototype = new MockPE();
+ prototype.setInitMethod("testInitialize");
+
+ PrototypeWrapper prototypeWrapper = new PrototypeWrapper(prototype, new WallClock());
+
+ assertEquals(0, prototype.getInitializeCount());
+ MockPE instance = (MockPE)prototypeWrapper.getPE("asd");
+ assertNotNull(instance);
+
+ assertEquals(0, prototype.getInitializeCount());
+ assertEquals(1, instance.getInitializeCount());
+ }
+
+}
View
44 src/test/resources/log4j.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="R" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="File" value="target/s4_core.log" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %c %p (%F:%L) %m%n"/>
+ </layout>
+ </appender>
+ <appender name="S" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="File" value="target/s4_core.mon" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %c %p (%F:%L) %m%n"/>
+ </layout>
+ </appender>
+ <logger name="com.yahoo" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="R"/>
+ </logger>
+ <logger name="s4" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="R"/>
+ </logger>
+ <logger name="zk" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="R"/>
+ </logger>
+ <logger name="dispatcher" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="R"/>
+ </logger>
+ <logger name="monitor">
+ <level value="info"/>
+ <appender-ref ref="S"/>
+ </logger>
+ <root>
+ <priority value ="info" />
+ <appender-ref ref="R" />
+ </root>
+</log4j:configuration>
+
+
+
+

0 comments on commit 6450c80

Please sign in to comment.