Skip to content

Commit

Permalink
Added support for AutoTunePigLiJob (#201)
Browse files Browse the repository at this point in the history
* Added support for AutoTunePigLiJob
* Build failure fix: negative output directory removed
  • Loading branch information
arpang authored and pranayyerra committed Mar 5, 2018
1 parent 6870d7e commit c0ff31d
Show file tree
Hide file tree
Showing 18 changed files with 419 additions and 10 deletions.
3 changes: 3 additions & 0 deletions CONTRIBUTORS.md
Expand Up @@ -23,6 +23,9 @@ Vaughan.

### Contributors

The following were contributed by Arpan Agrawal. Thanks, Arpan!
* `Added AutoTunePigLiJob job type support `

The following were contributed by Jun "Tony" Zhou. Thanks, Tony!
* `Added Python Spark support for runSparkJob task.`
* `Added support for submitting python Spark applications with Spark job type. Corresponsing tests are added.`
Expand Down
4 changes: 4 additions & 0 deletions VERSIONS.md
Expand Up @@ -17,6 +17,10 @@ the License.
Note that the LinkedIn build system occasionally requires that we skip a
version bump, so you will see a few skipped version numbers in the list below.

0.14.4

* Adding AutoTunePigLiJob job type support

0.14.3

* Introduce YamlCompiler, YamlWorkflow, YamlJob, and YamlProject for Flow 2.0
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
@@ -1,2 +1,2 @@
org.gradle.daemon=true
version=0.14.3
version=0.14.4
164 changes: 162 additions & 2 deletions li-hadoop-plugin-test/build.gradle
Expand Up @@ -17,8 +17,29 @@ task forceGradlew(type: Exec) {
task initTestCases() {
dependsOn ":li-hadoop-plugin:build"
dependsOn forceGradlew
description = "Hadoop DSL task to prepare the test cases"
group = "Hadoop DSL Tests"
description = "Li Hadoop DSL task to prepare the test cases"
group = "Li Hadoop DSL Tests"

doFirst {
file("${project.projectDir}/jobs").deleteDir()
file("${project.projectDir}/jobs").mkdir()
file("${project.projectDir}/output").deleteDir()
file("${project.projectDir}/output").mkdir()
file("${project.projectDir}/output/positive").mkdir()
}
}

// Enumerate the test case files.
def testFiles = []

def testCases = project.fileTree([
dir: "${project.projectDir}",
include: "src/main/gradle/**/*.gradle"
])

testCases.each { File file ->
String relPath = file.getAbsolutePath().replace("${project.projectDir}/src/main/gradle/", "")
testFiles.add(relPath)
}

// Helper function to determine the location of the li-hadoop-plugin jars. LinkedIn internal builds
Expand All @@ -33,8 +54,147 @@ def buildPluginDir(Project project) {
return project.hasProperty("overridePluginTestDir") ? project.overridePluginTestDir : project.pluginTestDir
}

// Get rid of messages about starting the Gradle daemon.
// Get rid of LinkedIn-specific messages about CIA instrumentation.
// Get rid of LinkedIn-specific messages about posting task-result start and end.
// Get rid of the variable "Total time: 4.544 secs" that appears at the end of the output.
def cleanBuildOutput(String buildOutput) {
String[] linesToRemove = [
"Starting a new Gradle Daemon for this build \\(subsequent builds will be faster\\).\n",
"Will post to CIA for parent task uuid [a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}\n",
"posting task-result-start for li-hadoop-plugin-test:test_[a-zA-Z0-9]*\n",
"posting task-result-end [a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12} for li-hadoop-plugin-test:test_[a-zA-Z0-9]*\n",
"Download (.*)\n",
"cache artifact-at-repository.bin (.*)\n",
"cache artifact-at-url.bin (.*)\n",
"Starting a Gradle Daemon, (.*), use --status for details\n",
"BUILD SUCCESSFUL in (.*)\n",
"[0-9] actionable task: [0-9] executed\n"
] as String[]

for (String lineToRemove : linesToRemove) {
buildOutput = buildOutput.replaceFirst(lineToRemove, "")
}

int endIndex = buildOutput.lastIndexOf("Total time:")
return (endIndex == -1) ? buildOutput : buildOutput.substring(0, endIndex)
}

// Determine the base file name for a test case file from its relative path name.
def findBaseFileName(String fileName) {
return fileName.replace("positive/", "").replace("negative/", "").replace(".gradle", "")
}

// Create a task for each test case
testFiles.each { fileName ->
String baseFileName = findBaseFileName(fileName)
String pluginDir = buildPluginDir(project)

project.tasks.create("test_${baseFileName}") {
description = "Li Hadoop DSL test case for the file ${fileName}"
group = "Li Hadoop DSL Tests"

doFirst {
logger.lifecycle("Running test for the file ${fileName}")
}
doLast {
project.pluginTestDir="${pluginDir}"
apply from: "src/main/gradle/${fileName}"
project.tasks["buildAzkabanFlows"].execute()
}
}
}

// Create a wrapper task that runs each test case and captures its output.
testFiles.each { fileName ->
String baseFileName = findBaseFileName(fileName)
String baseDirName = fileName.replace("${baseFileName}.gradle", "").replace("/", "")
String outputFile = baseDirName.isEmpty() ? "${baseFileName}.out" : "${baseDirName}/${baseFileName}.out"

// Properties to work with LinkedIn internal builds. In LinkedIn internal builds, the build
// directory is in a different place, so pass the location to the test case task.
String pluginDir = buildPluginDir(project)

project.tasks.create(name: "run_${baseFileName}", type: Exec, dependsOn: initTestCases) {
description = "Runs Li Hadoop DSL test case for the file ${fileName} and captures the output"
group = "Li Hadoop DSL Tests"

// If a test fails that is supposed to pass, we want to see the stacktrace.
if ("positive" == baseDirName) {
commandLine "${project.projectDir}/../gradlew", "test_${baseFileName}", "-PoverridePluginTestDir=${pluginDir}", "--stacktrace"
} else {
commandLine "${project.projectDir}/../gradlew", "test_${baseFileName}", "-PoverridePluginTestDir=${pluginDir}"
}

ignoreExitValue true

// Store the output instead of printing to the console
// errorOutput = new ByteArrayOutputStream()
standardOutput = new ByteArrayOutputStream()

doLast {
String buildOutput = standardOutput.toString()
logger.lifecycle(buildOutput)

new File("${project.projectDir}/output/${outputFile}").withWriter { out ->
out.write(cleanBuildOutput(buildOutput))
}

// If a test case gives the wrong result, stop right there with an exception.
if ("positive" == baseDirName && execResult.getExitValue() != 0) {
throw new Exception("Positive test case ${baseFileName} failed")
}

if ("negative" == baseDirName && execResult.getExitValue() == 0) {
throw new Exception("Negative test case ${baseFileName} passed")
}
}
}
}


// Setup the a master task that runs all the test case wrappers.
task runDslTestCases(dependsOn: initTestCases) {
description = "Runs the Li Hadoop DSL test cases"
group = "Li Hadoop DSL Tests"

testFiles.each() { fileName ->
String baseFileName = findBaseFileName(fileName)
dependsOn "run_${baseFileName}"
}
}

// Create a task that compares the generated files against the set of known good job files.
task compareKnownJobFiles(type: Exec, dependsOn: runDslTestCases) {
description = "Compares the results of the known job files to the compiled job files"
group = "Li Hadoop DSL Tests"
commandLine "diff", "-r", "${project.projectDir}/expectedJobs", "${project.projectDir}/jobs"
}

// Create a task that compares the console output against the known console output.
task compareKnownOutputFiles(type: Exec, dependsOn: runDslTestCases) {
description = "Compares the results of the known output to the actual output"
group = "Li Hadoop DSL Tests"
commandLine "diff", "-r", "${project.projectDir}/expectedOutput", "${project.projectDir}/output"
}

// Create a master task that runs the test cases and compares the results.
task runTestCasesAndCompareOutput {
dependsOn compareKnownJobFiles
dependsOn compareKnownOutputFiles
description = "Master task to run all Li Hadoop DSL tests and compare the actual results against the known results"
group = "Li Hadoop DSL Tests"
}

// Make the test task depend on the master task to run the test cases and compare the results.
test.dependsOn runTestCasesAndCompareOutput

// Test for including the sources zip and scmMetadata in the Hadoop zip and building the CRT zip.
task runHadoopZipTest(type: GradleBuild, dependsOn: initTestCases) {
dependsOn runTestCasesAndCompareOutput
description = "Runs Li Hadoop DSL test case for building Hadoop zips"
group = "Li Hadoop DSL Tests"

String liPluginDir = buildLiPluginDir(project)
String pluginDir = buildPluginDir(project)

Expand Down
@@ -0,0 +1,3 @@
# This file generated from the Hadoop DSL. Do not edit by hand.
type=noop
dependencies=countByCountryFlow_countByCountry
@@ -0,0 +1,11 @@
# This file generated from the Hadoop DSL. Do not edit by hand.
type=hadoopJava
enable_tuning=true
job.class=com.linkedin.jobtype.HadoopTuneInPigJob
optimizationMetric=RESOURCE
output_path=home/hadoop-starter-kit/hello-pig-azkaban/count_by_country
param.output_path=home/hadoop-starter-kit/hello-pig-azkaban/count_by_country
param.profile_data=/data/databases/Identity/Profile/#LATEST
pig.additional.jars=hello-pig-udf.jar
pig.script=src/main/pig/count_by_country.pig
profile_data=/data/databases/Identity/Profile/#LATEST
4 changes: 4 additions & 0 deletions li-hadoop-plugin-test/expectedOutput/positive/workflows.out
@@ -0,0 +1,4 @@
:li-hadoop-plugin-test:test_workflows
Running test for the file positive/workflows.gradle
Hadoop DSL static checker PASSED

@@ -0,0 +1,3 @@
# This file generated from the Hadoop DSL. Do not edit by hand.
type=noop
dependencies=countByCountryFlow_countByCountry
@@ -0,0 +1,11 @@
# This file generated from the Hadoop DSL. Do not edit by hand.
type=hadoopJava
enable_tuning=true
job.class=com.linkedin.jobtype.HadoopTuneInPigJob
optimizationMetric=RESOURCE
output_path=home/hadoop-starter-kit/hello-pig-azkaban/count_by_country
param.output_path=home/hadoop-starter-kit/hello-pig-azkaban/count_by_country
param.profile_data=/data/databases/Identity/Profile/#LATEST
pig.additional.jars=hello-pig-udf.jar
pig.script=src/main/pig/count_by_country.pig
profile_data=/data/databases/Identity/Profile/#LATEST
4 changes: 4 additions & 0 deletions li-hadoop-plugin-test/output/positive/workflows.out
@@ -0,0 +1,4 @@
:li-hadoop-plugin-test:test_workflows
Running test for the file positive/workflows.gradle
Hadoop DSL static checker PASSED

29 changes: 29 additions & 0 deletions li-hadoop-plugin-test/src/main/gradle/positive/workflows.gradle
@@ -0,0 +1,29 @@
buildscript {
dependencies {
classpath files(
"${project.liPluginTestDir}/li-hadoop-plugin-${project.version}.jar",
"${project.liPluginTestDir}/li-hadoop-plugin-${project.version}-SNAPSHOT.jar",
"${project.pluginTestDir}/hadoop-plugin-${project.version}.jar",
"${project.pluginTestDir}/hadoop-plugin-${project.version}-SNAPSHOT.jar")
}
}

apply plugin: com.linkedin.gradle.lihadoop.LiHadoopPlugin

def profileData = '/data/databases/Identity/Profile/#LATEST'

hadoop {
buildPath "jobs"

workflow('countByCountryFlow') {
autoTunePigLiJob('countByCountry') {
uses 'src/main/pig/count_by_country.pig'
reads files: ['profile_data': profileData]
writes files: ['output_path': "home/hadoop-starter-kit/hello-pig-azkaban/count_by_country"]
set properties: ['pig.additional.jars': "hello-pig-udf.jar"
]
}

targets 'countByCountry'
}
}
Expand Up @@ -17,7 +17,8 @@ package com.linkedin.gradle.lihadoopdsl;

import com.linkedin.gradle.hadoopdsl.HadoopDslExtension;
import com.linkedin.gradle.hadoopdsl.HadoopDslMethod;
import com.linkedin.gradle.hadoopdsl.NamedScope;
import com.linkedin.gradle.hadoopdsl.NamedScope
import com.linkedin.gradle.lihadoopdsl.lijob.AutoTunePigLiJob;
import com.linkedin.gradle.lihadoopdsl.lijob.LiPigBangBangJob;
import com.linkedin.gradle.lihadoopdsl.lijob.PigLiJob;
import org.gradle.api.Project;
Expand Down Expand Up @@ -47,7 +48,7 @@ class LiHadoopDslExtension extends HadoopDslExtension implements LiNamedScopeCon
}

/**
* DSL LiPigBangBangJob method creates a LiPigBangBangJob in workflow scope with the given name
* DSL liPigBangBangJob method creates a LiPigBangBangJob in workflow scope with the given name
* and configuration.
*
* @param name The job name
Expand All @@ -74,4 +75,17 @@ class LiHadoopDslExtension extends HadoopDslExtension implements LiNamedScopeCon
PigLiJob pigLiJob(String name, @DelegatesTo(PigLiJob) Closure configure) {
return ((PigLiJob)configureJob(((LiHadoopDslFactory)factory).makePigLiJob(name), configure));
}

/**
* DSL autoTunePigLiJob method creates a AutoTunePigLiJob in workflow scope with the given name
* and configuration.
*
* @param name The job name
* @param configure The configuration closure
* @return The new job
*/
@HadoopDslMethod
AutoTunePigLiJob autoTunePigLiJob(String name, @DelegatesTo(AutoTunePigLiJob) Closure configure) {
return ((AutoTunePigLiJob)configureJob(((LiHadoopDslFactory)factory).makeAutoTunePigLiJob(name), configure));
}
}
Expand Up @@ -18,7 +18,8 @@ package com.linkedin.gradle.lihadoopdsl;
import com.linkedin.gradle.hadoopdsl.HadoopDslFactory;
import com.linkedin.gradle.hadoopdsl.Namespace;
import com.linkedin.gradle.hadoopdsl.Workflow;
import com.linkedin.gradle.hadoopdsl.NamedScope;
import com.linkedin.gradle.hadoopdsl.NamedScope
import com.linkedin.gradle.lihadoopdsl.lijob.AutoTunePigLiJob;
import com.linkedin.gradle.lihadoopdsl.lijob.LiPigBangBangJob;
import com.linkedin.gradle.lihadoopdsl.lijob.PigLiJob;
import org.gradle.api.Project;
Expand Down Expand Up @@ -88,4 +89,14 @@ class LiHadoopDslFactory extends HadoopDslFactory {
PigLiJob makePigLiJob(String name) {
return new PigLiJob(name);
}

/**
* Factory method to build a Linkedin-specific AutoTunePigLiJob.
*
* @param name The job name
* @return The job
*/
AutoTunePigLiJob makeAutoTunePigLiJob(String name){
return new AutoTunePigLiJob(name);
}
}
Expand Up @@ -18,6 +18,7 @@ package com.linkedin.gradle.lihadoopdsl;
import com.linkedin.gradle.hadoopdsl.HadoopDslFactory;
import com.linkedin.gradle.hadoopdsl.HadoopDslMethod;
import com.linkedin.gradle.hadoopdsl.HadoopDslPlugin;
import com.linkedin.gradle.lihadoopdsl.lijob.AutoTunePigLiJob;
import com.linkedin.gradle.lihadoopdsl.lijob.LiPigBangBangJob;
import com.linkedin.gradle.lihadoopdsl.lijob.PigLiJob;
import org.gradle.api.Project;
Expand All @@ -42,6 +43,7 @@ class LiHadoopDslPlugin extends HadoopDslPlugin implements LiNamedScopeContainer
super.apply(project);
project.extensions.add("liPigBangBangJob", this.&liPigBangBangJob);
project.extensions.add("pigLiJob", this.&pigLiJob);
project.extensions.add("autoTunePigLiJob",this.&autoTunePigLiJob);
}

/**
Expand All @@ -56,7 +58,7 @@ class LiHadoopDslPlugin extends HadoopDslPlugin implements LiNamedScopeContainer
}

/**
* DSL LiPigBangBangJob method creates a LiPigBangBangJob in scope with the given name
* DSL liPigBangBangJob method creates a LiPigBangBangJob in scope with the given name
* and configuration
* @param name The Job name
* @param configure The configuration closure
Expand All @@ -83,4 +85,17 @@ class LiHadoopDslPlugin extends HadoopDslPlugin implements LiNamedScopeContainer
LiHadoopDslFactory liFactory = (LiHadoopDslFactory)factory;
return ((PigLiJob)configureJob(liFactory.makePigLiJob(name), configure));
}

/**
* DSL autoTunePigLiJob method creates a AutoTunePigLiJob in scope with the given name
* and configuration
* @param name The Job name
* @param configure The configuration closure
* @return The new job
*/
@HadoopDslMethod
AutoTunePigLiJob autoTunePigLiJob(String name, @DelegatesTo(AutoTunePigLiJob) Closure configure){
LiHadoopDslFactory liFactory = (LiHadoopDslFactory)factory;
return ((AutoTunePigLiJob)configureJob(liFactory.makeAutoTunePigLiJob(name), configure));
}
}

0 comments on commit c0ff31d

Please sign in to comment.