Skip to content
This repository has been archived by the owner on May 22, 2024. It is now read-only.

Commit

Permalink
#963 - implemented shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
g4s8 committed Aug 4, 2018
1 parent 7d466a7 commit 33892d6
Show file tree
Hide file tree
Showing 10 changed files with 330 additions and 49 deletions.
42 changes: 40 additions & 2 deletions src/main/java/com/zerocracy/claims/ClaimsRoutine.java
Expand Up @@ -26,6 +26,7 @@
import com.jcabi.xml.XMLDocument;
import com.zerocracy.Farm;
import com.zerocracy.entry.ExtSqs;
import com.zerocracy.shutdown.ShutdownFarm;
import java.io.Closeable;
import java.util.HashSet;
import java.util.LinkedList;
Expand Down Expand Up @@ -90,10 +91,12 @@ public ClaimsRoutine(final Farm farm, final Proc<List<Message>> proc) {

/**
* Start routine.
*
* @param shutdown Shutdown hook
*/
public void start() {
public void start(final ShutdownFarm.Hook shutdown) {
this.service.scheduleWithFixedDelay(
new VerboseRunnable(this),
new VerboseRunnable(new ShutdownRunnable(this, shutdown)),
0L,
ClaimsRoutine.DELAY,
TimeUnit.SECONDS
Expand Down Expand Up @@ -148,4 +151,39 @@ public void run() {
public void close() {
this.service.shutdown();
}

/**
* Runnable decorator with shutdown hook check.
*/
private static final class ShutdownRunnable implements Runnable {

/**
* Origin runnable.
*/
private final Runnable origin;

/**
* Shutdown hook.
*/
private final ShutdownFarm.Hook shutdown;

/**
* Ctor.
*
* @param origin Origin runnable
* @param shutdown Shutdown hook
*/
ShutdownRunnable(final Runnable origin,
final ShutdownFarm.Hook shutdown) {
this.origin = origin;
this.shutdown = shutdown;
}

@Override
public void run() {
if (!this.shutdown.isStopping()) {
this.origin.run();
}
}
}
}
45 changes: 35 additions & 10 deletions src/main/java/com/zerocracy/claims/proc/AsyncProc.java
Expand Up @@ -20,9 +20,11 @@
import com.jcabi.log.Logger;
import com.jcabi.log.VerboseCallable;
import com.jcabi.log.VerboseThreads;
import com.zerocracy.shutdown.ShutdownFarm;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.cactoos.Proc;
import org.cactoos.scalar.And;

Expand All @@ -43,46 +45,69 @@ public final class AsyncProc implements Proc<List<Message>> {
*/
private final Proc<Message> origin;

/**
* Shutdown hook.
*/
private final ShutdownFarm.Hook shutdown;

/**
* Counter.
*/
private final AtomicInteger count;

/**
* Ctor.
*
* @param origin Origin proc
* @param shutdown Shutdown hook
*/
public AsyncProc(final Proc<Message> origin) {
this(Runtime.getRuntime().availableProcessors(), origin);
public AsyncProc(final Proc<Message> origin,
final ShutdownFarm.Hook shutdown) {
this(Runtime.getRuntime().availableProcessors(), origin, shutdown);
}

/**
* Ctor.
*
* @param threads Threads
* @param origin Origin proc
* @param shutdown Shutdown hook
*/
public AsyncProc(final int threads, final Proc<Message> origin) {
public AsyncProc(final int threads, final Proc<Message> origin,
final ShutdownFarm.Hook shutdown) {
this.service = Executors.newFixedThreadPool(
threads, new VerboseThreads(AsyncProc.class)
);
this.origin = origin;
this.shutdown = shutdown;
this.count = new AtomicInteger();
}

@Override
public void exec(final List<Message> input) {
this.service.submit(
new VerboseCallable<>(
() -> {
Logger.info(
this, "Processing %d messages",
input.size()
);
new And(this.origin, input).value();
try {
Logger.info(
this, "Processing %d messages",
input.size()
);
new And(this.origin, input).value();
} finally {
if (this.count.decrementAndGet() == 0
&& this.shutdown.isStopping()) {
this.shutdown.complete();
}
}
return null;
},
true, true
)
);
Logger.info(
this, "Submitted %d messages",
input.size()
this, "Submitted %d messages (count=%d)",
input.size(), this.count.incrementAndGet()
);
}
}
17 changes: 12 additions & 5 deletions src/main/java/com/zerocracy/entry/Main.java
Expand Up @@ -35,6 +35,7 @@
import com.zerocracy.radars.slack.SlackRadar;
import com.zerocracy.radars.slack.TkSlack;
import com.zerocracy.radars.viber.TkViber;
import com.zerocracy.shutdown.ShutdownFarm;
import com.zerocracy.tk.TkAlias;
import com.zerocracy.tk.TkApp;
import io.sentry.Sentry;
Expand All @@ -52,6 +53,7 @@
* Main entry point.
* @since 1.0
* @checkstyle ClassDataAbstractionCouplingCheck (500 lines)
* @checkstyle ClassFanOutComplexityCheck (500 lines)
*/
@SuppressWarnings("PMD.ExcessiveImports")
public final class Main {
Expand Down Expand Up @@ -115,10 +117,14 @@ public void exec() throws IOException {
);
}
Logger.info(this, "Farm is ready to start");
final ShutdownFarm.Hook shutdown = new ShutdownFarm.Hook();
try (
final Farm farm = new SmartFarm(
new S3Farm(new ExtBucket().value(), temp)
).value();
final Farm farm = new ShutdownFarm(
new SmartFarm(
new S3Farm(new ExtBucket().value(), temp)
).value(),
shutdown
);
final SlackRadar radar = new SlackRadar(farm);
final ClaimsRoutine claims = new ClaimsRoutine(
farm,
Expand All @@ -131,12 +137,13 @@ public void exec() throws IOException {
new BrigadeProc(farm)
)
)
)
),
shutdown
)
)
) {
new ExtMongobee(farm).apply();
claims.start();
claims.start(shutdown);
new AsyncFunc<>(
input -> {
new ExtTelegram(farm).value();
Expand Down
11 changes: 0 additions & 11 deletions src/main/java/com/zerocracy/farm/sync/SyncFarm.java
Expand Up @@ -130,20 +130,9 @@ public Iterable<Project> find(final String query) throws IOException {
@Override
public void close() throws IOException {
try {
this.freeze();
this.terminator.close();
} finally {
this.origin.close();
}
}

/**
* Freeze the farm. It will lock all projects so they will stop
* returning new items.
* @throws IOException If already called
* @checkstyle NonStaticMethodCheck (5 lines)
*/
private void freeze() throws IOException {
// not implemented, see SyncFarmTests
}
}
150 changes: 150 additions & 0 deletions src/main/java/com/zerocracy/shutdown/ShutdownFarm.java
@@ -0,0 +1,150 @@
/*
* Copyright (c) 2016-2018 Zerocracy
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to read
* the Software only. Permissions is hereby NOT GRANTED to use, copy, modify,
* merge, publish, distribute, sublicense, and/or sell copies of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.zerocracy.shutdown;

import com.zerocracy.Farm;
import com.zerocracy.Project;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* Farm which can shutdown the App.
*
* @since 1.0
*/
public final class ShutdownFarm implements Farm {

/**
* Origin farm.
*/
private final Farm origin;

/**
* Shutdown hook.
*/
private final ShutdownFarm.Hook hook;

/**
* Ctor.
*
* @param origin Origin farm
* @param hook Shutdown hook
*/
public ShutdownFarm(final Farm origin, final ShutdownFarm.Hook hook) {
this.origin = origin;
this.hook = hook;
}

@Override
public Iterable<Project> find(final String xpath) throws IOException {
return this.origin.find(xpath);
}

@Override
public void close() throws IOException {
try {
this.hook.shutdown();
} finally {
this.origin.close();
}
}

/**
* Shutdown hook.
*/
public static final class Hook {

/**
* Initial state.
*/
private static final String NONE = "none";

/**
* Shutdown in progress.
*/
private static final String STOPPING = "stopping";

/**
* Shutdown completed.
*/
private static final String STOPPED = "stopped";

/**
* Current state.
*/
private final AtomicReference<String> state;

/**
* Default ctor.
*/
public Hook() {
this(new AtomicReference<>(ShutdownFarm.Hook.NONE));
}

/**
* Primary ctor.
*
* @param state State
*/
Hook(final AtomicReference<String> state) {
this.state = state;
}

/**
* Request shutdown.
*/
public void shutdown() {
if (!this.state.compareAndSet(
ShutdownFarm.Hook.NONE, ShutdownFarm.Hook.STOPPING
)) {
throw new IllegalStateException(
String.format("Cant stop when %s", this.state.get())
);
}
while (!ShutdownFarm.Hook.STOPPED.equals(this.state.get())) {
try {
TimeUnit.SECONDS.sleep(1L);
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
break;
}
}
}

/**
* Check shutdown in progress.
*
* @return TRUE if in progress
*/
public boolean isStopping() {
return !this.state.get().equals(ShutdownFarm.Hook.NONE);
}

/**
* Complete shutdown.
*/
public void complete() {
this.state.set(ShutdownFarm.Hook.STOPPED);
}

@Override
public String toString() {
return this.state.get();
}
}
}

0 comments on commit 33892d6

Please sign in to comment.