Skip to content

Commit

Permalink
fix(jenkins): Fixing missed build events on concurrent builds (#161)
Browse files Browse the repository at this point in the history
- Removed the use of parallelStream while processing builds
- parallelStream may lead to a concurrency issue with redis read/write for close enough builds
- Refactored method for readability
  • Loading branch information
jeyrschabu committed May 3, 2017
1 parent 3b7d1c4 commit 74bcff8
Showing 1 changed file with 98 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import com.netflix.spinnaker.igor.IgorConfigurationProperties
import com.netflix.spinnaker.igor.history.EchoService
import com.netflix.spinnaker.igor.history.model.BuildContent
import com.netflix.spinnaker.igor.history.model.BuildEvent
import com.netflix.spinnaker.igor.jenkins.client.model.Build
import com.netflix.spinnaker.igor.jenkins.client.model.Project
import com.netflix.spinnaker.igor.jenkins.service.JenkinsService
import com.netflix.spinnaker.igor.model.BuildServiceProvider
import com.netflix.spinnaker.igor.polling.PollingMonitor
import com.netflix.spinnaker.igor.service.BuildMasters
Expand All @@ -40,6 +42,7 @@ import rx.schedulers.Schedulers

import javax.annotation.PreDestroy
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors

/**
* Monitors new jenkins builds
Expand Down Expand Up @@ -139,117 +142,126 @@ class JenkinsBuildMonitor implements PollingMonitor {
*/

List<Map> changedBuilds(String master) {

log.info('Checking for new builds for ' + master)
List<Map> results = []

lastPoll = System.currentTimeMillis()
long thisPoll = lastPoll
try {
List<String> cachedBuilds = cache.getJobNames(master)

def startTime = System.currentTimeMillis()
List<Project> builds = buildMasters.map[master].projects?.list

JenkinsService buildService = buildMasters.map[master] as JenkinsService
List<Project> builds = buildService.projects?.list
log.info("finding new builds in ${master} : ${builds.size()} items")

List<String> buildNames = builds*.name

Observable.from(cachedBuilds).filter { String name ->
!(name in buildNames)
}.subscribe(
{ String jobName ->
log.info "Removing ${master}:${jobName}"
cache.remove(master, jobName)
},
{ log.error("Error: ${it.message}") }
)

builds.parallelStream().forEach(
{ Project project ->
try {
boolean addToCache = false
Map cachedBuild = null
log.debug "processing build : ${project?.name} : building? ${project?.lastBuild?.building}"
if (!project?.lastBuild) {
log.debug "no builds found for ${project.name}, skipping"
} else if (cachedBuilds.contains(project.name)) {
cachedBuild = cache.getLastBuild(master, project.name)
cache.setLastBuild(master, project.name, project.lastBuild.number, project.lastBuild.building)
if ((project.lastBuild.building != cachedBuild.lastBuildBuilding) ||
(project.lastBuild.number != Integer.valueOf(cachedBuild.lastBuildLabel))) {

addToCache = true

log.info "Build changed: ${master}: ${project.name} : ${project.lastBuild.number} : ${project.lastBuild.building}"

if (echoService) {
int currentBuild = project.lastBuild.number
int lastBuild = Integer.valueOf(cachedBuild.lastBuildLabel)

log.info "sending build events for builds between ${lastBuild} and ${currentBuild}"

try {
buildMasters.map[master].getBuilds(project.name).list.sort {
it.number
}.each { build ->
if (build.number >= lastBuild && build.number < currentBuild) {
try {
Project oldProject = new Project(name: project.name, lastBuild: build)
if (build.number != lastBuild
|| (build.number == lastBuild && cachedBuild.lastBuildBuilding != build.building)) {
postEvent(echoService, cachedBuilds, oldProject, master)
}
} catch (e) {
log.error("An error occurred fetching ${master}:${project.name}:${build.number}", e)
}
}
}
} catch (e) {
log.error("failed getting builds for ${master}", e)
}
}
refreshCache(builds, cachedBuilds, master, cache)

for (Project project : builds) {
try {
Map cachedBuild = null
log.debug "processing build : ${project?.name} : building? ${project?.lastBuild?.building}"
if (!project?.lastBuild) {
log.debug "no builds found for ${project.name}, skipping"
} else if (cachedBuilds.contains(project.name)) {
cachedBuild = cache.getLastBuild(master, project.name)
cache.setLastBuild(master, project.name, project.lastBuild.number, project.lastBuild.building)

if ((project.lastBuild.building != cachedBuild.lastBuildBuilding) || (project.lastBuild.number != cachedBuild.lastBuildLabel)) {
log.info "Build changed: ${master}: ${project.name} : ${project.lastBuild.number} : ${project.lastBuild.building}"
List<BuildEvent> eventsForIntermediateBuilds = getEventsForBuildsBetweenLastKnownAndCurrentBuild(buildService, project, cachedBuild, master)

if (cachedBuilds && echoService) {
eventsForIntermediateBuilds.forEach { event ->
echoService.postEvent(event);
}
} else {
log.info "New Build: ${master}: ${project.name} : ${project.lastBuild.number} : " +
"${project.lastBuild.result}"
addToCache = true
cache.setLastBuild(master, project.name, project.lastBuild.number, project.lastBuild.building)
}
if (addToCache) {
project.lastBuild.result = project?.lastBuild?.result ?: project.lastBuild.building ? BUILD_IN_PROGRESS : ""
log.debug "setting result to ${project.lastBuild.result}"
postEvent(echoService, cachedBuilds, project, master)
results << [previous: cachedBuild, current: project]
}
} catch (e) {
log.error("fail processing build : ${project?.name}", e)

updateBuildResult(project)
postEvent(echoService, cachedBuilds, project, master)
results << [previous: cachedBuild, current: project]
}
} else {
log.info "New Build: ${master}: ${project.name} : ${project.lastBuild.number} : " + "${project.lastBuild.result}"
cache.setLastBuild(master, project.name, project.lastBuild.number, project.lastBuild.building)

updateBuildResult(project)
postEvent(echoService, cachedBuilds, project, master)
results << [previous: cachedBuild, current: project]
}
)
} catch (e) {
log.error("fail processing build : ${project?.name}", e)
}
}

log.info("Took ${System.currentTimeMillis() - startTime}ms to retrieve projects (master: ${master})")

} catch (e) {
log.error("failed to update master $master", e)
}

results
}

static void postEvent(EchoService echoService, List<String> cachedBuildsForMaster, Project project, String master) {
if (!cachedBuildsForMaster) {
// avoid publishing an event if this master has no indexed builds (protects against a flushed redis)
return
}
private static void updateBuildResult(Project project) {
project.lastBuild.result = project?.lastBuild?.result ?: project.lastBuild.building ? BUILD_IN_PROGRESS : ""
log.info "setting result to ${project.lastBuild.result}"
}

if (!echoService) {
// avoid publishing an event if echo is disabled
return
private static void postEvent(EchoService echoService, List<String> cachedBuilds, Project project, String master) {
if (cachedBuilds && echoService) {
echoService.postEvent(
new BuildEvent(content: new BuildContent(project: project, master: master))
)
}
}

echoService.postEvent(
new BuildEvent(content: new BuildContent(project: project, master: master))
private static void refreshCache(List<Project> builds, List<String> cachedBuilds, String master, JenkinsCache cache) {
List<String> buildNames = builds*.name
Observable.from(cachedBuilds).filter { String name ->
!(name in buildNames)
}.subscribe(
{ String jobName ->
log.info "Removing ${master}:${jobName}"
cache.remove(master, jobName)
},
{ log.error("Error: ${it.message}") }
)
}

private static List<BuildEvent> getEventsForBuildsBetweenLastKnownAndCurrentBuild(JenkinsService buildService,
Project project,
Map cachedBuild,
String master) {
int currentBuildNumber = project.lastBuild.number
int lastBuildNumber = cachedBuild.lastBuildLabel
List<BuildEvent> buildsEvents = []

log.info "looking for builds between ${lastBuildNumber} and ${currentBuildNumber}"

try {
List<Build> projectBuilds = buildService.getBuilds(project.name).list
projectBuilds.sort{ it.number }

buildsEvents = projectBuilds
.stream()
.filter { build -> isIntermediateBuild(build, cachedBuild, lastBuildNumber, currentBuildNumber) }
.map { build ->
Project oldProject = new Project(name: project.name, lastBuild: build)
return new BuildEvent(content: new BuildContent(project: oldProject, master: master))
}.collect(Collectors.toList())

} catch (e) {
log.error("An error occurred fetching ${master}:${project.name}", e)
}

return buildsEvents
}

private static boolean isIntermediateBuild(Build build, Map cachedBuild, Integer lastBuildNumber, Integer currentBuildNumber) {
if (build.number >= lastBuildNumber && build.number < currentBuildNumber) {
if (build.number != lastBuildNumber || (build.number == lastBuildNumber && cachedBuild.lastBuildBuilding != build.building)) {
return true;
}
}

return false;
}
}

0 comments on commit 74bcff8

Please sign in to comment.