/
upload_manager.go
130 lines (102 loc) · 2.57 KB
/
upload_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package services
import (
"cloud.google.com/go/storage"
"context"
log "github.com/sirupsen/logrus"
"io"
"os"
"path/filepath"
"runtime"
"strings"
)
type VideoUpload struct {
Paths []string
VideoPath string
OutputBucket string
Errors []string
}
func NewVideoUpload() *VideoUpload {
return &VideoUpload{}
}
func (vu *VideoUpload) UploadObject(objectPath string, client *storage.Client, ctx context.Context) error {
path := strings.Split(objectPath, os.Getenv("localStoragePath")+"/")
f, err := os.Open(objectPath)
if err != nil {
return err
}
defer f.Close()
wc := client.Bucket(vu.OutputBucket).Object(path[1]).NewWriter(ctx)
// storage ACL all users role reader
//wc.ACL = []storage.ACLRule{{Entity: storage.AllUsers, Role: storage.RoleReader}}
wc.ACL = []storage.ACLRule{{Entity: storage.AllUsers, Role: storage.RoleReader}}
if _, err = io.Copy(wc, f); err != nil {
return err
}
if err := wc.Close(); err != nil {
return err
}
return nil
}
func (vu *VideoUpload) loadPaths() error {
err := filepath.Walk(vu.VideoPath, func(path string, info os.FileInfo, err error) error {
if !info.IsDir() {
vu.Paths = append(vu.Paths, path)
}
return nil
})
if err != nil {
return err
}
return nil
}
func (vu *VideoUpload) ProcessUpload(concurrency int, doneUpload chan string) error {
// channel with buffered size of Number of CPU
in := make(chan int, runtime.NumCPU())
returnChannel := make(chan string)
// load all paths
err := vu.loadPaths()
if err != nil {
return nil
}
uploadClient, ctx, err := getClientUpload()
if err != nil {
return err
}
// concurrency define number of workers
for process := 0; process < concurrency; process++ {
go vu.uploadWorker(in, returnChannel, uploadClient, ctx)
}
go func() {
for x := 0; x < len(vu.Paths); x++ {
in <- x
}
close(in)
}()
for r := range returnChannel {
if r != "" {
doneUpload <- r
break
}
}
return nil
}
func (vu *VideoUpload) uploadWorker(in chan int, returnChannel chan string, uploadClient *storage.Client, ctx context.Context) {
for x := range in {
err := vu.UploadObject(vu.Paths[x], uploadClient, ctx)
if err != nil {
vu.Errors = append(vu.Errors, vu.Paths[x])
log.Printf("error during the upload: %v. Error: %v", vu.Paths[x], err)
returnChannel <- err.Error()
}
returnChannel <- ""
}
returnChannel <- "upload completed"
}
func getClientUpload() (*storage.Client, context.Context, error) {
ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
return nil, nil, err
}
return client, ctx, nil
}