Skip to content

Commit

Permalink
Rent byte arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
saul committed Feb 25, 2024
1 parent 909c58a commit 4494e34
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 76 deletions.
71 changes: 35 additions & 36 deletions src/DemoFile/DemoParser.FullPacket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ public partial class DemoParser
public int CompareTo(FullPacketPosition other) => Tick.CompareTo(other.Tick);
}

/// Key ticks occur every 60 seconds
/// Full packets occur every 60 seconds
private const int FullPacketInterval = 64 * 60;

// todo: reset on StartReadingAsync
private readonly List<FullPacketPosition> _fullPacketPositions = new(64);
private DemoTick _readFullPacketTick = DemoTick.PreRecord;
private int _fullPacketTickOffset;
Expand All @@ -38,61 +37,67 @@ private bool TryFindFullPacketBefore(DemoTick demoTick, out FullPacketPosition f
return false;
}

public async ValueTask SeekToTickAsync(DemoTick tick, CancellationToken cancellationToken)
/// <summary>
/// Seek to a specific tick within the demo file. Tick can be in the future or the past.
/// This works by first seeking to the nearest <see cref="CDemoFullPacket"/> before <paramref name="targetTick"/>,
/// decoding the full stringtable and entity snapshot, then reading tick-by-tick to <paramref name="targetTick"/>.
/// </summary>
/// <param name="targetTick">Tick to seek to.</param>
/// <param name="cancellationToken">Cancellation token for cancelling the seek.</param>
/// <exception cref="InvalidOperationException">Tick is invalid, or attempting to seek while reading commands.</exception>
/// <exception cref="EndOfStreamException">EOF before reaching <paramref name="targetTick"/>.</exception>
/// <remarks>
/// Seeking is not allowed while reading commands. See <see cref="IsReading"/>.
/// </remarks>
public async ValueTask SeekToTickAsync(DemoTick targetTick, CancellationToken cancellationToken)
{
// todo: throw if currently in a MoveNextAsync
if (IsReading)
throw new InvalidOperationException($"Cannot seek to tick while reading commands");

if (TickCount < DemoTick.Zero)
{
throw new InvalidOperationException($"Cannot seek to tick {tick}");
}
throw new InvalidOperationException($"Cannot seek to tick {targetTick}");

if (TickCount != default && tick > TickCount)
{
throw new InvalidOperationException($"Cannot seek to tick {tick}. The demo only contains data for {TickCount} ticks");
}
if (TickCount != default && targetTick > TickCount)
throw new InvalidOperationException($"Cannot seek to tick {targetTick}. The demo only contains data for {TickCount} ticks");

var hasFullPacket = TryFindFullPacketBefore(tick, out var fullPacket);
if (tick < CurrentDemoTick)
var hasFullPacket = TryFindFullPacketBefore(targetTick, out var fullPacket);
if (targetTick < CurrentDemoTick)
{
if (!hasFullPacket)
{
throw new InvalidOperationException($"Cannot seek backwards to tick {tick}. No {nameof(CDemoFullPacket)} has been read");
}
throw new InvalidOperationException($"Cannot seek backwards to tick {targetTick}. No {nameof(CDemoFullPacket)} has been read");

// Seeking backwards. Jump back to the key tick to read the snapshot
// Seeking backwards. Jump back to the full packet to read the snapshot
(CurrentDemoTick, _stream.Position) = fullPacket;
}
else
{
var deltaTicks = fullPacket.Tick - CurrentDemoTick;

// Only read the key tick if the jump is far enough ahead
// Only read the full packet if the jump is far enough ahead
if (hasFullPacket && deltaTicks.Value >= FullPacketInterval)
{
(CurrentDemoTick, _stream.Position) = fullPacket;
}
}

// Keep reading commands until we reach the key tick
_readFullPacketTick = new DemoTick(tick.Value / FullPacketInterval * FullPacketInterval + _fullPacketTickOffset);
// Keep reading commands until we reach the full packet
_readFullPacketTick = new DemoTick(targetTick.Value / FullPacketInterval * FullPacketInterval + _fullPacketTickOffset);
SkipToTick(_readFullPacketTick);

// Advance ticks until we get to the target tick
while (CurrentDemoTick < tick)
while (CurrentDemoTick < targetTick)
{
var cmd = ReadCommandHeader();

// We've arrived at the target tick
if (CurrentDemoTick == tick)
if (CurrentDemoTick == targetTick)
{
_stream.Position = _commandStartPosition;
break;
}

if (!await MoveNextCoreAsync(cmd.Command, cmd.Size, cancellationToken).ConfigureAwait(false))
if (!await MoveNextCoreAsync(cmd.Command, cmd.IsCompressed, cmd.Size, cancellationToken).ConfigureAwait(false))
{
throw new EndOfStreamException($"Reached EOF at tick {CurrentDemoTick} while seeking to tick {tick}");
throw new EndOfStreamException($"Reached EOF at tick {CurrentDemoTick} while seeking to tick {targetTick}");
}
}
}
Expand All @@ -102,17 +107,16 @@ private void SkipToTick(DemoTick targetTick)
while (CurrentDemoTick < targetTick)
{
var cmd = ReadCommandHeader();
var actualCmd = cmd.Command & ~(uint) EDemoCommands.DemIsCompressed;

// If we're at the target tick, jump back to the start of the command
if (CurrentDemoTick == targetTick && actualCmd == (uint)EDemoCommands.DemFullPacket)
if (CurrentDemoTick == targetTick && cmd.Command == EDemoCommands.DemFullPacket)
{
_stream.Position = _commandStartPosition;
break;
}

// Record fullpackets even when seeking to improve seeking performance
if (actualCmd == (uint) EDemoCommands.DemFullPacket)
if (cmd.Command == EDemoCommands.DemFullPacket)
{
RecordFullPacket();
}
Expand All @@ -134,23 +138,18 @@ private void RecordFullPacket()

// Some demos have fullpackets at tick 0, some at tick 1.
_fullPacketTickOffset = CurrentDemoTick.Value % FullPacketInterval;
Debug.Assert(_fullPacketTickOffset == 0 || _fullPacketTickOffset == 1, "Unexpected key tick offset");
Debug.Assert(_fullPacketTickOffset == 0 || _fullPacketTickOffset == 1, "Unexpected full packet tick offset");
}

private void OnDemoFullPacket(CDemoFullPacket fullPacket)
{
if (CurrentDemoTick < DemoTick.Zero)
return;

RecordFullPacket();

// We only care about full packets if we're seeking
// We only want to read full packets if we're seeking to it
if (CurrentDemoTick == _readFullPacketTick)
{
var stringTables = fullPacket.StringTable;
for (var tableIdx = 0; tableIdx < stringTables.Tables.Count; tableIdx++)
foreach (var snapshot in fullPacket.StringTable.Tables)
{
var snapshot = stringTables.Tables[tableIdx];
var stringTable = _stringTables[snapshot.TableName];
stringTable.ReplaceWith(snapshot.Items);
}
Expand Down
88 changes: 48 additions & 40 deletions src/DemoFile/DemoParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,25 @@ namespace DemoFile;

public sealed partial class DemoParser

Check warning on line 10 in src/DemoFile/DemoParser.cs

View workflow job for this annotation

GitHub Actions / Build

Missing XML comment for publicly visible type or member 'DemoParser'

Check warning on line 10 in src/DemoFile/DemoParser.cs

View workflow job for this annotation

GitHub Actions / Build

Missing XML comment for publicly visible type or member 'DemoParser'
{
private readonly ArrayPool<byte> _bytePool = ArrayPool<byte>.Create();
private readonly PriorityQueue<ITickTimer, int> _demoTickTimers = new();
private readonly PriorityQueue<QueuedPacket, (int, int)> _packetQueue = new(128);
private readonly PriorityQueue<ITickTimer, uint> _serverTickTimers = new();
private readonly Source1GameEvents _source1GameEvents;

private long _commandStartPosition;
private DemoEvents _demoEvents;
private EntityEvents _entityEvents;
private GameEvents _gameEvents;
private PacketEvents _packetEvents;
private Stream _stream;
private UserMessageEvents _userMessageEvents;
private long _commandStartPosition;

/// <summary>
/// Event fired when the current demo command has finished (e.g, just before <see cref="MoveNextAsync"/> returns).
/// Reset to <c>null</c> just before it is invoked.
/// </summary>
public Action? OnCommandFinish;

/// <summary>
/// Event fired every time a demo command is parsed during <see cref="ReadAllAsync(System.IO.Stream)"/>.
Expand All @@ -30,12 +37,6 @@ public sealed partial class DemoParser
/// </remarks>
public Action<DemoProgressEvent>? OnProgress;

/// <summary>
/// Event fired when the current demo command has finished (e.g, just before <see cref="MoveNextAsync"/> returns).
/// Reset to <c>null</c> just before it is invoked.
/// </summary>
public Action? OnCommandFinish;

public DemoParser()

Check warning on line 40 in src/DemoFile/DemoParser.cs

View workflow job for this annotation

GitHub Actions / Build

Missing XML comment for publicly visible type or member 'DemoParser.DemoParser()'

Check warning on line 40 in src/DemoFile/DemoParser.cs

View workflow job for this annotation

GitHub Actions / Build

Missing XML comment for publicly visible type or member 'DemoParser.DemoParser()'
{
_source1GameEvents = new Source1GameEvents(this);
Expand Down Expand Up @@ -63,6 +64,12 @@ public DemoParser()
_gameEvents.Source1LegacyGameEvent += @event => Source1GameEvents.ParseSource1GameEvent(this, @event);
}

/// <summary>
/// Flag indicate whether the parser is currently reading a command.
/// During reading, seeking (e.g. with <see cref="SeekToTickAsync"/>) is not possible.
/// </summary>
public bool IsReading { get; private set; }

public ref DemoEvents DemoEvents => ref _demoEvents;

Check warning on line 73 in src/DemoFile/DemoParser.cs

View workflow job for this annotation

GitHub Actions / Build

Missing XML comment for publicly visible type or member 'DemoParser.DemoEvents'

Check warning on line 73 in src/DemoFile/DemoParser.cs

View workflow job for this annotation

GitHub Actions / Build

Missing XML comment for publicly visible type or member 'DemoParser.DemoEvents'
public ref GameEvents GameEvents => ref _gameEvents;

Check warning on line 74 in src/DemoFile/DemoParser.cs

View workflow job for this annotation

GitHub Actions / Build

Missing XML comment for publicly visible type or member 'DemoParser.GameEvents'

Check warning on line 74 in src/DemoFile/DemoParser.cs

View workflow job for this annotation

GitHub Actions / Build

Missing XML comment for publicly visible type or member 'DemoParser.GameEvents'
public ref PacketEvents PacketEvents => ref _packetEvents;

Check warning on line 75 in src/DemoFile/DemoParser.cs

View workflow job for this annotation

GitHub Actions / Build

Missing XML comment for publicly visible type or member 'DemoParser.PacketEvents'

Check warning on line 75 in src/DemoFile/DemoParser.cs

View workflow job for this annotation

GitHub Actions / Build

Missing XML comment for publicly visible type or member 'DemoParser.PacketEvents'
Expand Down Expand Up @@ -100,7 +107,6 @@ private void OnDemoFileInfo(CDemoFileInfo fileInfo)

private void OnDemoPacket(CDemoPacket msg)
{
var arrayPool = ArrayPool<byte>.Shared;
var buffer = new BitBuffer(msg.Data.Span);

// Read all messages from the buffer. Messages are packed serially as
Expand All @@ -111,8 +117,8 @@ private void OnDemoPacket(CDemoPacket msg)
var msgType = (int) buffer.ReadUBitVar();
var size = (int) buffer.ReadUVarInt32();

var rentedBuffer = arrayPool.Rent(size);
var msgBuf = ((Span<byte>) rentedBuffer)[..size];
var rentedBuffer = _bytePool.Rent(size);
var msgBuf = rentedBuffer.AsSpan(..size);
buffer.ReadBytes(msgBuf);

// Queue packets to be read in a specific order.
Expand All @@ -131,11 +137,11 @@ private void OnDemoPacket(CDemoPacket msg)
{
}

arrayPool.Return(queued.RentedBuf);
_bytePool.Return(queued.RentedBuf);
}
}

private static int ReadDemoSize(byte[] bytes)
private static int ReadDemoSize(Span<byte> bytes)
{
ReadOnlySpan<int> values = MemoryMarshal.Cast<byte, int>(bytes);
return values[0];
Expand All @@ -156,8 +162,12 @@ public async ValueTask StartReadingAsync(Stream stream, CancellationToken cancel
_fullPacketPositions.Clear();
_stream = stream;

ValidateMagic(await ReadExactBytesAsync(8, cancellationToken).ConfigureAwait(false));
var sizeBytes = ReadDemoSize(await ReadExactBytesAsync(8, cancellationToken).ConfigureAwait(false));
var rented = _bytePool.Rent(16);
var buf = rented.AsMemory(..16);
await _stream.ReadExactlyAsync(buf, cancellationToken).ConfigureAwait(false);
ValidateMagic(buf.Span[..8]);
var sizeBytes = ReadDemoSize(buf.Span[8..]);
_bytePool.Return(rented);

// `sizeBytes` represents the number of bytes remaining in the demo,
// from this point (i.e. 16 bytes into the file).
Expand Down Expand Up @@ -227,16 +237,19 @@ public async ValueTask ReadAllAsync(Stream stream, CancellationToken cancellatio
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private (uint Command, uint Size) ReadCommandHeader()
private (EDemoCommands Command, bool IsCompressed, int Size) ReadCommandHeader()
{
_commandStartPosition = _stream.Position;
var command = _stream.ReadUVarInt32();
var tick = (int) _stream.ReadUVarInt32();
var size = _stream.ReadUVarInt32();
var size = (int) _stream.ReadUVarInt32();

CurrentDemoTick = new DemoTick(tick);

return (Command: command, Size: size);
var isCompressed = (command & (uint) EDemoCommands.DemIsCompressed) != 0;
var msgType = (EDemoCommands)(command & ~(uint) EDemoCommands.DemIsCompressed);

return (Command: msgType, IsCompressed: isCompressed, Size: size);
}

/// <summary>
Expand All @@ -247,36 +260,35 @@ public async ValueTask ReadAllAsync(Stream stream, CancellationToken cancellatio
public ValueTask<bool> MoveNextAsync(CancellationToken cancellationToken)
{
var cmd = ReadCommandHeader();
return MoveNextCoreAsync(cmd.Command, cmd.Size, cancellationToken);
return MoveNextCoreAsync(cmd.Command, cmd.IsCompressed, cmd.Size, cancellationToken);
}

private async ValueTask<bool> MoveNextCoreAsync(uint command, uint size, CancellationToken cancellationToken)
private async ValueTask<bool> MoveNextCoreAsync(EDemoCommands msgType, bool isCompressed, int size, CancellationToken cancellationToken)
{
var msgType = (EDemoCommands)(command & ~(uint)EDemoCommands.DemIsCompressed);
if (msgType is < 0 or >= EDemoCommands.DemMax)
throw new InvalidDemoException($"Unexpected demo command: {command}");
IsReading = true;

var isCompressed = (command & (uint)EDemoCommands.DemIsCompressed)
== (uint)EDemoCommands.DemIsCompressed;
if (msgType is < 0 or >= EDemoCommands.DemMax)
throw new InvalidDemoException($"Unexpected demo command: {msgType}");

while (_demoTickTimers.TryPeek(out var timer, out var timerTick) && timerTick <= CurrentDemoTick.Value)
{
_demoTickTimers.Dequeue();
timer.Invoke();
}

// todo: read into pooled array
var buf = await ReadExactBytesAsync((int)size, cancellationToken).ConfigureAwait(false);
var rented = _bytePool.Rent(size);
var buf = rented.AsMemory(..size);
await _stream.ReadExactlyAsync(buf, cancellationToken).ConfigureAwait(false);

bool canContinue;
if (isCompressed)
{
using var decompressed = Snappy.DecompressToMemory(buf);
using var decompressed = Snappy.DecompressToMemory(buf.Span);
canContinue = _demoEvents.ReadDemoCommand(msgType, decompressed.Memory.Span);
}
else
{
canContinue = _demoEvents.ReadDemoCommand(msgType, buf);
canContinue = _demoEvents.ReadDemoCommand(msgType, buf.Span);
}

if (OnCommandFinish is { } onCommandFinish)
Expand All @@ -286,19 +298,24 @@ private async ValueTask<bool> MoveNextCoreAsync(uint command, uint size, Cancell
onCommandFinish();
}

IsReading = false;
_bytePool.Return(rented);
return canContinue;
}

private async ValueTask ReadFileInfo(CancellationToken cancellationToken)
{
var cmd = ReadCommandHeader();
Debug.Assert(cmd.Command == (uint)EDemoCommands.DemFileInfo);
Debug.Assert(cmd.Command == EDemoCommands.DemFileInfo);

// Always treat DemoFileInfo as being at 'pre-record'
CurrentDemoTick = DemoTick.PreRecord;

var buf = await ReadExactBytesAsync((int)cmd.Size, cancellationToken).ConfigureAwait(false);
DemoEvents.DemoFileInfo?.Invoke(CDemoFileInfo.Parser.ParseFrom(buf));
var rented = _bytePool.Rent(cmd.Size);
var buf = rented.AsMemory(..cmd.Size);
await _stream.ReadExactlyAsync(buf, cancellationToken).ConfigureAwait(false);
DemoEvents.DemoFileInfo?.Invoke(CDemoFileInfo.Parser.ParseFrom(buf.Span));
_bytePool.Return(rented);
}

private static void ValidateMagic(ReadOnlySpan<byte> magic)
Expand All @@ -310,15 +327,6 @@ private static void ValidateMagic(ReadOnlySpan<byte> magic)
}
}

private async ValueTask<byte[]> ReadExactBytesAsync(
int length,
CancellationToken cancellationToken)
{
var result = new byte[length];
await _stream.ReadExactlyAsync(result, 0, length, cancellationToken).ConfigureAwait(false);
return result;
}

/// <summary>
/// Schedule a callback at demo tick <paramref name="tick"/>.
/// The callback will be fired at the start of the demo tick.
Expand Down

0 comments on commit 4494e34

Please sign in to comment.