diff --git a/src/DemoFile/DemoParser.FullPacket.cs b/src/DemoFile/DemoParser.FullPacket.cs index 88b2c2c..f43b5dd 100644 --- a/src/DemoFile/DemoParser.FullPacket.cs +++ b/src/DemoFile/DemoParser.FullPacket.cs @@ -10,10 +10,9 @@ public partial class DemoParser public int CompareTo(FullPacketPosition other) => Tick.CompareTo(other.Tick); } - /// Key ticks occur every 60 seconds + /// Full packets occur every 60 seconds private const int FullPacketInterval = 64 * 60; - // todo: reset on StartReadingAsync private readonly List _fullPacketPositions = new(64); private DemoTick _readFullPacketTick = DemoTick.PreRecord; private int _fullPacketTickOffset; @@ -38,61 +37,67 @@ private bool TryFindFullPacketBefore(DemoTick demoTick, out FullPacketPosition f return false; } - public async ValueTask SeekToTickAsync(DemoTick tick, CancellationToken cancellationToken) + /// + /// Seek to a specific tick within the demo file. Tick can be in the future or the past. + /// This works by first seeking to the nearest before , + /// decoding the full stringtable and entity snapshot, then reading tick-by-tick to . + /// + /// Tick to seek to. + /// Cancellation token for cancelling the seek. + /// Tick is invalid, or attempting to seek while reading commands. + /// EOF before reaching . + /// + /// Seeking is not allowed while reading commands. See . + /// + public async ValueTask SeekToTickAsync(DemoTick targetTick, CancellationToken cancellationToken) { - // todo: throw if currently in a MoveNextAsync + if (IsReading) + throw new InvalidOperationException($"Cannot seek to tick while reading commands"); if (TickCount < DemoTick.Zero) - { - throw new InvalidOperationException($"Cannot seek to tick {tick}"); - } + throw new InvalidOperationException($"Cannot seek to tick {targetTick}"); - if (TickCount != default && tick > TickCount) - { - throw new InvalidOperationException($"Cannot seek to tick {tick}. The demo only contains data for {TickCount} ticks"); - } + if (TickCount != default && targetTick > TickCount) + throw new InvalidOperationException($"Cannot seek to tick {targetTick}. The demo only contains data for {TickCount} ticks"); - var hasFullPacket = TryFindFullPacketBefore(tick, out var fullPacket); - if (tick < CurrentDemoTick) + var hasFullPacket = TryFindFullPacketBefore(targetTick, out var fullPacket); + if (targetTick < CurrentDemoTick) { if (!hasFullPacket) - { - throw new InvalidOperationException($"Cannot seek backwards to tick {tick}. No {nameof(CDemoFullPacket)} has been read"); - } + throw new InvalidOperationException($"Cannot seek backwards to tick {targetTick}. No {nameof(CDemoFullPacket)} has been read"); - // Seeking backwards. Jump back to the key tick to read the snapshot + // Seeking backwards. Jump back to the full packet to read the snapshot (CurrentDemoTick, _stream.Position) = fullPacket; } else { var deltaTicks = fullPacket.Tick - CurrentDemoTick; - // Only read the key tick if the jump is far enough ahead + // Only read the full packet if the jump is far enough ahead if (hasFullPacket && deltaTicks.Value >= FullPacketInterval) { (CurrentDemoTick, _stream.Position) = fullPacket; } } - // Keep reading commands until we reach the key tick - _readFullPacketTick = new DemoTick(tick.Value / FullPacketInterval * FullPacketInterval + _fullPacketTickOffset); + // Keep reading commands until we reach the full packet + _readFullPacketTick = new DemoTick(targetTick.Value / FullPacketInterval * FullPacketInterval + _fullPacketTickOffset); SkipToTick(_readFullPacketTick); // Advance ticks until we get to the target tick - while (CurrentDemoTick < tick) + while (CurrentDemoTick < targetTick) { var cmd = ReadCommandHeader(); - // We've arrived at the target tick - if (CurrentDemoTick == tick) + if (CurrentDemoTick == targetTick) { _stream.Position = _commandStartPosition; break; } - if (!await MoveNextCoreAsync(cmd.Command, cmd.Size, cancellationToken).ConfigureAwait(false)) + if (!await MoveNextCoreAsync(cmd.Command, cmd.IsCompressed, cmd.Size, cancellationToken).ConfigureAwait(false)) { - throw new EndOfStreamException($"Reached EOF at tick {CurrentDemoTick} while seeking to tick {tick}"); + throw new EndOfStreamException($"Reached EOF at tick {CurrentDemoTick} while seeking to tick {targetTick}"); } } } @@ -102,17 +107,16 @@ private void SkipToTick(DemoTick targetTick) while (CurrentDemoTick < targetTick) { var cmd = ReadCommandHeader(); - var actualCmd = cmd.Command & ~(uint) EDemoCommands.DemIsCompressed; // If we're at the target tick, jump back to the start of the command - if (CurrentDemoTick == targetTick && actualCmd == (uint)EDemoCommands.DemFullPacket) + if (CurrentDemoTick == targetTick && cmd.Command == EDemoCommands.DemFullPacket) { _stream.Position = _commandStartPosition; break; } // Record fullpackets even when seeking to improve seeking performance - if (actualCmd == (uint) EDemoCommands.DemFullPacket) + if (cmd.Command == EDemoCommands.DemFullPacket) { RecordFullPacket(); } @@ -134,23 +138,18 @@ private void RecordFullPacket() // Some demos have fullpackets at tick 0, some at tick 1. _fullPacketTickOffset = CurrentDemoTick.Value % FullPacketInterval; - Debug.Assert(_fullPacketTickOffset == 0 || _fullPacketTickOffset == 1, "Unexpected key tick offset"); + Debug.Assert(_fullPacketTickOffset == 0 || _fullPacketTickOffset == 1, "Unexpected full packet tick offset"); } private void OnDemoFullPacket(CDemoFullPacket fullPacket) { - if (CurrentDemoTick < DemoTick.Zero) - return; - RecordFullPacket(); - // We only care about full packets if we're seeking + // We only want to read full packets if we're seeking to it if (CurrentDemoTick == _readFullPacketTick) { - var stringTables = fullPacket.StringTable; - for (var tableIdx = 0; tableIdx < stringTables.Tables.Count; tableIdx++) + foreach (var snapshot in fullPacket.StringTable.Tables) { - var snapshot = stringTables.Tables[tableIdx]; var stringTable = _stringTables[snapshot.TableName]; stringTable.ReplaceWith(snapshot.Items); } diff --git a/src/DemoFile/DemoParser.cs b/src/DemoFile/DemoParser.cs index 78e6dff..4532566 100644 --- a/src/DemoFile/DemoParser.cs +++ b/src/DemoFile/DemoParser.cs @@ -9,18 +9,25 @@ namespace DemoFile; public sealed partial class DemoParser { + private readonly ArrayPool _bytePool = ArrayPool.Create(); private readonly PriorityQueue _demoTickTimers = new(); private readonly PriorityQueue _packetQueue = new(128); private readonly PriorityQueue _serverTickTimers = new(); private readonly Source1GameEvents _source1GameEvents; + private long _commandStartPosition; private DemoEvents _demoEvents; private EntityEvents _entityEvents; private GameEvents _gameEvents; private PacketEvents _packetEvents; private Stream _stream; private UserMessageEvents _userMessageEvents; - private long _commandStartPosition; + + /// + /// Event fired when the current demo command has finished (e.g, just before returns). + /// Reset to null just before it is invoked. + /// + public Action? OnCommandFinish; /// /// Event fired every time a demo command is parsed during . @@ -30,12 +37,6 @@ public sealed partial class DemoParser /// public Action? OnProgress; - /// - /// Event fired when the current demo command has finished (e.g, just before returns). - /// Reset to null just before it is invoked. - /// - public Action? OnCommandFinish; - public DemoParser() { _source1GameEvents = new Source1GameEvents(this); @@ -63,6 +64,12 @@ public DemoParser() _gameEvents.Source1LegacyGameEvent += @event => Source1GameEvents.ParseSource1GameEvent(this, @event); } + /// + /// Flag indicate whether the parser is currently reading a command. + /// During reading, seeking (e.g. with ) is not possible. + /// + public bool IsReading { get; private set; } + public ref DemoEvents DemoEvents => ref _demoEvents; public ref GameEvents GameEvents => ref _gameEvents; public ref PacketEvents PacketEvents => ref _packetEvents; @@ -100,7 +107,6 @@ private void OnDemoFileInfo(CDemoFileInfo fileInfo) private void OnDemoPacket(CDemoPacket msg) { - var arrayPool = ArrayPool.Shared; var buffer = new BitBuffer(msg.Data.Span); // Read all messages from the buffer. Messages are packed serially as @@ -111,8 +117,8 @@ private void OnDemoPacket(CDemoPacket msg) var msgType = (int) buffer.ReadUBitVar(); var size = (int) buffer.ReadUVarInt32(); - var rentedBuffer = arrayPool.Rent(size); - var msgBuf = ((Span) rentedBuffer)[..size]; + var rentedBuffer = _bytePool.Rent(size); + var msgBuf = rentedBuffer.AsSpan(..size); buffer.ReadBytes(msgBuf); // Queue packets to be read in a specific order. @@ -131,11 +137,11 @@ private void OnDemoPacket(CDemoPacket msg) { } - arrayPool.Return(queued.RentedBuf); + _bytePool.Return(queued.RentedBuf); } } - private static int ReadDemoSize(byte[] bytes) + private static int ReadDemoSize(Span bytes) { ReadOnlySpan values = MemoryMarshal.Cast(bytes); return values[0]; @@ -156,8 +162,12 @@ public async ValueTask StartReadingAsync(Stream stream, CancellationToken cancel _fullPacketPositions.Clear(); _stream = stream; - ValidateMagic(await ReadExactBytesAsync(8, cancellationToken).ConfigureAwait(false)); - var sizeBytes = ReadDemoSize(await ReadExactBytesAsync(8, cancellationToken).ConfigureAwait(false)); + var rented = _bytePool.Rent(16); + var buf = rented.AsMemory(..16); + await _stream.ReadExactlyAsync(buf, cancellationToken).ConfigureAwait(false); + ValidateMagic(buf.Span[..8]); + var sizeBytes = ReadDemoSize(buf.Span[8..]); + _bytePool.Return(rented); // `sizeBytes` represents the number of bytes remaining in the demo, // from this point (i.e. 16 bytes into the file). @@ -227,16 +237,19 @@ public async ValueTask ReadAllAsync(Stream stream, CancellationToken cancellatio } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private (uint Command, uint Size) ReadCommandHeader() + private (EDemoCommands Command, bool IsCompressed, int Size) ReadCommandHeader() { _commandStartPosition = _stream.Position; var command = _stream.ReadUVarInt32(); var tick = (int) _stream.ReadUVarInt32(); - var size = _stream.ReadUVarInt32(); + var size = (int) _stream.ReadUVarInt32(); CurrentDemoTick = new DemoTick(tick); - return (Command: command, Size: size); + var isCompressed = (command & (uint) EDemoCommands.DemIsCompressed) != 0; + var msgType = (EDemoCommands)(command & ~(uint) EDemoCommands.DemIsCompressed); + + return (Command: msgType, IsCompressed: isCompressed, Size: size); } /// @@ -247,17 +260,15 @@ public async ValueTask ReadAllAsync(Stream stream, CancellationToken cancellatio public ValueTask MoveNextAsync(CancellationToken cancellationToken) { var cmd = ReadCommandHeader(); - return MoveNextCoreAsync(cmd.Command, cmd.Size, cancellationToken); + return MoveNextCoreAsync(cmd.Command, cmd.IsCompressed, cmd.Size, cancellationToken); } - private async ValueTask MoveNextCoreAsync(uint command, uint size, CancellationToken cancellationToken) + private async ValueTask MoveNextCoreAsync(EDemoCommands msgType, bool isCompressed, int size, CancellationToken cancellationToken) { - var msgType = (EDemoCommands)(command & ~(uint)EDemoCommands.DemIsCompressed); - if (msgType is < 0 or >= EDemoCommands.DemMax) - throw new InvalidDemoException($"Unexpected demo command: {command}"); + IsReading = true; - var isCompressed = (command & (uint)EDemoCommands.DemIsCompressed) - == (uint)EDemoCommands.DemIsCompressed; + if (msgType is < 0 or >= EDemoCommands.DemMax) + throw new InvalidDemoException($"Unexpected demo command: {msgType}"); while (_demoTickTimers.TryPeek(out var timer, out var timerTick) && timerTick <= CurrentDemoTick.Value) { @@ -265,18 +276,19 @@ private async ValueTask MoveNextCoreAsync(uint command, uint size, Cancell timer.Invoke(); } - // todo: read into pooled array - var buf = await ReadExactBytesAsync((int)size, cancellationToken).ConfigureAwait(false); + var rented = _bytePool.Rent(size); + var buf = rented.AsMemory(..size); + await _stream.ReadExactlyAsync(buf, cancellationToken).ConfigureAwait(false); bool canContinue; if (isCompressed) { - using var decompressed = Snappy.DecompressToMemory(buf); + using var decompressed = Snappy.DecompressToMemory(buf.Span); canContinue = _demoEvents.ReadDemoCommand(msgType, decompressed.Memory.Span); } else { - canContinue = _demoEvents.ReadDemoCommand(msgType, buf); + canContinue = _demoEvents.ReadDemoCommand(msgType, buf.Span); } if (OnCommandFinish is { } onCommandFinish) @@ -286,19 +298,24 @@ private async ValueTask MoveNextCoreAsync(uint command, uint size, Cancell onCommandFinish(); } + IsReading = false; + _bytePool.Return(rented); return canContinue; } private async ValueTask ReadFileInfo(CancellationToken cancellationToken) { var cmd = ReadCommandHeader(); - Debug.Assert(cmd.Command == (uint)EDemoCommands.DemFileInfo); + Debug.Assert(cmd.Command == EDemoCommands.DemFileInfo); // Always treat DemoFileInfo as being at 'pre-record' CurrentDemoTick = DemoTick.PreRecord; - var buf = await ReadExactBytesAsync((int)cmd.Size, cancellationToken).ConfigureAwait(false); - DemoEvents.DemoFileInfo?.Invoke(CDemoFileInfo.Parser.ParseFrom(buf)); + var rented = _bytePool.Rent(cmd.Size); + var buf = rented.AsMemory(..cmd.Size); + await _stream.ReadExactlyAsync(buf, cancellationToken).ConfigureAwait(false); + DemoEvents.DemoFileInfo?.Invoke(CDemoFileInfo.Parser.ParseFrom(buf.Span)); + _bytePool.Return(rented); } private static void ValidateMagic(ReadOnlySpan magic) @@ -310,15 +327,6 @@ private static void ValidateMagic(ReadOnlySpan magic) } } - private async ValueTask ReadExactBytesAsync( - int length, - CancellationToken cancellationToken) - { - var result = new byte[length]; - await _stream.ReadExactlyAsync(result, 0, length, cancellationToken).ConfigureAwait(false); - return result; - } - /// /// Schedule a callback at demo tick . /// The callback will be fired at the start of the demo tick.