Skip to content

Commit

Permalink
Realtime: Delay firehose connection until job is started.
Browse files Browse the repository at this point in the history
Some firehoses (like the Kafka firehose) acquire input resources when they
connect, so it helps to delay this until after plumber.startJob() runs.
  • Loading branch information
gianm committed May 4, 2015
1 parent 8eb441e commit e69d82a
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,6 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception

boolean normalExit = true;

// Set up firehose
final Period intermediatePersistPeriod = spec.getTuningConfig().getIntermediatePersistPeriod();
final Firehose firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser());

// It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for
// stuff like the ServerView, which seems kind of odd? Perhaps revisit this when Guice has been introduced.

Expand Down Expand Up @@ -280,12 +276,19 @@ public String getVersion(final Interval interval)

this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics());

// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
Firehose firehose = null;

try {
plumber.startJob();

// Set up metrics emission
toolbox.getMonitorScheduler().addMonitor(metricsMonitor);

// Set up firehose
final Period intermediatePersistPeriod = spec.getTuningConfig().getIntermediatePersistPeriod();
firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser());

// Time to read data!
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) {
Expand Down Expand Up @@ -338,6 +341,7 @@ public String getVersion(final Interval interval)
log.makeAlert(e, "Failed to finish realtime task").emit();
}
finally {
// firehose will be non-null since normalExit is true
CloseQuietly.close(firehose);
toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
}
Expand Down
64 changes: 32 additions & 32 deletions server/src/main/java/io/druid/segment/realtime/RealtimeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package io.druid.segment.realtime;


import com.fasterxml.jackson.annotation.JacksonInject;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
Expand All @@ -33,7 +31,6 @@
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.guice.annotations.Processing;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
Expand All @@ -58,7 +55,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;

/**
*/
Expand All @@ -68,24 +64,20 @@ public class RealtimeManager implements QuerySegmentWalker

private final List<FireDepartment> fireDepartments;
private final QueryRunnerFactoryConglomerate conglomerate;
private ExecutorService executorService;

/**
* key=data source name,value=FireChiefs of all partition of that data source
*/
private final Map<String, List<FireChief>> chiefs;


@Inject
public RealtimeManager(
List<FireDepartment> fireDepartments,
QueryRunnerFactoryConglomerate conglomerate,
@JacksonInject @Processing ExecutorService executorService
QueryRunnerFactoryConglomerate conglomerate
)
{
this.fireDepartments = fireDepartments;
this.conglomerate = conglomerate;
this.executorService = executorService;

this.chiefs = Maps.newHashMap();
}
Expand All @@ -112,7 +104,6 @@ public void start() throws IOException
)
);
chief.setDaemon(true);
chief.init();
chief.start();
}
}
Expand Down Expand Up @@ -188,8 +179,8 @@ private class FireChief extends Thread implements Closeable
{
private final FireDepartment fireDepartment;
private final FireDepartmentMetrics metrics;
private final RealtimeTuningConfig config;

private volatile RealtimeTuningConfig config = null;
private volatile Firehose firehose = null;
private volatile Plumber plumber = null;
private volatile boolean normalExit = true;
Expand All @@ -199,26 +190,42 @@ public FireChief(
)
{
this.fireDepartment = fireDepartment;

this.config = fireDepartment.getTuningConfig();
this.metrics = fireDepartment.getMetrics();
}

public void init() throws IOException
public Firehose initFirehose()
{
config = fireDepartment.getTuningConfig();
synchronized (this) {
if (firehose == null) {
try {
log.info("Calling the FireDepartment and getting a Firehose.");
firehose = fireDepartment.connect();
log.info("Firehose acquired!");
}
catch (IOException e) {
throw Throwables.propagate(e);
}
} else {
log.warn("Firehose already connected, skipping initFirehose().");
}

return firehose;
}
}

public Plumber initPlumber()
{
synchronized (this) {
try {
log.info("Calling the FireDepartment and getting a Firehose.");
firehose = fireDepartment.connect();
log.info("Firehose acquired!");
if (plumber == null) {
log.info("Someone get us a plumber!");
plumber = fireDepartment.findPlumber();
log.info("We have our plumber!");
} else {
log.warn("Plumber already trained, skipping initPlumber().");
}
catch (IOException e) {
throw Throwables.propagate(e);
}

return plumber;
}
}

Expand All @@ -230,13 +237,15 @@ public FireDepartmentMetrics getMetrics()
@Override
public void run()
{
verifyState();

plumber = initPlumber();
final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod();

try {
plumber.startJob();

// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
firehose = initFirehose();

long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) {
InputRow inputRow = null;
Expand Down Expand Up @@ -310,15 +319,6 @@ public void run()
}
}

private void verifyState()
{
Preconditions.checkNotNull(config, "config is null, init() must be called first.");
Preconditions.checkNotNull(firehose, "firehose is null, init() must be called first.");
Preconditions.checkNotNull(plumber, "plumber is null, init() must be called first.");

log.info("FireChief[%s] state ok.", fireDepartment.getDataSchema().getDataSource());
}

public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ public Plumber findPlumber(
tuningConfig
)
),
null,
null
);
}
Expand Down

0 comments on commit e69d82a

Please sign in to comment.