Skip to content

Commit

Permalink
Stop migration pipeline on failure.
Browse files Browse the repository at this point in the history
  • Loading branch information
Christopher Lorke committed Oct 20, 2018
1 parent ea34350 commit d1ff41d
Show file tree
Hide file tree
Showing 21 changed files with 288 additions and 21 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ Namespace: Triadev\EsMigration\Business\Events
| MigrationStepDone | Migration step is done |

## Roadmap
- stop migration
- restart migration
- restart migration step
- create/delete templates
- shrink index
- split index
Expand Down
6 changes: 4 additions & 2 deletions src/Business/Repository/ElasticsearchClients.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ public function __construct()
* @param string $scheme
* @param null|string $user
* @param null|string $password
* @param int $retries
*/
public function add(
string $esClientKey,
string $host,
int $port,
string $scheme,
?string $user = null,
?string $password = null
?string $password = null,
int $retries = 1
) {
$this->elasticsearchClients[$esClientKey] = $this->clientBuilder->setHosts([
[
Expand All @@ -48,7 +50,7 @@ public function add(
'user' => $user,
'pass' => $password
]
])->build();
])->setRetries($retries)->build();
}

/**
Expand Down
11 changes: 9 additions & 2 deletions src/Business/Repository/ElasticsearchMigrationStep.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ public function create(
int $migrationId,
string $type,
array $params = [],
int $priority = 1
int $priority = 1,
bool $stopOnFailure = true
): \Triadev\EsMigration\Models\Entity\ElasticsearchMigrationStep {
$dbMigration = new \Triadev\EsMigration\Models\Entity\ElasticsearchMigrationStep();

Expand All @@ -26,6 +27,7 @@ public function create(
$dbMigration->status = MigrationStatus::MIGRATION_STATUS_WAIT;
$dbMigration->params = json_encode($params);
$dbMigration->priority = $priority;
$dbMigration->stop_on_failure = $stopOnFailure;

$dbMigration->saveOrFail();

Expand All @@ -39,7 +41,8 @@ public function update(
int $migrationStepId,
int $status,
?string $error = null,
?int $priority = null
?int $priority = null,
?bool $stopOnFailure = null
): \Triadev\EsMigration\Models\Entity\ElasticsearchMigrationStep {
$entity = $this->find($migrationStepId);
if (!$entity) {
Expand All @@ -55,6 +58,10 @@ public function update(
$entity->priority = $priority;
}

if (is_bool($stopOnFailure)) {
$entity->stop_on_failure = $stopOnFailure;
}

$entity->saveOrFail();

$this->dispatchStatus($entity);
Expand Down
2 changes: 2 additions & 0 deletions src/Business/Service/MigrationSteps.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public function getMigrationSteps(string $migration, bool $withoutDoneSteps = tr
$migrationStepEntity->status,
$migrationStepEntity->error,
json_decode($migrationStepEntity->params, true),
$migrationStepEntity->priority,
$migrationStepEntity->stop_on_failure,
new \DateTime($migrationStepEntity->created_at),
new \DateTime($migrationStepEntity->updated_at)
);
Expand Down
8 changes: 7 additions & 1 deletion src/Contract/ElasticsearchMigrationContract.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ public function createMigration(string $migration) : bool;
* @param string $type
* @param array $params
* @param int $priority
* @param bool $stopOnFailure
* @return bool
*/
public function addMigrationStep(
string $migration,
string $type,
array $params = [],
int $priority = 1
int $priority = 1,
bool $stopOnFailure = true
) : bool;

/**
Expand All @@ -37,10 +39,14 @@ public function addMigrationStep(
* @return array [
* 'migration' => STRING,
* 'status' => STRING,
* 'error' => STRING|NULL,
* 'steps' => [
* 'type' => STRING,
* 'status' => INTEGER,
* 'error' => STRING|NULL,
* 'params' => ARRAY,
* 'priority' => INTEGER,
* 'stop_on_failure' => BOOLEAN,
* 'created_at' => DATETIME,
* 'updated_at' => DATETIME
* ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ interface ElasticsearchMigrationStepContract
* @param string $type
* @param array $params
* @param int $priority
* @param bool $stopOnFailure
* @return ElasticsearchMigrationStep
*
* @throws \Throwable
Expand All @@ -21,7 +22,8 @@ public function create(
int $migrationId,
string $type,
array $params = [],
int $priority = 1
int $priority = 1,
bool $stopOnFailure = true
) : ElasticsearchMigrationStep;

/**
Expand All @@ -31,6 +33,7 @@ public function create(
* @param int $status
* @param string|null $error
* @param int|null $priority
* @param bool|null $stopOnFailure
* @return ElasticsearchMigrationStep
*
* @throws MigrationsNotExist
Expand All @@ -40,7 +43,8 @@ public function update(
int $migrationStepId,
int $status,
?string $error = null,
?int $priority = null
?int $priority = null,
?bool $stopOnFailure = null
) : ElasticsearchMigrationStep;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ public function up()
Schema::create('triadev_elasticsearch_migration', function (Blueprint $table) {
$table->bigIncrements('id');
$table->string('migration');
$table->integer('status');
$table->integer('status')->default(
\Triadev\EsMigration\Business\Mapper\MigrationStatus::MIGRATION_STATUS_WAIT
);
$table->text('error')->nullable();
$table->timestamps();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ public function up()
$table->bigIncrements('id');
$table->bigInteger('migration_id');
$table->string('type');
$table->integer('status');
$table->integer('status')->default(
\Triadev\EsMigration\Business\Mapper\MigrationStatus::MIGRATION_STATUS_WAIT
);
$table->text('error')->nullable();
$table->text('params');
$table->integer('priority');
$table->integer('priority')->default(1);
$table->boolean('stop_on_failure')->default(true);
$table->timestamps();

$table->foreign('migration_id')->references('id')->on('triadev_elasticsearch_migration');
Expand Down
42 changes: 31 additions & 11 deletions src/ElasticsearchMigration.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public function addMigrationStep(
string $migration,
string $type,
array $params = [],
int $priority = 1
int $priority = 1,
bool $stopOnFailure = true
) : bool {
if (!(new MigrationTypes())->isMigrationTypeValid($type)) {
return false;
Expand All @@ -67,7 +68,8 @@ public function addMigrationStep(
$migration->id,
$type,
$params,
$priority
$priority,
$stopOnFailure
);

return true;
Expand All @@ -87,22 +89,34 @@ public function getMigrationStatus(string $migration) : array
$migrationSteps = [];

$status = null;
$error = null;

if ($migrationEntity = $this->migrationRepository->find($migration)) {
$status = $migrationEntity->status;
$error = $migrationEntity->error;

foreach ($migrationEntity->migrationSteps()->cursor() as $migrationStep) {
/** @var ElasticsearchMigrationStep $migrationStep */
$migrationSteps[] = array_except($migrationStep->toArray(), [
$migrationStepData = array_except($migrationStep->toArray(), [
'id',
'migration_id'
]);

$migrationStepData['status'] = (int)$migrationStepData['status'];
$migrationStepData['params'] = json_decode($migrationStepData['params'], true);
$migrationStepData['priority'] = (int)$migrationStepData['priority'];
$migrationStepData['stop_on_failure'] = (bool)$migrationStepData['stop_on_failure'];
$migrationStepData['created_at'] = new \DateTime($migrationStepData['created_at']);
$migrationStepData['updated_at'] = new \DateTime($migrationStepData['updated_at']);

$migrationSteps[] = $migrationStepData;
}
}

return [
'migration' => $migration,
'status' => $status,
'status' => (int)$status,
'error' => $error,
'steps' => $migrationSteps
];
}
Expand All @@ -112,14 +126,14 @@ public function getMigrationStatus(string $migration) : array
*/
public function startMigration(string $migration, ElasticsearchClients $elasticsearchClients)
{
$this->checkIfMigrationAlreadyRunning($migration);
$this->checkIfMigrationAlreadyDone($migration);

try {
$migrationSteps = $this->migrationStepService->getMigrationSteps($migration, true);

if (!empty($migrationSteps)) {
$this->migrationRepository->createOrUpdate($migration, MigrationStatus::MIGRATION_STATUS_RUNNING);

foreach ($migrationSteps as $migrationStep) {
$this->startMigrationStep($migrationStep, $elasticsearchClients);
}
Expand All @@ -132,12 +146,10 @@ public function startMigration(string $migration, ElasticsearchClients $elastics
MigrationStatus::MIGRATION_STATUS_ERROR,
$e->getMessage()
);

throw $e;
}
}

private function checkIfMigrationAlreadyRunning(string $migration)
private function checkIfMigrationAlreadyDone(string $migration)
{
$migrationEntity = $this->migrationRepository->find($migration);

Expand All @@ -162,7 +174,15 @@ private function startMigrationStep(

$this->migrationStepRepository->update($migrationStep->getId(), MigrationStatus::MIGRATION_STATUS_DONE);
} catch (\Exception $e) {
$this->migrationStepRepository->update($migrationStep->getId(), MigrationStatus::MIGRATION_STATUS_ERROR);
$this->migrationStepRepository->update(
$migrationStep->getId(),
MigrationStatus::MIGRATION_STATUS_ERROR,
$e->getMessage()
);

if ($migrationStep->isStopOnFailure()) {
throw $e;
}
}
}
}
1 change: 1 addition & 0 deletions src/Models/Entity/ElasticsearchMigrationStep.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* @property string|null $error
* @property string $params
* @property integer $priority
* @property bool $stop_on_failure
* @property string $created_at
* @property string $updated_at
*/
Expand Down
28 changes: 28 additions & 0 deletions src/Models/MigrationStep.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ class MigrationStep
/** @var array */
private $params;

/** @var int */
private $priority;

/** @var bool */
private $stopOnFailure;

/** @var \DateTime */
private $createdAt;

Expand All @@ -31,6 +37,8 @@ class MigrationStep
* @param int $status
* @param null|string $error
* @param array $params
* @param int $priority
* @param bool $stopOnFailure
* @param \DateTime $createdAt
* @param \DateTime $updatedAt
*/
Expand All @@ -40,6 +48,8 @@ public function __construct(
int $status,
?string $error,
array $params,
int $priority,
bool $stopOnFailure,
\DateTime $createdAt,
\DateTime $updatedAt
) {
Expand All @@ -48,6 +58,8 @@ public function __construct(
$this->status = $status;
$this->error = $error;
$this->params = $params;
$this->priority = $priority;
$this->stopOnFailure = $stopOnFailure;
$this->createdAt = $createdAt;
$this->updatedAt = $updatedAt;
}
Expand Down Expand Up @@ -107,4 +119,20 @@ public function getUpdatedAt(): \DateTime
{
return $this->updatedAt;
}

/**
* @return int
*/
public function getPriority(): int
{
return $this->priority;
}

/**
* @return bool
*/
public function isStopOnFailure(): bool
{
return $this->stopOnFailure;
}
}
6 changes: 6 additions & 0 deletions tests/integration/Business/Migration/CreateIndexTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public function it_throws_an_validation_exception()
MigrationStatus::MIGRATION_STATUS_WAIT,
null,
[],
1,
true,
new \DateTime(),
new \DateTime()
));
Expand All @@ -59,6 +61,8 @@ public function it_fails_if_index_already_exist()
MigrationStatus::MIGRATION_STATUS_WAIT,
null,
$this->getValidPayload(),
1,
true,
new \DateTime(),
new \DateTime()
));
Expand All @@ -81,6 +85,8 @@ public function it_runs_migration()
MigrationStatus::MIGRATION_STATUS_WAIT,
null,
$this->getValidPayload(),
1,
true,
new \DateTime(),
new \DateTime()
));
Expand Down
Loading

0 comments on commit d1ff41d

Please sign in to comment.