Skip to content

Commit

Permalink
Version 2 of joins
Browse files Browse the repository at this point in the history
  • Loading branch information
stampy88 committed Feb 29, 2012
1 parent ca85109 commit 829923c
Show file tree
Hide file tree
Showing 14 changed files with 474 additions and 371 deletions.
Expand Up @@ -9,6 +9,7 @@
import com.espertech.esper.client.EPStatement;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.lisapark.octopus.core.Input;
import org.lisapark.octopus.core.ProcessingModel;
import org.lisapark.octopus.core.ValidationException;
Expand All @@ -18,6 +19,7 @@
import org.lisapark.octopus.core.processor.CompiledProcessor;
import org.lisapark.octopus.core.processor.Processor;
import org.lisapark.octopus.core.processor.ProcessorInput;
import org.lisapark.octopus.core.processor.ProcessorJoin;
import org.lisapark.octopus.core.runtime.ProcessingRuntime;
import org.lisapark.octopus.core.runtime.ProcessorContext;
import org.lisapark.octopus.core.runtime.basic.BasicProcessorContext;
Expand Down Expand Up @@ -165,8 +167,7 @@ private Collection<CompiledProcessor<?>> compileProcessors(EPServiceProvider epS
}

EsperProcessorAdaptor runner = new EsperProcessorAdaptor(compiledProcessor, ctx, runtime);

stmt.setSubscriber(runner);
stmt.addListener(runner);

compiledProcessors.add(compiledProcessor);
} catch (ValidationException e) {
Expand Down Expand Up @@ -201,8 +202,7 @@ String getStatementForCompiledProcessor(CompiledProcessor<?> compiledProcessor)

List<ProcessorInput> inputs = compiledProcessor.getInputs();

// todo join inputs

Map<ProcessorInput, String> inputToAlias = Maps.newHashMap();
int aliasIndex = 0;
for (ProcessorInput input : inputs) {
if (selectClause.length() > 0) {
Expand All @@ -213,11 +213,35 @@ String getStatementForCompiledProcessor(CompiledProcessor<?> compiledProcessor)
String inputName = EsperUtils.getEventNameForSource(input.getSource());

String aliasName = "_" + aliasIndex++;
selectClause.append(aliasName).append(".*");
inputToAlias.put(input, aliasName);

selectClause.append(aliasName).append(".* as ").append(aliasName).append("_properties");
fromClause.append(inputName).append(".win:length(1) as ").append(aliasName);
}

return String.format("SELECT %s FROM %s", selectClause, fromClause);
StringBuilder whereClause = new StringBuilder();
List<ProcessorJoin> joins = compiledProcessor.getJoins();
for (ProcessorJoin join : joins) {
// some joins aren't required as they are on the same input
if (join.isRequired()) {
String firstAlias = inputToAlias.get(join.getFirstInput());
String secondAlias = inputToAlias.get(join.getSecondInput());

if (whereClause.length() > 0) {
whereClause.append("AND ");
}

whereClause.append(firstAlias).append('.').append(join.getFirstInputAttributeName());
whereClause.append(" = ");
whereClause.append(secondAlias).append('.').append(join.getSecondInputAttributeName());
}
}

if (whereClause.length() == 0) {
return String.format("SELECT %s FROM %s", selectClause, fromClause);
} else {
return String.format("SELECT %s FROM %s WHERE %s", selectClause, fromClause, whereClause);
}
}

String getStatementForCompiledSink(CompiledExternalSink compiledExternalSink) {
Expand Down
@@ -1,23 +1,27 @@
package org.lisapark.octopus.core.compiler.esper;

import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;
import com.espertech.esper.event.map.MapEventBean;
import com.google.common.collect.Maps;
import org.lisapark.octopus.core.Input;
import org.lisapark.octopus.core.event.Event;
import org.lisapark.octopus.core.processor.CompiledProcessor;
import org.lisapark.octopus.core.processor.ProcessorInput;
import org.lisapark.octopus.core.processor.ProcessorJoin;
import org.lisapark.octopus.core.runtime.ProcessorContext;
import org.lisapark.octopus.util.Pair;
import org.lisapark.octopus.util.esper.EsperUtils;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

/**
* @author dave sinclair(david.sinclair@lisa-park.com)
*/
class EsperProcessorAdaptor {
class EsperProcessorAdaptor implements UpdateListener {
private final CompiledProcessor processor;
private final Pair<String, Integer>[] sourceIdToInputId;
private final Pair<String, ProcessorInput>[] sourceIdToInput;
private final String outputAttributeName;
private final String outputEventId;

Expand All @@ -30,52 +34,87 @@ class EsperProcessorAdaptor {
this.ctx = ctx;
this.runtime = runtime;

this.sourceIdToInputId = (Pair<String, Integer>[]) new Pair[processor.getInputs().size()];
this.sourceIdToInput = (Pair<String, ProcessorInput>[]) new Pair[processor.getInputs().size()];

int index = 0;
for (Input input : processor.getInputs()) {
for (ProcessorInput input : processor.getInputs()) {
String sourceId = EsperUtils.getEventNameForSource(input.getSource());
Integer inputId = input.getId();
sourceIdToInputId[index++] = Pair.newInstance(sourceId, inputId);
sourceIdToInput[index++] = Pair.newInstance(sourceId, input);
}

outputAttributeName = processor.getOutput().getAttributeName();
outputEventId = EsperUtils.getEventNameForSource(processor);
}

Pair<String, Integer>[] getSourceIdToInputId() {
return Arrays.copyOf(sourceIdToInputId, sourceIdToInputId.length);
}
@Override
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
if (isMapEvent(newEvents)) {
MapEventBean mapEvent = (MapEventBean) newEvents[0];

Map<Integer, Event> eventsByInputId = eventsByInputIdsFromMapEvent(mapEvent);

@SuppressWarnings("unchecked")
Object output = processor.processEvent(ctx, eventsByInputId);

if (output != null && outputAttributeName != null) {
// todo create new event based on old event - what about name collisions??

Event outputEvent = new Event(outputAttributeName, output);
outputEvent = outputEvent.unionWith(eventsByInputId.values());

String getOutputAttributeName() {
return outputAttributeName;
runtime.sendEvent(outputEvent.getData(), outputEventId);
}
}
}

String getOutputEventId() {
return outputEventId;
/**
* Returns true if the specified set of {@link EventBean}s is non-null and the first item of which is a
* {@link MapEventBean}
*
* @param newEvents to inspect
* @return true if the newEvents is non-null and first element is a MapEventBean
*/
private boolean isMapEvent(EventBean[] newEvents) {
return newEvents != null && newEvents.length > 0 && newEvents[0] instanceof MapEventBean;
}

public void update(Map<String, Object> eventFromInput_1) {
Event event = new Event(eventFromInput_1);
Map<Integer, Event> eventsByInputId = Maps.newHashMapWithExpectedSize(1);
eventsByInputId.put(sourceIdToInputId[0].getSecond(), event);
private Map<Integer, Event> eventsByInputIdsFromMapEvent(MapEventBean mapEvent) {
Collection<Object> mapEventBeans = mapEvent.getProperties().values();
Map<Integer, Event> eventsByInputId = Maps.newHashMapWithExpectedSize(mapEventBeans.size());

for (Object mapEventBeanObj : mapEventBeans) {
MapEventBean mapEventBean = (MapEventBean) mapEventBeanObj;

@SuppressWarnings("unchecked")
Object output = processor.processEvent(ctx, eventsByInputId);
ProcessorInput input = getInputForSourceId(mapEventBean.getEventType().getName());

if (output != null && outputAttributeName != null) {
// todo create new event based on old event - what about name collisions??
if (input != null) {
// put the event for the input
eventsByInputId.put(input.getId(), new Event(mapEventBean.getProperties()));

Event outputEvent = new Event(outputAttributeName, output);
outputEvent = outputEvent.unionWith(event);
// if the input is part of a join, BUT the join is not required we need to put the SAME event in for the
// other side of the join
ProcessorJoin join = processor.getJoinForInput(input);
if (join != null && !join.isRequired()) {
ProcessorInput otherInput = join.getOtherInput(input);

runtime.sendEvent(outputEvent.getData(), outputEventId);
eventsByInputId.put(otherInput.getId(), new Event(mapEventBean.getProperties()));
}
}
}

return eventsByInputId;
}

// todo multiple inputs?
private ProcessorInput getInputForSourceId(String sourceId) {
ProcessorInput input = null;

public void update(Event eventFromInput_1, Event eventFromInput_2) {
for (int index = 0; index < sourceIdToInput.length; ++index) {
if (sourceIdToInput[index].getFirst().equals(sourceId)) {
input = sourceIdToInput[index].getSecond();
break;
}
}

return input;
}
}
10 changes: 10 additions & 0 deletions src/main/java/org/lisapark/octopus/core/event/Event.java
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.Maps;
import org.lisapark.octopus.core.Persistable;

import java.util.Collection;
import java.util.Map;

/**
Expand All @@ -27,6 +28,15 @@ public Event unionWith(Event event) {
return new Event(newData);
}

public Event unionWith(Collection<Event> events) {
Map<String, Object> newData = Maps.newHashMap(data);
for (Event event : events) {
newData.putAll(event.getData());
}

return new Event(newData);
}

public Map<String, Object> getData() {
return data;
}
Expand Down
23 changes: 18 additions & 5 deletions src/main/java/org/lisapark/octopus/core/processor/Addition.java
Expand Up @@ -18,7 +18,7 @@
* @author dave sinclair(david.sinclair@lisa-park.com)
*/
@Persistable
public class Addition extends DualInputProcessor<Void> {
public class Addition extends Processor<Void> {
private static final String DEFAULT_NAME = "Addition";
private static final String DEFAULT_DESCRIPTION = "Add 2 operands";

Expand All @@ -41,6 +41,16 @@ protected Addition(Addition additionToCopy) {
super(additionToCopy);
}

public ProcessorInput getFirstInput() {
// there are two inputs for addition
return getInputs().get(0);
}

public ProcessorInput getSecondInput() {
// there are two inputs for addition
return getInputs().get(1);
}

@Override
public Addition newInstance() {
return new Addition(UUID.randomUUID(), this);
Expand Down Expand Up @@ -72,8 +82,13 @@ public static Addition newTemplate() {
Addition addition = new Addition(processorId, DEFAULT_NAME, DEFAULT_DESCRIPTION);

// two double inputs
addition.setFirstInput(ProcessorInput.doubleInputWithId(FIRST_INPUT_ID).name("First Operand").description("First operand for addition"));
addition.setSecondInput(ProcessorInput.doubleInputWithId(SECOND_INPUT_ID).name("Second Operand").description("Second operand for addition"));
ProcessorInput<Double> firstInput = ProcessorInput.doubleInputWithId(FIRST_INPUT_ID).name("First Operand").description("First operand for addition").build();
addition.addInput(firstInput);

ProcessorInput<Double> secondInput = ProcessorInput.doubleInputWithId(SECOND_INPUT_ID).name("Second Operand").description("Second operand for addition").build();
addition.addInput(secondInput);

addition.addJoin(firstInput, secondInput);

// double output
try {
Expand All @@ -86,8 +101,6 @@ public static Addition newTemplate() {
return addition;
}

// todo need a join

static class CompiledAddition extends CompiledProcessor<Void> {
private final String firstAttributeName;
private final String secondAttributeName;
Expand Down
Expand Up @@ -7,24 +7,32 @@
import java.util.Map;
import java.util.UUID;

import static com.google.common.base.Preconditions.checkArgument;

/**
* @author dave sinclair(david.sinclair@lisa-park.com)
*/
public abstract class CompiledProcessor<MEMORY_TYPE> {
private final List<ProcessorInput> inputs;
private final List<ProcessorJoin> joins;
private final ProcessorOutput output;
private final UUID id;

protected CompiledProcessor(Processor<MEMORY_TYPE> processor) {
this.id = processor.getId();
this.inputs = processor.getInputs();
this.joins = processor.getJoins();
this.output = processor.getOutput();
}

public UUID getId() {
return id;
}

public List<ProcessorJoin> getJoins() {
return joins;
}

public List<ProcessorInput> getInputs() {
return inputs;
}
Expand All @@ -33,5 +41,19 @@ public ProcessorOutput getOutput() {
return output;
}

public ProcessorJoin getJoinForInput(ProcessorInput input) {
checkArgument(input != null, "input cannot be null");

ProcessorJoin join = null;
for (ProcessorJoin candidate : joins) {
if (candidate.getFirstInput().equals(input) || candidate.getSecondInput().equals(input)) {
join = candidate;
break;
}
}

return join;
}

public abstract Object processEvent(ProcessorContext<MEMORY_TYPE> ctx, Map<Integer, Event> eventsByInputId);
}

0 comments on commit 829923c

Please sign in to comment.