Skip to content

Commit

Permalink
Merge pull request #137 from psliwa/master
Browse files Browse the repository at this point in the history
New strategy: "column"
  • Loading branch information
jprante committed Dec 10, 2013
2 parents 8333293 + c237d3c commit 93127d5
Show file tree
Hide file tree
Showing 45 changed files with 1,524 additions and 3 deletions.
29 changes: 28 additions & 1 deletion pom.xml
Expand Up @@ -262,7 +262,6 @@
</reporting>

<profiles>

<profile>
<id>derby</id>
<activation>
Expand Down Expand Up @@ -413,6 +412,34 @@
</plugins>
</build>
</profile>

<profile>
<id>column-strategy</id>
<dependencies>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.9.1.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<derby.system.home>${project.build.directory}/derby</derby.system.home>
<derby.stream.error.file>${project.build.directory}/derby.log</derby.stream.error.file>
</systemPropertyVariables>
<suiteXmlFiles>
<suiteXmlFile>${basedir}/src/test/resources/column-testsuite-derby.xml</suiteXmlFile>
</suiteXmlFiles>
</configuration>
</plugin>
</plugins>
</build>
</profile>

</profiles>

Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/xbib/elasticsearch/river/jdbc/JDBCRiver.java
Expand Up @@ -66,6 +66,10 @@ public class JDBCRiver extends AbstractRiverComponent implements River {
private final String acksql;
private final List<? super Object> acksqlparams;
private final boolean autocommit;
private final String columnUpdatedAt;
private final String columnCreatedAt;
private final String columnDeletedAt;
private final boolean columnEscape;
private final int fetchsize;
private final int maxrows;
private final int maxretries;
Expand Down Expand Up @@ -110,6 +114,10 @@ public JDBCRiver(RiverName riverName, RiverSettings riverSettings,
rounding = XContentMapValues.nodeStringValue(sourceSettings.get("rounding"), null);
scale = XContentMapValues.nodeIntegerValue(sourceSettings.get("scale"), 2);
autocommit = XContentMapValues.nodeBooleanValue(sourceSettings.get("autocommit"), Boolean.FALSE);
columnCreatedAt = XContentMapValues.nodeStringValue(sourceSettings.get("column_created_at"), "created_at");
columnUpdatedAt = XContentMapValues.nodeStringValue(sourceSettings.get("column_updated_at"), "updated_at");
columnDeletedAt = XContentMapValues.nodeStringValue(sourceSettings.get("column_deleted_at"), null);
columnEscape = XContentMapValues.nodeBooleanValue(sourceSettings.get("column_escape"), true);
fetchsize = url.startsWith("jdbc:mysql") ? Integer.MIN_VALUE :
XContentMapValues.nodeIntegerValue(sourceSettings.get("fetchsize"), 10);
maxrows = XContentMapValues.nodeIntegerValue(sourceSettings.get("max_rows"), 0);
Expand Down Expand Up @@ -166,6 +174,10 @@ public JDBCRiver(RiverName riverName, RiverSettings riverSettings,
.pollAckStatement(acksql)
.pollAckStatementParams(acksqlparams)
.autocommit(autocommit)
.columnCreatedAt(columnCreatedAt)
.columnUpdatedAt(columnUpdatedAt)
.columnDeletedAt(columnDeletedAt)
.columnEscape(columnEscape)
.maxRows(maxrows)
.fetchSize(fetchsize)
.retries(maxretries)
Expand Down
@@ -0,0 +1,131 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.xbib.elasticsearch.river.jdbc.strategy.column;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.indices.IndexMissingException;
import org.xbib.elasticsearch.river.jdbc.JDBCRiver;
import static org.xbib.elasticsearch.river.jdbc.RiverFlow.ID_INFO_RIVER_INDEX;
import org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverFlow;

/**
* River flow implementation for the 'column' strategy
*
* @author Piotr Śliwa <piotr.sliwa@zineinc.com>
*/
public class ColumnRiverFlow extends SimpleRiverFlow {

private final ESLogger logger = ESLoggerFactory.getLogger(ColumnRiverFlow.class.getName());

static final String LAST_RUN_TIME = "last_run_time";
static final String CURRENT_RUN_STARTED_TIME = "current_run_started_time";

@Override
public String strategy() {
return "column";
}

@Override
public void move() {
try {
TimeValue lastRunTime = readLastRunTimeFromCustomInfo();
TimeValue currentTime = new TimeValue(new java.util.Date().getTime());

writeTimesToJdbcSettings(lastRunTime, currentTime);

context.riverSource().fetch();

writeCustomInfo(currentTime.millis());
} catch (Exception e) {
logger.error(e.getMessage(), e);
abort = true;
}
}

private TimeValue readLastRunTimeFromCustomInfo() throws IOException {
try {
GetResponse response = client().prepareGet(context.riverIndexName(), context.riverName(), ID_INFO_RIVER_INDEX).execute().actionGet();

if(response != null && response.isExists()) {
Map jdbcState = (Map) response.getSourceAsMap().get("jdbc");

if(jdbcState != null) {
Number lastRunTime = (Number) jdbcState.get(LAST_RUN_TIME);

if(lastRunTime != null) {
return new TimeValue(lastRunTime.longValue());
}
} else {
throw new IOException("can't retrieve previously persisted state from " + context.riverIndexName() + "/" + context.riverName());
}
}
} catch(IndexMissingException e) {
logger.warn("river state missing: {}/{}/{}", context.riverIndexName(), context.riverName(), ID_INFO_RIVER_INDEX);
}

return null;
}

private Client client() {
return context.riverMouth().client();
}

private void writeCustomInfo(long lastRunAt) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();

builder
.startObject()
.startObject("jdbc")
.field(LAST_RUN_TIME, lastRunAt)
.endObject()
.endObject();

client().prepareBulk()
.add(Requests.indexRequest(context.riverIndexName())
.type(context.riverName())
.id(ID_INFO_RIVER_INDEX)
.source(builder)
)
.execute()
.actionGet();
}

private void writeTimesToJdbcSettings(TimeValue lastRunTime, TimeValue currentTime) {
Map<String, Object> jdbcSettings = (Map<String, Object>) context.riverSettings().get(JDBCRiver.TYPE);

if(jdbcSettings == null) {
jdbcSettings = new HashMap<String, Object>();
context.riverSettings().put(JDBCRiver.TYPE, jdbcSettings);
}

jdbcSettings.put(LAST_RUN_TIME, lastRunTime);
jdbcSettings.put(CURRENT_RUN_STARTED_TIME, currentTime);
}
}
@@ -0,0 +1,36 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.xbib.elasticsearch.river.jdbc.strategy.column;

import org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverMouth;

/**
* River mouth implementation for the 'column' strategy
*
* @author Piotr Śliwa <piotr.sliwa@zineinc.com>
*/
public class ColumnRiverMouth extends SimpleRiverMouth {

@Override
public String strategy() {
return "column";
}

}

0 comments on commit 93127d5

Please sign in to comment.