forked from snickers/snickers
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipeline.go
131 lines (113 loc) · 3.22 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package pipeline
import (
"net/url"
"os"
"path"
"code.cloudfoundry.org/lager"
"github.com/flavioribeiro/gonfig"
"github.com/snickers/snickers/db"
"github.com/snickers/snickers/downloaders"
"github.com/snickers/snickers/encoders"
"github.com/snickers/snickers/helpers"
"github.com/snickers/snickers/types"
"github.com/snickers/snickers/uploaders"
)
// StartJob starts the job
func StartJob(logger lager.Logger, config gonfig.Gonfig, dbInstance db.Storage, job types.Job) {
log := logger.Session("start-job", lager.Data{
"id": job.ID,
"status": job.Status,
"source": job.Source,
"destination": job.Destination,
})
defer log.Info("finished")
log.Info("setup")
newJob, err := SetupJob(job.ID, dbInstance, config)
job = *newJob
if err != nil {
log.Error("setup-job failed", err)
return
}
log.Info("downloading")
downloadFunc := downloaders.GetDownloadFunc(job.Source)
if err := downloadFunc(log, config, dbInstance, job.ID); err != nil {
log.Error("download failed", err)
job.Status = types.JobError
job.Details = err.Error()
dbInstance.UpdateJob(job.ID, job)
return
}
log.Info("encoding")
encodeFunc := encoders.GetEncodeFunc(job)
if err := encodeFunc(logger, dbInstance, job.ID); err != nil {
log.Error("encode failed", err)
job.Status = types.JobError
job.Details = err.Error()
dbInstance.UpdateJob(job.ID, job)
return
}
log.Info("uploading")
uploadFunc := uploaders.GetUploadFunc(job.Destination)
if err := uploadFunc(logger, dbInstance, job.ID); err != nil {
log.Error("upload failed", err)
job.Status = types.JobError
job.Details = err.Error()
dbInstance.UpdateJob(job.ID, job)
return
}
log.Info("erasing temporary files")
if err := CleanSwap(dbInstance, job.ID); err != nil {
log.Error("erasing temporary files failed", err)
}
job.Status = types.JobFinished
dbInstance.UpdateJob(job.ID, job)
}
// CleanSwap removes LocalSource and LocalDestination
// files/directories.
func CleanSwap(dbInstance db.Storage, jobID string) error {
job, err := dbInstance.RetrieveJob(jobID)
if err != nil {
return err
}
err = os.RemoveAll(job.LocalSource)
if err != nil {
return err
}
err = os.RemoveAll(job.LocalDestination)
return err
}
// SetupJob is responsible for set the initial state for a given
// job before starting. It sets local source and destination
// paths and the final destination as well.
func SetupJob(jobID string, dbInstance db.Storage, config gonfig.Gonfig) (*types.Job, error) {
job, err := dbInstance.RetrieveJob(jobID)
if err != nil {
return nil, err
}
localSource, err := helpers.GetLocalSourcePath(config, job.ID)
if err != nil {
return nil, err
}
job.LocalSource = localSource + path.Base(job.Source)
job.LocalDestination, err = helpers.GetLocalDestination(config, dbInstance, jobID)
if err != nil {
return nil, err
}
u, err := url.Parse(job.Destination)
if err != nil {
return nil, err
}
outputFilename, err := helpers.GetOutputFilename(dbInstance, jobID)
if err != nil {
return nil, err
}
u.Path = path.Join(u.Path, outputFilename)
job.Destination = u.String()
job.Status = types.JobDownloading
job.Progress = "0%"
job, err = dbInstance.UpdateJob(job.ID, job)
if err != nil {
return nil, err
}
return &job, nil
}