diff --git a/extensions/spring-xd-extension-batch/src/main/java/org/springframework/batch/step/tasklet/x/AbstractProcessBuilderTasklet.java b/extensions/spring-xd-extension-batch/src/main/java/org/springframework/batch/step/tasklet/x/AbstractProcessBuilderTasklet.java index 985404bbd..e7f46e3a8 100644 --- a/extensions/spring-xd-extension-batch/src/main/java/org/springframework/batch/step/tasklet/x/AbstractProcessBuilderTasklet.java +++ b/extensions/spring-xd-extension-batch/src/main/java/org/springframework/batch/step/tasklet/x/AbstractProcessBuilderTasklet.java @@ -61,8 +61,6 @@ public abstract class AbstractProcessBuilderTasklet implements Tasklet, Environm protected final Logger logger = LoggerFactory.getLogger(this.getClass()); - private static final String XD_CONFIG_HOME = "xd.config.home"; - protected ConfigurableEnvironment environment; /** @@ -84,11 +82,17 @@ public abstract class AbstractProcessBuilderTasklet implements Tasklet, Environm private SystemProcessExitCodeMapper systemProcessExitCodeMapper = new SimpleSystemProcessExitCodeMapper(); + private List environmentProviders = new ArrayList(); + @Override public void setEnvironment(Environment environment) { this.environment = (ConfigurableEnvironment) environment; } + public void addEnvironmentProvider(EnvironmentProvider environmentProvider) { + this.environmentProviders.add(environmentProvider); + } + @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { @@ -102,8 +106,6 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon List command = createCommand(); - String commandClassPath = createClassPath(this.getClass()); - File out = File.createTempFile(commandName + "-", ".out"); stepExecution.getExecutionContext().putString(commandName.toLowerCase() + ".system.out", out.getAbsolutePath()); logger.info(commandName + " system.out: " + out.getAbsolutePath()); @@ -111,7 +113,9 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon stepExecution.getExecutionContext().putString(commandName.toLowerCase() + ".system.err", err.getAbsolutePath()); ProcessBuilder pb = new ProcessBuilder(command).redirectOutput(out).redirectError(err); Map env = pb.environment(); - env.put("CLASSPATH", commandClassPath); + for (EnvironmentProvider envProvider : environmentProviders) { + envProvider.setEnvironment(env); + } String msg = commandDescription + " is being launched"; stepExecution.getExecutionContext().putString(commandName.toLowerCase() + ".command", commandDisplayString.trim()); List commandOut = new ArrayList(); @@ -243,7 +247,7 @@ public ExitStatus afterStep(StepExecution stepExecution) { protected abstract boolean isStoppable(); - protected abstract List createCommand(); + protected abstract List createCommand() throws Exception; protected abstract String getCommandDisplayString(); @@ -251,50 +255,6 @@ public ExitStatus afterStep(StepExecution stepExecution) { protected abstract String getCommandDescription(); - protected String createClassPath(Class taskletClass) { - URLClassLoader serverClassLoader; - URLClassLoader taskletClassLoader; - try { - serverClassLoader = (URLClassLoader) Class.forName("org.springframework.xd.dirt.core.Job").getClassLoader(); - taskletClassLoader = (URLClassLoader) taskletClass.getClassLoader(); - } - catch (Exception e) { - throw new IllegalStateException("Unable to determine classpath from ClassLoader.", e); - } - if (serverClassLoader == null) { - throw new IllegalStateException("Unable to access ClassLoader for " + taskletClass + "."); - } - if (taskletClassLoader == null) { - throw new IllegalStateException("Unable to access Context ClassLoader."); - } - List classPath = new ArrayList(); - String configHome = environment.getProperty(XD_CONFIG_HOME); - if (StringUtils.hasText(configHome)) { - classPath.add(configHome); - } - for (URL url : serverClassLoader.getURLs()) { - String file = url.getFile().split("\\!/", 2)[0]; - if (file.endsWith(".jar")) { - classPath.add(file); - } - } - for (URL url : taskletClassLoader.getURLs()) { - String file = url.getFile().split("\\!/", 2)[0]; - if (file.endsWith(".jar") && !classPath.contains(file)) { - classPath.add(file); - } - } - StringBuilder classPathBuilder = new StringBuilder(); - String separator = System.getProperty("path.separator"); - for (String url : classPath) { - if (classPathBuilder.length() > 0) { - classPathBuilder.append(separator); - } - classPathBuilder.append(url); - } - return classPathBuilder.toString(); - } - protected List getProcessOutput(File f) { List lines = new ArrayList(); if (f == null) { diff --git a/extensions/spring-xd-extension-batch/src/main/java/org/springframework/batch/step/tasklet/x/ClasspathEnvironmentProvider.java b/extensions/spring-xd-extension-batch/src/main/java/org/springframework/batch/step/tasklet/x/ClasspathEnvironmentProvider.java new file mode 100644 index 000000000..f3064a28a --- /dev/null +++ b/extensions/spring-xd-extension-batch/src/main/java/org/springframework/batch/step/tasklet/x/ClasspathEnvironmentProvider.java @@ -0,0 +1,94 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.step.tasklet.x; + +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.util.StringUtils; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + */ +public class ClasspathEnvironmentProvider implements EnvironmentProvider { + + private static final String XD_CONFIG_HOME = "xd.config.home"; + + ConfigurableEnvironment environment; + + Class taskletClass; + + + public ClasspathEnvironmentProvider(ConfigurableEnvironment environment, Class taskletClass) { + this.environment = environment; + this.taskletClass = taskletClass; + } + + @Override + public void setEnvironment(Map env) { + String classPath = createClassPath(); + env.put("CLASSPATH", classPath); + } + + protected String createClassPath() { + URLClassLoader serverClassLoader; + URLClassLoader taskletClassLoader; + try { + serverClassLoader = (URLClassLoader) Class.forName("org.springframework.xd.dirt.core.Job").getClassLoader(); + taskletClassLoader = (URLClassLoader) taskletClass.getClassLoader(); + } + catch (Exception e) { + throw new IllegalStateException("Unable to determine classpath from ClassLoader.", e); + } + if (serverClassLoader == null) { + throw new IllegalStateException("Unable to access ClassLoader for " + taskletClass + "."); + } + if (taskletClassLoader == null) { + throw new IllegalStateException("Unable to access Context ClassLoader."); + } + List classPath = new ArrayList(); + String configHome = environment.getProperty(XD_CONFIG_HOME); + if (StringUtils.hasText(configHome)) { + classPath.add(configHome); + } + for (URL url : serverClassLoader.getURLs()) { + String file = url.getFile().split("\\!/", 2)[0]; + if (file.endsWith(".jar")) { + classPath.add(file); + } + } + for (URL url : taskletClassLoader.getURLs()) { + String file = url.getFile().split("\\!/", 2)[0]; + if (file.endsWith(".jar") && !classPath.contains(file)) { + classPath.add(file); + } + } + StringBuilder classPathBuilder = new StringBuilder(); + String separator = System.getProperty("path.separator"); + for (String url : classPath) { + if (classPathBuilder.length() > 0) { + classPathBuilder.append(separator); + } + classPathBuilder.append(url); + } + return classPathBuilder.toString(); + } + +} diff --git a/extensions/spring-xd-extension-batch/src/main/java/org/springframework/batch/step/tasklet/x/EnvironmentProvider.java b/extensions/spring-xd-extension-batch/src/main/java/org/springframework/batch/step/tasklet/x/EnvironmentProvider.java new file mode 100644 index 000000000..ff6ff5e36 --- /dev/null +++ b/extensions/spring-xd-extension-batch/src/main/java/org/springframework/batch/step/tasklet/x/EnvironmentProvider.java @@ -0,0 +1,27 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.step.tasklet.x; + +import java.util.Map; + +/** + */ +public interface EnvironmentProvider { + + void setEnvironment(Map env); + +} diff --git a/extensions/spring-xd-extension-gpload/src/main/java/org/springframework/xd/gpload/GploadEnvironmentProvider.java b/extensions/spring-xd-extension-gpload/src/main/java/org/springframework/xd/gpload/GploadEnvironmentProvider.java new file mode 100644 index 000000000..242505d14 --- /dev/null +++ b/extensions/spring-xd-extension-gpload/src/main/java/org/springframework/xd/gpload/GploadEnvironmentProvider.java @@ -0,0 +1,37 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.xd.gpload; + +import org.springframework.batch.step.tasklet.x.EnvironmentProvider; + +import java.util.Map; + +/** + */ +public class GploadEnvironmentProvider implements EnvironmentProvider { + + String gploadHome; + + public GploadEnvironmentProvider(String gploadHome) { + this.gploadHome = gploadHome; + } + + @Override + public void setEnvironment(Map env) { + env.put("GPHOME_LOADERS", gploadHome); + } +} diff --git a/extensions/spring-xd-extension-gpload/src/main/java/org/springframework/xd/gpload/GploadOptionsMetadata.java b/extensions/spring-xd-extension-gpload/src/main/java/org/springframework/xd/gpload/GploadOptionsMetadata.java new file mode 100644 index 000000000..83332ffad --- /dev/null +++ b/extensions/spring-xd-extension-gpload/src/main/java/org/springframework/xd/gpload/GploadOptionsMetadata.java @@ -0,0 +1,64 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.xd.gpload; + +import org.hibernate.validator.constraints.NotBlank; +import org.springframework.xd.module.options.spi.ModuleOption; + +/** + * Module options for Gpload batch job module. + * + * @author Thomas Risberg + */ +public class GploadOptionsMetadata { + + private String gploadHome; + + private String controlFile; + + private String options; + + + @NotBlank + public String getGploadHome() { + return gploadHome; + } + + @ModuleOption("the gpload home location") + public void setGploadHome(String gploadHome) { + this.gploadHome = gploadHome; + } + + public String getControlFile() { + return controlFile; + } + + @ModuleOption("path to the gpload control file") + public void setControlFile(String controlFile) { + this.controlFile = controlFile; + } + + public String getOptions() { + return options; + } + + @ModuleOption("the gpload options to use") + public void setOptions(String options) { + this.options = options; + } + +} diff --git a/extensions/spring-xd-extension-gpload/src/main/java/org/springframework/xd/gpload/GploadTasklet.java b/extensions/spring-xd-extension-gpload/src/main/java/org/springframework/xd/gpload/GploadTasklet.java new file mode 100644 index 000000000..03f3b025c --- /dev/null +++ b/extensions/spring-xd-extension-gpload/src/main/java/org/springframework/xd/gpload/GploadTasklet.java @@ -0,0 +1,207 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.xd.gpload; + +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.JobExecutionException; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.step.tasklet.x.AbstractProcessBuilderTasklet; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.util.StringUtils; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Tasklet used for running gpload tool. + * + * Note: This this class is not thread-safe. + * + * @since 1.2 + * @author Thomas Risberg + */ +public class GploadTasklet extends AbstractProcessBuilderTasklet implements InitializingBean { + + private static final String GPLOAD_COMMAND = "gpload"; + private static final String CONTROL_FILE_NODE_GPLOAD = "GPLOAD"; + private static final String CONTROL_FILE_NODE_INPUT = "INPUT"; + private static final String CONTROL_FILE_NODE_SOURCE = "SOURCE"; + private static final String CONTROL_FILE_NODE_FILE = "FILE"; + + private String gploadHome; + + private String controlFile; + + private String options; + + private String inputSourceFile; + + private String controlFileToUse; + + + public String getGploadHome() { + return gploadHome; + } + + public void setGploadHome(String gploadHome) { + this.gploadHome = gploadHome; + } + + public String getControlFile() { + return controlFile; + } + + public void setControlFile(String controlFile) { + this.controlFile = controlFile; + } + + public String getOptions() { + return options; + } + + public void setOptions(String options) { + this.options = options; + } + + public String getInputSourceFile() { + return inputSourceFile; + } + + public void setInputSourceFile(String inputSourceFile) { + this.inputSourceFile = inputSourceFile; + } + + @Override + protected boolean isStoppable() { + return false; + } + + @Override + protected List createCommand() throws Exception { + List command = new ArrayList(); + command.add(getCommandString()); + if (StringUtils.hasText(controlFile)) { + if (StringUtils.hasText(inputSourceFile)) { + try { + controlFileToUse = createControlFile(inputSourceFile, controlFile); + } catch (IOException e) { + throw new JobExecutionException("Error while creating control file", e); + } + } + else { + controlFileToUse = controlFile; + } + command.add("-f"); + command.add(controlFileToUse); + } + if (StringUtils.hasText(options)) { + command.add(options); + } + return command; + } + + @Override + protected String getCommandDisplayString() { + return getCommandString(); + } + + @Override + protected String getCommandName() { + return "gpload"; + } + + @Override + protected String getCommandDescription() { + return "gpload testing"; + } + + @Override + public ExitStatus afterStep(StepExecution stepExecution) { + stepExecution.getExecutionContext().put(getCommandName() + ".controlFile", controlFileToUse); + return super.afterStep(stepExecution); + } + + @Override + public void afterPropertiesSet() throws Exception { + if (!StringUtils.hasText(gploadHome)) { + throw new IllegalArgumentException("Missing gploadHome property, it is mandatory for running gpload command"); + } + addEnvironmentProvider(new GploadEnvironmentProvider(gploadHome)); + } + + private String getCommandString() { + return gploadHome + "/bin/" + GPLOAD_COMMAND; + } + + private String createControlFile(String inputSourceFile, String controlFile) throws IOException { + InputStream in = new FileInputStream(new File(controlFile)); + Yaml yaml = new Yaml(); + Object data = yaml.load(in); + in.close(); + replaceInputSourceFile(data, inputSourceFile); + String output = yaml.dump(data); + File out = File.createTempFile("gpload-", ".yml"); + String outFile = out.getAbsolutePath(); + FileWriter f = new FileWriter(out); + f.write(output); + f.close(); + return outFile; + } + + protected static void replaceInputSourceFile(Object data, String inputSourceFile) { + if (data != null && data instanceof Map) { + Object gpload = ((Map)data).get(CONTROL_FILE_NODE_GPLOAD); + if (gpload != null && gpload instanceof Map) { + Object input = ((Map)gpload).get(CONTROL_FILE_NODE_INPUT); + if (input == null) { + input = new ArrayList(); + ((Map)gpload).put(CONTROL_FILE_NODE_INPUT, input); + } + if (input instanceof List) { + boolean hasSource = false; + for (Object o : (List)input) { + if (o instanceof Map && ((Map)o).containsKey(CONTROL_FILE_NODE_SOURCE)) { + hasSource = true; + } + } + if (!hasSource) { + Map tmp = new LinkedHashMap<>(); + tmp.put(CONTROL_FILE_NODE_SOURCE, new LinkedHashMap()); + ((List)input).add(tmp); + } + for (Object o : (List)input) { + if (o instanceof Map && ((Map)o).containsKey(CONTROL_FILE_NODE_SOURCE)) { + Object source = ((Map)o).get(CONTROL_FILE_NODE_SOURCE); + if (source != null && source instanceof Map) { + ((Map)source).put(CONTROL_FILE_NODE_FILE, Arrays.asList(inputSourceFile)); + } + } + } + } + } + } + } +} diff --git a/extensions/spring-xd-extension-sqoop/src/main/java/org/springframework/xd/sqoop/SqoopTasklet.java b/extensions/spring-xd-extension-sqoop/src/main/java/org/springframework/xd/sqoop/SqoopTasklet.java index 15725c7d2..bbfeedf0e 100644 --- a/extensions/spring-xd-extension-sqoop/src/main/java/org/springframework/xd/sqoop/SqoopTasklet.java +++ b/extensions/spring-xd-extension-sqoop/src/main/java/org/springframework/xd/sqoop/SqoopTasklet.java @@ -16,6 +16,7 @@ package org.springframework.xd.sqoop; +import org.springframework.batch.step.tasklet.x.ClasspathEnvironmentProvider; import org.springframework.beans.factory.InitializingBean; import org.springframework.batch.step.tasklet.x.AbstractProcessBuilderTasklet; import org.springframework.core.env.EnumerablePropertySource; @@ -101,5 +102,6 @@ public void afterPropertiesSet() throws Exception { if (arguments == null || arguments.length < 1) { throw new IllegalArgumentException("Missing arguments and/or configuration options for Sqoop"); } + addEnvironmentProvider(new ClasspathEnvironmentProvider(environment, this.getClass())); } } diff --git a/gradle/build-extensions.gradle b/gradle/build-extensions.gradle index 4b41abad0..712a8d630 100644 --- a/gradle/build-extensions.gradle +++ b/gradle/build-extensions.gradle @@ -237,3 +237,14 @@ project('spring-xd-extension-sqoop') { provided "org.springframework.data:spring-data-hadoop-batch:${springDataHadoopBase}" } } + +project('spring-xd-extension-gpload') { + description = 'Spring XD GPLoad extension' + dependencies { + compile project(':spring-xd-module-spi') + compile project(':spring-xd-extension-batch') + compile project(':spring-xd-extension-jdbc') + provided "org.springframework.batch:spring-batch-core" + compile "org.yaml:snakeyaml" + } +} diff --git a/gradle/build-modules.gradle b/gradle/build-modules.gradle index a1f9834fc..c07e36487 100644 --- a/gradle/build-modules.gradle +++ b/gradle/build-modules.gradle @@ -398,3 +398,9 @@ project('modules.job.sqoop') { runtime(project(":spring-xd-extension-sqoop")) } } + +project('modules.job.gpload') { + dependencies { + runtime(project(":spring-xd-extension-gpload")) + } +} diff --git a/modules/job/gpload/config/gpload.properties b/modules/job/gpload/config/gpload.properties new file mode 100644 index 000000000..6221b014a --- /dev/null +++ b/modules/job/gpload/config/gpload.properties @@ -0,0 +1 @@ +options_class=org.springframework.xd.gpload.GploadOptionsMetadata diff --git a/modules/job/gpload/config/gpload.xml b/modules/job/gpload/config/gpload.xml new file mode 100644 index 000000000..c1ded2158 --- /dev/null +++ b/modules/job/gpload/config/gpload.xml @@ -0,0 +1,28 @@ + + + + + + + + + + + + + + + + + + + + + +