-
Notifications
You must be signed in to change notification settings - Fork 7
/
syncer.go
291 lines (252 loc) · 7.61 KB
/
syncer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package get
import (
"compress/gzip"
"crypto"
"encoding/xml"
"io"
"log"
"github.com/moio/minima/util"
)
// common
// XMLLocation maps a <location> tag in repodata/repomd.xml or repodata/<ID>-primary.xml.gz
type XMLLocation struct {
Href string `xml:"href,attr"`
}
// repodata/repomd.xml
// XMLRepomd maps a <repomd> tag in repodata/repomd.xml
type XMLRepomd struct {
Data []XMLData `xml:"data"`
}
// XMLData maps a <data> tag in repodata/repomd.xml
type XMLData struct {
Type string `xml:"type,attr"`
Location XMLLocation `xml:"location"`
}
// repodata/<ID>-primary.xml.gz
// XMLMetaData maps a <metadata> tag in repodata/<ID>-primary.xml.gz
type XMLMetaData struct {
Packages []XMLPackage `xml:"package"`
}
// XMLPackage maps a <package> tag in repodata/<ID>-primary.xml.gz
type XMLPackage struct {
Arch string `xml:"arch"`
Location XMLLocation `xml:"location"`
Checksum XMLChecksum `xml:"checksum"`
}
// XMLChecksum maps a <checksum> tag in repodata/<ID>-primary.xml.gz
type XMLChecksum struct {
Type string `xml:"type,attr"`
Checksum string `xml:",cdata"`
}
var hashMap = map[string]crypto.Hash{
"sha": crypto.SHA1,
"sha1": crypto.SHA1,
"sha256": crypto.SHA256,
}
const repomdPath = "repodata/repomd.xml"
// Syncer syncs repos from an HTTP source to a Storage
type Syncer struct {
// URL of the repo this syncer syncs
Url string
archs map[string]bool
storage Storage
}
// NewSyncer creates a new Syncer
func NewSyncer(url string, archs map[string]bool, storage Storage) *Syncer {
return &Syncer{url, archs, storage}
}
// StoreRepo stores an HTTP repo in a Storage, automatically retrying in case of recoverable errors
func (r *Syncer) StoreRepo() (err error) {
checksumMap := r.readChecksumMap()
for i := 0; i < 10; i++ {
err = r.storeRepo(checksumMap)
if err == nil {
return
}
uerr, unexpectedStatusCode := err.(*UnexpectedStatusCodeError)
if unexpectedStatusCode {
if uerr.StatusCode == 404 {
log.Printf("Got 404, presumably temporarily, retrying...\n")
} else {
return err
}
}
_, checksumError := err.(*util.ChecksumError)
if checksumError {
log.Printf("Checksum did not match, presumably the repo was published while syncing, retrying...\n")
} else {
return err
}
}
log.Printf("Too many temporary errors, aborting...\n")
return err
}
// StoreRepo stores an HTTP repo in a Storage
func (r *Syncer) storeRepo(checksumMap map[string]XMLChecksum) (err error) {
packagesToDownload, packagesToRecycle, err := r.processMetadata(checksumMap)
if err != nil {
return
}
downloadCount := len(packagesToDownload)
log.Printf("Downloading %v packages...\n", downloadCount)
for _, pack := range packagesToDownload {
err = r.downloadStoreApply(pack.Location.Href, pack.Checksum.Checksum, hashMap[pack.Checksum.Type], util.Nop)
if err != nil {
return err
}
}
recycleCount := len(packagesToRecycle)
log.Printf("Recycling %v packages...\n", recycleCount)
for _, pack := range packagesToRecycle {
err = r.storage.Recycle(pack.Location.Href)
if err != nil {
return
}
}
log.Printf("Committing changes...\n")
err = r.storage.Commit()
if err != nil {
return
}
return
}
// downloadStore downloads a repo-relative path into a file
func (r *Syncer) downloadStore(path string) error {
return r.downloadStoreApply(path, "", 0, util.Nop)
}
// downloadStoreApply downloads a repo-relative path into a file, while applying a ReaderConsumer
func (r *Syncer) downloadStoreApply(path string, checksum string, hash crypto.Hash, f util.ReaderConsumer) error {
log.Printf("Downloading %v...", path)
body, err := ReadURL(r.Url + "/" + path)
if err != nil {
return err
}
return util.Compose(r.storage.StoringMapper(path, checksum, hash), f)(body)
}
// processMetadata stores the repo metadata and returns a list of package file
// paths to download
func (r *Syncer) processMetadata(checksumMap map[string]XMLChecksum) (packagesToDownload []XMLPackage, packagesToRecycle []XMLPackage, err error) {
err = r.downloadStoreApply(repomdPath, "", 0, func(reader io.ReadCloser) (err error) {
decoder := xml.NewDecoder(reader)
var repomd XMLRepomd
err = decoder.Decode(&repomd)
if err != nil {
return
}
data := repomd.Data
for i := 0; i < len(data); i++ {
metadataPath := data[i].Location.Href
if data[i].Type == "primary" {
packagesToDownload, packagesToRecycle, err = r.processPrimary(metadataPath, checksumMap)
} else {
err = r.downloadStore(metadataPath)
}
if err != nil {
return
}
}
return
})
if err != nil {
return
}
err = r.downloadStore(repomdPath + ".asc")
if err != nil {
uerr, unexpectedStatusCode := err.(*UnexpectedStatusCodeError)
if unexpectedStatusCode && uerr.StatusCode == 404 {
log.Printf("Got 404, ignoring...")
} else {
return
}
}
err = r.downloadStore(repomdPath + ".key")
if err != nil {
uerr, unexpectedStatusCode := err.(*UnexpectedStatusCodeError)
if unexpectedStatusCode && uerr.StatusCode == 404 {
log.Printf("Got 404, ignoring...")
err = nil
} else {
return
}
}
return
}
func (r *Syncer) readMetaData(reader io.Reader) (primary XMLMetaData, err error) {
gzReader, err := gzip.NewReader(reader)
if err != nil {
return
}
defer gzReader.Close()
decoder := xml.NewDecoder(gzReader)
err = decoder.Decode(&primary)
return
}
func (r *Syncer) readChecksumMap() (checksumMap map[string]XMLChecksum) {
checksumMap = make(map[string]XMLChecksum)
repomdReader, err := r.storage.NewReader(repomdPath)
if err != nil {
if err == ErrFileNotFound {
log.Println("First-time sync started")
} else {
log.Println(err.Error())
log.Println("Error while reading previously-downloaded metadata. Starting sync from scratch")
}
return
}
defer repomdReader.Close()
decoder := xml.NewDecoder(repomdReader)
var repomd XMLRepomd
err = decoder.Decode(&repomd)
if err != nil {
log.Println(err.Error())
log.Println("Error while parsing previously-downloaded metadata. Starting sync from scratch")
return
}
data := repomd.Data
for i := 0; i < len(data); i++ {
metadataPath := data[i].Location.Href
if data[i].Type == "primary" {
primaryReader, err := r.storage.NewReader(metadataPath)
if err != nil {
return
}
primary, err := r.readMetaData(primaryReader)
if err != nil {
return
}
for _, pack := range primary.Packages {
checksumMap[pack.Location.Href] = pack.Checksum
}
}
}
return
}
// processPrimary stores the primary XML metadata file and returns a list of
// package file paths to download
func (r *Syncer) processPrimary(path string, checksumMap map[string]XMLChecksum) (packagesToDownload []XMLPackage, packagesToRecycle []XMLPackage, err error) {
err = r.downloadStoreApply(path, "", 0, func(reader io.ReadCloser) (err error) {
primary, err := r.readMetaData(reader)
if err != nil {
return
}
allArchs := len(r.archs) == 0
for _, pack := range primary.Packages {
if allArchs || pack.Arch == "noarch" || r.archs[pack.Arch] {
previousChecksum, ok := checksumMap[pack.Location.Href]
switch {
case !ok:
log.Printf("...package '%v' not found, will be downloaded\n", pack.Location.Href)
packagesToDownload = append(packagesToDownload, pack)
case previousChecksum.Type == pack.Checksum.Type && previousChecksum.Checksum == pack.Checksum.Checksum:
log.Printf("...package '%v' is up-to-date already, will be recycled\n", pack.Location.Href)
packagesToRecycle = append(packagesToRecycle, pack)
default:
log.Printf("...package '%v' does not have the expected checksum, will be redownloaded\n", pack.Location.Href)
packagesToDownload = append(packagesToDownload, pack)
}
}
}
return
})
return
}