Skip to content

Commit

Permalink
Collapse the data from multiple received packets (in one socket data …
Browse files Browse the repository at this point in the history
…event) in to one buffer.

This appears to provide a very slight performance improvement. Probably due to fewer
read-beyond-end-of-buffer exceptions being thrown.
  • Loading branch information
pekim committed Jul 8, 2012
1 parent e0608fa commit ed0d73e
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 54 deletions.
28 changes: 14 additions & 14 deletions src/connection.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class Connection extends EventEmitter
@transitionTo(@STATE.FINAL)
connectTimeout: ->
@transitionTo(@STATE.FINAL)
packet: (packet) ->
@addToMessageBuffer(packet)
data: (data) ->
@addToMessageBuffer(data)
message: ->
@processPreLoginResponse()
@sendLogin7Packet()
Expand All @@ -60,8 +60,8 @@ class Connection extends EventEmitter
@transitionTo(@STATE.FINAL)
connectTimeout: ->
@transitionTo(@STATE.FINAL)
packet: (packet) ->
@sendPacketToTokenStreamParser(packet)
data: (data) ->
@sendDataToTokenStreamParser(data)
loggedIn: ->
@transitionTo(@STATE.LOGGED_IN_SENDING_INITIAL_SQL)
loginFailed: ->
Expand All @@ -73,8 +73,8 @@ class Connection extends EventEmitter
enter: ->
@sendInitialSql()
events:
packet: (packet) ->
@sendPacketToTokenStreamParser(packet)
data: (data) ->
@sendDataToTokenStreamParser(data)
message: (error) ->
@transitionTo(@STATE.LOGGED_IN)
@processedInitialSql()
Expand All @@ -88,8 +88,8 @@ class Connection extends EventEmitter
events:
socketError: (error) ->
@transitionTo(@STATE.FINAL)
packet: (packet) ->
@sendPacketToTokenStreamParser(packet)
data: (data) ->
@sendDataToTokenStreamParser(data)
message: ->
@transitionTo(@STATE.LOGGED_IN)

Expand Down Expand Up @@ -236,8 +236,8 @@ class Connection extends EventEmitter
@socket.on('end', @socketClose)

@messageIo = new MessageIO(@socket, @config.options.packetSize, @debug)
@messageIo.on('packet', (packet) =>
@dispatchEvent('packet', packet)
@messageIo.on('data', (data) =>
@dispatchEvent('data', data)
)
@messageIo.on('message', =>
@dispatchEvent('message')
Expand Down Expand Up @@ -301,8 +301,8 @@ class Connection extends EventEmitter
emptyMessageBuffer: ->
@messageBuffer = new Buffer(0)

addToMessageBuffer: (packet) ->
@messageBuffer = @messageBuffer.concat(packet.data())
addToMessageBuffer: (data) ->
@messageBuffer = @messageBuffer.concat(data)

processPreLoginResponse: ->
preloginPayload = new PreloginPayload(@messageBuffer)
Expand All @@ -324,8 +324,8 @@ class Connection extends EventEmitter
payload.toString(' ')
)

sendPacketToTokenStreamParser: (packet) ->
@tokenStreamParser.addBuffer(packet.data())
sendDataToTokenStreamParser: (data) ->
@tokenStreamParser.addBuffer(data)

sendInitialSql: ->
payload = new SqlBatchPayload('set textsize ' + @config.options.textsize)
Expand Down
12 changes: 10 additions & 2 deletions src/message-io.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@ class MessageIO extends EventEmitter
else
@packetBuffer = data

packetsData = []
endOfMessage = false

while isPacketComplete(@packetBuffer)
length = packetLength(@packetBuffer)
packet = new Packet(@packetBuffer.slice(0, length))
@logPacket('Received', packet);

@emit('packet', packet)
packetsData.push(packet.data())
if (packet.isLast())
@emit('message')
endOfMessage = true

@packetBuffer = @packetBuffer.slice(length)

if packetsData.length > 0
@emit('data', Buffer.concat(packetsData))
if endOfMessage
@emit('message')

packetSize: (packetSize) ->
if arguments.length > 0
@debug.log("Packet size changed from #{@_packetSize} to #{packetSize}")
Expand Down
29 changes: 26 additions & 3 deletions test/performance/big-select.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,40 @@ async = require('async')
getConfig = ->
JSON.parse(fs.readFileSync(process.env.HOME + '/.tedious/test-connection.json', 'utf8')).config

exports.lotsOfRows = (test) ->
test.expect(2)
exports.smallRows = (test) ->
rows = 50000

createTableSql = 'create table #many_rows (id int, first_name varchar(20), last_name varchar(20))'
insertRowSql = """
insert into #many_rows (id, first_name, last_name) values(@count, 'MyFirstName', 'YourLastName')
"""

createInsertSelect(test, rows, createTableSql, insertRowSql)

exports.mediumRows = (test) ->
rows = 2000

medium = ''
for i in [1..8000]
medium += 'x'

createTableSql = 'create table #many_rows (id int, first_name varchar(20), last_name varchar(20), medium varchar(8000))'
insertRowSql = """
insert into #many_rows (id, first_name, last_name, medium) values(@count, 'MyFirstName', 'YourLastName', '#{medium}')
"""

createInsertSelect(test, rows, createTableSql, insertRowSql)

createInsertSelect = (test, rows, createTableSql, insertRowSql) ->
test.expect(2)

insertRowsSql = """
declare @count int
set @count = #{rows}
while @count > 0
begin
insert into #many_rows (id, first_name, last_name) values(@count, 'MyFirstName', 'YourLastName')
#{insertRowSql}
set @count = @count - 1
end
"""
Expand Down Expand Up @@ -50,6 +72,7 @@ exports.lotsOfRows = (test) ->
durationMillis = Date.now() - start
console.log "Took #{durationMillis / 1000}s"
console.log "#{rows / (durationMillis / 1000)} rows/sec"
console.log "#{(rows * insertRowSql.length) / (durationMillis / 1000)} bytes/sec"

callback(err)
)
Expand Down
56 changes: 21 additions & 35 deletions test/unit/message-io-test.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,14 @@ exports.sendOneLongerThanPacket = (test) ->
io.sendMessage(packetType, payload)

exports.receiveOnePacket = (test) ->
test.expect(2)
test.expect(1)

payload = new Buffer([1, 2, 3])
connection = new Connection()

io = new MessageIO(connection, packetSize, new Debug())
io.on('packet', (packet) ->
test.strictEqual(packet.type(), packetType)
test.ok(packet.data().equals(payload))
io.on('data', (data) ->
test.ok(data.equals(payload))
)
io.on('message', ->
test.done()
Expand All @@ -93,15 +92,14 @@ exports.receiveOnePacket = (test) ->
connection.emit('data', packet.buffer)

exports.receiveOnePacketInTwoChunks = (test) ->
test.expect(2)
test.expect(1)

payload = new Buffer([1, 2, 3])
connection = new Connection()

io = new MessageIO(connection, packetSize, new Debug())
io.on('packet', (packet) ->
test.strictEqual(packet.type(), packetType)
test.ok(packet.data().equals(payload))
io.on('data', (data) ->
test.ok(data.equals(payload))
)
io.on('message', ->
test.done()
Expand All @@ -114,7 +112,7 @@ exports.receiveOnePacketInTwoChunks = (test) ->
connection.emit('data', packet.buffer.slice(4))

exports.receiveTwoPackets = (test) ->
test.expect(4)
test.expect(2)

payload = new Buffer([1, 2, 3])
payload1 = payload.slice(0, 2)
Expand All @@ -124,16 +122,14 @@ exports.receiveTwoPackets = (test) ->
receivedPacketCount = 0

io = new MessageIO(connection, packetSize, new Debug())
io.on('packet', (packet) ->
io.on('data', (data) ->
receivedPacketCount++

test.strictEqual(packet.type(), packetType)

switch receivedPacketCount
when 1
test.ok(packet.data().equals(payload1))
test.ok(data.equals(payload1))
when 2
test.ok(packet.data().equals(payload2))
test.ok(data.equals(payload2))
)
io.on('message', ->
test.done()
Expand All @@ -149,7 +145,7 @@ exports.receiveTwoPackets = (test) ->
connection.emit('data', packet.buffer)

exports.receiveTwoPacketsWithChunkSpanningPackets = (test) ->
test.expect(4)
test.expect(2)

payload = new Buffer([1, 2, 3, 4])
payload1 = payload.slice(0, 2)
Expand All @@ -159,16 +155,14 @@ exports.receiveTwoPacketsWithChunkSpanningPackets = (test) ->
receivedPacketCount = 0

io = new MessageIO(connection, packetSize, new Debug())
io.on('packet', (packet) ->
io.on('data', (data) ->
receivedPacketCount++

test.strictEqual(packet.type(), packetType)

switch receivedPacketCount
when 1
test.ok(packet.data().equals(payload1))
test.ok(data.equals(payload1))
when 2
test.ok(packet.data().equals(payload2))
test.ok(data.equals(payload2))
)
io.on('message', ->
test.done()
Expand All @@ -186,31 +180,23 @@ exports.receiveTwoPacketsWithChunkSpanningPackets = (test) ->
connection.emit('data', packet2.buffer.slice(4))

exports.receiveMultiplePacketsWithMoreThanOnePacketFromOneChunk = (test) ->
test.expect(6)
test.expect(1)

payload = new Buffer([1, 2, 3, 4, 5, 6])
payload1 = payload.slice(0, 2)
payload2 = payload.slice(2, 4)
payload3 = payload.slice(4, 6)

connection = new Connection()
receivedPacketCount = 0
receivedData = new Buffer(0)

io = new MessageIO(connection, packetSize, new Debug())
io.on('packet', (packet) ->
receivedPacketCount++

test.strictEqual(packet.type(), packetType)

switch receivedPacketCount
when 1
test.ok(packet.data().equals(payload1))
when 2
test.ok(packet.data().equals(payload2))
when 3
test.ok(packet.data().equals(payload3))
io.on('data', (data) ->
receivedData = receivedData.concat(data)
)

io.on('message', ->
test.deepEqual(payload, receivedData)
test.done()
)

Expand All @@ -223,7 +209,7 @@ exports.receiveMultiplePacketsWithMoreThanOnePacketFromOneChunk = (test) ->
packet3 = new Packet(packetType)
packet3.last(true)
packet3.addData(payload.slice(4, 6))

allData = new Buffer(packet1.buffer.concat(packet2.buffer, packet3.buffer))
data1 = allData.slice(0, 5)
data2 = allData.slice(5)
Expand Down

0 comments on commit ed0d73e

Please sign in to comment.