Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge remote-tracking branch 'remotes/dev/master'
- Loading branch information
Showing
16 changed files
with
1,098 additions
and
92 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
101 changes: 101 additions & 0 deletions
101
src/Oro/Bundle/ContactBundle/Async/ContactPostImportProcessor.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
<?php | ||
|
||
namespace Oro\Bundle\ContactBundle\Async; | ||
|
||
use Oro\Bundle\ContactBundle\Handler\ContactEmailAddressHandler; | ||
use Oro\Bundle\EntityBundle\ORM\DatabaseExceptionHelper; | ||
use Oro\Component\MessageQueue\Consumption\MessageProcessorInterface; | ||
use Oro\Component\MessageQueue\Job\JobStorage; | ||
use Oro\Component\MessageQueue\Transport\MessageInterface; | ||
use Oro\Component\MessageQueue\Transport\SessionInterface; | ||
use Psr\Log\LoggerInterface; | ||
|
||
/** | ||
* Perform actions after contacts import finished. | ||
* Actualize EmailAddress records - add new emails and remove not existing | ||
*/ | ||
class ContactPostImportProcessor implements MessageProcessorInterface | ||
{ | ||
/** | ||
* @var ContactEmailAddressHandler | ||
*/ | ||
private $contactEmailAddressHandler; | ||
|
||
/** | ||
* @var DatabaseExceptionHelper | ||
*/ | ||
private $databaseExceptionHelper; | ||
|
||
/** | ||
* @var JobStorage | ||
*/ | ||
private $jobStorage; | ||
|
||
/** | ||
* @var LoggerInterface | ||
*/ | ||
private $logger; | ||
|
||
/** | ||
* @param ContactEmailAddressHandler $contactEmailAddressHandler | ||
* @param DatabaseExceptionHelper $databaseExceptionHelper | ||
* @param JobStorage $jobStorage | ||
* @param LoggerInterface $logger | ||
*/ | ||
public function __construct( | ||
ContactEmailAddressHandler $contactEmailAddressHandler, | ||
DatabaseExceptionHelper $databaseExceptionHelper, | ||
JobStorage $jobStorage, | ||
LoggerInterface $logger | ||
) { | ||
$this->contactEmailAddressHandler = $contactEmailAddressHandler; | ||
$this->databaseExceptionHelper = $databaseExceptionHelper; | ||
$this->jobStorage = $jobStorage; | ||
$this->logger = $logger; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function process(MessageInterface $message, SessionInterface $session) | ||
{ | ||
$messageBody = json_decode($message->getBody(), JSON_OBJECT_AS_ARRAY); | ||
|
||
// Skip non import jobs. For example import validate | ||
if (empty($messageBody['process']) || $messageBody['process'] !== 'import') { | ||
return self::REJECT; | ||
} | ||
|
||
// Skip non contact import jobs | ||
$rootImportJob = $this->jobStorage->findJobById($messageBody['rootImportJobId']); | ||
if ($rootImportJob) { | ||
$importJobData = explode(':', $rootImportJob->getName()); | ||
if (empty($importJobData[2]) || strpos($importJobData[2], 'oro_contact') === false) { | ||
return self::REJECT; | ||
} | ||
} else { | ||
return self::REJECT; | ||
} | ||
|
||
try { | ||
$this->contactEmailAddressHandler->actualizeContactEmailAssociations(); | ||
} catch (\Exception $e) { | ||
$driverException = $this->databaseExceptionHelper->getDriverException($e); | ||
|
||
if ($driverException && $this->databaseExceptionHelper->isDeadlock($driverException)) { | ||
$this->logger->error( | ||
'Deadlock occurred during actualization of contact emails', | ||
[ | ||
'exception' => $e | ||
] | ||
); | ||
|
||
return self::REQUEUE; | ||
} | ||
|
||
throw $e; | ||
} | ||
|
||
return self::ACK; | ||
} | ||
} |
61 changes: 61 additions & 0 deletions
61
src/Oro/Bundle/ContactBundle/EventListener/ImportEventListener.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
<?php | ||
|
||
namespace Oro\Bundle\ContactBundle\EventListener; | ||
|
||
use Akeneo\Bundle\BatchBundle\Entity\JobExecution; | ||
use Akeneo\Bundle\BatchBundle\Event\JobExecutionEvent; | ||
use Oro\Bundle\ContactBundle\Entity\Contact; | ||
use Oro\Bundle\PlatformBundle\Manager\OptionalListenerManager; | ||
|
||
/** | ||
* Disable Email EntityListener to prevent creation of EmailAddress entities per each contact during import | ||
* EmailAddress records will be later actualized by ContactPostImportProcessor | ||
*/ | ||
class ImportEventListener | ||
{ | ||
/** | ||
* @var OptionalListenerManager | ||
*/ | ||
private $listenerManager; | ||
|
||
/** | ||
* @param OptionalListenerManager $listenerManager | ||
*/ | ||
public function __construct(OptionalListenerManager $listenerManager) | ||
{ | ||
$this->listenerManager = $listenerManager; | ||
} | ||
|
||
/** | ||
* @param JobExecutionEvent $jobExecutionEvent | ||
*/ | ||
public function onBeforeJobExecution(JobExecutionEvent $jobExecutionEvent) | ||
{ | ||
if ($this->isSupportedJob($jobExecutionEvent->getJobExecution())) { | ||
return; | ||
} | ||
|
||
$this->listenerManager->disableListener('oro_email.listener.entity_listener'); | ||
} | ||
|
||
/** | ||
* @param JobExecutionEvent $jobExecutionEvent | ||
*/ | ||
public function onAfterJobExecution(JobExecutionEvent $jobExecutionEvent) | ||
{ | ||
if ($this->isSupportedJob($jobExecutionEvent->getJobExecution())) { | ||
return; | ||
} | ||
|
||
$this->listenerManager->enableListener('oro_email.listener.entity_listener'); | ||
} | ||
|
||
/** | ||
* @param JobExecution $jobExecution | ||
* @return bool | ||
*/ | ||
protected function isSupportedJob($jobExecution): bool | ||
{ | ||
return $jobExecution->getExecutionContext()->get('entityName') !== Contact::class; | ||
} | ||
} |
205 changes: 205 additions & 0 deletions
205
src/Oro/Bundle/ContactBundle/Handler/ContactEmailAddressHandler.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,205 @@ | ||
<?php | ||
|
||
namespace Oro\Bundle\ContactBundle\Handler; | ||
|
||
use Doctrine\Common\Persistence\ManagerRegistry; | ||
use Doctrine\Common\Persistence\ObjectManager; | ||
use Doctrine\ORM\EntityManager; | ||
use Doctrine\ORM\Query\Expr\Join; | ||
use Oro\Bundle\ContactBundle\Entity\Contact; | ||
use Oro\Bundle\ContactBundle\Entity\ContactEmail; | ||
use Oro\Bundle\EmailBundle\Entity\Email; | ||
use Oro\Bundle\EmailBundle\Entity\EmailRecipient; | ||
use Oro\Bundle\EmailBundle\Entity\Manager\EmailAddressManager; | ||
use Oro\Bundle\EntityBundle\ORM\InsertFromSelectQueryExecutor; | ||
use Oro\Component\DoctrineUtils\ORM\QueryBuilderUtil; | ||
|
||
/** | ||
* ContactEmailAddressHandler is responsible for actualization of EmailAddress records for ContactEmails. | ||
* First it removes records from EmailAddress for Contact emails that were removed for contact | ||
* Then it adds new Contact emails that are not present in EmailAddress | ||
*/ | ||
class ContactEmailAddressHandler | ||
{ | ||
/** | ||
* @var InsertFromSelectQueryExecutor | ||
*/ | ||
private $insertFromSelectQueryExecutor; | ||
|
||
/** | ||
* @var EmailAddressManager | ||
*/ | ||
private $emailAddressManager; | ||
|
||
/** | ||
* @var ManagerRegistry | ||
*/ | ||
private $registry; | ||
|
||
/** | ||
* @param InsertFromSelectQueryExecutor $insertFromSelectQueryExecutor | ||
* @param EmailAddressManager $emailAddressManager | ||
* @param ManagerRegistry $registry | ||
*/ | ||
public function __construct( | ||
InsertFromSelectQueryExecutor $insertFromSelectQueryExecutor, | ||
EmailAddressManager $emailAddressManager, | ||
ManagerRegistry $registry | ||
) { | ||
$this->insertFromSelectQueryExecutor = $insertFromSelectQueryExecutor; | ||
$this->emailAddressManager = $emailAddressManager; | ||
$this->registry = $registry; | ||
} | ||
|
||
/** | ||
* @return string | ||
*/ | ||
private function getEmailAddressClass() | ||
{ | ||
return $this->emailAddressManager->getEmailAddressProxyClass(); | ||
} | ||
|
||
/** | ||
* Actualize EmailAddress records based on ContactEmail entities | ||
*/ | ||
public function actualizeContactEmailAssociations() | ||
{ | ||
$this->deleteRemovedEmailAssociations(); | ||
$this->addContactEmailAssociations(); | ||
} | ||
|
||
/** | ||
* Add EmailAddress records based on ContactEmail entities | ||
*/ | ||
private function addContactEmailAssociations() | ||
{ | ||
$contactAssociationFieldName = $this->getContactAssociationFieldName(); | ||
if ($contactAssociationFieldName) { | ||
$qb = $this->getEntityManager()->createQueryBuilder(); | ||
$nowString = $this->getCurrentTimestampString(); | ||
|
||
$qb->from(ContactEmail::class, 'ce') | ||
->select( | ||
'ce.email', | ||
'MAX(IDENTITY(ce.owner))', | ||
(string)$qb->expr()->literal($nowString), | ||
(string)$qb->expr()->literal($nowString), | ||
'CASE WHEN ce.email IS NOT NULL THEN TRUE ELSE FALSE END' | ||
) | ||
->leftJoin($this->getEmailAddressClass(), 'ea', Join::WITH, $qb->expr()->eq('ce.email', 'ea.email')) | ||
->where($qb->expr()->isNull('ea.id')) | ||
->groupBy('ce.email'); | ||
|
||
$this->insertFromSelectQueryExecutor->execute( | ||
$this->getEmailAddressClass(), | ||
['email', $contactAssociationFieldName, 'created', 'updated', 'hasOwner'], | ||
$qb | ||
); | ||
} | ||
} | ||
|
||
/** | ||
* Delete EmailAddress records which contains emails that are not present in ContactEmail | ||
*/ | ||
private function deleteRemovedEmailAssociations() | ||
{ | ||
$idsToRemove = $this->getNonExistingEmailAssociationIds(); | ||
if ($idsToRemove) { | ||
$deleteQb = $this->getEntityManager() | ||
->createQueryBuilder() | ||
->delete($this->getEmailAddressClass(), 'ea'); | ||
$deleteQb->where($deleteQb->expr()->in('ea.id', ':deleteIds')) | ||
->setParameter('deleteIds', $idsToRemove) | ||
->getQuery() | ||
->execute(); | ||
} | ||
} | ||
|
||
/** | ||
* @return array | ||
*/ | ||
private function getNonExistingEmailAssociationIds(): array | ||
{ | ||
$contactAssociationFieldName = $this->getContactAssociationFieldName(); | ||
if ($contactAssociationFieldName) { | ||
// Skip email addresses that was already synced | ||
$emailDQL = $this->getEntityManager()->createQueryBuilder() | ||
->select('e.id') | ||
->from(Email::class, 'e') | ||
->where('e.fromEmailAddress = ea.id') | ||
->getDQL(); | ||
$emailRecipientDQL = $this->getEntityManager()->createQueryBuilder() | ||
->select('er.id') | ||
->from(EmailRecipient::class, 'er') | ||
->where('er.emailAddress = ea.id') | ||
->getDQL(); | ||
|
||
QueryBuilderUtil::checkIdentifier($contactAssociationFieldName); | ||
$toDeleteQb = $this->getEntityManager()->createQueryBuilder(); | ||
$toDeleteQb->select('ea.id') | ||
->from($this->getEmailAddressClass(), 'ea') | ||
->leftJoin( | ||
ContactEmail::class, | ||
'ce', | ||
Join::WITH, | ||
'ea.email = ce.email AND ce.owner = ea.' . $contactAssociationFieldName | ||
) | ||
->where($toDeleteQb->expr()->isNull('ce.id')) | ||
->andWhere($toDeleteQb->expr()->isNotNull('ea.' . $contactAssociationFieldName)) | ||
->andWhere( | ||
$toDeleteQb->expr()->not( | ||
$toDeleteQb->expr()->exists($emailDQL) | ||
) | ||
) | ||
->andWhere( | ||
$toDeleteQb->expr()->not( | ||
$toDeleteQb->expr()->exists($emailRecipientDQL) | ||
) | ||
); | ||
|
||
$result = $toDeleteQb->getQuery()->getScalarResult(); | ||
return array_map('current', $result); | ||
} | ||
|
||
return []; | ||
} | ||
|
||
/** | ||
* @return null|string | ||
*/ | ||
private function getContactAssociationFieldName() | ||
{ | ||
$em = $this->getEntityManager(); | ||
$metadata = $em->getClassMetadata($this->getEmailAddressClass()); | ||
$contactAssociations = $metadata->getAssociationsByTargetClass(Contact::class); | ||
$contactAssociation = reset($contactAssociations); | ||
|
||
if ($contactAssociation && !empty($contactAssociation['fieldName'])) { | ||
return $contactAssociation['fieldName']; | ||
} | ||
|
||
return null; | ||
} | ||
|
||
/** | ||
* @return EntityManager|ObjectManager | ||
*/ | ||
private function getEntityManager() | ||
{ | ||
return $this->registry->getManagerForClass(ContactEmail::class); | ||
} | ||
|
||
/** | ||
* @return string | ||
*/ | ||
private function getCurrentTimestampString(): string | ||
{ | ||
$dateFormat = $this->getEntityManager() | ||
->getConnection() | ||
->getDatabasePlatform() | ||
->getDateTimeFormatString(); | ||
$now = new \DateTime('now', new \DateTimeZone('UTC')); | ||
|
||
return $now->format($dateFormat); | ||
} | ||
} |
Oops, something went wrong.