Skip to content

Commit

Permalink
Buffer payloads for PreLogin and Login7 responses, until the complete…
Browse files Browse the repository at this point in the history
… message has been received..
  • Loading branch information
pekim committed Oct 26, 2011
1 parent 4038c89 commit b30c938
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 35 deletions.
82 changes: 50 additions & 32 deletions lib/connection.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ Login7Payload = require('./login7-payload')
MessageIO = require('./message-io')
Socket = require('net').Socket

# s3.2.1
STATE =
INITIAL: 0,
SENT_PRELOGIN: 1,
SENT_LOGIN7: 2,
LOGGED_IN: 3,
SENT_CLIENT_REQUEST: 4,
SENT_ATTENTION: 5,
FINAL: 6


class Connection extends EventEmitter
Expand All @@ -17,13 +23,15 @@ class Connection extends EventEmitter
@options.port ||= 1433
@options.timeout ||= 10 * 1000

@state = STATE.INITIAL

@debug = new Debug(@options.debug)
@debug.on('debug', (message) =>
if !@closed
if @state != STATE.FINAL
@emit('debug', message)
)

@closed = false
@messagePayloadBuffer = new Buffer(0)

@connection = new Socket({})
@connection.setTimeout(options.timeout)
Expand All @@ -43,50 +51,54 @@ class Connection extends EventEmitter
@packetBuffer = new Buffer(0)

eventClose: (hadError) =>
console.log('close', hadError)
@emit('closed')
@debug.log("connection close, hadError:#{hadError}")

eventConnect: =>
console.log('connect')
@debug.log('connected')
@sendPreLoginPacket()

eventEnd: =>
console.log('end')
@debug.log('end')

eventError: (exception) =>
@debug.log(exception)
@emit('fatal', exception)
@fatalError(exception)
@connection.destroy()

eventTimeout: =>
@debug.log('timeout')
@emit('fatal', 'timeout')
@fatalError('timeout')
@connection.destroy()

eventPacket: (packet) =>
switch @state
when STATE.SENT_PRELOGIN
@processPreloginResponsePacket(packet)
@buildMessage(packet, @processPreloginResponse)
when STATE.SENT_LOGIN7
@processLogin7ResponsePacket(packet)

processPreloginResponsePacket: (packet) ->
if packet.isLast()
preloginPayload = new PreloginPayload(packet.data())
@debug.payload(preloginPayload.toString(' '))

@sendLogin7Packet()
else
# Naive, but reasonable, assumption not valid.
fatalError("Expected only a single packet response from PRELOGIN message")

processLogin7ResponsePacket: (packet) ->
if packet.isLast()
console.log('LOGIN7 response')
console.log(packet.data())
@activeRequest.callback(undefined, true)
else
# Naive, but reasonable, assumption not valid.
fatalError("Expected only a single packet response from LOGIN7 message")
@buildMessage(packet, @processLogin7Response)
else
@fatalError("Unexpected packet, type #{packet.type()}")

# Accumulates packet payloads into a buffer until all of the packets
# for a message have been received.
#
# Only used during some states, when we want to process the complete message.
# For other states the payloads are processed for each packet as they arrive.
buildMessage: (packet, payloadProcessFunction) ->
@messagePayloadBuffer = new Buffer(@messagePayloadBuffer.concat(packet.data()))

if (packet.isLast())
payloadProcessFunction.call(@)
@messagePayloadBuffer = new Buffer(0)

processPreloginResponse: ->
preloginPayload = new PreloginPayload(@messagePayloadBuffer)
@debug.payload(preloginPayload.toString(' '))

@sendLogin7Packet()

processLogin7Response: ->
#console.log("LOGIN7 response #{@messagePayloadBuffer}")
@activeRequest.callback(undefined, true)

startRequest: (requestName, callback) =>
@activeRequest =
Expand All @@ -100,6 +112,7 @@ class Connection extends EventEmitter
sendPreLoginPacket: ->
payload = new PreloginPayload()
@messageIo.sendMessage(TYPE.PRELOGIN, payload.data)
@debug.payload(payload.toString(' '))
@state = STATE.SENT_PRELOGIN

sendLogin7Packet: ->
Expand All @@ -114,7 +127,12 @@ class Connection extends EventEmitter
@state = STATE.SENT_LOGIN7

fatalError: (message) ->
# TODO close connection, and emit fatal error to client.
console.log("FATAL ERROR #{message}")
@debug.log("FATAL ERROR: #{message}")
@close()
@emit('fatal', message)

close: ->
@connection.end()
@state = STATE.FINAL

module.exports = Connection
6 changes: 3 additions & 3 deletions test/integration/connection-test.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ exports.connect = (test) ->
)

connection.on('debug', (message) ->
console.log(message);
console.log(message)
)

exports.badServer = (test) ->
Expand All @@ -28,7 +28,7 @@ exports.badServer = (test) ->
)

connection.on('debug', (message) ->
#console.log(message);
#console.log(message)
)

exports.badPort = (test) ->
Expand All @@ -42,7 +42,7 @@ exports.badPort = (test) ->
)

connection.on('debug', (message) ->
#console.log(message);
#console.log(message)
)


Expand Down

0 comments on commit b30c938

Please sign in to comment.