/
accounting.go
232 lines (196 loc) · 6.19 KB
/
accounting.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
package core
import (
"math/big"
"sync"
"time"
ethcommon "github.com/ethereum/go-ethereum/common"
)
// Balance holds the credit balance for a broadcast session
type Balance struct {
addr ethcommon.Address
manifestID ManifestID
balances *AddressBalances
}
// NewBalance returns a Balance instance
func NewBalance(addr ethcommon.Address, manifestID ManifestID, balances *AddressBalances) *Balance {
return &Balance{
addr: addr,
manifestID: manifestID,
balances: balances,
}
}
// Credit adds an amount to the balance
func (b *Balance) Credit(amount *big.Rat) {
b.balances.Credit(b.addr, b.manifestID, amount)
}
// StageUpdate prepares a balance update by reserving the current balance and returning the number of tickets
// to send with a payment, the new credit represented by the payment and the existing credit (i.e reserved balance)
func (b *Balance) StageUpdate(minCredit, ev *big.Rat) (int, *big.Rat, *big.Rat) {
existingCredit := b.balances.Reserve(b.addr, b.manifestID)
// If the existing credit exceeds the minimum credit then no tickets are required
// and the total payment value is 0
if existingCredit.Cmp(minCredit) >= 0 {
return 0, big.NewRat(0, 1), existingCredit
}
creditGap := new(big.Rat).Sub(minCredit, existingCredit)
sizeRat := creditGap.Quo(creditGap, ev)
res := sizeRat.Num()
if !sizeRat.IsInt() {
// If sizeRat is not an integer take the ceiling of the result of division to ensure
// that the batch of tickets will cover the entire creditGap
res = res.Div(res, sizeRat.Denom()).Add(res, big.NewInt(1))
}
size := res.Int64()
return int(size), new(big.Rat).Mul(new(big.Rat).SetInt64(size), ev), existingCredit
}
// AddressBalances holds credit balances for ETH addresses
type AddressBalances struct {
balances map[ethcommon.Address]*Balances
mtx sync.Mutex
ttl time.Duration
}
// NewAddressBalances creates a new AddressBalances instance
func NewAddressBalances(ttl time.Duration) *AddressBalances {
return &AddressBalances{
balances: make(map[ethcommon.Address]*Balances),
ttl: ttl,
}
}
// Credit adds an an amount to the balance for an address' ManifestID
func (a *AddressBalances) Credit(addr ethcommon.Address, id ManifestID, amount *big.Rat) {
a.balancesForAddr(addr).Credit(id, amount)
}
// Debit subtracts an amount from the balance for an address' ManifestID
func (a *AddressBalances) Debit(addr ethcommon.Address, id ManifestID, amount *big.Rat) {
a.balancesForAddr(addr).Debit(id, amount)
}
// Reserve zeros the balance for an address' ManifestID and returns the current balance
func (a *AddressBalances) Reserve(addr ethcommon.Address, id ManifestID) *big.Rat {
return a.balancesForAddr(addr).Reserve(id)
}
// Balance retrieves the current balance for an address' ManifestID
func (a *AddressBalances) Balance(addr ethcommon.Address, id ManifestID) *big.Rat {
return a.balancesForAddr(addr).Balance(id)
}
// StopCleanup stops the cleanup loop for all balances
func (a *AddressBalances) StopCleanup() {
a.mtx.Lock()
defer a.mtx.Unlock()
for _, b := range a.balances {
b.StopCleanup()
}
}
func (a *AddressBalances) balancesForAddr(addr ethcommon.Address) *Balances {
a.mtx.Lock()
defer a.mtx.Unlock()
if _, ok := a.balances[addr]; !ok {
b := NewBalances(a.ttl)
go b.StartCleanup()
a.balances[addr] = b
}
return a.balances[addr]
}
// Balances holds credit balances on a per-stream basis
type Balances struct {
balances map[ManifestID]*balance
mtx sync.RWMutex
ttl time.Duration
quit chan struct{}
}
type balance struct {
lastUpdate time.Time // Unix time since last update
amount *big.Rat // Balance represented as a big.Rat
fixedPrice *big.Rat // Fixed price for the session
}
// NewBalances creates a Balances instance with the given ttl
func NewBalances(ttl time.Duration) *Balances {
return &Balances{
balances: make(map[ManifestID]*balance),
ttl: ttl,
quit: make(chan struct{}),
}
}
// Credit adds an an amount to the balance for a ManifestID
func (b *Balances) Credit(id ManifestID, amount *big.Rat) {
b.mtx.Lock()
defer b.mtx.Unlock()
if b.balances[id] == nil {
b.balances[id] = &balance{amount: big.NewRat(0, 1)}
}
b.balances[id].amount.Add(b.balances[id].amount, amount)
b.balances[id].lastUpdate = time.Now()
}
// Debit subtracts an amount from the balance for a ManifestID
func (b *Balances) Debit(id ManifestID, amount *big.Rat) {
b.mtx.Lock()
defer b.mtx.Unlock()
if b.balances[id] == nil {
b.balances[id] = &balance{amount: big.NewRat(0, 1)}
}
b.balances[id].amount.Sub(b.balances[id].amount, amount)
b.balances[id].lastUpdate = time.Now()
}
// Reserve zeros the balance for a ManifestID and returns the current balance
func (b *Balances) Reserve(id ManifestID) *big.Rat {
b.mtx.Lock()
defer b.mtx.Unlock()
if b.balances[id] == nil {
b.balances[id] = &balance{amount: big.NewRat(0, 1)}
}
amount := b.balances[id].amount
b.balances[id].amount = big.NewRat(0, 1)
return amount
}
// Balance retrieves the current balance for a ManifestID
func (b *Balances) Balance(id ManifestID) *big.Rat {
b.mtx.RLock()
defer b.mtx.RUnlock()
if b.balances[id] == nil {
return nil
}
return b.balances[id].amount
}
// FixedPrice retrieves the price fixed the given session
func (b *Balances) FixedPrice(id ManifestID) *big.Rat {
b.mtx.RLock()
defer b.mtx.RUnlock()
if b.balances[id] == nil {
return nil
}
return b.balances[id].fixedPrice
}
// SetFixedPrice sets fixed price for the given session
func (b *Balances) SetFixedPrice(id ManifestID, fixedPrice *big.Rat) {
b.mtx.Lock()
defer b.mtx.Unlock()
if b.balances[id] == nil {
b.balances[id] = &balance{amount: big.NewRat(0, 1)}
}
b.balances[id].fixedPrice = fixedPrice
b.balances[id].lastUpdate = time.Now()
}
func (b *Balances) cleanup() {
for id, balance := range b.balances {
b.mtx.Lock()
if int64(time.Since(balance.lastUpdate)) > int64(b.ttl) {
delete(b.balances, id)
}
b.mtx.Unlock()
}
}
// StartCleanup is a state flushing method to clean up the balances mapping
func (b *Balances) StartCleanup() {
ticker := time.NewTicker(b.ttl)
for {
select {
case <-ticker.C:
b.cleanup()
case <-b.quit:
return
}
}
}
// StopCleanup stops the cleanup loop for Balances
func (b *Balances) StopCleanup() {
close(b.quit)
}