/
share.go
127 lines (111 loc) · 2.75 KB
/
share.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package sharing
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"github.com/google/uuid"
"github.com/mbrostami/goshare/api/grpc"
"github.com/mbrostami/goshare/api/grpc/pb"
"github.com/mbrostami/goshare/pkg/tracer"
"github.com/rs/zerolog/log"
"github.com/schollz/progressbar/v3"
"golang.org/x/sync/errgroup"
)
const (
KB = 1 << (10 * (iota + 1))
MB
GB
)
const MaxConcurrentShare = 0
const ChunkSize = 1 * MB
func (s *Service) Share(ctx context.Context, filePath string, uid uuid.UUID, relays []string, caPath string, withTLS, insecure bool) error {
ctx, span := tracer.NewSpan(ctx, "sender-service")
defer span.End()
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open file: %v", err)
}
defer file.Close()
fi, err := file.Stat()
if err != nil {
return err
}
connections := make([]*grpc.Client, len(relays))
for i, server := range relays {
c, err := grpc.NewClient(ctx, server, caPath, withTLS, insecure)
if err != nil {
return err
}
connections[i] = c
}
log.Debug().Msg("connection to relays was successful!")
for i, _ := range relays {
err = connections[i].ShareInit(ctx, uid, filepath.Base(filePath), fi.Size())
if err != nil {
return fmt.Errorf("couldn't initialize share %+v", err)
}
}
// chunkChannel := make(chan *pb.ShareRequest, fi.Size()%1024)
chunkChannel := make(chan *pb.ShareRequest, len(relays)*2) // 2 per server
eg, _ := errgroup.WithContext(ctx)
for i, _ := range relays {
index := i
eg.Go(func() error {
if err = connections[index].Share(ctx, uid, chunkChannel); err != nil {
return err
}
log.Debug().Msgf("sharing with relay %d finished!", index)
return nil
})
}
var breakLoop bool
go func() {
if err := eg.Wait(); err != nil {
breakLoop = true
}
}()
buf := make([]byte, ChunkSize) // TODO negotiate with receiver to set the chunk size
var seq int64
bar := progressbar.DefaultBytes(
fi.Size(),
"Uploading",
)
for {
if breakLoop {
break
}
seq++
n, err := file.Read(buf)
if err == io.EOF {
log.Debug().Msg("sending chunk to channel finished!")
// send nil to receiver so receiver knows it's done
chunkChannel <- &pb.ShareRequest{
Identifier: uid.String(),
SequenceNumber: -1,
}
break
}
if err != nil {
log.Error().Msgf("failed to read file: %v", err)
// send nil to receiver so receiver knows it's done
chunkChannel <- &pb.ShareRequest{
Identifier: uid.String(),
SequenceNumber: -1,
}
break
}
log.Debug().Msgf("sending chunk to channel seq: %d", seq)
r := pb.ShareRequest{
Identifier: uid.String(),
SequenceNumber: seq,
}
r.Data = make([]byte, n)
copy(r.Data, buf[:n])
bar.Write(r.Data)
chunkChannel <- &r
}
close(chunkChannel)
return eg.Wait()
}