3
3
4
4
using Microsoft . Agents . Builder . Errors ;
5
5
using Microsoft . Agents . Core ;
6
+ using Microsoft . Agents . Core . Errors ;
6
7
using Microsoft . Agents . Core . Models ;
7
8
using System ;
8
9
using System . Collections . Generic ;
@@ -40,13 +41,16 @@ internal class StreamingResponse : IStreamingResponse
40
41
{
41
42
public static readonly int DefaultEndStreamTimeout = ( int ) TimeSpan . FromMinutes ( 2 ) . TotalMilliseconds ;
42
43
44
+ private const string TeamsStreamCancelled = "ContentStreamNotAllowed" ;
45
+
43
46
private readonly TurnContext _context ;
44
47
private int _nextSequence = 1 ;
45
48
private bool _ended = false ;
46
49
private Timer _timer ;
47
50
private bool _messageUpdated = false ;
48
51
private bool _informativeSent = false ;
49
52
private bool _isTeamsChannel ;
53
+ private bool _cancelled ;
50
54
51
55
// Queue for outgoing activities
52
56
private readonly List < Func < IActivity > > _queue = [ ] ;
@@ -211,7 +215,7 @@ public async Task QueueInformativeUpdateAsync(string text, CancellationToken can
211
215
/// <exception cref="System.InvalidOperationException">Throws if the stream has already ended.</exception>
212
216
public void QueueTextChunk ( string text )
213
217
{
214
- if ( string . IsNullOrEmpty ( text ) )
218
+ if ( string . IsNullOrEmpty ( text ) || _cancelled )
215
219
{
216
220
return ;
217
221
}
@@ -264,7 +268,7 @@ public async Task EndStreamAsync(CancellationToken cancellationToken = default)
264
268
}
265
269
266
270
// Timer isn't running for non-streaming channels. Just send the Message buffer as a message.
267
- if ( ! string . IsNullOrWhiteSpace ( Message ) || FinalMessage != null )
271
+ if ( UpdatesSent ( ) > 0 || FinalMessage != null )
268
272
{
269
273
await _context . SendActivityAsync ( CreateFinalMessage ( ) , cancellationToken ) . ConfigureAwait ( false ) ;
270
274
}
@@ -280,7 +284,7 @@ public async Task EndStreamAsync(CancellationToken cancellationToken = default)
280
284
281
285
_ended = true ;
282
286
283
- if ( UpdatesSent ( ) == 0 )
287
+ if ( UpdatesSent ( ) == 0 || _cancelled )
284
288
{
285
289
// nothing was queued. nothing to "end".
286
290
return ;
@@ -300,7 +304,7 @@ public async Task EndStreamAsync(CancellationToken cancellationToken = default)
300
304
}
301
305
}
302
306
303
- if ( ! string . IsNullOrEmpty ( Message ) || FinalMessage != null )
307
+ if ( UpdatesSent ( ) > 0 || FinalMessage != null )
304
308
{
305
309
await SendActivityAsync ( CreateFinalMessage ( ) , cancellationToken ) . ConfigureAwait ( false ) ;
306
310
}
@@ -312,7 +316,7 @@ private IActivity CreateFinalMessage()
312
316
var activity = FinalMessage ?? new Activity ( ) ;
313
317
314
318
activity . Type = ActivityTypes . Message ;
315
- activity . Text = Message ; // Teams won't allow Activity.Text changes
319
+ activity . Text = Message ; // !string.IsNullOrEmpty(Message) ? Message : "No text was streamed"; // Teams won't allow Activity.Text changes or empty text
316
320
activity . Entities ??= [ ] ;
317
321
318
322
// make sure the supplied Activity doesn't have a streamInfo already.
@@ -325,7 +329,7 @@ private IActivity CreateFinalMessage()
325
329
}
326
330
}
327
331
328
- activity . Entities . Add ( new StreamInfo ( ) { StreamType = StreamTypes . Final } ) ;
332
+ activity . Entities . Add ( new StreamInfo ( ) { StreamType = StreamTypes . Final , StreamResult = ( string . IsNullOrEmpty ( Message ) ? StreamResults . Error : StreamResults . Success ) } ) ;
329
333
330
334
// Add in Generated by AI
331
335
if ( EnableGeneratedByAILabel == true )
@@ -367,6 +371,7 @@ public async Task ResetAsync(CancellationToken cancellationToken = default)
367
371
FinalMessage = null ;
368
372
_nextSequence = 1 ;
369
373
StreamId = null ;
374
+ _cancelled = false ;
370
375
}
371
376
}
372
377
@@ -527,11 +532,33 @@ private async Task SendActivityAsync(IActivity activity, CancellationToken cance
527
532
activity . GetStreamingEntity ( ) . StreamId = StreamId ;
528
533
}
529
534
530
- var response = await _context . SendActivityAsync ( activity , cancellationToken ) . ConfigureAwait ( false ) ;
531
-
532
- if ( string . IsNullOrEmpty ( StreamId ) )
535
+ try
536
+ {
537
+ var response = await _context . SendActivityAsync ( activity , cancellationToken ) . ConfigureAwait ( false ) ;
538
+ if ( string . IsNullOrEmpty ( StreamId ) )
539
+ {
540
+ StreamId = response . Id ;
541
+ }
542
+ }
543
+ catch ( Exception ex )
533
544
{
534
- StreamId = response . Id ;
545
+ // We are not rethrowing from here since this is likely being called
546
+ // from the Timer thread and will crash the app. A more elegant
547
+ // solution would be to get it back to the calling thread.
548
+
549
+ if ( ex is ErrorResponseException errorResponse )
550
+ {
551
+ if ( ! TeamsStreamCancelled . Equals ( errorResponse . Body . Error . Code , StringComparison . OrdinalIgnoreCase ) )
552
+ {
553
+ System . Diagnostics . Trace . WriteLine ( $ "Exception during StreamingResponse: { ex . Message } ") ;
554
+ }
555
+ }
556
+
557
+ lock ( this )
558
+ {
559
+ StopStream ( ) ;
560
+ _cancelled = true ;
561
+ }
535
562
}
536
563
}
537
564
}
0 commit comments