Skip to content

Commit

Permalink
Fix transaction support and add new Connection#transaction helper.
Browse files Browse the repository at this point in the history
This adds a new transaction helper that makes the use of transactions a
breeze. It uses a mix of transactions and transaction savepoints to
enable correctly nested transactions, while giving the user the ability
to only roll back the contents of specific transaction blocks.

This is a potential breakage for existing users that make use of
`Connection#beginTransaction`, `Connection#rollbackTransaction` or
`Connection#commitTransaction`, but these never worked correctly in the
first place.
  • Loading branch information
arthurschreiber committed Jan 27, 2015
1 parent 935d698 commit ae7dc1c
Show file tree
Hide file tree
Showing 3 changed files with 338 additions and 19 deletions.
107 changes: 89 additions & 18 deletions src/connection.coffee
Expand Up @@ -237,7 +237,7 @@ class Connection extends EventEmitter
@createDebug()
@createTokenStreamParser()

@transactions = []
@inTransaction = false
@transactionDescriptors = [new Buffer([0, 0, 0, 0, 0, 0, 0, 0])]

@transitionTo(@STATE.CONNECTING)
Expand Down Expand Up @@ -346,15 +346,30 @@ class Connection extends EventEmitter
@tokenStreamParser.on('packetSizeChange', (token) =>
@messageIo.packetSize(token.newValue)
)

# A new top-level transaction was started. This is not fired
# for nested transactions.
@tokenStreamParser.on('beginTransaction', (token) =>
@transactionDescriptors.push(token.newValue)
@inTransaction = true
)

# A top-level transaction was committed. This is not fired
# for nested transactions.
@tokenStreamParser.on('commitTransaction', (token) =>
@transactionDescriptors.pop()
@transactionDescriptors.length = 1
@inTransaction = false
)

# A top-level transaction was rolled back. This is not fired
# for nested transactions. This is also fired if a batch
# aborting error happened that caused a rollback.
@tokenStreamParser.on('rollbackTransaction', (token) =>
@transactionDescriptors.pop()
@transactionDescriptors.length = 1
# An outermost transaction was rolled back. Reset the transaction counter
@inTransaction = false
)

@tokenStreamParser.on('columnMetadata', (token) =>
if @request
if @config.options.useColumnNames
Expand Down Expand Up @@ -742,12 +757,9 @@ set xact_abort #{xact_abort}"""
@makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload(request, @currentTransactionDescriptor(), @config.options))

beginTransaction: (callback, name, isolationLevel) ->
name ||= ''
isolationLevel ||= @config.options.isolationLevel

transaction = new Transaction(name, isolationLevel)
@transactions.push(transaction)

transaction = new Transaction(name || '', isolationLevel)
if @config.options.tdsVersion < "7_2"
return @execSqlBatch new Request "SET TRANSACTION ISOLATION LEVEL #{transaction.isolationLevelToTSQL()};BEGIN TRAN #{transaction.name}", callback

Expand All @@ -757,30 +769,89 @@ set xact_abort #{xact_abort}"""

@makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.beginPayload(@currentTransactionDescriptor()))

commitTransaction: (callback) ->
if @transactions.length == 0
return callback RequestError('No transaction in progress', 'ENOTRNINPROG')

transaction = @transactions.pop()

commitTransaction: (callback, name) ->
transaction = new Transaction(name || '')
if @config.options.tdsVersion < "7_2"
return @execSqlBatch new Request "COMMIT TRAN #{transaction.name}", callback

request = new Request(undefined, callback)
@makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.commitPayload(@currentTransactionDescriptor()))

rollbackTransaction: (callback) ->
if @transactions.length == 0
return callback RequestError('No transaction in progress', 'ENOTRNINPROG')

transaction = @transactions.pop()
rollbackTransaction: (callback, name) ->
transaction = new Transaction(name || '')

if @config.options.tdsVersion < "7_2"
return @execSqlBatch new Request "ROLLBACK TRAN #{transaction.name}", callback

request = new Request(undefined, callback)
@makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.rollbackPayload(@currentTransactionDescriptor()))

saveTransaction: (callback, name) ->
transaction = new Transaction(name)

if @config.options.tdsVersion < "7_2"
return @execSqlBatch new Request "SAVE TRAN #{transaction.name}", callback

request = new Request(undefined, callback)
@makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.savePayload(@currentTransactionDescriptor()))

transaction: (cb, isolationLevel) ->
if typeof cb != 'function'
throw new TypeError('`cb` must be a function')

useSavepoint = @inTransaction
name = "_tedious_#{crypto.randomBytes(10).toString('hex')}"

txDone = (err, done) =>
args = []
args.push(arguments[i]) for i in [2..arguments.length]

if err
if @inTransaction
@rollbackTransaction((txErr) ->
args.unshift(txErr || err)
done.apply(null, args)
, name)
else
# We're no longer inside a transaction. This happens if the outermost transaction
# was rolled back, for one of the following reasons:
# * Connection#rollbackTransaction was called.
# * `ROLLBACK TRANSACTION` was executed.
# * the server rolled back the transaction, due to a batch aborting error.
#
# As the transaction was already rolled back, we only need to propagate
# the error through all callbacks.
process.nextTick ->
args.unshift(err)
done.apply(null, args)
else
if useSavepoint
process.nextTick ->
args.unshift(null)
done.apply(null, args)
else
@commitTransaction((txErr) ->
args.unshift(txErr)
done.apply(null, args)
, name)

if useSavepoint
@saveTransaction((err) =>
return cb(err) if err

if isolationLevel
@execSqlBatch new Request "SET transaction isolation level #{@getIsolationLevelText(isolationLevel)}", (err) ->
cb(err, txDone)
else
cb(null, txDone)
, name)
else
@beginTransaction((err) ->
return cb(err) if err

cb(null, txDone)
, name, isolationLevel)

makeRequest: (request, packetType, payload) ->
if @state != @STATE.LOGGED_IN
message = "Requests can only be made in the #{@STATE.LOGGED_IN.name} state, not the #{@state.name} state"
Expand Down
14 changes: 13 additions & 1 deletion src/transaction.coffee
Expand Up @@ -68,7 +68,19 @@ class Transaction
data: buffer.data
toString: =>
"Rollback Transaction: name=#{@name}"


savePayload: (txnDescriptor) ->
buffer = new WritableTrackingBuffer(100, 'ascii')
writeAllHeaders(buffer, txnDescriptor, @outstandingRequestCount)
buffer.writeUShort(OPERATION_TYPE.TM_SAVE_XACT)
buffer.writeUInt8(@name.length * 2)
buffer.writeString(@name, 'ucs2')

payload =
data: buffer.data
toString: =>
"Save Transaction: name=#{@name}"

isolationLevelToTSQL: ->
switch @isolationLevel
when ISOLATION_LEVEL.READ_UNCOMMITTED then return 'READ UNCOMMITTED'
Expand Down

0 comments on commit ae7dc1c

Please sign in to comment.