-
Notifications
You must be signed in to change notification settings - Fork 407
/
torrent_archive.go
130 lines (118 loc) · 4.35 KB
/
torrent_archive.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agentstorage
import (
"fmt"
"os"
"github.com/uber-go/tally"
"github.com/willf/bitset"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/store"
"github.com/uber/kraken/lib/store/metadata"
"github.com/uber/kraken/lib/torrent/storage"
"github.com/uber/kraken/tracker/metainfoclient"
)
// TorrentArchive is capable of initializing torrents in the download directory
// and serving torrents from either the download or cache directory.
type TorrentArchive struct {
stats tally.Scope
cads *store.CADownloadStore
metaInfoClient metainfoclient.Client
}
// NewTorrentArchive creates a new TorrentArchive.
func NewTorrentArchive(
stats tally.Scope,
cads *store.CADownloadStore,
mic metainfoclient.Client) *TorrentArchive {
stats = stats.Tagged(map[string]string{
"module": "agenttorrentarchive",
})
return &TorrentArchive{stats, cads, mic}
}
// Stat returns TorrentInfo for the given digest. Returns os.ErrNotExist if the
// file does not exist. Ignores namespace.
func (a *TorrentArchive) Stat(namespace string, d core.Digest) (*storage.TorrentInfo, error) {
var tm metadata.TorrentMeta
if err := a.cads.Any().GetMetadata(d.Hex(), &tm); err != nil {
return nil, err
}
var psm pieceStatusMetadata
if err := a.cads.Any().GetMetadata(d.Hex(), &psm); err != nil {
return nil, err
}
b := bitset.New(uint(len(psm.pieces)))
for i, p := range psm.pieces {
if p.status == _complete {
b.Set(uint(i))
}
}
return storage.NewTorrentInfo(tm.MetaInfo, b), nil
}
// CreateTorrent returns a Torrent for either an existing metainfo / file on
// disk, or downloads metainfo and initializes the file. Returns ErrNotFound
// if no metainfo was found.
func (a *TorrentArchive) CreateTorrent(namespace string, d core.Digest) (storage.Torrent, error) {
var tm metadata.TorrentMeta
if err := a.cads.Any().GetMetadata(d.Hex(), &tm); os.IsNotExist(err) {
downloadTimer := a.stats.Timer("metainfo_download").Start()
mi, err := a.metaInfoClient.Download(namespace, d)
if err != nil {
if err == metainfoclient.ErrNotFound {
return nil, storage.ErrNotFound
}
return nil, fmt.Errorf("download metainfo: %s", err)
}
downloadTimer.Stop()
// There's a race condition here, but it's "okay"... Basically, we could
// initialize a download file with metainfo that is rejected by file store,
// because someone else beats us to it. However, we catch a lucky break
// because the only piece of metainfo we use is file length -- which digest
// is derived from, so it's "okay".
createErr := a.cads.CreateDownloadFile(mi.Digest().Hex(), mi.Length())
if createErr != nil &&
!(a.cads.InDownloadError(createErr) || a.cads.InCacheError(createErr)) {
return nil, fmt.Errorf("create download file: %s", createErr)
}
tm.MetaInfo = mi
if err := a.cads.Any().GetOrSetMetadata(d.Hex(), &tm); err != nil {
return nil, fmt.Errorf("get or set metainfo: %s", err)
}
} else if err != nil {
return nil, fmt.Errorf("get metainfo: %s", err)
}
t, err := NewTorrent(a.cads, tm.MetaInfo)
if err != nil {
return nil, fmt.Errorf("initialize torrent: %s", err)
}
return t, nil
}
// GetTorrent returns a Torrent for an existing metainfo / file on disk. Ignores namespace.
func (a *TorrentArchive) GetTorrent(namespace string, d core.Digest) (storage.Torrent, error) {
var tm metadata.TorrentMeta
if err := a.cads.Any().GetMetadata(d.Hex(), &tm); err != nil {
return nil, fmt.Errorf("get metainfo: %s", err)
}
t, err := NewTorrent(a.cads, tm.MetaInfo)
if err != nil {
return nil, fmt.Errorf("initialize torrent: %s", err)
}
return t, nil
}
// DeleteTorrent deletes a torrent from disk.
func (a *TorrentArchive) DeleteTorrent(d core.Digest) error {
if err := a.cads.Any().DeleteFile(d.Hex()); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}