Skip to content

Commit

Permalink
Cosmetic renames and a simple helper for writing to arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
zah committed Aug 7, 2019
1 parent 9c5c6bd commit 6bbe1c2
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 20 deletions.
3 changes: 2 additions & 1 deletion faststreams/input_stream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ proc init*(T: type ByteStream,
# TODO: the result should use `from mem` once it's supported
result.head = unsafeAddr mem[0]
result.bufferEnd = result.head.shift mem.len
result.bufferEndPos = mem.len
result.reader = reader
result.asyncReader = asyncReader

Expand All @@ -73,7 +74,7 @@ proc init*(T: type BufferedStream,

proc endPos*(s: InputStreamVar): int =
# TODO This needs to use a VTable for `system.File` based streams
s.bufferEndPos
return s.bufferEndPos

proc syncRead(s: var ByteStream): bool =
let bytesRead = s.reader(s.bufferStart, s.bufferSize)
Expand Down
36 changes: 20 additions & 16 deletions faststreams/output_stream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,19 @@ const
# The goal is to make perfect page-aligned allocations
defaultPageSize = 4096 - allocatorMetadata - 1 # 1 byte for the null terminator

proc createWriteCursor*[R, T](x: var array[R, T]): WriteCursor =
let startAddr = cast[ptr byte](addr x[0])
WriteCursor(head: startAddr, bufferEnd: shift(startAddr, sizeof x))

template canExtendOutput(s: OutputStreamVar): bool =
# Streams writing to pre-allocated existing buffers cannot be grown
s.pageSize > 0
s != nil and s.pageSize > 0

template isExternalCursor(c: var WriteCursor): bool =
# Is this the original stream cursor or is it one created by a "delayed write"
addr(c) != addr(c.stream.cursor)

func remainingBytesToWrite*(c: var WriteCursor): int {.inline.} =
func runway*(c: var WriteCursor): int {.inline.} =
distance(c.head, c.bufferEnd)

proc flipPage(s: OutputStreamVar) =
Expand Down Expand Up @@ -120,7 +124,7 @@ proc init*(T: type OutputStream,
result.endPos = len

proc pos*(s: OutputStreamVar): int =
s.endPos - s.cursor.remainingBytesToWrite
s.endPos - s.cursor.runway

proc safeWritePage(s: OutputStreamVar, data: openarray[byte]) {.inline.} =
if data.len > 0: s.vtable.writePage(s, data)
Expand All @@ -133,7 +137,7 @@ proc writePages(s: OutputStreamVar, skipLast = 0) =
proc writePartialPage(s: OutputStreamVar, page: var OutputPage) =
assert s.vtable != nil
let
unwrittenBytes = s.cursor.remainingBytesToWrite
unwrittenBytes = s.cursor.runway
pageEndPos = s.pageSize - unwrittenBytes - 1
pageStartPos = page.startOffset

Expand Down Expand Up @@ -245,7 +249,7 @@ proc newStringFromBytes(input: ptr byte, inputLen: int): string =

proc handleLongAppend*(c: var WriteCursor, bytes: openarray[byte]) =
var
pageRemaining = c.remainingBytesToWrite
pageRemaining = c.runway
inputPos = unsafeAddr bytes[0]
inputLen = bytes.len
stream = c.stream
Expand Down Expand Up @@ -349,7 +353,7 @@ proc append*(c: var WriteCursor, bytes: openarray[byte]) {.inline.} =
# short enough to fit in the current page. We'll keep buffering until the
# page is full:
let
pageRemaining = c.remainingBytesToWrite
pageRemaining = c.runway
inputLen = bytes.len

if inputLen <= pageRemaining:
Expand Down Expand Up @@ -383,7 +387,7 @@ template appendMemCopy*(s: OutputStreamVar, value: auto) =
proc getOutput*(s: OutputStreamVar, T: type string): string =
doAssert s.vtable == nil and s.extCursorsCount == 0 and s.pageSize > 0

s.pages[s.pages.len - 1].buffer.setLen(s.pageSize - s.cursor.remainingBytesToWrite)
s.pages[s.pages.len - 1].buffer.setLen(s.pageSize - s.cursor.runway)

if s.pages.len == 1 and s.pages[0].startOffset == 0:
result.swap s.pages[0].buffer
Expand Down Expand Up @@ -416,7 +420,7 @@ proc createCursor(s: OutputStreamVar, size: int): WriteCursor =
s.cursor.head = result.bufferEnd

proc delayFixedSizeWrite*(s: OutputStreamVar, size: Natural): WriteCursor =
let remainingBytesInPage = s.cursor.remainingBytesToWrite
let remainingBytesInPage = s.cursor.runway
if size <= remainingBytesInPage:
result = s.createCursor(size)
else:
Expand All @@ -438,28 +442,28 @@ proc delayFixedSizeWrite*(s: OutputStreamVar, size: Natural): WriteCursor =

proc delayVarSizeWrite*(s: OutputStreamVar, maxSize: Natural): VarSizeWriteCursor =
doAssert maxSize < s.pageSize
s.finishPageEarly s.cursor.remainingBytesToWrite
s.finishPageEarly s.cursor.runway
VarSizeWriteCursor s.createCursor(maxSize)

proc dispose*(cursor: var WriteCursor) =
proc finalize*(cursor: var WriteCursor) =
doAssert cursor.stream.extCursorsCount > 0
dec cursor.stream.extCursorsCount

proc endWrite*(cursor: var WriteCursor, data: openarray[byte]) =
doAssert data.len == cursor.remainingBytesToWrite
proc writeAndFinalize*(cursor: var WriteCursor, data: openarray[byte]) =
doAssert data.len == cursor.runway
copyMem(cursor.head, unsafeAddr data[0], data.len)
dispose cursor
finalize cursor

proc endWrite*(c: var VarSizeWriteCursor, data: openarray[byte]) =
proc writeAndFinalize*(c: var VarSizeWriteCursor, data: openarray[byte]) =
template cursor: auto = WriteCursor(c)

for page in mitems(cursor.stream.pages):
if unsafeAddr(page.buffer[0]) == cursor.head:
let overestimatedBytes = cursor.remainingBytesToWrite - data.len
let overestimatedBytes = cursor.runway - data.len
doAssert overestimatedBytes >= 0
page.startOffset = overestimatedBytes
copyMem(cursor.head.shift(overestimatedBytes), unsafeAddr data[0], data.len)
dispose cursor
finalize cursor
return

doAssert false
Expand Down
6 changes: 3 additions & 3 deletions tests/test_output_stream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ suite "output stream":
totalBytesWritten += count
check memStream.pos - cursorStart == totalBytesWritten

cursor.endWrite delayedWriteContent
cursor.writeAndFinalize delayedWriteContent

checkOutputsMatch()

Expand Down Expand Up @@ -116,7 +116,7 @@ suite "output stream":
delayedWrites[i].written += toWrite

if remaining - toWrite == 0:
dispose delayedWrites[i].cursor
finalize delayedWrites[i].cursor
if i != delayedWrites.len - 1:
swap(delayedWrites[i], delayedWrites[^1])
delayedWrites.setLen(delayedWrites.len - 1)
Expand All @@ -141,7 +141,7 @@ suite "output stream":
for dw in mitems(delayedWrites):
let remaining = dw.content.len - dw.written
dw.cursor.append dw.content[dw.written ..< dw.written + remaining]
dispose dw.cursor
finalize dw.cursor

# The final outputs are the same
check altOutput == memStream.getOutput
Expand Down

0 comments on commit 6bbe1c2

Please sign in to comment.