Skip to content

Commit

Permalink
[GOBBLIN-1052] Create a spec consumer path if it does not exist in FS …
Browse files Browse the repository at this point in the history
Closes apache#2892 from sv2000/specConsumer
  • Loading branch information
sv2000 authored and jhsenjaliya committed Mar 24, 2020
1 parent 4fdb863 commit 75cf21b
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 66 deletions.
Expand Up @@ -17,15 +17,14 @@
package org.apache.gobblin.cluster;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileSystem;

import com.google.common.base.Optional;
import com.google.common.eventbus.EventBus;
Expand All @@ -38,7 +37,6 @@
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.api.SpecConsumer;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;

Expand All @@ -55,88 +53,73 @@ public class FsJobConfigurationManager extends JobConfigurationManager {
private static final long DEFAULT_JOB_SPEC_REFRESH_INTERVAL = 60;

private final long refreshIntervalInSeconds;

private final ScheduledExecutorService fetchJobSpecExecutor;
private final Optional<MutableJobCatalog> jobCatalogOptional;
private final SpecConsumer specConsumer;

protected final SpecConsumer _specConsumer;

private final ClassAliasResolver<SpecConsumer> aliasResolver;

private final Optional<MutableJobCatalog> _jobCatalogOptional;

public FsJobConfigurationManager(EventBus eventBus, Config config) {
this(eventBus, config, null);
public FsJobConfigurationManager(EventBus eventBus, Config config, FileSystem fs) {
this(eventBus, config, null, fs);
}

public FsJobConfigurationManager(EventBus eventBus, Config config, MutableJobCatalog jobCatalog) {
public FsJobConfigurationManager(EventBus eventBus, Config config, MutableJobCatalog jobCatalog, FileSystem fs) {
super(eventBus, config);
this._jobCatalogOptional = jobCatalog != null ? Optional.of(jobCatalog) : Optional.absent();
this.jobCatalogOptional = jobCatalog != null ? Optional.of(jobCatalog) : Optional.absent();
this.refreshIntervalInSeconds = ConfigUtils.getLong(config, GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL,
DEFAULT_JOB_SPEC_REFRESH_INTERVAL);

this.fetchJobSpecExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("FetchJobSpecExecutor")));

this.aliasResolver = new ClassAliasResolver<>(SpecConsumer.class);
try {
String specConsumerClassName = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY,
GobblinClusterConfigurationKeys.DEFAULT_SPEC_CONSUMER_CLASS);
log.info("Using SpecConsumer ClassNameclass name/alias " + specConsumerClassName);
this._specConsumer = (SpecConsumer) ConstructorUtils
.invokeConstructor(Class.forName(this.aliasResolver.resolve(specConsumerClassName)), config);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException
| ClassNotFoundException e) {
throw new RuntimeException(e);
}
this.specConsumer = new FsSpecConsumer(fs, config);
}

protected void startUp() throws Exception{
protected void startUp() throws Exception {
super.startUp();
// Schedule the job config fetch task
this.fetchJobSpecExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
fetchJobSpecs();
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to fetch job specs", e);
throw new RuntimeException("Failed to fetch specs", e);
} catch (Exception e) {
//Log error and swallow exception to allow executor service to continue scheduling the thread
log.error("Failed to fetch job specs due to: ", e);
}
}
}, 0, this.refreshIntervalInSeconds, TimeUnit.SECONDS);
}

void fetchJobSpecs() throws ExecutionException, InterruptedException {
List<Pair<SpecExecutor.Verb, JobSpec>> jobSpecs =
(List<Pair<SpecExecutor.Verb, JobSpec>>) this._specConsumer.changedSpecs().get();
(List<Pair<SpecExecutor.Verb, JobSpec>>) this.specConsumer.changedSpecs().get();

log.info("Fetched {} job specs", jobSpecs.size());
for (Pair<SpecExecutor.Verb, JobSpec> entry : jobSpecs) {
JobSpec jobSpec = entry.getValue();
SpecExecutor.Verb verb = entry.getKey();
if (verb.equals(SpecExecutor.Verb.ADD)) {
// Handle addition
if (this._jobCatalogOptional.isPresent()) {
this._jobCatalogOptional.get().put(jobSpec);
if (this.jobCatalogOptional.isPresent()) {
this.jobCatalogOptional.get().put(jobSpec);
}
postNewJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
} else if (verb.equals(SpecExecutor.Verb.UPDATE)) {
//Handle update.
if (this._jobCatalogOptional.isPresent()) {
this._jobCatalogOptional.get().put(jobSpec);
if (this.jobCatalogOptional.isPresent()) {
this.jobCatalogOptional.get().put(jobSpec);
}
postUpdateJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
} else if (verb.equals(SpecExecutor.Verb.DELETE)) {
// Handle delete
if (this._jobCatalogOptional.isPresent()) {
this._jobCatalogOptional.get().remove(jobSpec.getUri());
if (this.jobCatalogOptional.isPresent()) {
this.jobCatalogOptional.get().remove(jobSpec.getUri());
}
postDeleteJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
}

try {
//Acknowledge the successful consumption of the JobSpec back to the SpecConsumer, so that the
//SpecConsumer can delete the JobSpec.
this._specConsumer.commit(jobSpec);
this.specConsumer.commit(jobSpec);
} catch (IOException e) {
log.error("Error when committing to FsSpecConsumer: ", e);
}
Expand Down
Expand Up @@ -157,7 +157,7 @@ public class GobblinClusterConfigurationKeys {
public static final long DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = 300;

public static final String HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "workflowListingTimeoutSeconds";
public static final long DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS = 300;
public static final long DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS = 60;

public static final String CLEAN_ALL_DIST_JOBS = GOBBLIN_CLUSTER_PREFIX + "bootup.clean.dist.jobs";
public static final boolean DEFAULT_CLEAN_ALL_DIST_JOBS = false;
Expand Down
Expand Up @@ -18,7 +18,6 @@
package org.apache.gobblin.cluster;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -398,16 +397,15 @@ private JobConfigurationManager buildJobConfigurationManager(Config config) {

private JobConfigurationManager create(Config config) {
try {
List<Object> argumentList = (this.jobCatalog != null)? ImmutableList.of(this.eventBus, config, this.jobCatalog) :
ImmutableList.of(this.eventBus, config);
List<Object> argumentList = (this.jobCatalog != null)? ImmutableList.of(this.eventBus, config, this.jobCatalog, this.fs) :
ImmutableList.of(this.eventBus, config, this.fs);
if (config.hasPath(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY)) {
return (JobConfigurationManager) GobblinConstructorUtils.invokeFirstConstructor(Class.forName(
config.getString(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY)), argumentList);
return (JobConfigurationManager) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(
config.getString(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY)), argumentList.toArray(new Object[argumentList.size()]));
} else {
return new JobConfigurationManager(this.eventBus, config);
}
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException |
ClassNotFoundException e) {
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}
Expand Down
Expand Up @@ -377,7 +377,7 @@ private void cancelJobIfRequired(DeleteJobConfigArrivalEvent deleteJobArrival) t
Collections.singletonList(deleteJobArrival.getJobName()));
Retryer<Map<String, String>> retryer = RetryerBuilder.<Map<String, String>>newBuilder()
.retryIfException()
.withStopStrategy(StopStrategies.stopAfterAttempt(1))
.withStopStrategy(StopStrategies.stopAfterAttempt(5))
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(this.helixWorkflowListingTimeoutMillis, TimeUnit.MILLISECONDS)).build();
Map<String, String> jobNameToWorkflowIdMap;
try {
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskState;
Expand Down Expand Up @@ -277,7 +278,7 @@ private static void deleteStoppedHelixJob(HelixManager helixManager, String work
}

/**
* Returns the Helix Workflow Ids given {@link Iterable} of Gobblin job names. The method returns a
* Returns the currently running Helix Workflow Ids given an {@link Iterable} of Gobblin job names. The method returns a
* {@link java.util.Map} from Gobblin job name to the corresponding Helix Workflow Id. This method iterates
* over all Helix workflows, and obtains the jobs of each workflow from its jobDag.
*
Expand All @@ -293,6 +294,10 @@ public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager helixM
Map<String, WorkflowConfig> workflowConfigMap = taskDriver.getWorkflows();
for (String workflow : workflowConfigMap.keySet()) {
WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflow);
//Filter out any stale Helix workflows which are not running.
if (workflowConfig.getTargetState() != TargetState.START) {
continue;
}
Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
for (String helixJob : helixJobs) {
Iterator<TaskConfig> taskConfigIterator = taskDriver.getJobConfig(helixJob).getTaskConfigMap().values().iterator();
Expand Down
Expand Up @@ -32,6 +32,7 @@

import com.google.common.eventbus.EventBus;
import com.google.common.io.Files;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
Expand All @@ -50,6 +51,8 @@
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
import org.apache.gobblin.util.filters.HiddenFilter;


@Slf4j
public class FsJobConfigurationManagerTest {
Expand Down Expand Up @@ -96,16 +99,15 @@ public void setUp() throws IOException {

Config config = ConfigFactory.empty()
.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, ConfigValueFactory.fromAnyRef(jobConfDir))
.withValue(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY, ConfigValueFactory.fromAnyRef(FsSpecConsumer.class.getName()))
.withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(fsSpecConsumerPathString))
.withValue(GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL, ConfigValueFactory.fromAnyRef(1));

this._jobCatalog = new NonObservingFSJobCatalog(config);
((NonObservingFSJobCatalog) this._jobCatalog).startAsync().awaitRunning();

jobConfigurationManager = new FsJobConfigurationManager(eventBus, config, this._jobCatalog);
jobConfigurationManager = new FsJobConfigurationManager(eventBus, config, this._jobCatalog, this.fs);

_specProducer = new FsSpecProducer(config);
_specProducer = new FsSpecProducer(this.fs, config);
}

private void addJobSpec(String jobSpecName, String version, String verb)
Expand Down Expand Up @@ -149,7 +151,7 @@ public void testFetchJobSpecs() throws ExecutionException, InterruptedException,

//Ensure the JobSpec is deleted from the FsSpecConsumer path.
Path fsSpecConsumerPath = new Path(fsSpecConsumerPathString);
Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath, new HiddenFilter()).length, 0);

//Ensure NewJobConfigArrivalEvent is posted to EventBus
Assert.assertEquals(newJobConfigArrivalEventCount, 1);
Expand All @@ -166,7 +168,7 @@ public void testFetchJobSpecs() throws ExecutionException, InterruptedException,
Assert.assertTrue(jobSpec.getVersion().equals(version2));

//Ensure the updated JobSpec is deleted from the FsSpecConsumer path.
Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath, new HiddenFilter()).length, 0);

//Ensure UpdateJobConfigArrivalEvent is posted to EventBus
Assert.assertEquals(newJobConfigArrivalEventCount, 1);
Expand All @@ -179,7 +181,7 @@ public void testFetchJobSpecs() throws ExecutionException, InterruptedException,
this.jobConfigurationManager.fetchJobSpecs();

//Ensure the JobSpec is deleted from the FsSpecConsumer path.
Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath, new HiddenFilter()).length, 0);
this._jobCatalog.getJobSpec(new URI(jobSpecUriString));

//Ensure DeleteJobConfigArrivalEvent is posted to EventBus
Expand All @@ -198,13 +200,14 @@ public void testException()

//Add wait to ensure that fetchJobSpecExecutor thread is scheduled at least once.
Thread.sleep(2000);
Mockito.verify(jobConfigurationManager, Mockito.times(1)).fetchJobSpecs();
int numInvocations = Mockito.mockingDetails(jobConfigurationManager).getInvocations().size();
Mockito.verify(jobConfigurationManager, Mockito.atLeast(1)).fetchJobSpecs();

Thread.sleep(2000);
//Verify that there are no new invocations of fetchJobSpecs()
Mockito.verify(jobConfigurationManager, Mockito.times(1)).fetchJobSpecs();
//Ensure that the JobConfigurationManager Service is not running.
Assert.assertFalse(jobConfigurationManager.isRunning());
//Verify that there new invocations of fetchJobSpecs()
Mockito.verify(jobConfigurationManager, Mockito.atLeast(numInvocations + 1)).fetchJobSpecs();
//Ensure that the JobConfigurationManager Service is running.
Assert.assertTrue(!jobConfigurationManager.state().equals(Service.State.FAILED) && !jobConfigurationManager.state().equals(Service.State.TERMINATED));
}

@AfterClass
Expand Down
Expand Up @@ -24,6 +24,9 @@
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import com.google.common.io.Resources;
Expand Down Expand Up @@ -52,7 +55,8 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite

public IntegrationJobRestartViaSpecSuite() throws IOException {
super();
this._specProducer = new FsSpecProducer(ConfigFactory.empty().withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR)));
FileSystem fs = FileSystem.getLocal(new Configuration());
this._specProducer = new FsSpecProducer(fs, ConfigFactory.empty().withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR)));
}

private Config getJobConfig() throws IOException {
Expand Down Expand Up @@ -80,7 +84,6 @@ public Config getManagerConfig() {
Config managerConfig = super.getManagerConfig();
managerConfig = managerConfig.withValue(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY,
ConfigValueFactory.fromAnyRef(FsJobConfigurationManager.class.getName()))
.withValue(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY, ConfigValueFactory.fromAnyRef(FsSpecConsumer.class.getName()))
.withValue(GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL, ConfigValueFactory.fromAnyRef(1L))
.withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR));
return managerConfig;
Expand Down
Expand Up @@ -40,10 +40,13 @@

import com.typesafe.config.Config;

import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
import org.apache.gobblin.util.CompletedFuture;
import org.apache.gobblin.util.filters.HiddenFilter;


@Slf4j
public class FsSpecConsumer implements SpecConsumer<Spec> {
Expand All @@ -56,9 +59,16 @@ public class FsSpecConsumer implements SpecConsumer<Spec> {


public FsSpecConsumer(Config config) {
this(null, config);
}

public FsSpecConsumer(@Nullable FileSystem fs, Config config) {
this.specDirPath = new Path(config.getString(SPEC_PATH_KEY));
try {
this.fs = this.specDirPath.getFileSystem(new Configuration());
this.fs = (fs == null) ? FileSystem.get(new Configuration()) : fs;
if (!this.fs.exists(specDirPath)) {
this.fs.mkdirs(specDirPath);
}
} catch (IOException e) {
throw new RuntimeException("Unable to detect spec directory file system: " + e, e);
}
Expand All @@ -72,11 +82,12 @@ public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
List<Pair<SpecExecutor.Verb, Spec>> specList = new ArrayList<>();
FileStatus[] fileStatuses;
try {
fileStatuses = this.fs.listStatus(this.specDirPath);
fileStatuses = this.fs.listStatus(this.specDirPath, new HiddenFilter());
} catch (IOException e) {
log.error("Error when listing files at path: {}", this.specDirPath.toString(), e);
return null;
}
log.info("Found {} files at path {}", fileStatuses.length, this.specDirPath.toString());

//Sort the {@link JobSpec}s in increasing order of their modification times.
//This is done so that the {JobSpec}s can be handled in FIFO order by the
Expand Down Expand Up @@ -118,6 +129,7 @@ public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);

JobSpec jobSpec = jobSpecBuilder.build();
log.debug("Successfully built jobspec: {}", jobSpec.getUri().toString());
specList.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb, jobSpec));
this.specToPathMap.put(jobSpec.getUri(), fileStatus.getPath());
}
Expand All @@ -130,7 +142,10 @@ public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
public void commit(Spec spec) throws IOException {
Path path = this.specToPathMap.get(spec.getUri());
if (path != null) {
log.debug("Calling delete on path: {}", path.toString());
this.fs.delete(path, false);
} else {
log.error("No path found for job: {}", spec.getUri().toString());
}
}
}

0 comments on commit 75cf21b

Please sign in to comment.