diff --git a/src/RepoFacade.ts b/src/RepoFacade.ts index 1bbaec14..489e6c60 100644 --- a/src/RepoFacade.ts +++ b/src/RepoFacade.ts @@ -5,4 +5,6 @@ export default interface RepoFacade { readonly updateProcessedMigration: (migration: ProcessedMigration) => Promise; readonly removeProcessedMigration: (key: string) => Promise; readonly clearMigrations: () => Promise; + readonly lockMigrations: () => Promise; + readonly unlockMigrations: () => Promise; } diff --git a/src/factory.test.ts b/src/factory.test.ts index 2430634c..43c893cb 100644 --- a/src/factory.test.ts +++ b/src/factory.test.ts @@ -4,8 +4,8 @@ sourceMapSupport.install(); import factoryTest from './factoryTest'; import ProcessedMigration from './utils/types/ProcessedMigration'; -// tslint:disable-next-line:no-let -let processedMigrations: ProcessedMigration[] = []; +let processedMigrations: ProcessedMigration[] = []; // tslint:disable-line:no-let +let hasLockedMigrations = false; // tslint:disable-line:no-let factoryTest({ clearMigrations: async () => { @@ -14,11 +14,20 @@ factoryTest({ getProcessedMigrations: async () => { return processedMigrations; }, + lockMigrations: async () => { + if (hasLockedMigrations) { + throw new Error(); + } + hasLockedMigrations = true; + }, removeProcessedMigration: async (key) => { processedMigrations = processedMigrations.filter((processedMigration) => { return processedMigration.key !== key; }); }, + unlockMigrations: async () => { + hasLockedMigrations = false; + }, updateProcessedMigration: async (migration) => { const unmatchedMigrations = processedMigrations.filter((processedMigration) => { return processedMigration.key !== migration.key; diff --git a/src/migrate/index.ts b/src/migrate/index.ts index b160676f..f6f502be 100644 --- a/src/migrate/index.ts +++ b/src/migrate/index.ts @@ -1,16 +1,19 @@ import { reduce } from 'bluebird'; import FacadeConfig from '../FacadeConfig'; import getUnprocessedKeys from '../utils/getUnprocessedKeys'; +import handleLocks from '../utils/handleLocks'; import migrateKey from '../utils/migrateKey'; import Signature from './Signature'; export default (config: FacadeConfig): Signature => { return async () => { - const unprocessedKeys = await getUnprocessedKeys(config); - const batchStart = new Date(); + await handleLocks(config, async () => { + const unprocessedKeys = await getUnprocessedKeys(config); + const batchStart = new Date(); - await Promise.resolve(reduce(unprocessedKeys, async (_result, key) => { - await migrateKey({ config, key, batchStart }); - }, Promise.resolve())); + await Promise.resolve(reduce(unprocessedKeys, async (_result, key) => { + await migrateKey({ config, key, batchStart }); + }, Promise.resolve())); + }); }; }; diff --git a/src/migrateByKey/index.ts b/src/migrateByKey/index.ts index 464f7a5b..e9ab5e95 100644 --- a/src/migrateByKey/index.ts +++ b/src/migrateByKey/index.ts @@ -1,16 +1,19 @@ import FacadeConfig from '../FacadeConfig'; import ProcessedMigrationError from '../utils/errors/ProcessedMigrationError'; +import handleLocks from '../utils/handleLocks'; import hasProcessedKey from '../utils/hasProcessedKey'; import migrateKey from '../utils/migrateKey'; import Signature from './Signature'; export default (config: FacadeConfig): Signature => { return async ({ key, force = false }) => { - const isProcessed = await hasProcessedKey(config, key); - if (isProcessed && !force) { - throw new ProcessedMigrationError(key); - } + await handleLocks(config, async () => { + const isProcessed = await hasProcessedKey(config, key); + if (isProcessed && !force) { + throw new ProcessedMigrationError(key); + } - await migrateKey({ config, key }); + await migrateKey({ config, key }); + }); }; }; diff --git a/src/rollback/index.ts b/src/rollback/index.ts index 50ac4427..3a066081 100644 --- a/src/rollback/index.ts +++ b/src/rollback/index.ts @@ -1,15 +1,18 @@ import { reduce } from 'bluebird'; import FacadeConfig from '../FacadeConfig'; import getLastBatchKeys from '../utils/getLastBatchKeys'; +import handleLocks from '../utils/handleLocks'; import rollbackKey from '../utils/rollbackKey'; import Signature from './Signature'; export default (config: FacadeConfig): Signature => { return async () => { - const lastBatchKeys = await getLastBatchKeys(config); + await handleLocks(config, async () => { + const lastBatchKeys = await getLastBatchKeys(config); - await Promise.resolve(reduce(lastBatchKeys, async (_result, key) => { - await rollbackKey({ config, key }); - }, Promise.resolve())); + await Promise.resolve(reduce(lastBatchKeys, async (_result, key) => { + await rollbackKey({ config, key }); + }, Promise.resolve())); + }); }; }; diff --git a/src/rollbackByKey/index.ts b/src/rollbackByKey/index.ts index f93b9092..5d312c8f 100644 --- a/src/rollbackByKey/index.ts +++ b/src/rollbackByKey/index.ts @@ -1,16 +1,19 @@ import FacadeConfig from '../FacadeConfig'; import UnprocessedMigrationError from '../utils/errors/UnprocessedMigrationError'; +import handleLocks from '../utils/handleLocks'; import hasProcessedKey from '../utils/hasProcessedKey'; import rollbackKey from '../utils/rollbackKey'; import Signature from './Signature'; export default (config: FacadeConfig): Signature => { return async ({ key, force = false }) => { - const isProcessed = await hasProcessedKey(config, key); - if (!isProcessed && !force) { - throw new UnprocessedMigrationError(key); - } + await handleLocks(config, async () => { + const isProcessed = await hasProcessedKey(config, key); + if (!isProcessed && !force) { + throw new UnprocessedMigrationError(key); + } - await rollbackKey({ config, key }); + await rollbackKey({ config, key }); + }); }; }; diff --git a/src/utils/handleLocks.ts b/src/utils/handleLocks.ts new file mode 100644 index 00000000..98041eac --- /dev/null +++ b/src/utils/handleLocks.ts @@ -0,0 +1,15 @@ +import FacadeConfig from '../FacadeConfig'; + +export default async (config: FacadeConfig, handler: () => Promise) => { + await config.repo.lockMigrations(); + config.log('Locked migrations'); + try { + await handler(); + config.log('Unlocked migrations after completion'); + await config.repo.unlockMigrations(); + } catch (err) { + config.log('Unlocked migrations after error'); + await config.repo.unlockMigrations(); + throw err; + } +};