-
Notifications
You must be signed in to change notification settings - Fork 250
/
upload-datasource.go
162 lines (143 loc) · 5.45 KB
/
upload-datasource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package importer
import (
"io"
"net/url"
"path/filepath"
"k8s.io/klog"
"kubevirt.io/containerized-data-importer/pkg/util"
)
// UploadDataSource contains all the information need to upload data into a data volume.
// Sequence of phases:
// 1a. ProcessingPhaseInfo -> ProcessingPhaseTransferScratch (In Info phase the format readers are configured) In case the readers don't contain a raw file.
// 1b. ProcessingPhaseInfo -> ProcessingPhaseTransferDataFile, in the case the readers contain a raw file.
// 2a. ProcessingPhaseTransferScratch -> ProcessingPhaseProcess
// 2b. ProcessingPhaseTransferDataFile -> ProcessingPhaseResize
// 3. ProcessingPhaseProcess -> ProcessingPhaseConvert
type UploadDataSource struct {
// Data strean
stream io.ReadCloser
// stack of readers
readers *FormatReaders
// url to a file in scratch space.
url *url.URL
}
// NewUploadDataSource creates a new instance of an UploadDataSource
func NewUploadDataSource(stream io.ReadCloser) *UploadDataSource {
return &UploadDataSource{
stream: stream,
}
}
// Info is called to get initial information about the data.
func (ud *UploadDataSource) Info() (ProcessingPhase, error) {
var err error
// Hardcoded to only accept kubevirt content type.
ud.readers, err = NewFormatReaders(ud.stream, uint64(0))
if err != nil {
klog.Errorf("Error creating readers: %v", err)
return ProcessingPhaseError, err
}
if !ud.readers.Convert {
// Uploading a raw file, we can write that directly to the target.
return ProcessingPhaseTransferDataFile, nil
}
return ProcessingPhaseTransferScratch, nil
}
// Transfer is called to transfer the data from the source to the passed in path.
func (ud *UploadDataSource) Transfer(path string) (ProcessingPhase, error) {
if util.GetAvailableSpace(path) <= int64(0) {
//Path provided is invalid.
return ProcessingPhaseError, ErrInvalidPath
}
file := filepath.Join(path, tempFile)
err := util.StreamDataToFile(ud.readers.TopReader(), file)
if err != nil {
return ProcessingPhaseError, err
}
// If we successfully wrote to the file, then the parse will succeed.
ud.url, _ = url.Parse(file)
return ProcessingPhaseProcess, nil
}
// TransferFile is called to transfer the data from the source to the passed in file.
func (ud *UploadDataSource) TransferFile(fileName string) (ProcessingPhase, error) {
err := util.StreamDataToFile(ud.readers.TopReader(), fileName)
if err != nil {
return ProcessingPhaseError, err
}
return ProcessingPhaseResize, nil
}
// Process is called to do any special processing before giving the url to the data back to the processor
func (ud *UploadDataSource) Process() (ProcessingPhase, error) {
return ProcessingPhaseConvert, nil
}
// GetURL returns the url that the data processor can use when converting the data.
func (ud *UploadDataSource) GetURL() *url.URL {
return ud.url
}
// Close closes any readers or other open resources.
func (ud *UploadDataSource) Close() error {
if ud.stream != nil {
return ud.stream.Close()
}
return nil
}
// AsyncUploadDataSource is an asynchronouse version of an upload data source, that returns finished phase instead
// of going to post upload processing phases.
type AsyncUploadDataSource struct {
uploadDataSource UploadDataSource
// Next Phase indicates what the next Processing Phase should be after the transfer completes.
ResumePhase ProcessingPhase
}
// NewAsyncUploadDataSource creates a new instance of an UploadDataSource
func NewAsyncUploadDataSource(stream io.ReadCloser) *AsyncUploadDataSource {
return &AsyncUploadDataSource{
uploadDataSource: UploadDataSource{
stream: stream,
},
ResumePhase: ProcessingPhaseInfo,
}
}
// Info is called to get initial information about the data.
func (aud *AsyncUploadDataSource) Info() (ProcessingPhase, error) {
return aud.uploadDataSource.Info()
}
// Transfer is called to transfer the data from the source to the passed in path.
func (aud *AsyncUploadDataSource) Transfer(path string) (ProcessingPhase, error) {
if util.GetAvailableSpace(path) <= int64(0) {
//Path provided is invalid.
return ProcessingPhaseError, ErrInvalidPath
}
file := filepath.Join(path, tempFile)
err := util.StreamDataToFile(aud.uploadDataSource.readers.TopReader(), file)
if err != nil {
return ProcessingPhaseError, err
}
// If we successfully wrote to the file, then the parse will succeed.
aud.uploadDataSource.url, _ = url.Parse(file)
aud.ResumePhase = ProcessingPhaseProcess
return ProcessingPhasePause, nil
}
// TransferFile is called to transfer the data from the source to the passed in file.
func (aud *AsyncUploadDataSource) TransferFile(fileName string) (ProcessingPhase, error) {
err := util.StreamDataToFile(aud.uploadDataSource.readers.TopReader(), fileName)
if err != nil {
return ProcessingPhaseError, err
}
aud.ResumePhase = ProcessingPhaseResize
return ProcessingPhasePause, nil
}
// Process is called to do any special processing before giving the url to the data back to the processor
func (aud *AsyncUploadDataSource) Process() (ProcessingPhase, error) {
return ProcessingPhaseConvert, nil
}
// Close closes any readers or other open resources.
func (aud *AsyncUploadDataSource) Close() error {
return aud.uploadDataSource.Close()
}
// GetURL returns the url that the data processor can use when converting the data.
func (aud *AsyncUploadDataSource) GetURL() *url.URL {
return aud.uploadDataSource.GetURL()
}
// GetResumePhase returns the next phase to process when resuming
func (aud *AsyncUploadDataSource) GetResumePhase() ProcessingPhase {
return aud.ResumePhase
}