/
objengine.go
144 lines (128 loc) · 5.05 KB
/
objengine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Copyright (c) 2015 Rackspace
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package objectserver
import (
"errors"
"flag"
"fmt"
"io"
"net/http"
"sync"
"github.com/troubling/hummingbird/common/conf"
"github.com/troubling/hummingbird/common/ring"
"github.com/troubling/hummingbird/common/srv"
)
// DriveFullError can be returned by Object.SetData and Object.Delete if the disk is too full for the operation.
var DriveFullError = errors.New("Drive Full")
type Object interface {
// Exists determines whether or not there is an object to serve. Deleted objects do not exist, even if there is a tombstone.
Exists() bool
// Quarantine removes the file's data, presumably after determining it's been corrupted.
Quarantine() error
// Metadata returns the object's metadata. Will be nil if the object doesn't exist.
Metadata() map[string]string
// ContentLength returns the object's content-length.
ContentLength() int64
// CopyRange copies a range of data from the object to the writer.
CopyRange(io.Writer, int64, int64) (int64, error)
// Copy copies an object's entire contents to the writer(s).
Copy(...io.Writer) (int64, error)
// SetData sets the data for the object, given the size (if known). It returns a writer and an error if any.
SetData(size int64) (io.Writer, error)
// Commit saves a new object data that was started with SetData.
Commit(metadata map[string]string) error
// CommitMetadata updates the object's metadata.
CommitMetadata(metadata map[string]string) error
// Delete deletes the object.
Delete(metadata map[string]string) error
// Close releases any resources held by the Object instance.
Close() error
// Repr returns a representation of the object, used for logging.
Repr() string
}
type ObjectStabilizer interface {
Object
// Stabilize object- move to stable location / erasure code / do nothing / etc
Stabilize(*ring.Device) error
Replicate(PriorityRepJob) error
}
type ReplicationDevice interface {
Scan()
ScanLoop()
Key() string
Cancel()
PriorityReplicate(w http.ResponseWriter, pri PriorityRepJob)
UpdateStat(string, int64)
Type() string
}
// ObjectEngine is the type you have to give hummingbird to create a new object engine.
type ObjectEngine interface {
// New creates a new instance of the Object, for interacting with a single object.
New(vars map[string]string, needData bool, asyncWG *sync.WaitGroup) (Object, error)
GetReplicationDevice(oring ring.Ring, dev *ring.Device, r *Replicator) (ReplicationDevice, error)
// Replicator here needs to be something else- it mostly needs logger, updateStat thing, and certs. not whole object- maybe an interface that gives those things
}
type NurseryObjectEngine interface {
ObjectEngine
GetObjectsToStabilize(device string, c chan ObjectStabilizer, cancel chan struct{})
GetObjectsToReplicate(prirep PriorityRepJob, c chan ObjectStabilizer, cancel chan struct{})
}
type PolicyHandlerRegistrator interface {
RegisterHandlers(addRoute func(method, path string, handler http.HandlerFunc))
}
// ObjectEngineConstructor> is a function that, given configs and flags, returns an ObjectEngine
type ObjectEngineConstructor func(conf.Config, *conf.Policy, *flag.FlagSet) (ObjectEngine, error)
type engineFactoryEntry struct {
name string
constructor ObjectEngineConstructor
}
var engineFactories = []engineFactoryEntry{}
// RegisterObjectEngine lets you tell hummingbird about a new object engine.
func RegisterObjectEngine(name string, newEngine ObjectEngineConstructor) {
for _, e := range engineFactories {
if e.name == name {
e.constructor = newEngine
return
}
}
engineFactories = append(engineFactories, engineFactoryEntry{name, newEngine})
}
// FindEngine returns the registered object engine with the given name.
func FindEngine(name string) (ObjectEngineConstructor, error) {
for _, e := range engineFactories {
if e.name == name {
return e.constructor, nil
}
}
return nil, errors.New("Not found")
}
func buildEngines(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (map[int]ObjectEngine, error) {
objEngines := make(map[int]ObjectEngine)
policies, err := cnf.GetPolicies()
if err != nil {
return objEngines, err
}
for _, policy := range policies {
if newEngine, err := FindEngine(policy.Type); err != nil {
return objEngines, fmt.Errorf("Unable to find object engine type %s: %v", policy.Type, err)
} else {
objEngines[policy.Index], err = newEngine(serverconf, policy, flags)
if err != nil {
return objEngines, fmt.Errorf("Error instantiating object engine type %s: %v", policy.Type, err)
}
}
}
return objEngines, nil
}