Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

simultaneous git operations

  • Loading branch information...
commit 31d082188adf2cd10943e46925b291ceda13be32 1 parent bc0ca7b
@tuler authored
View
23 grails-app/conf/BootStrap.groovy
@@ -43,19 +43,22 @@ class BootStrap {
// dequeue scheduling
def dequeueInterval = grailsApplication.config.app.dequeueInterval
- def corePoolSize = grailsApplication.config.app.gitWorkers
- queueExecutor = Executors.newScheduledThreadPool(corePoolSize);
- queueExecutor.scheduleWithFixedDelay({
- persistenceInterceptor.init()
+ queueExecutor = Executors.newSingleThreadScheduledExecutor();
+ queueExecutor.scheduleAtFixedRate({
try {
- queueService.dequeueAndRun()
- } finally {
- persistenceInterceptor.flush()
- persistenceInterceptor.destroy()
+ persistenceInterceptor.init()
+ try {
+ queueService.dequeueAndRun()
+ } finally {
+ persistenceInterceptor.flush()
+ persistenceInterceptor.destroy()
+ }
+ } catch (e) {
+ log.error e
}
- }, 10, dequeueInterval, TimeUnit.SECONDS)
-
+ }, dequeueInterval, dequeueInterval, TimeUnit.SECONDS)
}
+
def destroy = {
if (pollingExecutor) {
pollingExecutor.shutdown()
View
2  grails-app/conf/Config.groovy
@@ -121,7 +121,7 @@ app {
pollingInterval = 0
// dequeue interval, in seconds
- dequeueInterval = 5
+ dequeueInterval = 1
// number of simultaneous git operations
gitWorkers = 3
View
8 grails-app/conf/spring/resources.groovy
@@ -1,8 +1,16 @@
import gittig.*
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
+
beans = {
// Repository location resolvers
nameLocationResolver(NameLocationResolver)
usernameLocationResolver(UsernameLocationResolver)
serviceLocationResolver(ServiceLocationResolver)
+
+ // git task executor
+ taskExecutor(ThreadPoolTaskExecutor) {
+ corePoolSize = application.config.app.gitWorkers
+ maxPoolSize = application.config.app.gitWorkers
+ }
}
View
3  grails-app/domain/gittig/HookJob.groovy
@@ -4,7 +4,7 @@ class HookJob {
String url
- HookJobStatus status = HookJobStatus.WAITING
+ HookJobStatus status = HookJobStatus.QUEUED
String error
@@ -29,6 +29,7 @@ class HookJob {
}
enum HookJobStatus {
+ QUEUED,
WAITING,
RUNNING,
DISCARDED,
View
1  grails-app/i18n/messages.properties
@@ -71,6 +71,7 @@ hookJob.url.label=Origin
hookJob.status.label=Status
hookJob.cancel.label=Cancel
+hookJob.status.QUEUED=Queued
hookJob.status.WAITING=Waiting
hookJob.status.RUNNING=Running
hookJob.status.DISCARDED=Discarded
View
84 grails-app/services/gittig/QueueService.groovy
@@ -1,5 +1,7 @@
package gittig
+import org.codehaus.groovy.grails.support.PersistenceContextInterceptor
+
class QueueService {
static transactional = false
@@ -8,6 +10,10 @@ class QueueService {
def grailsApplication
+ PersistenceContextInterceptor persistenceInterceptor
+
+ def taskExecutor
+
def list(params) {
// list completed/cancelled/discarded/error jobs from this date on
def queueTimeout = grailsApplication.config.app.queueTimeout
@@ -15,6 +21,7 @@ class QueueService {
HookJob.withCriteria {
or {
+ eq('status', HookJob.HookJobStatus.QUEUED)
eq('status', HookJob.HookJobStatus.WAITING)
eq('status', HookJob.HookJobStatus.RUNNING)
ge('dateCreated', date)
@@ -23,25 +30,25 @@ class QueueService {
}
def enqueue(url) {
- new HookJob(url: url).save(failOnError: true, flush: true)
+ new HookJob(url: url, status: HookJob.HookJobStatus.QUEUED).save(failOnError: true, flush: true)
}
def dequeue() {
HookJob.withNewSession {
- def waiting = HookJob.executeQuery "from HookJob j where j.status = ? and j.url not in (select jr.url from HookJob jr where jr.status = ?)",
- [HookJob.HookJobStatus.WAITING, HookJob.HookJobStatus.RUNNING]
- if (waiting) {
- def job = waiting.first()
+ def queued = HookJob.executeQuery "from HookJob j where j.status = ? and j.url not in (select jr.url from HookJob jr where jr.status = ? or jr.status = ?)",
+ [HookJob.HookJobStatus.QUEUED, HookJob.HookJobStatus.WAITING, HookJob.HookJobStatus.RUNNING]
+ if (queued) {
+ def job = queued.first()
log.info "Dequeuing job for ${job.url}"
- // initialize the progress and set to RUNNING status
+ // initialize the progress and set to WAITING status
job.progress = new HookJobProgress(job: job)
- job.status = HookJob.HookJobStatus.RUNNING
+ job.status = HookJob.HookJobStatus.WAITING
job.save(failOnError: true, flush: true)
- // discard other waiting jobs of the same url
+ // discard other queue jobs of the same url
def discard = HookJob.withCriteria {
- eq('status', HookJob.HookJobStatus.WAITING)
+ eq('status', HookJob.HookJobStatus.QUEUED)
eq('url', job.url)
}.each {
it.status = HookJob.HookJobStatus.DISCARDED
@@ -56,28 +63,45 @@ class QueueService {
def dequeueAndRun() {
def job = dequeue()
if (job) {
- log.info "Running job for ${job.url}"
+ // submit the job for background execution
+ def jobId = job.id
+ def url = job.url
+ taskExecutor.submit {
+ persistenceInterceptor.init()
+ try {
+ // set job to RUNNING state
+ HookJob.withNewSession {
+ def j = HookJob.get(jobId)
+ j.status = HookJob.HookJobStatus.RUNNING
+ j.save(failOnError: true, flush: true)
+ log.info "Running job for ${j.url}"
+ }
+
+ def status
+ def error
+ def result
+ try {
+ def hookJobProgressMonitor = new HookJobProgressMonitor(jobId)
+ result = gitService.cloneOrUpdate(url, hookJobProgressMonitor)
+ status = HookJob.HookJobStatus.COMPLETED
+ } catch (HookJobException e) {
+ status = HookJob.HookJobStatus.ERROR
+ error = e.message
+ }
- // run the job
- def status
- def error
- def result
- try {
- def hookJobProgressMonitor = new HookJobProgressMonitor(job.id)
- result = gitService.cloneOrUpdate(job.url, hookJobProgressMonitor)
- status = HookJob.HookJobStatus.COMPLETED
- } catch (HookJobException e) {
- status = HookJob.HookJobStatus.ERROR
- error = e.message
- }
-
- // save the job final state
- HookJob.withNewSession {
- def j = HookJob.get(job.id)
- j.status = status
- j.error = error
- j.result = result
- j.save(failOnError: true, flush: true)
+ // save the job final state
+ HookJob.withNewSession {
+ def j = HookJob.get(jobId)
+ j.status = status
+ j.error = error
+ j.result = result
+ j.save(failOnError: true, flush: true)
+ }
+
+ } finally {
+ persistenceInterceptor.flush()
+ persistenceInterceptor.destroy()
+ }
}
}
}
View
3  grails-app/views/queue/index.gsp
@@ -21,6 +21,9 @@
<td class="date" nowrap><g:formatDate date="${job.dateCreated}" type="datetime" /></td>
<td class="url" width="100%">${job.url}</td>
<td class="status">
+ <div class="QUEUED status" style="display: none;">
+ <span class="label"><g:message code="hookJob.status.QUEUED" /></span>
+ </div>
<div class="WAITING status" style="display: none;">
<span class="label"><g:message code="hookJob.status.WAITING" /></span>
</div>
View
2  web-app/js/queue.js
@@ -21,7 +21,7 @@
$('div.' + job.status, td).show();
// cancel button
- if (job.status == 'RUNNING' || job.status == 'WAITING') {
+ if (job.status == 'RUNNING' || job.status == 'QUEUED') {
// show the cancel button
$('td.action a', tr).show();
} else {
Please sign in to comment.
Something went wrong with that request. Please try again.