diff --git a/README.md b/README.md index 7100d7fb..f9173fdf 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Creating a JDBC river is easy: - install the plugin -- download a JDBC driver jar from your vendor's site (for example MySQL) and put the jar into the folder of the plugin `$ES_HOME/plugins/river-jdbc`. +- download a JDBC driver jar from your vendor's site (for example MySQL) and put the jar into the folder of the plugin `$ES_HOME/plugins/jdbc`. Assuming you have a table of name `orders`, you can issue this simple command from the command line @@ -159,7 +159,7 @@ Internet access (of course) 5. Add MySQL JDBC driver jar to JDBC river plugin directory and set access permission for .jar file (at least chmod 644) `cp mysql-connector-java-5.1.33-bin.jar $ES_HOME/plugins/jdbc/` - `chmod 644 $ES_HOME/plugins/jdbc/` + `chmod 644 $ES_HOME/plugins/jdbc/*` 6. Start elasticsearch from terminal window diff --git a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/cron/CronThreadPoolExecutor.java b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/cron/CronThreadPoolExecutor.java index a616df67..18feba73 100644 --- a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/cron/CronThreadPoolExecutor.java +++ b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/cron/CronThreadPoolExecutor.java @@ -15,6 +15,9 @@ */ package org.xbib.elasticsearch.plugin.jdbc.cron; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; + import java.util.Date; import java.util.concurrent.CancellationException; import java.util.concurrent.Future; @@ -29,6 +32,9 @@ * to calculate future execution times for scheduled tasks. */ public class CronThreadPoolExecutor extends ScheduledThreadPoolExecutor implements CronExecutorService { + + private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.CronThreadPoolExecutor"); + /** * Constructs a new CronThreadPoolExecutor. * @@ -74,11 +80,8 @@ public Future schedule(final Runnable task, final CronExpression expression) if (task == null) { throw new NullPointerException(); } - this.setCorePoolSize(this.getCorePoolSize() + 1); + setCorePoolSize(getCorePoolSize() + 1); Runnable scheduleTask = new Runnable() { - /** - * @see Runnable#run() - */ @Override public void run() { Date now = new Date(); @@ -92,13 +95,10 @@ public void run() { } time = expression.getNextValidTimeAfter(now); } - } catch (RejectedExecutionException e) { - // - } catch (CancellationException e) { - // } catch (InterruptedException e) { - // Thread.currentThread().interrupt(); + } catch (RejectedExecutionException | CancellationException e) { + logger.error(e.getMessage(), e); } } }; diff --git a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/JDBCFeeder.java b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/JDBCFeeder.java index 622225e3..60650380 100644 --- a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/JDBCFeeder.java +++ b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/JDBCFeeder.java @@ -186,7 +186,7 @@ private List> schedule(Thread thread) { Long seconds = settings.getAsTime("interval", TimeValue.timeValueSeconds(0)).seconds(); if (schedule != null && schedule.length > 0) { CronThreadPoolExecutor cronThreadPoolExecutor = - new CronThreadPoolExecutor(settings.getAsInt("threadpoolsize", 4)); + new CronThreadPoolExecutor(settings.getAsInt("threadpoolsize", 1)); for (String cron : schedule) { futures.add(cronThreadPoolExecutor.schedule(thread, new CronExpression(cron))); } diff --git a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/river/JDBCRiver.java b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/river/JDBCRiver.java index 944dbd16..0650d90d 100644 --- a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/river/JDBCRiver.java +++ b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/river/JDBCRiver.java @@ -189,7 +189,7 @@ private List> schedule(Thread thread) { List> futures = newLinkedList(); if (schedule != null && schedule.length > 0) { CronThreadPoolExecutor cronThreadPoolExecutor = - new CronThreadPoolExecutor(settings.getAsInt("threadpoolsize", 4)); + new CronThreadPoolExecutor(settings.getAsInt("threadpoolsize", 1)); for (String cron : schedule) { futures.add(cronThreadPoolExecutor.schedule(thread, new CronExpression(cron))); } diff --git a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/state/RiverState.java b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/state/RiverState.java index 1f7a270f..5ef1baef 100644 --- a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/state/RiverState.java +++ b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/state/RiverState.java @@ -41,8 +41,6 @@ */ public class RiverState implements Streamable, ToXContent, Comparable { - private final static DateTime EMPTY_DATETIME = new DateTime(0L); - /** * The name of the river instance */ @@ -61,12 +59,12 @@ public class RiverState implements Streamable, ToXContent, Comparable getCustom() { return (Map) this.map.get("custom"); } - public boolean isAborted() { - return map.containsKey("aborted") ? (Boolean) map.get("aborted") : false; - } - public boolean isSuspended() { return map.containsKey("suspended") ? (Boolean) map.get("suspended") : false; } + public RiverState setLastStartDate(long lastStartDate) { + this.map.put("lastStartDate", lastStartDate); + return this; + } + + public long getLastStartDate() { + return (long)this.map.get("lastStartDate"); + } + + public RiverState setLastEndDate(long lastEndDate) { + this.map.put("lastEndDate", lastEndDate); + return this; + } + + public long getLastEndDate() { + return (long)this.map.get("lastEndDate"); + } + + public RiverState setLastExecutionStartDate(long lastExecutionStartDate) { + this.map.put("lastExecutionStartDate", lastExecutionStartDate); + return this; + } + + public long getLastExecutionStartDate() { + return (long)this.map.get("lastExecutionStartDate"); + } + + public RiverState setLastExecutionEndDate(long lastExecutionEndDate) { + this.map.put("lastExecutionEndDate", lastExecutionEndDate); + return this; + } + + public long getLastExecutionEndDate() { + return (long)this.map.get("lastExecutionEndDate"); + } + public RiverState fromXContent(XContentParser parser) throws IOException { DateTimeFormatter dateTimeFormatter = ISODateTimeFormat.dateOptionalTimeParser().withZone(DateTimeZone.UTC); Long startTimestamp = 0L; - Long begin = 0L; - Long end = 0L; + Long begin = null; + Long end = null; String name = null; String type = null; String currentFieldName = null; @@ -213,11 +223,11 @@ public RiverState fromXContent(XContentParser parser) throws IOException { break; case "last_active_begin": begin = parser.text() != null && !"null".equals(parser.text()) ? - dateTimeFormatter.parseMillis(parser.text()) : 0L; + dateTimeFormatter.parseMillis(parser.text()) : null; break; case "last_active_end": end = parser.text() != null && !"null".equals(parser.text()) ? - dateTimeFormatter.parseMillis(parser.text()) : 0L; + dateTimeFormatter.parseMillis(parser.text()) : null; break; } } else if (token == START_OBJECT) { @@ -228,7 +238,8 @@ public RiverState fromXContent(XContentParser parser) throws IOException { .setName(name) .setType(type) .setStarted(new DateTime(startTimestamp)) - .setLastActive(new DateTime(begin), new DateTime(end)) + .setLastActive(begin != null ? new DateTime(begin) : null, + end != null ? new DateTime(end) : null) .setMap(map); } @@ -250,9 +261,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public void readFrom(StreamInput in) throws IOException { this.name = in.readOptionalString(); this.type = in.readOptionalString(); - this.started = new DateTime(in.readLong()); - this.begin = new DateTime(in.readLong()); - this.end = new DateTime(in.readLong()); + if (in.readBoolean()) { + this.started = new DateTime(in.readLong()); + } + if (in.readBoolean()) { + this.lastActiveBegin = new DateTime(in.readLong()); + } + if (in.readBoolean()) { + this.lastActiveEnd = new DateTime(in.readLong()); + } map = in.readMap(); } @@ -260,9 +277,24 @@ public void readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(name); out.writeOptionalString(type); - out.writeLong(started != null ? started.getMillis() : 0L); - out.writeLong(begin != null ? begin.getMillis() : 0L); - out.writeLong(end != null ? end.getMillis() : 0L); + if (started != null) { + out.writeBoolean(true); + out.writeLong(started.getMillis()); + } else { + out.writeBoolean(false); + } + if (lastActiveBegin != null) { + out.writeBoolean(true); + out.writeLong(lastActiveBegin.getMillis()); + } else { + out.writeBoolean(false); + } + if (lastActiveEnd != null) { + out.writeBoolean(true); + out.writeLong(lastActiveEnd.getMillis()); + } else { + out.writeBoolean(false); + } out.writeMap(map); } diff --git a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverContext.java b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverContext.java index 032a0903..482b8fab 100644 --- a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverContext.java +++ b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverContext.java @@ -110,14 +110,6 @@ public class SimpleRiverContext implements RiverContext { private long lastRowCount; - private long lastStartDate; - - private long lastEndDate; - - private long lastExecutionStartDate; - - private long lastExecutionEndDate; - private Map columnNameMap; private Map lastRow = new HashMap(); @@ -337,39 +329,39 @@ public long getLastRowCount() { } public SimpleRiverContext setLastStartDate(long lastStartDate) { - this.lastStartDate = lastStartDate; + riverState.setLastStartDate(lastStartDate); return this; } public long getLastStartDate() { - return lastStartDate; + return riverState.getLastStartDate(); } public SimpleRiverContext setLastEndDate(long lastEndDate) { - this.lastEndDate = lastEndDate; + riverState.setLastEndDate(lastEndDate); return this; } public long getLastEndDate() { - return lastEndDate; + return riverState.getLastEndDate(); } public SimpleRiverContext setLastExecutionStartDate(long lastExecutionStartDate) { - this.lastExecutionStartDate = lastExecutionStartDate; + riverState.setLastExecutionStartDate(lastExecutionStartDate); return this; } public long getLastExecutionStartDate() { - return lastExecutionStartDate; + return riverState.getLastExecutionStartDate(); } public SimpleRiverContext setLastExecutionEndDate(long lastExecutionEndDate) { - this.lastExecutionEndDate = lastExecutionEndDate; + riverState.setLastExecutionEndDate(lastExecutionEndDate); return this; } public long getLastExecutionEndDate() { - return lastExecutionEndDate; + return riverState.getLastExecutionEndDate(); } public SimpleRiverContext setColumnNameMap(Map columnNameMap) { @@ -473,10 +465,10 @@ public Map asMap() { .field("shouldignorenull", shouldIgnoreNull) .field("lastResultSetMetadata", lastResultSetMetadata) .field("lastDatabaseMetadata", lastDatabaseMetadata) - .field("lastStartDate", lastStartDate) - .field("lastEndDate", lastEndDate) - .field("lastExecutionStartDate", lastExecutionStartDate) - .field("lastExecutionEndDate", lastExecutionEndDate) + .field("lastStartDate", riverState.getLastStartDate()) + .field("lastEndDate", riverState.getLastEndDate()) + .field("lastExecutionStartDate", riverState.getLastExecutionStartDate()) + .field("lastExecutionEndDate", riverState.getLastExecutionEndDate()) .field("columnNameMap", columnNameMap) .field("lastRow", lastRow) .field("sql", sql) diff --git a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverFlow.java b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverFlow.java index 84e7fab4..0c950265 100644 --- a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverFlow.java +++ b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverFlow.java @@ -164,6 +164,7 @@ protected void beforeFetch(RC riverContext) throws Exception { RiverState riverState = riverStateResponse.getRiverState(); // if river state was not defined yet, define it now if (riverState == null) { + logger.debug("river state not found, creating new state"); riverState = new RiverState() .setName(riverName.getName()) .setType(riverName.getType()) @@ -253,16 +254,7 @@ protected void afterFetch(RiverContext riverContext) throws Exception { logger.warn("no river mouth"); return; } - try { - riverContext.getRiverMouth().afterFetch(); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - try { - riverContext.getRiverSource().afterFetch(); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } + // set activity RiverState riverState = riverContext.getRiverState() .setLastActive(riverContext.getRiverState().getLastActiveBegin(), new DateTime()); PostRiverStateRequestBuilder postRiverStateRequestBuilder = new PostRiverStateRequestBuilder(client.admin().cluster()) @@ -274,6 +266,16 @@ protected void afterFetch(RiverContext riverContext) throws Exception { logger.warn("post river state not acknowledged: {}/{}", riverName.getName(), riverName.getType()); } logger.debug("after fetch: state posted = {}", riverState); + try { + riverContext.getRiverMouth().afterFetch(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + try { + riverContext.getRiverSource().afterFetch(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } } protected RiverSource createRiverSource(Map params) { diff --git a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverMouth.java b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverMouth.java index 2fc2e3f5..51ba9be8 100644 --- a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverMouth.java +++ b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverMouth.java @@ -42,11 +42,11 @@ * index name housekeeping (with replica/refresh), and metrics. It understands * _version, _routing, _timestamp, _parent, and _ttl metadata. */ -public class SimpleRiverMouth implements RiverMouth { +public class SimpleRiverMouth implements RiverMouth { private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.SimpleRiverMouth"); - protected SimpleRiverContext context; + protected RC context; protected IngestFactory ingestFactory; @@ -72,12 +72,12 @@ public String strategy() { } @Override - public SimpleRiverMouth newInstance() { - return new SimpleRiverMouth(); + public SimpleRiverMouth newInstance() { + return new SimpleRiverMouth(); } @Override - public SimpleRiverMouth setRiverContext(SimpleRiverContext context) { + public SimpleRiverMouth setRiverContext(RC context) { this.context = context; return this; } diff --git a/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/AbstractSimpleRiverTest.java b/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/AbstractSimpleRiverTest.java index e7fb41b2..1020a04c 100644 --- a/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/AbstractSimpleRiverTest.java +++ b/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/AbstractSimpleRiverTest.java @@ -326,7 +326,8 @@ public static RiverState waitForActiveRiver(Client client, String riverName, Str GetRiverStateResponse riverStateResponse = client.admin().cluster() .execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet(); RiverState riverState = riverStateResponse.getRiverState(); - long t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L; + long t0 = riverState != null && riverState.getLastActiveBegin() != null ? + riverState.getLastActiveBegin().getMillis() : 0L; logger.debug("waitForActiveRiver: now={} t0={} t0 0 && t0 == 0 && t0 < now) { @@ -354,8 +355,10 @@ public static RiverState waitForInactiveRiver(Client client, String riverName, S GetRiverStateResponse riverStateResponse = client.admin().cluster() .execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet(); RiverState riverState = riverStateResponse.getRiverState(); - long t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L; - long t1 = riverState != null ? riverState.getLastActiveEnd().getMillis() : 0L; + long t0 = riverState != null && riverState.getLastActiveBegin() != null ? + riverState.getLastActiveBegin().getMillis() : 0L; + long t1 = riverState != null && riverState.getLastActiveEnd() != null ? + riverState.getLastActiveEnd().getMillis() : 0L; logger.debug("waitForInactiveRiver: now={} t0 0 && t0 < now && t1 - t0 <= 0L) { @@ -363,8 +366,10 @@ public static RiverState waitForInactiveRiver(Client client, String riverName, S try { riverStateResponse = client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet(); riverState = riverStateResponse.getRiverState(); - t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L; - t1 = riverState != null ? riverState.getLastActiveEnd().getMillis() : 0L; + t0 = riverState != null && riverState.getLastActiveBegin() != null ? + riverState.getLastActiveBegin().getMillis() : 0L; + t1 = riverState != null && riverState.getLastActiveEnd() != null ? + riverState.getLastActiveEnd().getMillis() : 0L; } catch (IndexMissingException e) { logger.warn("index missing"); }