Skip to content

Commit

Permalink
Add new mode to wrap each migration in transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
garbles committed Dec 6, 2018
1 parent 5edf324 commit 543031e
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 56 deletions.
19 changes: 18 additions & 1 deletion src/commands/MigrationRevertCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,26 @@ export class MigrationRevertCommand {
logging: ["query", "error", "schema"]
});
connection = await createConnection(connectionOptions);

const options = {
transaction: argv["t"] === "false" ? false : true
transactionMode: "all" as "all" | "none" | "each",
};

switch (argv["t"]) {
case "all":
options.transactionMode = "all";
break;
case "none":
case "false":
options.transactionMode = "none";
break;
case "each":
options.transactionMode = "each";
break;
default:
// noop
}

await connection.undoLastMigration(options);
await connection.close();

Expand Down
18 changes: 17 additions & 1 deletion src/commands/MigrationRunCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,24 @@ export class MigrationRunCommand {
connection = await createConnection(connectionOptions);

const options = {
transaction: argv["t"] === "false" ? false : true
transactionMode: "all" as "all" | "none" | "each",
};

switch (argv["t"]) {
case "all":
options.transactionMode = "all";
break;
case "none":
case "false":
options.transactionMode = "none";
break;
case "each":
options.transactionMode = "each";
break;
default:
// noop
}

await connection.runMigrations(options);
await connection.close();
// exit process if no errors
Expand Down
14 changes: 6 additions & 8 deletions src/connection/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,13 @@ export class Connection {
* Runs all pending migrations.
* Can be used only after connection to the database is established.
*/
async runMigrations(options?: { transaction?: boolean }): Promise<Migration[]> {
async runMigrations(options?: { transactionMode?: 'all' | 'none' | 'each' }): Promise<Migration[]> {
if (!this.isConnected)
throw new CannotExecuteNotConnectedError(this.name);

const migrationExecutor = new MigrationExecutor(this);
if (options && options.transaction === false) {
migrationExecutor.transaction = false;
}
migrationExecutor.transactionMode = (options && options.transactionMode) || 'all';

const successMigrations = await migrationExecutor.executePendingMigrations();
return successMigrations;
}
Expand All @@ -288,15 +287,14 @@ export class Connection {
* Reverts last executed migration.
* Can be used only after connection to the database is established.
*/
async undoLastMigration(options?: { transaction?: boolean }): Promise<void> {
async undoLastMigration(options?: { transactionMode?: 'all' | 'none' | 'each' }): Promise<void> {

if (!this.isConnected)
throw new CannotExecuteNotConnectedError(this.name);

const migrationExecutor = new MigrationExecutor(this);
if (options && options.transaction === false) {
migrationExecutor.transaction = false;
}
migrationExecutor.transactionMode = (options && options.transactionMode) || 'all';

await migrationExecutor.undoLastMigration();
}

Expand Down
170 changes: 124 additions & 46 deletions src/migration/MigrationExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ export class MigrationExecutor {
// -------------------------------------------------------------------------

/**
* Indicates if migrations must be executed in a transaction.
* Indicates how migrations should be run in transactions.
* all: all migrations are run in a single transaction
* none: all migrations are run without a transaction
* each: each migration is run in a separate transaction
*/
transaction: boolean = true;
transactionMode: 'all' | 'none' | 'each' = 'all';

// -------------------------------------------------------------------------
// Private Properties
Expand Down Expand Up @@ -66,9 +69,6 @@ export class MigrationExecutor {
// get all user's migrations in the source code
const allMigrations = this.getMigrations();

// variable to store all migrations we did successefuly
const successMigrations: Migration[] = [];

// find all migrations that needs to be executed
const pendingMigrations = allMigrations.filter(migration => {
// check if we already have executed migration
Expand Down Expand Up @@ -100,47 +100,14 @@ export class MigrationExecutor {
this.connection.logger.logSchemaBuild(`${lastTimeExecutedMigration.name} is the last executed migration. It was executed on ${new Date(lastTimeExecutedMigration.timestamp).toString()}.`);
this.connection.logger.logSchemaBuild(`${pendingMigrations.length} migrations are new migrations that needs to be executed.`);

// start transaction if its not started yet
let transactionStartedByUs = false;
if (this.transaction && !queryRunner.isTransactionActive) {
await queryRunner.startTransaction();
transactionStartedByUs = true;
switch (this.transactionMode) {
case 'each':
return this.runEachMigrationInSeparateTransaction(queryRunner, pendingMigrations);
case 'all':
return this.runAllMigrationsInSingleTransaction(queryRunner, pendingMigrations);
case 'none':
return this.runAllMigrationsWithoutTransaction(queryRunner, pendingMigrations);
}

// run all pending migrations in a sequence
try {
await PromiseUtils.runInSequence(pendingMigrations, migration => {
return migration.instance!.up(queryRunner)
.then(() => { // now when migration is executed we need to insert record about it into the database
return this.insertExecutedMigration(queryRunner, migration);
})
.then(() => { // informative log about migration success
successMigrations.push(migration);
this.connection.logger.logSchemaBuild(`Migration ${migration.name} has been executed successfully.`);
});
});

// commit transaction if we started it
if (transactionStartedByUs)
await queryRunner.commitTransaction();

} catch (err) { // rollback transaction if we started it
if (transactionStartedByUs) {
try { // we throw original error even if rollback thrown an error
await queryRunner.rollbackTransaction();
} catch (rollbackError) { }
}

throw err;

} finally {

// if query runner was created by us then release it
if (!this.queryRunner)
await queryRunner.release();
}
return successMigrations;

}

/**
Expand Down Expand Up @@ -182,7 +149,7 @@ export class MigrationExecutor {

// start transaction if its not started yet
let transactionStartedByUs = false;
if (this.transaction && !queryRunner.isTransactionActive) {
if ((this.transactionMode !== 'none') && !queryRunner.isTransactionActive) {
await queryRunner.startTransaction();
transactionStartedByUs = true;
}
Expand Down Expand Up @@ -343,4 +310,115 @@ export class MigrationExecutor {
.execute();
}

protected async runEachMigrationInSeparateTransaction(queryRunner: QueryRunner, pendingMigrations: Migration[]) {
// variable to store all migrations we did successefuly
const successMigrations: Migration[] = [];

if (queryRunner.isTransactionActive) {
throw new Error('Trying to run each migration in separate transaction, but already in one. Try changing this option and run migrations again.');
}

// run all pending migrations in a sequence
try {
await PromiseUtils.runInSequence(pendingMigrations, async migration => {
await queryRunner.startTransaction();
return migration.instance!.up(queryRunner)
.then(async () => { // now when migration is executed we need to insert record about it into the database
await this.insertExecutedMigration(queryRunner, migration);
await queryRunner.commitTransaction();
})
.then(() => { // informative log about migration success
successMigrations.push(migration);
this.connection.logger.logSchemaBuild(`Migration ${migration.name} has been executed successfully.`);
});
});

} catch (err) { // rollback transaction if we started it
try { // we throw original error even if rollback thrown an error
if (queryRunner.isTransactionActive) {
await queryRunner.rollbackTransaction();
}
} catch (rollbackError) { }

throw err;

} finally {
// if query runner was created by us then release it
if (!this.queryRunner)
await queryRunner.release();
}
return successMigrations;
}

protected async runAllMigrationsInSingleTransaction(queryRunner: QueryRunner, pendingMigrations: Migration[]) {
// variable to store all migrations we did successefuly
const successMigrations: Migration[] = [];

// start transaction if its not started yet
let transactionStartedByUs = false;
if (!queryRunner.isTransactionActive) {
await queryRunner.startTransaction();
transactionStartedByUs = true;
}

// run all pending migrations in a sequence
try {
await PromiseUtils.runInSequence(pendingMigrations, migration => {
return migration.instance!.up(queryRunner)
.then(() => { // now when migration is executed we need to insert record about it into the database
return this.insertExecutedMigration(queryRunner, migration);
})
.then(() => { // informative log about migration success
successMigrations.push(migration);
this.connection.logger.logSchemaBuild(`Migration ${migration.name} has been executed successfully.`);
});
});

// commit transaction if we started it
if (transactionStartedByUs)
await queryRunner.commitTransaction();

} catch (err) { // rollback transaction if we started it
if (transactionStartedByUs) {
try { // we throw original error even if rollback thrown an error
await queryRunner.rollbackTransaction();
} catch (rollbackError) { }
}

throw err;

} finally {

// if query runner was created by us then release it
if (!this.queryRunner)
await queryRunner.release();
}
return successMigrations;
}

protected async runAllMigrationsWithoutTransaction(queryRunner: QueryRunner, pendingMigrations: Migration[]) {
// variable to store all migrations we did successefuly
const successMigrations: Migration[] = [];

// run all pending migrations in a sequence
try {
await PromiseUtils.runInSequence(pendingMigrations, migration => {
return migration.instance!.up(queryRunner)
.then(() => { // now when migration is executed we need to insert record about it into the database
return this.insertExecutedMigration(queryRunner, migration);
})
.then(() => { // informative log about migration success
successMigrations.push(migration);
this.connection.logger.logSchemaBuild(`Migration ${migration.name} has been executed successfully.`);
});
});
} finally {

// if query runner was created by us then release it
if (!this.queryRunner)
await queryRunner.release();
}
return successMigrations;
}

}

0 comments on commit 543031e

Please sign in to comment.