-
-
Notifications
You must be signed in to change notification settings - Fork 6
/
BlockingCosmosDbChangefeed.cs
117 lines (96 loc) · 4.83 KB
/
BlockingCosmosDbChangefeed.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Linq;
using Microsoft.Azure.Documents.Client;
using Demo.AspNetCore.Changefeed.Services.Abstractions;
namespace Demo.AspNetCore.Changefeed.Services.CosmosDB
{
internal class BlockingCosmosDbChangefeed<T> : IChangefeed<T>
{
private static Task<bool> _taskFromTrue = Task.FromResult(true);
private static Task<bool> _taskFromFalse = Task.FromResult(false);
private static DateTime _startTime = DateTime.Now;
private readonly DocumentClient _documentClient;
private readonly TimeSpan _feedPollDelay;
private readonly Uri _collectionUri;
private readonly Dictionary<string, string> _collectionPartitionKeyRangesCheckpoints = new Dictionary<string, string>();
private IEnumerator<T> _changefeedEnumerator;
public T CurrentNewValue { get; set; } = default(T);
public BlockingCosmosDbChangefeed(DocumentClient documentClient, Uri collectionUri, TimeSpan feedPollDelay)
{
_documentClient = documentClient;
_collectionUri = collectionUri;
_feedPollDelay = feedPollDelay;
}
public Task<bool> MoveNextAsync(CancellationToken cancelToken = default(CancellationToken))
{
if (_changefeedEnumerator == null)
{
_changefeedEnumerator = GetChangefeed(cancelToken).GetEnumerator();
}
if (_changefeedEnumerator.MoveNext())
{
CurrentNewValue = _changefeedEnumerator.Current;
return _taskFromTrue;
}
return _taskFromFalse;
}
private async Task<List<PartitionKeyRange>> GetCollectionPartitionKeyRanges()
{
List<PartitionKeyRange> collectionPartitionKeyRanges = new List<PartitionKeyRange>();
string collectionPartitionKeyRangesResponseContinuation = null;
do
{
FeedResponse<PartitionKeyRange> collectionPartitionKeyRangesResponse = await _documentClient.ReadPartitionKeyRangeFeedAsync(_collectionUri, new FeedOptions
{
RequestContinuation = collectionPartitionKeyRangesResponseContinuation
});
collectionPartitionKeyRanges.AddRange(collectionPartitionKeyRangesResponse);
collectionPartitionKeyRangesResponseContinuation = collectionPartitionKeyRangesResponse.ResponseContinuation;
}
while (collectionPartitionKeyRangesResponseContinuation != null);
return collectionPartitionKeyRanges;
}
private IEnumerable<T> GetChangefeed(CancellationToken cancelToken)
{
while (!cancelToken.IsCancellationRequested)
{
foreach (PartitionKeyRange collectionPartitionKeyRange in GetCollectionPartitionKeyRanges().Result)
{
if (!cancelToken.IsCancellationRequested)
{
string collectionPartitionKeyRangeCheckpoint = null;
_collectionPartitionKeyRangesCheckpoints.TryGetValue(collectionPartitionKeyRange.Id, out collectionPartitionKeyRangeCheckpoint);
IDocumentQuery<Document> collectionChangeFeedQuery = _documentClient.CreateDocumentChangeFeedQuery(_collectionUri, new ChangeFeedOptions
{
PartitionKeyRangeId = collectionPartitionKeyRange.Id,
RequestContinuation = collectionPartitionKeyRangeCheckpoint,
MaxItemCount = -1,
StartTime = _startTime
});
while (collectionChangeFeedQuery.HasMoreResults && !cancelToken.IsCancellationRequested)
{
FeedResponse<T> collectionChangeFeedResponse = collectionChangeFeedQuery.ExecuteNextAsync<T>(cancelToken).Result;
_collectionPartitionKeyRangesCheckpoints[collectionPartitionKeyRange.Id] = collectionChangeFeedResponse.ResponseContinuation;
foreach (T changedDocument in collectionChangeFeedResponse)
{
if (cancelToken.IsCancellationRequested)
{
break;
}
yield return changedDocument;
}
}
}
}
if (!cancelToken.IsCancellationRequested)
{
Task.Delay(_feedPollDelay, cancelToken).Wait();
}
}
}
}
}