From 37b4f10a36aab9895944096cdfd6f00ad0f83b9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Sun, 23 May 2021 21:23:21 +0200 Subject: [PATCH] Adjust ButteredStream to use Photos RefCounted. --- stream/vibe/stream/bufferedstream.d | 101 +++++++++++++++------------- 1 file changed, 54 insertions(+), 47 deletions(-) diff --git a/stream/vibe/stream/bufferedstream.d b/stream/vibe/stream/bufferedstream.d index 8969ecca70..9fd43c6d2f 100644 --- a/stream/vibe/stream/bufferedstream.d +++ b/stream/vibe/stream/bufferedstream.d @@ -6,11 +6,11 @@ */ module vibe.stream.bufferedstream; -import inbase.core.refcount; +import vibe.core.stream; import std.algorithm; import std.traits : Unqual; -import vibe.core.stream; +import std.typecons : RefCountedAutoInitialize, RefCounted, refCounted; /** Creates a new buffered stream wrapper. @@ -38,32 +38,39 @@ struct BufferedStream(S) { import std.experimental.allocator; import std.experimental.allocator.mallocator; - private final static class State { + private static struct State { S stream; ulong ptr; ulong size; size_t bufferSize; Buffer[] buffers; ulong accessCount; + ubyte[] buffermemory; - this(size_t buffer_size, size_t buffer_count) + this(size_t buffer_size, size_t buffer_count, S stream) { this.bufferSize = buffer_size; this.buffers = Mallocator.instance.makeArray!Buffer(buffer_count); - foreach (ref b; this.buffers) - b.memory = Mallocator.instance.makeArray!ubyte(buffer_size); + this.buffermemory = Mallocator.instance.makeArray!ubyte(buffer_count * buffer_size); + foreach (i, ref b; this.buffers) + b.memory = this.buffermemory[i * buffer_size .. (i+1) * buffer_size]; + this.size = stream.size; + this.stream = stream.move; } ~this() { + if (this.buffers is null) return; + if (this.stream.writable) flush(); - foreach (b; this.buffers) - Mallocator.instance.dispose(b.memory); + Mallocator.instance.dispose(this.buffermemory); Mallocator.instance.dispose(this.buffers); } + @disable this(this); + void flush() { foreach (i; 0 .. this.buffers.length) @@ -150,44 +157,43 @@ struct BufferedStream(S) { private { // makes the stream copyable and makes it small enough to be put into // a stream proxy - CountedRef!State m_state; + RefCounted!(State, RefCountedAutoInitialize.no) m_state; } private this(S stream, size_t buffer_size, size_t buffer_count) { - m_state = CountedRef!State.create(buffer_size, buffer_count); - m_state.stream = stream.move; - m_state.size = m_state.stream.size; + auto st = State(buffer_size, buffer_count, stream.move); + m_state = refCounted(st.move); } - @property bool empty() @blocking { return m_state.ptr >= m_state.size; } - @property ulong leastSize() @blocking { return m_state.size - m_state.ptr; } + @property bool empty() @blocking { return state.ptr >= state.size; } + @property ulong leastSize() @blocking { return state.size - state.ptr; } @property bool dataAvailableForRead() { return peek().length > 0; } - @property ulong size() const nothrow { return m_state.size; } - @property bool readable() const nothrow { return m_state.stream.readable; } - @property bool writable() const nothrow { return m_state.stream.writable; } + @property ulong size() const nothrow { return state.size; } + @property bool readable() const nothrow { return state.stream.readable; } + @property bool writable() const nothrow { return state.stream.writable; } - @property ref inout(S) underlying() inout { return m_state.stream; } + @property ref inout(S) underlying() inout { return state.stream; } static if (is(typeof(S.init.truncate(ulong.init)))) void truncate(ulong size) { sync(); - m_state.stream.truncate(size); - m_state.size = size; + state.stream.truncate(size); + state.size = size; } const(ubyte)[] peek() { - auto limit = (m_state.ptr / m_state.bufferSize + 1) * m_state.bufferSize; - auto dummy = () @trusted { return (cast(const(ubyte)*)null)[0 .. cast(size_t)(limit - m_state.ptr)]; } (); + auto limit = (state.ptr / state.bufferSize + 1) * state.bufferSize; + auto dummy = () @trusted { return (cast(const(ubyte)*)null)[0 .. cast(size_t)(limit - state.ptr)]; } (); const(ubyte)[] ret; - m_state.iterateChunks!(const(ubyte))(m_state.ptr, dummy, (offset, scope _, buf, buf_begin, buf_end) { + state.iterateChunks!(const(ubyte))(state.ptr, dummy, (offset, scope _, buf, buf_begin, buf_end) { if (buf >= 0) { - auto b = &m_state.buffers[buf]; + auto b = &state.buffers[buf]; ret = b.memory[buf_begin .. min(buf_end, b.fill)]; - m_state.touchBuffer(buf); + state.touchBuffer(buf); } return false; }); @@ -199,25 +205,25 @@ struct BufferedStream(S) { size_t nread = 0; // update size if a read past EOF is expected - if (m_state.ptr + dst.length > m_state.size) m_state.size = m_state.stream.size; + if (state.ptr + dst.length > state.size) state.size = state.stream.size; - m_state.iterateChunks!ubyte(m_state.ptr, dst, (offset, scope dst_chunk, buf, buf_begin, buf_end) { + state.iterateChunks!ubyte(state.ptr, dst, (offset, scope dst_chunk, buf, buf_begin, buf_end) { if (buf < 0) { if (mode == IOMode.immediate) return false; if (mode == IOMode.once && nread) return false; - buf = m_state.bufferChunk(offset / m_state.bufferSize); - } else m_state.touchBuffer(buf); + buf = state.bufferChunk(offset / state.bufferSize); + } else state.touchBuffer(buf); - if (m_state.buffers[buf].fill < buf_end) { + if (state.buffers[buf].fill < buf_end) { if (mode == IOMode.immediate) return false; if (mode == IOMode.once && nread) return false; - m_state.fillBuffer(buf, buf_end); + state.fillBuffer(buf, buf_end); } // the whole of dst_chunk is now in the buffer - assert(buf_begin % m_state.bufferSize == offset % m_state.bufferSize); + assert(buf_begin % state.bufferSize == offset % state.bufferSize); assert(dst_chunk.length <= buf_end - buf_begin); - dst_chunk[] = m_state.buffers[buf].memory[buf_begin .. buf_begin + dst_chunk.length]; + dst_chunk[] = state.buffers[buf].memory[buf_begin .. buf_begin + dst_chunk.length]; nread += dst_chunk.length; return true; @@ -226,7 +232,7 @@ struct BufferedStream(S) { if (mode == IOMode.all && dst.length != nread) throw new Exception("Reading past end of stream."); - m_state.ptr += nread; + state.ptr += nread; return nread; } @@ -237,30 +243,30 @@ struct BufferedStream(S) { @blocking { size_t nwritten = 0; - m_state.iterateChunks!(const(ubyte))(m_state.ptr, bytes, (offset, scope src_chunk, buf, buf_begin, buf_end) { + state.iterateChunks!(const(ubyte))(state.ptr, bytes, (offset, scope src_chunk, buf, buf_begin, buf_end) { if (buf < 0) { // write through if not buffered if (mode == IOMode.immediate) return false; if (mode == IOMode.once && nwritten) return false; - m_state.stream.seek(offset); - m_state.stream.write(src_chunk); + state.stream.seek(offset); + state.stream.write(src_chunk); } else { - auto b = &m_state.buffers[buf]; + auto b = &state.buffers[buf]; b.memory[buf_begin .. buf_begin + src_chunk.length] = src_chunk; b.fill = max(b.fill, buf_begin + src_chunk.length); b.dirty = true; } nwritten += src_chunk.length; - if (offset + src_chunk.length > m_state.size) - m_state.size = offset + src_chunk.length; + if (offset + src_chunk.length > state.size) + state.size = offset + src_chunk.length; return true; }); assert(mode != IOMode.all || nwritten == bytes.length); - m_state.ptr += nwritten; + state.ptr += nwritten; return nwritten; } @@ -268,7 +274,7 @@ struct BufferedStream(S) { void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); } void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } - void flush() @blocking { m_state.flush(); } + void flush() @blocking { state.flush(); } /** Flushes and releases all buffers and updates the buffer size. @@ -278,18 +284,19 @@ struct BufferedStream(S) { void sync() @blocking { flush(); - foreach (ref b; m_state.buffers) { + foreach (ref b; state.buffers) { b.chunk = ulong.max; b.fill = 0; } - m_state.size = m_state.stream.size; + state.size = state.stream.size; } void finalize() @blocking { flush(); } - void seek(ulong offset) { m_state.ptr = offset; } - ulong tell() nothrow { return m_state.ptr; } + void seek(ulong offset) { state.ptr = offset; } + ulong tell() nothrow { return state.ptr; } + private ref inout(State) state() @trusted nothrow return inout { return m_state.refCountedPayload; } } mixin validateRandomAccessStream!(BufferedStream!RandomAccessStream); @@ -459,7 +466,7 @@ unittest { } private struct Buffer { - ulong chunk = ulong.max; // chunk index (offset = chunk * m_state.bufferSize) + ulong chunk = ulong.max; // chunk index (offset = chunk * state.bufferSize) ubyte[] memory; ulong lastAccess; size_t fill;