Permalink
Browse files

1) Moved id out of event type

2) Moved Input Output to top level
  • Loading branch information...
1 parent 93692e7 commit 0e863d5716eabd9316c9f4e5b6d70d04f93b3cb1 @stampy88 committed Jan 2, 2012
@@ -1,8 +1,8 @@
-package org.matrixlab.octopus.core.processor;
+package org.matrixlab.octopus.core;
-import org.matrixlab.octopus.core.ValidationException;
import org.matrixlab.octopus.core.event.Attribute;
import org.matrixlab.octopus.core.event.EventType;
+import org.matrixlab.octopus.core.processor.ProcessorComponent;
import org.matrixlab.octopus.core.source.Source;
import static com.google.common.base.Preconditions.checkArgument;
@@ -79,6 +79,28 @@ public Source getSource() {
return this;
}
+ public void setSourceAttribute(String attributeName) throws ValidationException {
+ if (this.source == null) {
+ throw new ValidationException("Cannot set the source before setting the source attribute");
+ }
+
+ EventType sourceType = source.getOutputEventType();
+
+ Attribute sourceAttribute = sourceType.getAttributeByName(attributeName);
+
+ if (sourceAttribute == null) {
+ throw new ValidationException(String.format("Source does not contain an attribute named '%s'", attributeName));
+ }
+
+ if (!sourceAttribute.isCompatibleWith(getType())) {
+ throw new ValidationException(
+ String.format("The attribute '%s' is not compatible with the input '%s'", sourceAttribute,
+ getName())
+ );
+ }
+ this.sourceAttribute = sourceAttribute;
+ }
+
public void setSourceAttribute(Attribute sourceAttribute) throws ValidationException {
if (this.source == null) {
throw new ValidationException("Cannot set the source before setting the source attribute");
@@ -1,6 +1,7 @@
-package org.matrixlab.octopus.core.processor;
+package org.matrixlab.octopus.core;
import org.matrixlab.octopus.core.event.Attribute;
+import org.matrixlab.octopus.core.processor.ProcessorComponent;
/**
* @author dave sinclair(david.sinclair@lisa-park.com)
@@ -9,19 +10,27 @@
private final Attribute<T> attribute;
+ //private final EventType eventType;
+
private Output(Builder<T> builder) {
super(builder.id, builder.name, builder.description);
this.attribute = builder.attribute;
+
+// this.eventType = new EventType();
+// if(attribute != null) {
+// this.eventType.addAttribute(attribute);
+// }
}
private Output(Output<T> existingOutput, ReproductionMode mode) {
super(existingOutput);
if (mode == ReproductionMode.NEW_INSTANCE) {
this.attribute = existingOutput.attribute.newInstance();
-
+ // this.eventType = existingOutput.eventType.newInstance();
} else {
this.attribute = existingOutput.attribute.copyOf();
+ // this.eventType = existingOutput.eventType.copyOf();
}
}
@@ -33,7 +42,7 @@ public String getAttributeName() {
return attribute.getName();
}
- Attribute<T> getAttribute() {
+ public Attribute<T> getAttribute() {
return attribute;
}
@@ -2,18 +2,19 @@
import com.espertech.esper.client.*;
import com.google.common.collect.Lists;
+import org.matrixlab.octopus.core.Input;
import org.matrixlab.octopus.core.ProcessingModel;
import org.matrixlab.octopus.core.ValidationException;
import org.matrixlab.octopus.core.event.EventType;
import org.matrixlab.octopus.core.memory.Memory;
import org.matrixlab.octopus.core.memory.heap.HeapMemoryProvider;
import org.matrixlab.octopus.core.processor.CompiledProcessor;
-import org.matrixlab.octopus.core.processor.Input;
import org.matrixlab.octopus.core.processor.Processor;
import org.matrixlab.octopus.core.runtime.ProcessingRuntime;
import org.matrixlab.octopus.core.runtime.esper.EsperRuntime;
import org.matrixlab.octopus.core.source.external.CompiledExternalSource;
import org.matrixlab.octopus.core.source.external.ExternalSource;
+import org.matrixlab.octopus.util.esper.EsperUtils;
import java.util.*;
@@ -43,7 +44,7 @@ void registerEventTypesForModel(Configuration configuration, ProcessingModel mod
EventType eventType = externalSource.getOutputEventType();
configuration.addEventType(
- EsperUtils.getEventNameForEventType(eventType),
+ EsperUtils.getEventNameForSource(externalSource),
eventType.getEventDefinition()
);
}
@@ -54,7 +55,7 @@ void registerEventTypesForModel(Configuration configuration, ProcessingModel mod
EventType eventType = processor.getOutputEventType();
configuration.addEventType(
- EsperUtils.getEventNameForEventType(eventType),
+ EsperUtils.getEventNameForSource(processor),
eventType.getEventDefinition()
);
}
@@ -97,8 +98,7 @@ public ProcessingRuntime compile(ProcessingModel model) {
stmt.setSubscriber(runner);
// todo this is temporary think we want the output to have the id possibly
- EventType outputEventType = processor.getOutputEventType();
- String outputEventName = EsperUtils.getEventNameForEventType(outputEventType);
+ String outputEventName = EsperUtils.getEventNameForSource(processor);
System.out.println("Compiler " + outputEventName);
String debugStmt = String.format("select * from %s", outputEventName);
stmt = admin.createEPL(debugStmt);
@@ -136,7 +136,7 @@ String getStatementForCompiledProcessor(CompiledProcessor<?> compiledProcessor)
fromClause.append(", ");
}
- String inputName = EsperUtils.getEventNameForEventType(input.getSource().getOutputEventType());
+ String inputName = EsperUtils.getEventNameForSource(input.getSource());
String aliasName = "_" + aliasIndex++;
selectClause.append(aliasName).append(".*");
@@ -2,11 +2,12 @@
import com.espertech.esper.client.EPRuntime;
import com.google.common.collect.Maps;
+import org.matrixlab.octopus.core.Input;
import org.matrixlab.octopus.core.event.Event;
import org.matrixlab.octopus.core.memory.Memory;
import org.matrixlab.octopus.core.processor.CompiledProcessor;
-import org.matrixlab.octopus.core.processor.Input;
import org.matrixlab.octopus.util.Pair;
+import org.matrixlab.octopus.util.esper.EsperUtils;
import java.util.Arrays;
import java.util.Map;
@@ -33,14 +34,14 @@
int index = 0;
for (Input input : processor.getInputs()) {
- String sourceId = EsperUtils.getEventNameForEventType(input.getSource().getOutputEventType());
+ String sourceId = EsperUtils.getEventNameForSource(input.getSource());
Integer inputId = input.getId();
sourceIdToInputId[index++] = Pair.newInstance(sourceId, inputId);
}
if (processor.generatesOutput()) {
outputAttributeName = processor.getOutput().getAttributeName();
- outputEventId = EsperUtils.getEventNameForEventType(processor.getOutputEventType());
+ outputEventId = EsperUtils.getEventNameForSource(processor);
} else {
outputAttributeName = null;
@@ -1,23 +0,0 @@
-package org.matrixlab.octopus.core.compiler.esper;
-
-import org.matrixlab.octopus.core.event.EventType;
-
-/**
- * @author dave sinclair(david.sinclair@lisa-park.com)
- */
-abstract class EsperUtils {
-
- static String getEventNameForEventType(EventType eventType) {
- StringBuilder eventName = new StringBuilder("_");
-
- String idAsString = eventType.getId().toString();
- for (int i = 0; i < idAsString.length(); ++i) {
- if (idAsString.charAt(i) != '-') {
- eventName.append(idAsString.charAt(i));
- }
- }
-
- return eventName.toString();
- }
-
-}
@@ -47,6 +47,7 @@ public Float getAttributeAsFloat(String attributeName) {
}
public Double getAttributeAsDouble(String attributeName) {
+ // todo return null??
return ((Number) data.get(attributeName)).doubleValue();
}
@@ -8,34 +8,24 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
/**
+ * An {@link EventType} is the definition of an {@link Event} that describes some or all of the attributes a event
+ * will have.
+ *
* @author dave sinclair(david.sinclair@lisa-park.com)
*/
public class EventType implements Reproducible {
- private final UUID id;
private final List<Attribute> attributes = Lists.newLinkedList();
- public EventType(UUID id) {
- this.id = id;
+ public EventType() {
}
private EventType(EventType copyFromEventType) {
- this.id = copyFromEventType.id;
this.attributes.addAll(copyFromEventType.attributes);
}
- private EventType(UUID id, EventType copyFromEventType) {
- this.id = id;
- this.attributes.addAll(copyFromEventType.attributes);
- }
-
- public UUID getId() {
- return id;
- }
-
public EventType unionWith(EventType eventType) {
attributes.addAll(eventType.attributes);
@@ -90,18 +80,17 @@ public boolean containsAttribute(Attribute attribute) {
@Override
public String toString() {
return "EventType{" +
- "id='" + id + '\'' +
- ", attributes=" + attributes +
+ "attributes=" + attributes +
'}';
}
@Override
public EventType newInstance() {
- return new EventType(UUID.randomUUID(), this);
+ return new EventType(this);
}
@Override
- public Reproducible copyOf() {
+ public EventType copyOf() {
return new EventType(this);
}
}
@@ -1,5 +1,7 @@
package org.matrixlab.octopus.core.processor;
+import org.matrixlab.octopus.core.Input;
+import org.matrixlab.octopus.core.Output;
import org.matrixlab.octopus.core.ValidationException;
import org.matrixlab.octopus.core.event.Event;
import org.matrixlab.octopus.core.memory.Memory;
@@ -70,7 +72,7 @@ public Addition copyOf() {
/**
* Returns a new {@link Addition} processor configured with all the appropriate
- * {@link org.matrixlab.octopus.core.processor.parameter.Parameter}s, {@link Input}s and {@link Output}.
+ * {@link org.matrixlab.octopus.core.processor.parameter.Parameter}s, {@link org.matrixlab.octopus.core.Input}s and {@link org.matrixlab.octopus.core.Output}.
*
* @return new {@link Addition}
*/
@@ -1,11 +1,14 @@
package org.matrixlab.octopus.core.processor;
+import org.matrixlab.octopus.core.Input;
+import org.matrixlab.octopus.core.Output;
import org.matrixlab.octopus.core.event.Event;
import org.matrixlab.octopus.core.event.EventType;
import org.matrixlab.octopus.core.memory.Memory;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
/**
* @author dave sinclair(david.sinclair@lisa-park.com)
@@ -14,13 +17,19 @@
private final List<Input> inputs;
private final Output output;
private final EventType outputEventType;
+ private final UUID id;
protected CompiledProcessor(Processor<MEMORY_TYPE> processor) {
+ this.id = processor.getId();
this.inputs = processor.getInputs();
this.output = processor.getOutput();
this.outputEventType = processor.getOutputEventType();
}
+ public UUID getId() {
+ return id;
+ }
+
public List<Input> getInputs() {
return inputs;
}
@@ -3,6 +3,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.matrixlab.octopus.core.AbstractNode;
+import org.matrixlab.octopus.core.Input;
+import org.matrixlab.octopus.core.Output;
import org.matrixlab.octopus.core.ValidationException;
import org.matrixlab.octopus.core.event.EventType;
import org.matrixlab.octopus.core.memory.Memory;
@@ -20,8 +22,8 @@
* that affect the behavior of the processor.
*
* @author dave sinclair(david.sinclair@lisa-park.com)
- * @see org.matrixlab.octopus.core.processor.Input
- * @see org.matrixlab.octopus.core.processor.Output
+ * @see org.matrixlab.octopus.core.Input
+ * @see org.matrixlab.octopus.core.Output
* @see org.matrixlab.octopus.core.processor.parameter.Parameter
*/
public abstract class Processor<MEMORY_TYPE> extends AbstractNode implements Source, Sink {
@@ -102,8 +104,7 @@ protected void setOutput(Output.Builder output) {
protected void setOutput(Output output) {
this.output = output;
- // the event type id is the same as this processor's id
- this.outputEventType = new EventType(getId());
+ this.outputEventType = new EventType();
this.outputEventType.addAttribute(output.getAttribute());
}
@@ -1,5 +1,7 @@
package org.matrixlab.octopus.core.processor;
+import org.matrixlab.octopus.core.Input;
+import org.matrixlab.octopus.core.Output;
import org.matrixlab.octopus.core.ValidationException;
import org.matrixlab.octopus.core.event.Event;
import org.matrixlab.octopus.core.memory.Memory;
@@ -142,8 +144,11 @@ public Object processEvent(Memory<Double> memory, Map<Integer, Event> eventsByIn
// sma only has a single event
Event event = eventsByInputId.get(INPUT_ID);
- double newItem = event.getAttributeAsDouble(inputAttributeName);
+ Double newItem = event.getAttributeAsDouble(inputAttributeName);
+ if (newItem == null) {
+ newItem = 0D;
+ }
memory.add(newItem);
double total = 0;
@@ -1,13 +1,13 @@
package org.matrixlab.octopus.core.runtime;
import org.matrixlab.octopus.core.event.Event;
-import org.matrixlab.octopus.core.event.EventType;
+import org.matrixlab.octopus.core.source.Source;
/**
* @author dave sinclair(david.sinclair@lisa-park.com)
*/
public interface ProcessingRuntime {
void start();
- void sendEvent(Event event, EventType eventType);
+ void sendEventFromSource(Event event, Source source);
}
Oops, something went wrong.

0 comments on commit 0e863d5

Please sign in to comment.