Skip to content

Commit

Permalink
code cleanup: initialization of component moved on different method, …
Browse files Browse the repository at this point in the history
…avoid use of null for check in Orient loader
  • Loading branch information
robfrank committed Nov 27, 2015
1 parent 0fdf1f3 commit bf4b272
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 83 deletions.
184 changes: 108 additions & 76 deletions etl/src/main/java/com/orientechnologies/orient/etl/OETLProcessor.java
Expand Up @@ -133,7 +133,7 @@ public static OETLProcessor parseConfigAndParameters(String[] args) {
}
}

// override with args passes by command line
// override with args passed by command line
for (final String arg : args) {
if (arg.charAt(0) == '-') {
final String[] parts = arg.substring(1).split("=");
Expand Down Expand Up @@ -178,73 +178,125 @@ public OETLProcessor parse(final Collection<ODocument> iBeginBlocks, final ODocu
init();

try {
String name;

// BEGIN BLOCKS
beginBlocks = new ArrayList<OBlock>();
if (iBeginBlocks != null)
for (ODocument block : iBeginBlocks) {
name = block.fieldNames()[0];
final OBlock b = factory.getBlock(name);
beginBlocks.add(b);
configureComponent(b, (ODocument) block.field(name), iContext);
//Execution is necessary to resolve let blocks and provide resolved variables to other components
b.execute();
}
configureBeginBlocks(iBeginBlocks, iContext);

if (iSource != null) {
// SOURCE
name = iSource.fieldNames()[0];
source = factory.getSource(name);
configureComponent(source, (ODocument) iSource.field(name), iContext);
} else {
source = factory.getSource("input");
}
configureSource(iSource, iContext);

// EXTRACTOR
name = iExtractor.fieldNames()[0];
extractor = factory.getExtractor(name);
configureComponent(extractor, (ODocument) iExtractor.field(name), iContext);

if (iLoader != null) {
// LOADER
name = iLoader.fieldNames()[0];
loader = factory.getLoader(name);
configureComponent(loader, (ODocument) iLoader.field(name), iContext);
} else {
loader = factory.getLoader("output");
}
configureExtractors(iExtractor, iContext);

// TRANSFORMERS
transformers = new ArrayList<OTransformer>();
if (iTransformers != null) {
for (ODocument t : iTransformers) {
name = t.fieldNames()[0];
final OTransformer tr = factory.getTransformer(name);
transformers.add(tr);
configureComponent(tr, (ODocument) t.field(name), iContext);
}
}
configureLoader(iLoader, iContext);

// END BLOCKS
endBlocks = new ArrayList<OBlock>();
if (iEndBlocks != null) {
for (ODocument block : iEndBlocks) {
name = block.fieldNames()[0];
final OBlock b = factory.getBlock(name);
endBlocks.add(b);
configureComponent(b, (ODocument) block.field(name), iContext);
}
}
configureTransformers(iTransformers, iContext);

// analyzeFlow();
configureEndBlocks(iEndBlocks, iContext);

//isn't working right now
// analyzeFlow();

} catch (Exception e) {
throw OException.wrapException(new OConfigurationException("Error on creating ETL processor"), e);
}
return this;
}

protected void init() {
final String cfgLog = (String) context.getVariable("log");
if (cfgLog != null)
logLevel = LOG_LEVELS.valueOf(cfgLog.toUpperCase());

final Boolean cfgHaltOnError = (Boolean) context.getVariable("haltOnError");
if (cfgHaltOnError != null)
haltOnError = cfgHaltOnError;

final Object parallelSetting = context.getVariable("parallel");
if (parallelSetting != null)
parallel = (Boolean) parallelSetting;

if (parallel) {
final int cores = Runtime.getRuntime().availableProcessors();
threads = new Thread[cores];
for (int i = 0; i < cores; ++i) {
threads[i] = new Thread("OrientDB ETL Pipeline-" + i);
}
}
}

private void configureEndBlocks(Collection<ODocument> iEndBlocks, OCommandContext iContext)
throws IllegalAccessException, InstantiationException {
String name;// END BLOCKS
endBlocks = new ArrayList<OBlock>();
if (iEndBlocks != null) {
for (ODocument block : iEndBlocks) {
name = block.fieldNames()[0];
final OBlock b = factory.getBlock(name);
endBlocks.add(b);
configureComponent(b, (ODocument) block.field(name), iContext);
}
}
}

private void configureTransformers(Collection<ODocument> iTransformers, OCommandContext iContext)
throws IllegalAccessException, InstantiationException {
String name;// TRANSFORMERS
transformers = new ArrayList<OTransformer>();
if (iTransformers != null) {
for (ODocument t : iTransformers) {
name = t.fieldNames()[0];
final OTransformer tr = factory.getTransformer(name);
transformers.add(tr);
configureComponent(tr, (ODocument) t.field(name), iContext);
}
}
}

private void configureLoader(ODocument iLoader, OCommandContext iContext) throws IllegalAccessException, InstantiationException {
String name;
if (iLoader != null) {
// LOADER
name = iLoader.fieldNames()[0];
loader = factory.getLoader(name);
configureComponent(loader, (ODocument) iLoader.field(name), iContext);
} else {
loader = factory.getLoader("output");
}
}

private void configureExtractors(ODocument iExtractor, OCommandContext iContext)
throws IllegalAccessException, InstantiationException {
String name;// EXTRACTOR
name = iExtractor.fieldNames()[0];
extractor = factory.getExtractor(name);
configureComponent(extractor, (ODocument) iExtractor.field(name), iContext);
}

private void configureSource(ODocument iSource, OCommandContext iContext) throws IllegalAccessException, InstantiationException {
String name;
if (iSource != null) {
// SOURCE
name = iSource.fieldNames()[0];
source = factory.getSource(name);
configureComponent(source, (ODocument) iSource.field(name), iContext);
} else {
source = factory.getSource("input");
}
}

private void configureBeginBlocks(Collection<ODocument> iBeginBlocks, OCommandContext iContext)
throws IllegalAccessException, InstantiationException {
String name;// BEGIN BLOCKS
beginBlocks = new ArrayList<OBlock>();
if (iBeginBlocks != null) {
for (ODocument block : iBeginBlocks) {
name = block.fieldNames()[0];
final OBlock b = factory.getBlock(name);
beginBlocks.add(b);
configureComponent(b, (ODocument) block.field(name), iContext);
//Execution is necessary to resolve let blocks and provide resolved variables to other components
b.execute();
}
}
}

public OETLComponentFactory getFactory() {
return factory;
}
Expand Down Expand Up @@ -575,27 +627,7 @@ protected void checkTypeCompatibility(final OETLComponent iCurrentComponent, fin

}

protected void init() {
final String cfgLog = (String) context.getVariable("log");
if (cfgLog != null)
logLevel = LOG_LEVELS.valueOf(cfgLog.toUpperCase());

final Boolean cfgHaltOnError = (Boolean) context.getVariable("haltOnError");
if (cfgHaltOnError != null)
haltOnError = cfgHaltOnError;

final Object parallelSetting = context.getVariable("parallel");
if (parallelSetting != null)
parallel = (Boolean) parallelSetting;

if (parallel) {
final int cores = Runtime.getRuntime().availableProcessors();
threads = new Thread[cores];
for (int i = 0; i < cores; ++i) {
threads[i] = new Thread("OrientDB ETL Pipeline-" + i);
}
}
}

public enum LOG_LEVELS {
NONE, ERROR, INFO, DEBUG
Expand Down
Expand Up @@ -45,7 +45,7 @@ public ODocument getConfiguration() {
+ "{quote:{optional:true,description:'String character delimiter. Use \"\" to do not use any delimitator'}},"
+ "{ignoreEmptyLines:{optional:true,description:'Ignore empty lines',type:'boolean'}},"
+ "{skipFrom:{optional:true,description:'Line number where start to skip',type:'int'}},"
+ "{skipTo:{optional:true,description:'Line number where skip ends',type:'int'}}"
+ "{skipTo:{optional:true,description:'Line number where skip ends',type:'int'}},"
+ "{predefinedFormat:{optional:true,description:'Name of standard csv format (from Apache commons-csv): DEFAULT, EXCEL, MYSQL, RFC4180, TDF',type:'String'}}"
+ "],input:['String'],output:'ODocument'}");
}
Expand Down
Expand Up @@ -65,7 +65,7 @@ public class OOrientDBLoader extends OAbstractLoader implements OLoader {
protected long batchCounter = 0;
protected DB_TYPE dbType = DOCUMENT;
protected boolean wal = true;
protected Boolean txUseLog = null;
protected boolean txUseLog = false;

protected enum DB_TYPE {
DOCUMENT, GRAPH
Expand All @@ -83,7 +83,6 @@ public void load(final Object input, OCommandContext context) {
if (dbType == DOCUMENT) {
if (input instanceof ODocument) {
final ODocument doc = (ODocument) input;
final ODatabaseDocumentTx documentDatabase = pipeline.getDocumentDatabase();
final OClass cls;
if (className != null)
cls = getOrCreateClass(className, null);
Expand Down Expand Up @@ -192,8 +191,7 @@ public void load(final Object input, OCommandContext context) {

private void beginTransaction(final ODatabaseDocumentTx db) {
db.begin();
if (txUseLog != null)
db.getTransaction().setUsingLog(txUseLog);
db.getTransaction().setUsingLog(txUseLog);
}

@Override
Expand Down Expand Up @@ -268,8 +266,8 @@ public void configure(final OETLProcessor iProcessor, final ODocument iConfigura
}
}

if (!wal)
OGlobalConfiguration.USE_WAL.setValue(wal);
//use wal or not
OGlobalConfiguration.USE_WAL.setValue(wal);

switch (dbType) {
case DOCUMENT:
Expand Down

0 comments on commit bf4b272

Please sign in to comment.