Skip to content

Commit

Permalink
New class DynamicStreamWriter - useful when length of underlying outp…
Browse files Browse the repository at this point in the history
…ut stream is not known in advance.
  • Loading branch information
azeno committed Feb 10, 2014
1 parent 7c6d28f commit d905a10
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 6 deletions.
138 changes: 138 additions & 0 deletions common/src/core/Utils/Streams/DynamicStreamWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace VVVV.Utils.Streams
{
// Sets the length of the underlying stream to the number of items
// written to it on disposal.
// Useful when the number of items to write is not known in advance.
public class DynamicStreamWriter<T> : IStreamWriter<T>
{
private readonly IOutStream<T> FStream;
private readonly int FStreamLength;
private IStreamWriter<T> FStreamWriter;
private MemoryIOStream<T> FMemoryStream;
private MemoryIOStream<T>.StreamWriter FMemoryStreamWriter;
private int FPosition;

public DynamicStreamWriter(IOutStream<T> stream)
{
FStream = stream;
FStreamLength = stream.Length;
FStreamWriter = stream.GetWriter();
}

private MemoryIOStream<T>.StreamWriter MemoryStreamWriter
{
get
{
if (FMemoryStreamWriter == null)
{
var length = FPosition - FStreamLength;
FMemoryStream = new MemoryIOStream<T>(length, length, true);
FMemoryStreamWriter = FMemoryStream.GetWriter();
FMemoryStreamWriter.Position = length;
}
return FMemoryStreamWriter;
}
}

public void Write(T value, int stride = 1)
{
if (FPosition >= FStreamLength)
MemoryStreamWriter.Write(value, stride);
else
FStreamWriter.Write(value, stride);
FPosition += stride;
}

public int Write(T[] buffer, int index, int length, int stride = 1)
{
int slicesWritten;
if (FPosition >= FStreamLength)
{
slicesWritten = MemoryStreamWriter.Write(buffer, index, length, stride);
FPosition += slicesWritten * stride;
}
else
{
slicesWritten = FStreamWriter.Write(buffer, index, length, stride);
FPosition += slicesWritten * stride;
if (slicesWritten < length)
{
// Corner case
index += slicesWritten;
length -= slicesWritten;
slicesWritten += Write(buffer, index, length, stride);
}
}
return slicesWritten;
}

public void Reset()
{
FPosition = 0;
if (FMemoryStream != null)
{
FMemoryStreamWriter.Dispose();
FMemoryStreamWriter = null;
FMemoryStream = null;
}
}

public bool Eos
{
get { return false; }
}

public int Position
{
get
{
return FPosition;
}
set
{
FPosition = value;
}
}

public int Length
{
get
{
if (FMemoryStream != null)
return FStream.Length + FMemoryStream.Length;
else
return FStream.Length;
}
}

public void Dispose()
{
if (FPosition > FStreamLength)
{
FMemoryStreamWriter.Dispose();
FMemoryStreamWriter = null;
FStream.Length += FMemoryStream.Length;
var buffer = MemoryPool<T>.GetArray();
try
{
FStreamWriter.Write(FMemoryStream, buffer);
}
finally
{
MemoryPool<T>.PutArray(buffer);
}
FStreamWriter.Dispose();
FMemoryStream = null;
}
else
{
FStream.Length = FPosition;
}
}
}
}
39 changes: 33 additions & 6 deletions common/src/core/Utils/Streams/MemoryIOStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,18 @@ public int Write(T[] buffer, int index, int length, int stride = 1)
{
FStream.IsChanged = true;

int slicesToWrite = StreamUtils.GetNumSlicesAhead(this, index, length, stride);
int slicesToWrite;
if (FStream.CanExpand)
{
var remainingSpace = Length - Position;
var requestedSpace = length * stride;
var neededSpace = requestedSpace - remainingSpace;
if (neededSpace > 0)
FStream.Length += neededSpace;
slicesToWrite = length;
}
else
slicesToWrite = StreamUtils.GetNumSlicesAhead(this, index, length, stride);

switch (stride)
{
Expand Down Expand Up @@ -195,20 +206,34 @@ public int Write(T[] buffer, int index, int length, int stride = 1)
private int FLength;
private int FCapacity;
protected int FChangeCount;

public MemoryIOStream(int initialCapacity = 0)
: this(new T[initialCapacity], 0, false)
{
}

public MemoryIOStream(int initialCapacity = 0, int length = 0, bool canExpand = false)
: this(new T[initialCapacity], length, canExpand)
{
}

public MemoryIOStream(bool canExpand)
: this(0, 0, canExpand)
{
FCapacity = initialCapacity;
FBuffer = new T[initialCapacity];
IsChanged = true;
}

public MemoryIOStream(T[] buffer)
: this(buffer, buffer.Length, false)
{
}

private MemoryIOStream(T[] buffer, int length, bool canExpand)
{
FCapacity = buffer.Length;
FBuffer = buffer;
FLength = buffer.Length;
FLength = length;
IsChanged = true;
CanExpand = canExpand;
}

public virtual bool Sync()
Expand All @@ -228,6 +253,8 @@ public bool IsChanged
get { return FChangeCount > 0; }
set { if (value) FChangeCount++; else FChangeCount = 0; }
}

public bool CanExpand { get; private set; }

public int Length
{
Expand Down
5 changes: 5 additions & 0 deletions common/src/core/Utils/Streams/StreamUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ public static BufferedStreamWriter<T> GetBufferedWriter<T>(this IOutStream<T> st
{
return new BufferedStreamWriter<T>(stream);
}

public static DynamicStreamWriter<T> GetDynamicWriter<T>(this IOutStream<T> stream)
{
return new DynamicStreamWriter<T>(stream);
}

public static int GetNumSlicesAhead(IStreamer streamer, int index, int length, int stride)
{
Expand Down
1 change: 1 addition & 0 deletions common/src/core/Utils/Utils.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
<Compile Include="SharedMemory\Win32Native.cs" />
<Compile Include="Skeleton\ISkeleton.cs" />
<Compile Include="Streams\BufferedStreamWriter.cs" />
<Compile Include="Streams\DynamicStreamWriter.cs" />
<Compile Include="Streams\ReverseStream.cs" />
<Compile Include="Streams\CyclicStream.cs" />
<Compile Include="Streams\RangeStream.cs" />
Expand Down

0 comments on commit d905a10

Please sign in to comment.