-
Notifications
You must be signed in to change notification settings - Fork 15
/
LogDecorator.cs
140 lines (125 loc) · 7.44 KB
/
LogDecorator.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using NStore.Core.Logging;
namespace NStore.Core.Persistence
{
public class LogDecorator : IPersistence
{
private readonly IPersistence _persistence;
private readonly INStoreLogger _logger;
public LogDecorator(IPersistence persistence, INStoreLoggerFactory inStoreLoggerFactory, string categoryName = "Persistence")
{
_persistence = persistence;
_logger = inStoreLoggerFactory.CreateLogger(categoryName);
}
public bool SupportsFillers => _persistence.SupportsFillers;
public async Task ReadForwardAsync(
string partitionId,
long fromLowerIndexInclusive,
ISubscription subscription,
long toUpperIndexInclusive,
int limit,
CancellationToken cancellationToken)
{
_logger.LogDebug("Start ReadPartitionForward(Partition {PartitionId}, from: {from})", partitionId, fromLowerIndexInclusive);
await _persistence.ReadForwardAsync(partitionId, fromLowerIndexInclusive, subscription, toUpperIndexInclusive, limit, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("End ReadPartitionForward(Partition {PartitionId}, from: {from})", partitionId, fromLowerIndexInclusive);
}
public async Task ReadForwardMultiplePartitionsByGlobalPositionAsync(
IEnumerable<string> partitionIdsList,
long fromLowerPositionInclusive,
ISubscription subscription,
long toUpperPositionInclusive,
CancellationToken cancellationToken)
{
_logger.LogDebug("Start ReadForwardMultiplePartitionsByGlobalPositionAsync(Partition {PartitionId}, from: {from})", string.Join(",", partitionIdsList), fromLowerPositionInclusive);
await _persistence.ReadForwardMultiplePartitionsByGlobalPositionAsync(partitionIdsList, fromLowerPositionInclusive, subscription, toUpperPositionInclusive, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("End ReadForwardMultiplePartitionsByGlobalPositionAsync(Partition {PartitionId}, from: {from})", string.Join(",", partitionIdsList), fromLowerPositionInclusive);
}
public async Task ReadBackwardAsync(
string partitionId,
long fromUpperIndexInclusive,
ISubscription subscription,
long toLowerIndexInclusive,
int limit,
CancellationToken cancellationToken)
{
_logger.LogDebug("Start ReadPartitionBackward(Partition {PartitionId}, from: {from})", partitionId, fromUpperIndexInclusive);
await _persistence.ReadBackwardAsync(partitionId, fromUpperIndexInclusive, subscription, toLowerIndexInclusive, limit, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("End ReadPartitionBackward(Partition {PartitionId}, from: {from})", partitionId, fromUpperIndexInclusive);
}
public async Task<IChunk> ReadSingleBackwardAsync(string partitionId, long fromUpperIndexInclusive, CancellationToken cancellationToken)
{
_logger.LogDebug("Start ReadLast(partitionId:{partitionId}, to:{to})", partitionId, fromUpperIndexInclusive);
var result = await _persistence.ReadSingleBackwardAsync(partitionId, fromUpperIndexInclusive, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("End ReadLast(partitionId:{partitionId}, to:{to})", partitionId, fromUpperIndexInclusive);
return result;
}
public async Task ReadAllAsync(
long fromPositionInclusive,
ISubscription subscription,
int limit,
CancellationToken cancellationToken)
{
_logger.LogDebug("Start ReadAllAsync(from:{from}, limit:{limit})", fromPositionInclusive, limit);
await _persistence.ReadAllAsync(fromPositionInclusive, subscription, limit, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("end ReadAllAsync(from:{from}, limit:{limit})", fromPositionInclusive, limit);
}
public async Task<long> ReadLastPositionAsync(CancellationToken cancellationToken)
{
_logger.LogDebug("Start ReadLastPosition()");
var result = await _persistence.ReadLastPositionAsync(cancellationToken).ConfigureAwait(false);
_logger.LogDebug("end ReadLastPosition()");
return result;
}
public async Task<IChunk> AppendAsync(
string partitionId,
long index,
object payload,
string operationId,
CancellationToken cancellationToken)
{
_logger.LogDebug("Start PersistAsync(partition: \"{partitionId}\", index: {index}, op: \"{op}\")", partitionId, index, operationId);
var result = await _persistence.AppendAsync(partitionId, index, payload, operationId, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("End PersistAsync(partition: \"{partitionId}\", index: {index}, op: \"{op}\") => position: {Position}", partitionId, index, operationId, result?.Position);
return result;
}
public async Task<IChunk> ReplaceOneAsync(long position, string partitionId, long index, object payload,
string operationId,
CancellationToken cancellationToken)
{
_logger.LogDebug("Start RewriteAsync({position}, {partitionId}, {index})", position, partitionId, index);
var chunk = await _persistence.ReplaceOneAsync(position, partitionId, index, payload, operationId, cancellationToken);
_logger.LogDebug("End RewriteAsync({position}, {partitionId}, {index})", position, partitionId, index);
return chunk;
}
public async Task<IChunk> ReadOneAsync(long position, CancellationToken cancellationToken)
{
_logger.LogDebug("Start ReadOneAsync({position})", position);
var chunk = await _persistence.ReadOneAsync(position, cancellationToken);
_logger.LogDebug("End ReadOneAsync({position})", position);
return chunk;
}
public async Task DeleteAsync(
string partitionId,
long fromLowerIndexInclusive,
long toUpperIndexInclusive,
CancellationToken cancellationToken)
{
_logger.LogDebug("Start DeleteAsync({partitionId}, {from}, {to})", partitionId, fromLowerIndexInclusive, toUpperIndexInclusive);
await _persistence.DeleteAsync(partitionId, fromLowerIndexInclusive, toUpperIndexInclusive, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("End DeleteAsync({partitionId}, {from}, {to})", partitionId, fromLowerIndexInclusive, toUpperIndexInclusive);
}
public Task<IChunk> ReadByOperationIdAsync(string partitionId, string operationId, CancellationToken cancellationToken)
{
return _persistence.ReadByOperationIdAsync(partitionId, operationId, cancellationToken);
}
public Task ReadAllByOperationIdAsync(string operationId, ISubscription subscription, CancellationToken cancellationToken)
{
return _persistence.ReadAllByOperationIdAsync(operationId, subscription, cancellationToken);
}
}
}