-
Notifications
You must be signed in to change notification settings - Fork 0
/
file.go
103 lines (85 loc) · 2.06 KB
/
file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package server
import (
"bufio"
"context"
"fmt"
"io"
api "github.com/ugent-library/biblio-backoffice/api/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const fileBufSize = 524288
func (s *server) GetFile(req *api.GetFileRequest, stream api.Biblio_GetFileServer) error {
b, err := s.services.FileStore.Get(stream.Context(), req.Sha256)
if err != nil {
return fmt.Errorf("cannot open file: %w", err)
}
defer b.Close()
r := bufio.NewReader(b)
buf := make([]byte, fileBufSize)
for {
n, err := r.Read(buf)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("cannot read chunk to buffer: %w", err)
}
req := &api.GetFileResponse{
Chunk: buf[:n],
}
if err := stream.Send(req); err != nil {
return fmt.Errorf("cannot send chunk to client: %w", err)
}
}
return nil
}
func (s *server) ExistsFile(ctx context.Context, req *api.ExistsFileRequest) (*api.ExistsFileResponse, error) {
exists, err := s.services.FileStore.Exists(ctx, req.Sha256)
return &api.ExistsFileResponse{
Exists: exists,
}, err
}
func (s *server) AddFile(stream api.Biblio_AddFileServer) error {
var (
sha256 string
fileStoreErr error
)
pr, pw := io.Pipe()
waitc := make(chan struct{})
go func() {
sha256, fileStoreErr = s.services.FileStore.Add(stream.Context(), pr, "")
close(waitc)
}()
recv:
for {
select {
case <-waitc:
break recv
default:
req, err := stream.Recv()
if err == io.EOF {
break recv
}
if err != nil {
return status.Errorf(codes.Internal, "failed to read stream: %s", err)
}
if _, err := pw.Write(req.Chunk); err != nil {
return status.Errorf(codes.Internal, "failed to write file chunk: %s", err)
}
}
}
pw.Close()
<-waitc
if fileStoreErr != nil {
return status.Errorf(codes.Internal, "failed to write to stream: %v", fileStoreErr)
}
if err := stream.SendAndClose(&api.AddFileResponse{
Response: &api.AddFileResponse_Sha256{
Sha256: sha256,
},
}); err != nil {
return status.Errorf(codes.Internal, "failed to write to stream: %v", err)
}
return nil
}