/
blob_s3_storage.go
87 lines (73 loc) · 1.77 KB
/
blob_s3_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package infrastructure
import (
"bytes"
"context"
"fmt"
"github.com/alexandria-oss/core/exception"
"github.com/go-kit/kit/log"
"github.com/maestre3d/alexandria/blob-service/internal/domain"
"gocloud.dev/blob"
"gocloud.dev/gcerrors"
"io"
"sync"
)
type BlobS3Storage struct {
logger log.Logger
mu *sync.Mutex
}
func NewBlobS3Storage(logger log.Logger) *BlobS3Storage {
return &BlobS3Storage{
logger: logger,
mu: new(sync.Mutex),
}
}
func (s *BlobS3Storage) Store(ctx context.Context, blobRef *domain.Blob) error {
s.mu.Lock()
defer s.mu.Unlock()
bucket, err := blob.OpenBucket(ctx, fmt.Sprintf("s3://%s?region=%s", domain.StorageDomain, domain.StorageRegion))
if err != nil {
return err
}
defer bucket.Close()
bucket = blob.PrefixedBucket(bucket, domain.StoragePath+"/"+blobRef.Service+"/")
ctxR, cancel := context.WithCancel(ctx)
defer cancel()
w, err := bucket.NewWriter(ctxR, blobRef.Name, nil)
if err != nil {
return err
}
buf := bytes.NewBuffer(nil)
_, err = io.Copy(buf, blobRef.Content)
if err != nil {
return err
}
_, err = w.Write(buf.Bytes())
if err != nil {
return err
}
closeErr := w.Close()
if closeErr != nil {
return closeErr
}
return nil
}
func (s *BlobS3Storage) Delete(ctx context.Context, key, service string) error {
s.mu.Lock()
defer s.mu.Unlock()
bucket, err := blob.OpenBucket(ctx, fmt.Sprintf("s3://%s?region=%s", domain.StorageDomain, domain.StorageRegion))
if err != nil {
return err
}
defer bucket.Close()
bucket = blob.PrefixedBucket(bucket, domain.StoragePath+"/"+service+"/")
ctxR, cancel := context.WithCancel(ctx)
defer cancel()
err = bucket.Delete(ctxR, key)
if err != nil {
if gcerrors.Code(err) == gcerrors.NotFound {
return exception.EntityNotFound
}
return err
}
return nil
}