Skip to content

Commit

Permalink
fixup test failing
Browse files Browse the repository at this point in the history
  • Loading branch information
robfrank committed Feb 1, 2016
1 parent e3ca726 commit 5da169a
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 249 deletions.
11 changes: 11 additions & 0 deletions etl/pom.xml
Expand Up @@ -51,6 +51,17 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>${surefire.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
Expand Down
Expand Up @@ -29,6 +29,8 @@
import com.orientechnologies.orient.core.sql.filter.OSQLPredicate;
import com.orientechnologies.orient.etl.OETLProcessor.LOG_LEVELS;

import java.util.concurrent.TimeUnit;

/**
* ETL abstract component.
*/
Expand Down
Expand Up @@ -22,16 +22,31 @@
import com.orientechnologies.orient.core.record.impl.ODocument;

/**
* ETL basic component.
* ETL basic component. Each ETL component must implement this interface.
*/
public interface OETLComponent {

/**
* @return
*/
ODocument getConfiguration();

void configure(OETLProcessor iProcessor, ODocument iConfiguration, OCommandContext iSettings);
/**
* Called by the @OETLProcessor
* @param processor
* @param configuration
* @param context
*/
void configure(OETLProcessor processor, ODocument configuration, OCommandContext context);

void begin();

void end();

/**
* Return the symbolic name of the component
*
* @return the name of the component
*/
String getName();
}
Expand Up @@ -18,6 +18,10 @@

package com.orientechnologies.orient.etl;

import static com.orientechnologies.orient.etl.OETLProcessor.LOG_LEVELS.*;

import java.util.List;

import com.orientechnologies.common.concur.ONeedRetryException;
import com.orientechnologies.common.exception.OException;
import com.orientechnologies.common.log.OLogManager;
Expand All @@ -29,34 +33,24 @@
import com.orientechnologies.orient.etl.transformer.OTransformer;
import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph;

import java.util.List;

import static com.orientechnologies.orient.etl.OETLProcessor.LOG_LEVELS.DEBUG;
import static com.orientechnologies.orient.etl.OETLProcessor.LOG_LEVELS.ERROR;
import static com.orientechnologies.orient.etl.OETLProcessor.LOG_LEVELS.INFO;

/**
* ETL pipeline: sequence of OTransformer and a OLoader.
*
* @author Luca Garulli (l.garulli-at-orientechnologies.com)
*/
public class OETLPipeline {
protected final OETLProcessor processor;
protected final List<OTransformer> transformers;
protected final OLoader loader;
protected final OCommandContext context;
protected final LOG_LEVELS logLevel;
protected final int maxRetries;
protected boolean haltOnError;
protected ODatabaseDocumentTx db;
protected OrientBaseGraph graph;

public OETLPipeline(final OETLProcessor processor,
final List<OTransformer> transformers,
final OLoader loader,
final LOG_LEVELS logLevel,
final int maxRetries,
final boolean haltOnError) {
protected final OETLProcessor processor;
protected final List<OTransformer> transformers;
protected final OLoader loader;
protected final OCommandContext context;
protected final LOG_LEVELS logLevel;
protected final int maxRetries;
protected boolean haltOnError;
protected ODatabaseDocumentTx db;
protected OrientBaseGraph graph;

public OETLPipeline(final OETLProcessor processor, final List<OTransformer> transformers, final OLoader loader,
final LOG_LEVELS logLevel, final int maxRetries, final boolean haltOnError) {
this.processor = processor;
this.transformers = transformers;
this.loader = loader;
Expand Down Expand Up @@ -118,10 +112,10 @@ protected Object execute(final OExtractedItem source) {
if (current == null) {
if (logLevel == DEBUG) {
OLogManager.instance().warn(this, "Transformer [%s] returned null, skip rest of pipeline execution", t);
break;
break;
}
}
}
}
if (current != null)
// LOAD
loader.load(current, context);
Expand All @@ -142,8 +136,9 @@ protected Object execute(final OExtractedItem source) {
processor.out(ERROR, "Error in Pipeline execution: %s", e);
processor.getStats().incrementErrors();

if (!haltOnError)
if (!haltOnError) {
return null;
}

loader.rollback();
throw OException.wrapException(new OETLProcessHaltedException("Halt"), e);
Expand Down

0 comments on commit 5da169a

Please sign in to comment.