Skip to content

Commit

Permalink
retry logstoragerequest load from db fixes #3089
Browse files Browse the repository at this point in the history
  • Loading branch information
gschueler committed Mar 22, 2018
1 parent cb79ef8 commit d5576a9
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 22 deletions.
Expand Up @@ -248,6 +248,7 @@ class LogFileStorageService implements InitializingBean,ApplicationContextAware{

failedRequests.remove(requestId)
failures.remove(requestId)
long retryMax = 30000;

LogFileStorageRequest.withNewSession {
Execution execution = Execution.get(execId)
Expand All @@ -262,8 +263,11 @@ class LogFileStorageService implements InitializingBean,ApplicationContextAware{
if (!success && failuremap && failuremap.size() > 1 || !failuremap[filetype]) {
def ftype = failuremap.keySet().findAll { it != null && it != 'null' }.join(',')

LogFileStorageRequest request = LogFileStorageRequest.get(requestId)
if (request.filetype != ftype || request.completed != success) {
LogFileStorageRequest request = retryLoad(requestId, retryMax)
if (!request) {
log.error("Storage request [ID#${task.id}]: Error updating: not found for id $requestId")
success = false
} else if (request.filetype != ftype || request.completed != success) {
int retryC = 5
boolean saveDone = false
Exception saveError
Expand All @@ -283,7 +287,7 @@ class LogFileStorageService implements InitializingBean,ApplicationContextAware{
retryC--
}
if (!saveDone) {
log.error("Error updating LogFileStorageRequest: $saveError", saveError)
log.error("Storage request [ID#${task.id}]: Error updating: $saveError", saveError)
}
}

Expand All @@ -305,16 +309,15 @@ class LogFileStorageService implements InitializingBean,ApplicationContextAware{
//if policy, remove the request from db
executorService.execute {
//use executorService to run within hibernate session
LogFileStorageRequest request = LogFileStorageRequest.get(requestId)
while(!request){
Thread.sleep(500)
request = LogFileStorageRequest.get(requestId)
LogFileStorageRequest request = retryLoad(requestId, retryMax)
if (!request) {
log.error("Storage request [ID#${task.id}]: Error deleting: not found for id $requestId")
} else {
request.delete(flush: true)
log.debug("Storage request [ID#${task.id}] cancelled.")
}
request.delete(flush:true)

log.debug("Storage request [ID#${task.id}] cancelled.")
failedRequests.add(requestId)
}
failedRequests.add(requestId)
}else{
log.error("Storage request [ID#${task.id}] FAILED ${retry} attempts, giving up")

Expand All @@ -326,23 +329,35 @@ class LogFileStorageService implements InitializingBean,ApplicationContextAware{
//use executorService to run within hibernate session
executorService.execute {
log.debug("executorService saving storage request status...")
LogFileStorageRequest request = LogFileStorageRequest.get(requestId)
while(!request){
Thread.sleep(500)
request = LogFileStorageRequest.get(requestId)
if(request){
log.debug("Loaded LogFileStorageRequest ${requestId} [ID#${task.id}] after retry")
}
}
request.completed = success
request.save(flush: true)
LogFileStorageRequest request = retryLoad(requestId, retryMax)
if (!request) {
log.error("Storage request [ID#${task.id}]: Error saving: not found for id $requestId")
} else if (request) {
log.debug("Loaded LogFileStorageRequest ${requestId} [ID#${task.id}] after retry")

request.completed = success
request.save(flush: true)

log.debug("Storage request [ID#${task.id}] complete.")
log.debug("Storage request [ID#${task.id}] complete.")
}
getStorageSuccessCounter()?.inc()
}
}
}

LogFileStorageRequest retryLoad(long requestId, long retryMaxMs) {
long start = System.currentTimeMillis()
LogFileStorageRequest request = LogFileStorageRequest.get(requestId)
while (!request) {
Thread.sleep(500)
request = LogFileStorageRequest.get(requestId)
if ((System.currentTimeMillis() - start) > retryMaxMs) {
break;
}
}
request
}

/**
* Run retrieval request task with no retries, executes in the logFileTaskExecutor threadpool
* @param task
Expand Down
Expand Up @@ -516,6 +516,16 @@ class LogFileStorageServiceTests {
assertEquals(1,task.count)
assertFalse(svc.executorService.executeCalled)
assertEquals(0,svc.getCurrentRequests().size())

def request
LogFileStorageRequest.withSession { session ->
session.flush()
request=LogFileStorageRequest.get(task.requestId)
request.refresh()
}

assertEquals(false, request.completed)
assertEquals('rdlog', request.filetype)
}
/**
* failure of one filetype should set request filetype to the failed type(s)
Expand Down Expand Up @@ -683,6 +693,16 @@ class LogFileStorageServiceTests {
assertEquals(1,task.count)
assertTrue(svc.executorService.executeCalled)
assertTrue(!svc.failedRequestIds.contains(task.requestId))

LogFileStorageRequest request
LogFileStorageRequest.withSession { session ->
session.flush()
request=LogFileStorageRequest.get(task.requestId)
request.refresh()
}

assertNotNull(request)
assertTrue(request.completed)
}
void testRunStorageRequestFailure(){
grailsApplication.config.clear()
Expand Down Expand Up @@ -735,6 +755,15 @@ class LogFileStorageServiceTests {
assertEquals(1,task.count)
assertTrue(svc.executorService.executeCalled)

def request
LogFileStorageRequest.withSession { session ->
session.flush()
request=LogFileStorageRequest.get(task.requestId)
request?.refresh()
}

assertNull(request)

}
void testRunStorageRequestFailureWithRetry(){
grailsApplication.config.clear()
Expand Down Expand Up @@ -807,6 +836,7 @@ class LogFileStorageServiceTests {
emock.execute={Closure cls->
emock.executeCalled=true
assertNotNull(cls)
cls.call()
}
service.frameworkService = fmock.createMock()
service.pluginService = pmock.createMock()
Expand Down

0 comments on commit d5576a9

Please sign in to comment.