-
Notifications
You must be signed in to change notification settings - Fork 55
/
pmem-state.go
184 lines (153 loc) · 5.27 KB
/
pmem-state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
/*
Copyright 2019 Intel Corporation.
SPDX-License-Identifier: Apache-2.0
*/
package pmemstate
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"sync"
"github.com/pkg/errors"
"k8s.io/klog"
)
// GetAllFunc callback function used for StateManager.GetAll().
// This function is called with ID, for each entry found in the state.
type GetAllFunc func(id string) bool
// StateManager manages the driver persistent state, i.e, volumes information
type StateManager interface {
// Create creates an entry in the state with given id and data
Create(id string, data interface{}) error
// Delete deletes an entry found with the id from the state
Delete(id string) error
// Get retrives the entry data into location pointed by dataPtr.
Get(id string, dataPtr interface{}) error
// GetAll retrieves all entries found in the state, foreach functions is
// called with id for every entry found in the state, and entry data is filled in dataPtr.
// the caller has to copy the data if needed.
GetAll(dataPtr interface{}, foreach GetAllFunc) error
}
// fileState Persists the state information into a file.
// This is is supposed to use by Nodes to persists the state.
type fileState struct {
location string
// lock holds read-write lock
lock sync.RWMutex
// stateDirLock holds lock on state directory
stateDirLock sync.Mutex
}
var _ StateManager = &fileState{}
// NewFileState instantiates the file state manager with given directory
// location. It ensures the provided directory exists.
// Returns error, if fails to create the direcotry incase of not pre-existing.
func NewFileState(directory string) (StateManager, error) {
if err := ensureLocation(directory); err != nil {
return nil, err
}
return &fileState{
location: directory,
}, nil
}
// Create saves the volume metadata to file named <id>.json
func (fs *fileState) Create(id string, data interface{}) error {
fs.lock.Lock()
defer fs.lock.Unlock()
file := path.Join(fs.location, id+".json")
// Create new file for synchronous writes
fp, err := os.OpenFile(file, os.O_WRONLY|os.O_SYNC|os.O_CREATE|os.O_EXCL, 0600)
if err != nil {
return errors.Wrapf(err, "file-state: failed to create metadata storage file %s", file)
}
if err := json.NewEncoder(fp).Encode(data); err != nil {
// cleanup file entry before returning error
fp.Close() //nolint: errcheck, gosec
if e := os.Remove(file); e != nil {
klog.Warningf("file-state: fail to remove file %s: %s", file, e.Error())
}
return errors.Wrap(err, "file-state: failed to encode metadata")
}
if err := fp.Close(); err != nil {
return errors.Wrapf(err, "file-state: failed to close metadata storage file %s", file)
}
return fs.syncStateDir()
}
// Delete deletes the metadata file saved for given volume id
func (fs *fileState) Delete(id string) error {
fs.lock.Lock()
defer fs.lock.Unlock()
file := path.Join(fs.location, id+".json")
if err := os.Remove(file); err != nil && err != os.ErrNotExist {
return errors.Wrapf(err, "file-state: failed to delete file %s", file)
}
return fs.syncStateDir()
}
// Get retrieves metadata for given volume id to pointer location of dataPtr
func (fs *fileState) Get(id string, dataPtr interface{}) error {
return fs.readFileData(path.Join(fs.location, id+".json"), dataPtr)
}
// GetAll retrieves metadata of all volumes found in fileState.location directory.
// reads all the .json files in fileState.location direcotry and decodes the filedata
func (fs *fileState) GetAll(dataPtr interface{}, f GetAllFunc) error {
fs.stateDirLock.Lock()
files, err := ioutil.ReadDir(fs.location)
fs.stateDirLock.Unlock()
if err != nil {
return errors.Wrapf(err, "file-state: failed to read metadata from %s", fs.location)
}
for _, fileInfo := range files {
fileName := fileInfo.Name()
if !strings.HasSuffix(fileName, ".json") {
continue
}
file := path.Join(fs.location, fileName)
if err := fs.readFileData(file, dataPtr); err != nil {
return err
}
id := fileName[0 : len(fileName)-len(".json")]
if !f(id) {
return nil
}
}
return nil
}
func ensureLocation(directory string) error {
info, err := os.Stat(directory)
if err != nil {
if os.IsNotExist(err) {
err = os.Mkdir(directory, 0750)
}
} else if !info.IsDir() {
err = fmt.Errorf("State location(%s) must be a directory", directory)
}
return err
}
func (fs *fileState) readFileData(file string, dataPtr interface{}) error {
fs.lock.RLock()
defer fs.lock.RUnlock()
fp, err := os.OpenFile(file, os.O_RDONLY|os.O_SYNC, 0) //nolint: gosec
if err != nil {
return errors.Wrapf(err, "file-state: failed to open file %s", file)
}
defer fp.Close() //nolint: errcheck
if err := json.NewDecoder(fp).Decode(dataPtr); err != nil {
return errors.Wrapf(err, "file-state: failed to decode metadata from file %s", file)
}
return nil
}
func (fs *fileState) syncStateDir() error {
var rErr error
fs.stateDirLock.Lock()
defer fs.stateDirLock.Unlock()
if fp, err := os.Open(fs.location); err != nil {
rErr = errors.Wrap(err, "file-state: failed to open state directory for syncing")
} else if err := fp.Sync(); err != nil {
fp.Close() //nolint: errcheck
rErr = errors.Wrap(err, "file-state: fsync failure on state directroy")
} else if err := fp.Close(); err != nil {
rErr = errors.Wrap(err, "file-state: failed to close state directory after sync")
}
return rErr
}