Skip to content
This repository has been archived by the owner on Jan 18, 2022. It is now read-only.

Commit

Permalink
Extract the AsynchronousMailProcessService
Browse files Browse the repository at this point in the history
  • Loading branch information
Vitalii Samolovskikh committed Sep 28, 2013
1 parent e2b85bf commit 992c06a
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@ class AsynchronousMailJob {
def group = "AsynchronousMail"

// Dependency injection
def asynchronousMailPersistenceService
AsynchronousMailProcessService asynchronousMailProcessService

def execute() {
log.trace('Enter to execute method.')
def startDate = System.currentTimeMillis()

def startDate = new Date().time
asynchronousMailPersistenceService.findAndSendEmails()
def endDate = new Date().time
log.debug("Execution time = ${endDate - startDate}")
asynchronousMailProcessService.findAndSendEmails()

log.trace("Exit from execute method.");
def endDate = System.currentTimeMillis()
log.trace("Exit from execute method. Execution time = ${endDate - startDate}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,11 @@ class ExpiredMessagesCollectorJob {
def concurrent = false
def group = "AsynchronousMail"

AsynchronousMailPersistenceService asynchronousMailPersistenceService

def execute() {
log.trace('Enter to execute method.')
int count = 0
AsynchronousMailMessage.withTransaction {
count = AsynchronousMailMessage.executeUpdate(
"update AsynchronousMailMessage amm set amm.status=:es where amm.endDate<:date and (amm.status=:cs or amm.status=:as)",
["es": MessageStatus.EXPIRED, "date": new Date(), "cs": MessageStatus.CREATED, "as": MessageStatus.ATTEMPTED]
)
}
log.trace("${count} expired messages was updated.")
asynchronousMailPersistenceService.updateExpiredMessages()
log.trace('Exit from execute method.')
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package grails.plugin.asyncmail

import org.springframework.mail.*
import groovyx.gpars.GParsPool

class AsynchronousMailPersistenceService {
def grailsApplication
def asynchronousMailSendService

private AsynchronousMailMessage save(AsynchronousMailMessage message, boolean flush = false) {
return message.save(flush: flush)
Expand All @@ -15,6 +12,10 @@ class AsynchronousMailPersistenceService {
message.delete()
}

AsynchronousMailMessage getMessage(long id){
return AsynchronousMailMessage.get(id)
}

List<Long> selectMessagesIdsForSend(){
return AsynchronousMailMessage.withCriteria {
Date now = new Date()
Expand All @@ -35,73 +36,14 @@ class AsynchronousMailPersistenceService {
}
}

public void findAndSendEmails() {
// Get messages from DB
def messagesIds = this.selectMessagesIdsForSend()

Integer gparsPoolSize = grailsApplication.config.asynchronous.mail.gparsPoolSize

// Send each message and save new status
try {
GParsPool.withPool(gparsPoolSize) {
messagesIds.eachParallel {Long messageId ->
AsynchronousMailMessage.withNewSession { session ->
this.processEmailMessage(messageId)
}
}
}
} catch (Exception e) {
log.error('Abort mail sent.', e)
}
}

private void processEmailMessage(Long messageId) {
boolean useFlushOnSave = grailsApplication.config.asynchronous.mail.useFlushOnSave

def message = AsynchronousMailMessage.get(messageId)
log.trace("Found a message: " + message.toString())

Date now = new Date()
Date attemptDate = new Date(now.getTime() - message.attemptInterval)
if (
message.status == MessageStatus.CREATED
|| (message.status == MessageStatus.ATTEMPTED && message.lastAttemptDate.before(attemptDate))
) {
message.lastAttemptDate = now
message.attemptsCount++

// It guarantee that e-mail can't be sent more than 1 time
message.status = MessageStatus.ERROR
this.save(message, useFlushOnSave)

// Attempt to send
try {
log.trace("Attempt to send the message with id=${message.id}.")
asynchronousMailSendService.send(message)
message.sentDate = now
message.status = MessageStatus.SENT
log.trace("The message with id=${message.id} was sent successfully.")
} catch (MailException e) {
log.warn("Attempt to send the message with id=${message.id} was failed.", e)
if (message.attemptsCount < message.maxAttemptsCount &&
!(e instanceof MailParseException || e instanceof MailPreparationException)
) {
message.status = MessageStatus.ATTEMPTED
}

if (e instanceof MailAuthenticationException) {
throw e
}
} finally {
this.save(message, useFlushOnSave)
}

// Delete message if it is sent successfully and can be deleted
if (message.status == MessageStatus.SENT && message.markDelete) {
long id = message.id
this.delete(message);
log.trace("The message with id=${id} was deleted.")
}
void updateExpiredMessages(){
int count = 0
AsynchronousMailMessage.withTransaction {
count = AsynchronousMailMessage.executeUpdate(
"update AsynchronousMailMessage amm set amm.status=:es where amm.endDate<:date and (amm.status=:cs or amm.status=:as)",
["es": MessageStatus.EXPIRED, "date": new Date(), "cs": MessageStatus.CREATED, "as": MessageStatus.ATTEMPTED]
)
}
log.trace("${count} expired messages was updated.")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package grails.plugin.asyncmail

import org.springframework.mail.*
import groovyx.gpars.GParsPool

class AsynchronousMailProcessService {
def grailsApplication

def asynchronousMailPersistenceService
def asynchronousMailSendService

public void findAndSendEmails() {
// Get messages from DB
def messagesIds = asynchronousMailPersistenceService.selectMessagesIdsForSend()

Integer gparsPoolSize = grailsApplication.config.asynchronous.mail.gparsPoolSize

// Send each message and save new status
try {
GParsPool.withPool(gparsPoolSize) {
messagesIds.eachParallel {Long messageId ->
AsynchronousMailMessage.withNewSession { session ->
processEmailMessage(messageId)
}
}
}
} catch (Exception e) {
log.error('Abort mail sent.', e)
}
}

void processEmailMessage(long messageId) {
boolean useFlushOnSave = grailsApplication.config.asynchronous.mail.useFlushOnSave

def message = asynchronousMailPersistenceService.getMessage(messageId)
log.trace("Found a message: " + message.toString())

Date now = new Date()
Date attemptDate = new Date(now.getTime() - message.attemptInterval)
if (
message.status == MessageStatus.CREATED
|| (message.status == MessageStatus.ATTEMPTED && message.lastAttemptDate.before(attemptDate))
) {
message.lastAttemptDate = now
message.attemptsCount++

// It guarantee that e-mail can't be sent more than 1 time
message.status = MessageStatus.ERROR
asynchronousMailPersistenceService.save(message, useFlushOnSave)

// Attempt to send
try {
log.trace("Attempt to send the message with id=${message.id}.")
asynchronousMailSendService.send(message)
message.sentDate = now
message.status = MessageStatus.SENT
log.trace("The message with id=${message.id} was sent successfully.")
} catch (MailException e) {
log.warn("Attempt to send the message with id=${message.id} was failed.", e)
if (message.attemptsCount < message.maxAttemptsCount &&
!(e instanceof MailParseException || e instanceof MailPreparationException)
) {
message.status = MessageStatus.ATTEMPTED
}

if (e instanceof MailAuthenticationException) {
throw e
}
} finally {
asynchronousMailPersistenceService.save(message, useFlushOnSave)
}

// Delete message if it is sent successfully and can be deleted
if (message.status == MessageStatus.SENT && message.markDelete) {
long id = message.id
asynchronousMailPersistenceService.delete(message);
log.trace("The message with id=${id} was deleted.")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,21 @@ class AsynchronousMailPersistenceServiceTests extends GroovyTestCase {
Assert.assertNotNull(asynchronousMailPersistenceService.save(message, true))
asynchronousMailPersistenceService.delete(message)
}

void testUpdateExpiredMessages(){
def message = new AsynchronousMailMessage(
from: 'John Smith <john@example.com>',
to: ['Mary Smith <mary@example.com>'],
subject: 'Subject',
text: 'Text',
beginDate: new Date(System.currentTimeMillis()-2),
endDate: new Date(System.currentTimeMillis()-1)
)

Assert.assertNotNull(asynchronousMailPersistenceService.save(message, true))
Assert.assertEquals(0, asynchronousMailPersistenceService.selectMessagesIdsForSend()?.size())
asynchronousMailPersistenceService.updateExpiredMessages()
message.refresh()
assert message.getStatus() == MessageStatus.EXPIRED
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package grails.plugin.asyncmail

import grails.test.mixin.*
import org.junit.*

/**
* See the API for {@link grails.test.mixin.services.ServiceUnitTestMixin} for usage instructions
*/
@TestFor(AsynchronousMailProcessService)
class AsynchronousMailProcessServiceTests {
AsynchronousMailProcessService asynchronousMailProcessService
def asynchronousMailPersistenceService

@Before
void init(){
asynchronousMailProcessService = new AsynchronousMailProcessService()
asynchronousMailPersistenceService = new AsynchronousMailPersistenceServiceMock()
asynchronousMailProcessService.asynchronousMailPersistenceService = asynchronousMailPersistenceService
asynchronousMailProcessService.asynchronousMailSendService = new AsynchronousMailSendServiceMock()

grailsApplication.config.asynchronous.mail.useFlushOnSave = true
asynchronousMailProcessService.grailsApplication = grailsApplication
}

void testProcessEmailMessage() {
def message = new AsynchronousMailMessage(
from: 'John Smith <john@example.com>',
to: ['Mary Smith <mary@example.com>'],
subject: 'Subject',
text: 'Text'
)

asynchronousMailPersistenceService.save(message)
asynchronousMailProcessService.processEmailMessage(1)
assert message.status == MessageStatus.SENT
assert message.sentDate !=null
}
}

class AsynchronousMailPersistenceServiceMock{
def message

void save(message, boolean flush = true){
this.message = message
}

def getMessage(id){
return message
}
}

class AsynchronousMailSendServiceMock{
void send(message){
// Nothing!
}
}

0 comments on commit 992c06a

Please sign in to comment.