-
Notifications
You must be signed in to change notification settings - Fork 90
/
WaveFileStreamReader.cs
369 lines (316 loc) · 14.5 KB
/
WaveFileStreamReader.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
namespace Microsoft.Psi.Audio
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization;
using System.Threading;
using Microsoft.Psi;
using Microsoft.Psi.Data;
/// <summary>
/// Reader that streams audio from a WAVE file.
/// </summary>
[StreamReader("WAVE File", ".wav")]
public sealed class WaveFileStreamReader : IStreamReader
{
/// <summary>
/// Name of audio stream.
/// </summary>
public const string AudioStreamName = "Audio";
/// <summary>
/// Default size of each data buffer in milliseconds.
/// </summary>
public const int DefaultAudioBufferSizeMs = 20;
private const int AudioSourceId = 0;
private readonly WaveAudioStreamMetadata audioStreamMetadata;
private readonly BinaryReader waveFileReader;
private readonly WaveFormat waveFormat;
private readonly DateTime startTime;
private readonly long dataStart;
private readonly long dataLength;
private readonly List<Delegate> audioTargets = new List<Delegate>();
private readonly List<Delegate> audioIndexTargets = new List<Delegate>();
private int sequenceId = 0;
private byte[] buffer;
private TimeInterval seekInterval = TimeInterval.Infinite;
/// <summary>
/// Initializes a new instance of the <see cref="WaveFileStreamReader"/> class.
/// </summary>
/// <param name="name">Name of the WAVE file.</param>
/// <param name="path">Path of the WAVE file.</param>
public WaveFileStreamReader(string name, string path)
: this(name, path, DateTime.UtcNow, DefaultAudioBufferSizeMs)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="WaveFileStreamReader"/> class.
/// </summary>
/// <param name="name">Name of the WAVE file.</param>
/// <param name="path">Path of the WAVE file.</param>
/// <param name="startTime">Starting time for streams of data..</param>
/// <param name="audioBufferSizeMs">The size of each data buffer in milliseconds.</param>
internal WaveFileStreamReader(string name, string path, DateTime startTime, int audioBufferSizeMs = DefaultAudioBufferSizeMs)
{
this.Name = name;
this.Path = path;
this.startTime = startTime;
var file = System.IO.Path.Combine(path, name);
this.Size = file.Length;
this.waveFileReader = new BinaryReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read));
this.waveFormat = WaveFileHelper.ReadWaveFileHeader(this.waveFileReader);
this.dataLength = WaveFileHelper.ReadWaveDataLength(this.waveFileReader);
this.dataStart = this.waveFileReader.BaseStream.Position;
var bufferSize = (int)(this.waveFormat.AvgBytesPerSec * audioBufferSizeMs / 1000);
this.buffer = new byte[bufferSize];
// Compute originating times based on audio chunk start time + duration
var endTime = this.startTime.AddSeconds((double)this.dataLength / (double)this.waveFormat.AvgBytesPerSec);
this.MessageOriginatingTimeInterval = this.MessageCreationTimeInterval = this.StreamTimeInterval = new TimeInterval(this.startTime, endTime);
var messageCount = (long)Math.Ceiling((double)this.dataLength / bufferSize);
this.audioStreamMetadata = new WaveAudioStreamMetadata(AudioStreamName, typeof(AudioBuffer).AssemblyQualifiedName, name, path, this.startTime, endTime, messageCount, (double)this.dataLength / messageCount, audioBufferSizeMs);
}
/// <inheritdoc />
public string Name { get; private set; }
/// <inheritdoc />
public string Path { get; private set; }
/// <inheritdoc />
public IEnumerable<IStreamMetadata> AvailableStreams
{
get
{
yield return this.audioStreamMetadata;
}
}
/// <inheritdoc />
public TimeInterval MessageCreationTimeInterval { get; private set; }
/// <inheritdoc />
public TimeInterval MessageOriginatingTimeInterval { get; private set; }
/// <inheritdoc />
public TimeInterval StreamTimeInterval { get; private set; }
/// <inheritdoc/>
public long? Size { get; }
/// <inheritdoc/>
public int? StreamCount => 1;
/// <inheritdoc />
public bool ContainsStream(string name)
{
return name == AudioStreamName;
}
/// <inheritdoc />
public void Dispose()
{
this.waveFileReader.Dispose();
}
/// <inheritdoc />
public IStreamMetadata GetStreamMetadata(string name)
{
ValidateStreamName(name);
return this.audioStreamMetadata;
}
/// <inheritdoc />
public T GetSupplementalMetadata<T>(string streamName)
{
ValidateStreamName(streamName);
if (typeof(T) != typeof(WaveFormat))
{
throw new NotSupportedException("The Audio stream supports only supplemental metadata of type WaveFormat.");
}
return (T)(object)this.waveFormat;
}
/// <inheritdoc />
public bool IsLive()
{
return false;
}
/// <inheritdoc />
public bool MoveNext(out Envelope envelope)
{
if (
!this.Next(out var audio, out envelope) ||
!this.seekInterval.PointIsWithin(envelope.OriginatingTime))
{
return false;
}
this.InvokeTargets(audio, envelope);
return true;
}
/// <inheritdoc />
public IStreamReader OpenNew()
{
return new WaveFileStreamReader(this.Name, this.Path, this.startTime);
}
/// <inheritdoc />
public IStreamMetadata OpenStream<T>(string name, Action<T, Envelope> target, Func<T> allocator = null, Action<T> deallocator = null, Action<SerializationException> errorHandler = null)
{
ValidateStreamName(name);
if (target == null)
{
throw new ArgumentNullException(nameof(target));
}
if (allocator != null)
{
throw new NotSupportedException($"Allocators are not supported by {nameof(WaveFileStreamReader)} and must be null.");
}
// targets are later called when data is read by MoveNext or ReadAll (see InvokeTargets).
this.audioTargets.Add(target);
return this.audioStreamMetadata;
}
/// <inheritdoc />
public IStreamMetadata OpenStreamIndex<T>(string name, Action<Func<IStreamReader, T>, Envelope> target, Func<T> allocator = null)
{
ValidateStreamName(name);
if (target == null)
{
throw new ArgumentNullException(nameof(target));
}
if (allocator != null)
{
throw new NotSupportedException($"Allocators are not supported by {nameof(WaveFileStreamReader)} and must be null.");
}
// targets are later called when data is read by MoveNext or ReadAll (see InvokeTargets).
this.audioIndexTargets.Add(target);
return this.audioStreamMetadata;
}
/// <inheritdoc />
public void ReadAll(ReplayDescriptor descriptor, CancellationToken cancelationToken = default)
{
this.Seek(descriptor.Interval);
while (!cancelationToken.IsCancellationRequested && this.Next(out var audio, out var envelope))
{
if (descriptor.Interval.PointIsWithin(envelope.OriginatingTime))
{
this.InvokeTargets(audio, envelope);
}
}
}
/// <inheritdoc />
public void Seek(TimeInterval interval, bool useOriginatingTime = false)
{
this.seekInterval = interval;
this.waveFileReader.BaseStream.Position = this.dataStart;
this.sequenceId = 0;
var previousPosition = this.waveFileReader.BaseStream.Position;
while (this.Next(out var _, out var envelope))
{
if (interval.PointIsWithin(envelope.OriginatingTime))
{
this.waveFileReader.BaseStream.Position = previousPosition; // rewind
return;
}
previousPosition = this.waveFileReader.BaseStream.Position;
}
}
/// <summary>
/// Validate that name corresponds to a supported stream.
/// </summary>
/// <param name="name">Stream name.</param>
private static void ValidateStreamName(string name)
{
if (name != AudioStreamName)
{
// the only supported stream is the single audio stream.
throw new NotSupportedException($"Only '{AudioStreamName}' stream is supported.");
}
}
/// <summary>
/// Read an audio buffer of data.
/// </summary>
/// <param name="position">Byte position.</param>
/// <param name="sequenceId">Message sequence ID.</param>
/// <returns>Audio buffer.</returns>
private AudioBuffer Read(long position, int sequenceId)
{
this.waveFileReader.BaseStream.Position = position;
this.sequenceId = sequenceId;
if (!this.Next(out var audio, out var _))
{
throw new InvalidOperationException("Invalid position (out of bounds).");
}
return audio;
}
/// <summary>
/// Invoke target callbacks with currently read message information.
/// </summary>
/// <param name="audio">Current audio buffer.</param>
/// <param name="envelope">Current message envelope.</param>
/// <remarks>This method is called as the data is read when MoveNext() or ReadAll() are called.</remarks>
private void InvokeTargets(AudioBuffer audio, Envelope envelope)
{
foreach (Delegate action in this.audioTargets)
{
action.DynamicInvoke(audio, envelope);
}
foreach (Delegate action in this.audioIndexTargets)
{
// Index targets are given the message Envelope and a Func by which to retrieve the message data.
// This Func may be held as a kind of "index" later called to retrieve the data. It may be called,
// given the current IStreamReader or a new `reader` instance against the same store.
// The Func is a closure over the `position` and `sequenceId` information needed for retrieval
// but these implementation details remain opaque to users of the reader.
var position = this.waveFileReader.BaseStream.Position;
var sequenceId = this.sequenceId;
action.DynamicInvoke(new Func<IStreamReader, AudioBuffer>(reader => ((WaveFileStreamReader)reader).Read(position, sequenceId)), envelope);
}
}
/// <summary>
/// Read the next audio buffer of data from the WAVE file.
/// </summary>
/// <param name="audio">Audio buffer to be populated.</param>
/// <param name="envelope">Message envelope to be populated.</param>
/// <returns>A bool indicating whether the end of available data has been reached.</returns>
private bool Next(out AudioBuffer audio, out Envelope envelope)
{
var bytesRemaining = this.dataLength - (this.waveFileReader.BaseStream.Position - this.dataStart);
int nextBytesToRead = (int)Math.Min(this.buffer.Length, bytesRemaining);
// Re-allocate buffer if necessary
if ((this.buffer == null) || (this.buffer.Length != nextBytesToRead))
{
this.buffer = new byte[nextBytesToRead];
}
// Read next audio chunk
int bytesRead = this.waveFileReader.Read(this.buffer, 0, (int)nextBytesToRead);
if (bytesRead == 0)
{
// Break on end of file
audio = default;
envelope = default;
return false;
}
// Truncate buffer if necessary
if (bytesRead < nextBytesToRead)
{
byte[] truncated = new byte[bytesRead];
Array.Copy(this.buffer, 0, truncated, 0, bytesRead);
this.buffer = truncated;
}
var totalBytesRead = this.waveFileReader.BaseStream.Position - this.dataStart;
DateTime time = this.startTime.AddSeconds((double)totalBytesRead / (double)this.waveFormat.AvgBytesPerSec);
audio = new AudioBuffer(this.buffer, this.waveFormat);
envelope = new Envelope(time, time, AudioSourceId, this.sequenceId++);
return true;
}
/// <summary>
/// WAVE audio stream metadata.
/// </summary>
public class WaveAudioStreamMetadata : StreamMetadataBase
{
/// <summary>
/// Initializes a new instance of the <see cref="WaveAudioStreamMetadata"/> class.
/// </summary>
/// <param name="name">Stream name.</param>
/// <param name="typeName">Stream type name.</param>
/// <param name="partitionName">Partition/file name.</param>
/// <param name="partitionPath">Partition/file path.</param>
/// <param name="first">First message time.</param>
/// <param name="last">Last message time.</param>
/// <param name="messageCount">Total message count.</param>
/// <param name="averageMessageSize">Average message size (bytes).</param>
/// <param name="averageLatencyMs">Average message latency (milliseconds).</param>
internal WaveAudioStreamMetadata(string name, string typeName, string partitionName, string partitionPath, DateTime first, DateTime last, long messageCount, double averageMessageSize, double averageLatencyMs)
: base(name, AudioSourceId, typeName, partitionName, partitionPath, first, last, messageCount, averageMessageSize, averageLatencyMs)
{
}
}
}
}