Skip to content

Commit

Permalink
NEXT-6133 - Use multi insert queue for search keyword indexing. Added…
Browse files Browse the repository at this point in the history
… fallback to prepared statement in case of deadlock exceptions
  • Loading branch information
OliverSkroblin committed Apr 24, 2020
1 parent 6fb13a2 commit 9da6473
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 27 deletions.
Expand Up @@ -45,6 +45,8 @@ public function __construct(Connection $connection, PriceRounding $priceRounding

public function update(array $ids, Context $context): void
{
$ids = array_unique(array_filter($ids));

if (empty($ids)) {
return;
}
Expand Down
Expand Up @@ -3,9 +3,11 @@
namespace Shopware\Core\Content\Product\DataAbstractionLayer;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\DeadlockException;
use Shopware\Core\Defaults;
use Shopware\Core\Framework\Context;
use Shopware\Core\Framework\DataAbstractionLayer\Doctrine\FetchModeHelper;
use Shopware\Core\Framework\DataAbstractionLayer\Doctrine\MultiInsertQueryQueue;
use Shopware\Core\Framework\DataAbstractionLayer\Doctrine\RetryableQuery;
use Shopware\Core\Framework\Uuid\Uuid;

Expand Down Expand Up @@ -38,19 +40,11 @@ public function update(array $ids, Context $context): void
$this->connection->prepare('UPDATE product SET category_tree = :tree WHERE id = :id AND version_id = :version')
);

$insert = new RetryableQuery(
$this->connection->prepare('
INSERT IGNORE INTO product_category_tree
(`product_id`, `product_version_id`, `category_id`, `category_version_id`)
VALUES
(:product_id, :product_version_id, :category_id, :category_version_id)
')
);

$delete = new RetryableQuery(
$this->connection->prepare('DELETE FROM `product_category_tree` WHERE `product_id` = :id AND `product_version_id` = :version')
);

$inserts = [];
foreach ($categories as $productId => $mapping) {
$productId = Uuid::fromHexToBytes($productId);

Expand All @@ -72,14 +66,42 @@ public function update(array $ids, Context $context): void
}

foreach ($categoryIds as $id) {
$params = [
$inserts[] = [
'product_id' => $productId,
'product_version_id' => $versionId,
'category_id' => Uuid::fromHexToBytes($id),
'category_version_id' => $liveVersionId,
];
}
}

$this->insertTree($inserts);
}

$insert->execute($params);
private function insertTree(array $inserts): void
{
if (empty($inserts)) {
return;
}

try {
$queue = new MultiInsertQueryQueue($this->connection, 250);
foreach ($inserts as $insert) {
$queue->addInsert('product_category_tree', $insert);
}
$queue->execute();
} catch (DeadlockException $e) {
$query = new RetryableQuery(
$this->connection->prepare('
INSERT IGNORE INTO product_category_tree
(`product_id`, `product_version_id`, `category_id`, `category_version_id`)
VALUES
(:product_id, :product_version_id, :category_id, :category_version_id)
')
);

foreach ($inserts as $insert) {
$query->execute($insert);
}
}
}
Expand Down
Expand Up @@ -3,11 +3,15 @@
namespace Shopware\Core\Content\Product\DataAbstractionLayer;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\DeadlockException;
use Shopware\Core\Content\Product\Aggregate\ProductKeywordDictionary\ProductKeywordDictionaryDefinition;
use Shopware\Core\Content\Product\Aggregate\ProductSearchKeyword\ProductSearchKeywordDefinition;
use Shopware\Core\Content\Product\ProductEntity;
use Shopware\Core\Content\Product\SearchKeyword\ProductSearchKeywordAnalyzerInterface;
use Shopware\Core\Defaults;
use Shopware\Core\Framework\Api\Context\SystemSource;
use Shopware\Core\Framework\Context;
use Shopware\Core\Framework\DataAbstractionLayer\Doctrine\MultiInsertQueryQueue;
use Shopware\Core\Framework\DataAbstractionLayer\Doctrine\RetryableQuery;
use Shopware\Core\Framework\DataAbstractionLayer\EntityRepositoryInterface;
use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria;
Expand Down Expand Up @@ -83,27 +87,19 @@ private function updateLanguage(array $ids, Context $context): void

$now = (new \DateTime())->format(Defaults::STORAGE_DATE_TIME_FORMAT);

$keywordInsert = new RetryableQuery(
$this->connection->prepare('
INSERT IGNORE INTO `product_search_keyword` (`id`, `version_id`, `product_version_id`, `language_id`, `product_id`, `keyword`, `ranking`, `created_at`)
VALUES (:id, :version_id, :product_version_id, :language_id, :product_id, :keyword, :ranking, :created_at)
')
);

$dictionaryInsert = new RetryableQuery(
$this->connection->prepare('INSERT IGNORE INTO `product_keyword_dictionary` (`id`, `language_id`, `keyword`) VALUES (:id, :language_id, :keyword)')
);

$this->delete($ids, $context->getLanguageId(), $context->getVersionId());

$keywords = [];
$dictionary = [];

/** @var ProductEntity $product */
foreach ($products as $product) {
$analyzed = $this->analyzer->analyze($product, $context);

$productId = Uuid::fromHexToBytes($product->getId());

foreach ($analyzed as $keyword) {
$keywordInsert->execute([
$keywords[] = [
'id' => Uuid::randomBytes(),
'version_id' => $versionId,
'product_version_id' => $versionId,
Expand All @@ -112,15 +108,18 @@ private function updateLanguage(array $ids, Context $context): void
'keyword' => $keyword->getKeyword(),
'ranking' => $keyword->getRanking(),
'created_at' => $now,
]);

$dictionaryInsert->execute([
];
$dictionary[] = [
'id' => Uuid::randomBytes(),
'language_id' => $languageId,
'keyword' => $keyword->getKeyword(),
]);
];
}
}

$this->insertKeywords($keywords);

$this->insertDictionary($dictionary);
}

private function delete(array $ids, string $languageId, string $versionId): void
Expand All @@ -141,4 +140,51 @@ private function delete(array $ids, string $languageId, string $versionId): void
);
});
}

private function insertKeywords(array $keywords): void
{
$queue = new MultiInsertQueryQueue($this->connection, 50);
foreach ($keywords as $insert) {
$queue->addInsert(ProductSearchKeywordDefinition::ENTITY_NAME, $insert);
}

// try batch insert
try {
$queue->execute();
} catch (DeadlockException $e) {
// catch deadlock exception and retry with single insert
$query = new RetryableQuery(
$this->connection->prepare('
INSERT IGNORE INTO `product_search_keyword` (`id`, `version_id`, `product_version_id`, `language_id`, `product_id`, `keyword`, `ranking`, `created_at`)
VALUES (:id, :version_id, :product_version_id, :language_id, :product_id, :keyword, :ranking, :created_at)
')
);

foreach ($keywords as $keyword) {
$query->execute($keyword);
}
}
}

private function insertDictionary(array $dictionary): void
{
$queue = new MultiInsertQueryQueue($this->connection, 50, false, true);
foreach ($dictionary as $insert) {
$queue->addInsert(ProductKeywordDictionaryDefinition::ENTITY_NAME, $insert);
}

// try batch insert
try {
$queue->execute();
} catch (DeadlockException $e) {
// catch deadlock exception and retry with single insert
$query = new RetryableQuery(
$this->connection->prepare('INSERT IGNORE INTO `product_keyword_dictionary` (`id`, `language_id`, `keyword`) VALUES (:id, :language_id, :keyword)')
);

foreach ($dictionary as $insert) {
$query->execute($insert);
}
}
}
}

0 comments on commit 9da6473

Please sign in to comment.