Skip to content

Commit

Permalink
clarify release/shutdown methods
Browse files Browse the repository at this point in the history
  • Loading branch information
jprante committed Feb 28, 2015
1 parent ff9b16e commit 318e95e
Show file tree
Hide file tree
Showing 14 changed files with 85 additions and 33 deletions.
27 changes: 3 additions & 24 deletions pom.xml
Expand Up @@ -7,7 +7,7 @@

<groupId>org.xbib.elasticsearch.plugin</groupId>
<artifactId>elasticsearch-river-jdbc</artifactId>
<version>1.4.0.10</version>
<version>1.4.0.11</version>

<packaging>jar</packaging>

Expand Down Expand Up @@ -166,7 +166,7 @@
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<version>2.18.1</version>
<configuration>
<properties>
<property>
Expand Down Expand Up @@ -297,7 +297,7 @@
</plugin>
<plugin>
<artifactId>maven-surefire-report-plugin</artifactId>
<version>2.17</version>
<version>2.18.1</version>
</plugin>
</plugins>
</reporting>
Expand Down Expand Up @@ -585,27 +585,6 @@
</distributionManagement>
</profile>

<profile>
<id>doclint-java8-disable</id>
<activation>
<os>
<name>mac os x</name>
</os>
<jdk>[1.8,</jdk>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<javadocExecutable>/Library/Java/JavaVirtualMachines/jdk1.8.0.jdk/Contents/Home/bin/javadoc</javadocExecutable>
<additionalparam>-Xdoclint:none</additionalparam>
</configuration>
</plugin>
</plugins>
</build>
</profile>

</profiles>
</project>
Expand Up @@ -70,7 +70,7 @@ public RiverRunnable(RiverFlow riverFlow, List<Map<String, Object>> input) {
}

/**
* Before the pipelines are executed, put the river defintions on the queue
* Before the pipelines are executed, put the river definitions on the queue
*
* @throws IOException
*/
Expand Down
Expand Up @@ -164,6 +164,12 @@ public void close() {
logger.debug("interrupting river thread");
riverThread.interrupt();
}
logger.debug("shutting down river flow");
try {
riverFlow.shutdown();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.info("river closed [{}/{}]", riverName.getType(), riverName.getName());
}

Expand Down
Expand Up @@ -233,4 +233,6 @@ public interface RiverContext {
*/
RiverContext release();

RiverContext shutdown();

}
Expand Up @@ -100,7 +100,7 @@ public interface RiverFlow<RC extends RiverContext> {
*
* @param cause the cause why metrics are logged
*/
void logMetrics(RiverContext riverContext, String cause);
void logMetrics(RC riverContext, String cause);

/**
* Execute this river flow
Expand Down Expand Up @@ -143,4 +143,6 @@ public interface RiverFlow<RC extends RiverContext> {

boolean isSuspensionThreadEnabled();

void shutdown() throws Exception;

}
Expand Up @@ -166,6 +166,8 @@ public interface RiverMouth<RC extends RiverContext> {
*/
void flush() throws IOException;

void release() throws Exception;

/**
* Shutdown river mouth and release all resources, e.g. bulk processor and client
*/
Expand Down
Expand Up @@ -350,11 +350,17 @@ public interface RiverSource<RC extends RiverContext> {
*/
void resume() throws Exception;

/**
* Release resources of source
* @throws Exception
*/
void release() throws Exception;

/**
* Shutdown source
*
* @throws IOException
*/
void shutdown() throws IOException;
void shutdown() throws IOException, SQLException;

}
Expand Up @@ -429,21 +429,43 @@ public boolean shouldTreatBinaryAsString() {
return shouldTreatBinaryAsString;
}

@Override
public SimpleRiverContext release() {
try {
if (mouth != null) {
mouth.release();
mouth = null;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
try {
if (source != null) {
source.release();
source = null;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return this;
}

@Override
public SimpleRiverContext shutdown() {
try {
if (mouth != null) {
mouth.shutdown();
mouth = null;
}
} catch (IOException e) {
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
try {
if (source != null) {
source.shutdown();
source = null;
}
} catch (IOException e) {
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return this;
Expand Down
Expand Up @@ -68,6 +68,8 @@ public class SimpleRiverFlow<RC extends RiverContext> implements RiverFlow<RC> {

private RiverName riverName;

private RC riverContext;

private Settings settings;

private IngestFactory ingestFactory;
Expand All @@ -90,7 +92,8 @@ public SimpleRiverFlow<RC> newInstance() {

@Override
public RC newRiverContext() {
return (RC) new SimpleRiverContext();
this.riverContext = (RC) new SimpleRiverContext();
return riverContext;
}

@Override
Expand Down Expand Up @@ -379,7 +382,7 @@ protected RC fillRiverContext(RC riverContext, RiverState state,
}

@Override
public void logMetrics(RiverContext riverContext, String cause) {
public void logMetrics(RC riverContext, String cause) {
MeterMetric metric = getMetric();
if (metric == null) {
return;
Expand Down Expand Up @@ -419,4 +422,8 @@ public void logMetrics(RiverContext riverContext, String cause) {
);
}

public void shutdown() throws Exception {
riverContext.shutdown();
}

}
Expand Up @@ -132,6 +132,15 @@ public synchronized void afterFetch() throws IOException {
}
}

@Override
public synchronized void release() {
try {
flush();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}

@Override
public synchronized void shutdown() {
try {
Expand Down
Expand Up @@ -357,11 +357,11 @@ public void fetch() throws SQLException, IOException {
@Override
public void afterFetch() throws Exception {
context.setLastEndDate(new DateTime().getMillis());
shutdown();
release();
}

@Override
public void shutdown() {
public void release() throws SQLException {
closeReading();
logger.debug("read connection closed");
readConnection = null;
Expand All @@ -370,6 +370,11 @@ public void shutdown() {
writeConnection = null;
}

@Override
public void shutdown() throws SQLException {
release();
}

/**
* Execute SQL query command without parameter binding.
*
Expand Down
Expand Up @@ -153,4 +153,8 @@ public RiverContext release() {
return this;
}

@Override
public RiverContext shutdown() {
return this;
}
}
Expand Up @@ -118,6 +118,10 @@ public String getId() {
public void flush() throws IOException {
}

@Override
public void release() throws IOException {
}

@Override
public void shutdown() throws IOException {
}
Expand Down
Expand Up @@ -198,6 +198,10 @@ public void suspend() throws Exception {
public void resume() throws Exception {
}

@Override
public void release() throws IOException {
}

@Override
public void shutdown() throws IOException {
}
Expand Down

0 comments on commit 318e95e

Please sign in to comment.