-
Notifications
You must be signed in to change notification settings - Fork 399
/
attribution.go
212 lines (181 loc) · 6.38 KB
/
attribution.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"context"
"strings"
"sync"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/storj"
"storj.io/common/useragent"
"storj.io/common/uuid"
"storj.io/drpc/drpccache"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/console"
)
// ensureAttribution ensures that the bucketName has the partner information specified by keyInfo partner ID or the header user agent.
// PartnerID from keyInfo is a value associated with registered user and prevails over header user agent.
//
// Assumes that the user has permissions sufficient for authenticating.
func (endpoint *Endpoint) ensureAttribution(ctx context.Context, header *pb.RequestHeader, keyInfo *console.APIKeyInfo, bucketName []byte) error {
if header == nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, "header is nil")
}
if len(header.UserAgent) == 0 && keyInfo.PartnerID.IsZero() {
return nil
}
if conncache := drpccache.FromContext(ctx); conncache != nil {
cache := conncache.LoadOrCreate(attributionCheckCacheKey{},
func() interface{} {
return &attributionCheckCache{}
}).(*attributionCheckCache)
if !cache.needsCheck(string(bucketName)) {
return nil
}
}
var err error
partnerID := keyInfo.PartnerID
if partnerID.IsZero() {
partnerID, err = endpoint.ResolvePartnerID(ctx, header)
if err != nil {
return err
}
if partnerID.IsZero() {
return nil
}
}
err = endpoint.tryUpdateBucketAttribution(ctx, header, keyInfo.ProjectID, bucketName, partnerID)
if errs2.IsRPC(err, rpcstatus.NotFound) || errs2.IsRPC(err, rpcstatus.AlreadyExists) {
return nil
}
return err
}
// ResolvePartnerID returns partnerIDBytes as parsed or UUID corresponding to header.UserAgent.
// returns empty uuid when neither is defined.
func (endpoint *Endpoint) ResolvePartnerID(ctx context.Context, header *pb.RequestHeader) (uuid.UUID, error) {
if header == nil {
return uuid.UUID{}, rpcstatus.Error(rpcstatus.InvalidArgument, "header is nil")
}
if len(header.UserAgent) == 0 {
return uuid.UUID{}, nil
}
entries, err := useragent.ParseEntries(header.UserAgent)
if err != nil {
return uuid.UUID{}, rpcstatus.Errorf(rpcstatus.InvalidArgument, "invalid user agent %q: %v", string(header.UserAgent), err)
}
entries = removeUplinkUserAgent(entries)
// no user agent defined
if len(entries) == 0 {
return uuid.UUID{}, nil
}
// Use the first partner product entry as the PartnerID.
for _, entry := range entries {
if entry.Product != "" {
partner, err := endpoint.partners.ByUserAgent(ctx, entry.Product)
if err != nil || partner.UUID.IsZero() {
continue
}
return partner.UUID, nil
}
}
return uuid.UUID{}, nil
}
func removeUplinkUserAgent(entries []useragent.Entry) []useragent.Entry {
var xs []useragent.Entry
for i := 0; i < len(entries); i++ {
// If it's "uplink" then skip it.
if strings.EqualFold(entries[i].Product, uplinkProduct) {
// also skip any associated comments
for i+1 < len(entries) && entries[i+1].Comment != "" {
i++
}
continue
}
xs = append(xs, entries[i])
}
return xs
}
func (endpoint *Endpoint) tryUpdateBucketAttribution(ctx context.Context, header *pb.RequestHeader, projectID uuid.UUID, bucketName []byte, partnerID uuid.UUID) error {
if header == nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, "header is nil")
}
// check if attribution is set for given bucket
_, err := endpoint.attributions.Get(ctx, projectID, bucketName)
if err == nil {
// bucket has already an attribution, no need to update
return nil
}
if !attribution.ErrBucketNotAttributed.Has(err) {
// try only to set the attribution, when it's missing
endpoint.log.Error("error while getting attribution from DB", zap.Error(err))
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
empty, err := endpoint.metainfo.IsBucketEmpty(ctx, projectID, bucketName)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if !empty {
return rpcstatus.Errorf(rpcstatus.AlreadyExists, "bucket %q is not empty, PartnerID %q cannot be attributed", bucketName, partnerID)
}
// checks if bucket exists before updates it or makes a new entry
bucket, err := endpoint.metainfo.GetBucket(ctx, bucketName, projectID)
if err != nil {
if storj.ErrBucketNotFound.Has(err) {
return rpcstatus.Errorf(rpcstatus.NotFound, "bucket %q does not exist", bucketName)
}
endpoint.log.Error("error while getting bucket", zap.ByteString("bucketName", bucketName), zap.Error(err))
return rpcstatus.Error(rpcstatus.Internal, "unable to set bucket attribution")
}
if !bucket.PartnerID.IsZero() {
return rpcstatus.Errorf(rpcstatus.AlreadyExists, "bucket %q already has attribution, PartnerID %q cannot be attributed", bucketName, partnerID)
}
// update bucket information
bucket.PartnerID = partnerID
_, err = endpoint.metainfo.UpdateBucket(ctx, bucket)
if err != nil {
endpoint.log.Error("error while updating bucket", zap.ByteString("bucketName", bucketName), zap.Error(err))
return rpcstatus.Error(rpcstatus.Internal, "unable to set bucket attribution")
}
// update attribution table
_, err = endpoint.attributions.Insert(ctx, &attribution.Info{
ProjectID: projectID,
BucketName: bucketName,
PartnerID: partnerID,
})
if err != nil {
endpoint.log.Error("error while inserting attribution to DB", zap.Error(err))
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return nil
}
// maxAttributionCacheSize determines how many buckets attributionCheckCache remembers.
const maxAttributionCacheSize = 10
// attributionCheckCacheKey is used as a key for the connection cache.
type attributionCheckCacheKey struct{}
// attributionCheckCache implements a basic lru cache, with a constant size.
type attributionCheckCache struct {
mu sync.Mutex
pos int
buckets []string
}
// needsCheck returns true when the bucket should be tested for setting the useragent.
func (cache *attributionCheckCache) needsCheck(bucket string) bool {
cache.mu.Lock()
defer cache.mu.Unlock()
for _, b := range cache.buckets {
if b == bucket {
return false
}
}
if len(cache.buckets) >= maxAttributionCacheSize {
cache.pos = (cache.pos + 1) % len(cache.buckets)
cache.buckets[cache.pos] = bucket
} else {
cache.pos = len(cache.buckets)
cache.buckets = append(cache.buckets, bucket)
}
return true
}