Skip to content

Commit

Permalink
Adjust ButteredStream to use Photos RefCounted.
Browse files Browse the repository at this point in the history
  • Loading branch information
s-ludwig committed May 23, 2021
1 parent b72ee21 commit 37b4f10
Showing 1 changed file with 54 additions and 47 deletions.
101 changes: 54 additions & 47 deletions stream/vibe/stream/bufferedstream.d
Expand Up @@ -6,11 +6,11 @@
*/
module vibe.stream.bufferedstream;

import inbase.core.refcount;
import vibe.core.stream;

import std.algorithm;
import std.traits : Unqual;
import vibe.core.stream;
import std.typecons : RefCountedAutoInitialize, RefCounted, refCounted;


/** Creates a new buffered stream wrapper.
Expand Down Expand Up @@ -38,32 +38,39 @@ struct BufferedStream(S) {
import std.experimental.allocator;
import std.experimental.allocator.mallocator;

private final static class State {
private static struct State {
S stream;
ulong ptr;
ulong size;
size_t bufferSize;
Buffer[] buffers;
ulong accessCount;
ubyte[] buffermemory;

this(size_t buffer_size, size_t buffer_count)
this(size_t buffer_size, size_t buffer_count, S stream)
{
this.bufferSize = buffer_size;
this.buffers = Mallocator.instance.makeArray!Buffer(buffer_count);
foreach (ref b; this.buffers)
b.memory = Mallocator.instance.makeArray!ubyte(buffer_size);
this.buffermemory = Mallocator.instance.makeArray!ubyte(buffer_count * buffer_size);
foreach (i, ref b; this.buffers)
b.memory = this.buffermemory[i * buffer_size .. (i+1) * buffer_size];
this.size = stream.size;
this.stream = stream.move;
}

~this()
{
if (this.buffers is null) return;

if (this.stream.writable)
flush();

foreach (b; this.buffers)
Mallocator.instance.dispose(b.memory);
Mallocator.instance.dispose(this.buffermemory);
Mallocator.instance.dispose(this.buffers);
}

@disable this(this);

void flush()
{
foreach (i; 0 .. this.buffers.length)
Expand Down Expand Up @@ -150,44 +157,43 @@ struct BufferedStream(S) {
private {
// makes the stream copyable and makes it small enough to be put into
// a stream proxy
CountedRef!State m_state;
RefCounted!(State, RefCountedAutoInitialize.no) m_state;
}

private this(S stream, size_t buffer_size, size_t buffer_count)
{
m_state = CountedRef!State.create(buffer_size, buffer_count);
m_state.stream = stream.move;
m_state.size = m_state.stream.size;
auto st = State(buffer_size, buffer_count, stream.move);
m_state = refCounted(st.move);
}

@property bool empty() @blocking { return m_state.ptr >= m_state.size; }
@property ulong leastSize() @blocking { return m_state.size - m_state.ptr; }
@property bool empty() @blocking { return state.ptr >= state.size; }
@property ulong leastSize() @blocking { return state.size - state.ptr; }
@property bool dataAvailableForRead() { return peek().length > 0; }
@property ulong size() const nothrow { return m_state.size; }
@property bool readable() const nothrow { return m_state.stream.readable; }
@property bool writable() const nothrow { return m_state.stream.writable; }
@property ulong size() const nothrow { return state.size; }
@property bool readable() const nothrow { return state.stream.readable; }
@property bool writable() const nothrow { return state.stream.writable; }

@property ref inout(S) underlying() inout { return m_state.stream; }
@property ref inout(S) underlying() inout { return state.stream; }

static if (is(typeof(S.init.truncate(ulong.init))))
void truncate(ulong size)
{
sync();
m_state.stream.truncate(size);
m_state.size = size;
state.stream.truncate(size);
state.size = size;
}

const(ubyte)[] peek()
{
auto limit = (m_state.ptr / m_state.bufferSize + 1) * m_state.bufferSize;
auto dummy = () @trusted { return (cast(const(ubyte)*)null)[0 .. cast(size_t)(limit - m_state.ptr)]; } ();
auto limit = (state.ptr / state.bufferSize + 1) * state.bufferSize;
auto dummy = () @trusted { return (cast(const(ubyte)*)null)[0 .. cast(size_t)(limit - state.ptr)]; } ();

const(ubyte)[] ret;
m_state.iterateChunks!(const(ubyte))(m_state.ptr, dummy, (offset, scope _, buf, buf_begin, buf_end) {
state.iterateChunks!(const(ubyte))(state.ptr, dummy, (offset, scope _, buf, buf_begin, buf_end) {
if (buf >= 0) {
auto b = &m_state.buffers[buf];
auto b = &state.buffers[buf];
ret = b.memory[buf_begin .. min(buf_end, b.fill)];
m_state.touchBuffer(buf);
state.touchBuffer(buf);
}
return false;
});
Expand All @@ -199,25 +205,25 @@ struct BufferedStream(S) {
size_t nread = 0;

// update size if a read past EOF is expected
if (m_state.ptr + dst.length > m_state.size) m_state.size = m_state.stream.size;
if (state.ptr + dst.length > state.size) state.size = state.stream.size;

m_state.iterateChunks!ubyte(m_state.ptr, dst, (offset, scope dst_chunk, buf, buf_begin, buf_end) {
state.iterateChunks!ubyte(state.ptr, dst, (offset, scope dst_chunk, buf, buf_begin, buf_end) {
if (buf < 0) {
if (mode == IOMode.immediate) return false;
if (mode == IOMode.once && nread) return false;
buf = m_state.bufferChunk(offset / m_state.bufferSize);
} else m_state.touchBuffer(buf);
buf = state.bufferChunk(offset / state.bufferSize);
} else state.touchBuffer(buf);

if (m_state.buffers[buf].fill < buf_end) {
if (state.buffers[buf].fill < buf_end) {
if (mode == IOMode.immediate) return false;
if (mode == IOMode.once && nread) return false;
m_state.fillBuffer(buf, buf_end);
state.fillBuffer(buf, buf_end);
}

// the whole of dst_chunk is now in the buffer
assert(buf_begin % m_state.bufferSize == offset % m_state.bufferSize);
assert(buf_begin % state.bufferSize == offset % state.bufferSize);
assert(dst_chunk.length <= buf_end - buf_begin);
dst_chunk[] = m_state.buffers[buf].memory[buf_begin .. buf_begin + dst_chunk.length];
dst_chunk[] = state.buffers[buf].memory[buf_begin .. buf_begin + dst_chunk.length];
nread += dst_chunk.length;

return true;
Expand All @@ -226,7 +232,7 @@ struct BufferedStream(S) {
if (mode == IOMode.all && dst.length != nread)
throw new Exception("Reading past end of stream.");

m_state.ptr += nread;
state.ptr += nread;

return nread;
}
Expand All @@ -237,38 +243,38 @@ struct BufferedStream(S) {
@blocking {
size_t nwritten = 0;

m_state.iterateChunks!(const(ubyte))(m_state.ptr, bytes, (offset, scope src_chunk, buf, buf_begin, buf_end) {
state.iterateChunks!(const(ubyte))(state.ptr, bytes, (offset, scope src_chunk, buf, buf_begin, buf_end) {
if (buf < 0) { // write through if not buffered
if (mode == IOMode.immediate) return false;
if (mode == IOMode.once && nwritten) return false;

m_state.stream.seek(offset);
m_state.stream.write(src_chunk);
state.stream.seek(offset);
state.stream.write(src_chunk);
} else {
auto b = &m_state.buffers[buf];
auto b = &state.buffers[buf];
b.memory[buf_begin .. buf_begin + src_chunk.length] = src_chunk;
b.fill = max(b.fill, buf_begin + src_chunk.length);
b.dirty = true;
}

nwritten += src_chunk.length;
if (offset + src_chunk.length > m_state.size)
m_state.size = offset + src_chunk.length;
if (offset + src_chunk.length > state.size)
state.size = offset + src_chunk.length;

return true;
});

assert(mode != IOMode.all || nwritten == bytes.length);

m_state.ptr += nwritten;
state.ptr += nwritten;

return nwritten;
}

void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); }
void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }

void flush() @blocking { m_state.flush(); }
void flush() @blocking { state.flush(); }

/** Flushes and releases all buffers and updates the buffer size.
Expand All @@ -278,18 +284,19 @@ struct BufferedStream(S) {
void sync()
@blocking {
flush();
foreach (ref b; m_state.buffers) {
foreach (ref b; state.buffers) {
b.chunk = ulong.max;
b.fill = 0;
}
m_state.size = m_state.stream.size;
state.size = state.stream.size;
}

void finalize() @blocking { flush(); }

void seek(ulong offset) { m_state.ptr = offset; }
ulong tell() nothrow { return m_state.ptr; }
void seek(ulong offset) { state.ptr = offset; }
ulong tell() nothrow { return state.ptr; }

private ref inout(State) state() @trusted nothrow return inout { return m_state.refCountedPayload; }
}

mixin validateRandomAccessStream!(BufferedStream!RandomAccessStream);
Expand Down Expand Up @@ -459,7 +466,7 @@ unittest {
}

private struct Buffer {
ulong chunk = ulong.max; // chunk index (offset = chunk * m_state.bufferSize)
ulong chunk = ulong.max; // chunk index (offset = chunk * state.bufferSize)
ubyte[] memory;
ulong lastAccess;
size_t fill;
Expand Down

0 comments on commit 37b4f10

Please sign in to comment.