Skip to content

Commit

Permalink
cas: add image locking.
Browse files Browse the repository at this point in the history
This patch adds image locking like explained in rkt#460.
An image is composed by the ACI file in the blob store and it's imageManifest
in the imageManifest store.
In future also the tree store will be part of an image.

This means that a lock on an image covers multiple elements.
For this reason the keyLock functions are used to take a lock.

To avoid deadlocks the image locks shouldn't be taken inside the db access
code.
  • Loading branch information
sgotti committed Mar 6, 2015
1 parent 5821a7b commit 4618dea
Showing 1 changed file with 42 additions and 5 deletions.
47 changes: 42 additions & 5 deletions cas/cas.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/coreos/rocket/Godeps/_workspace/src/github.com/appc/spec/aci"
"github.com/coreos/rocket/Godeps/_workspace/src/github.com/appc/spec/schema"
"github.com/coreos/rocket/pkg/lock"

"github.com/coreos/rocket/Godeps/_workspace/src/github.com/peterbourgon/diskv"
)
Expand Down Expand Up @@ -59,24 +60,33 @@ var diskvStores = [...]string{

// Store encapsulates a content-addressable-storage for storing ACIs on disk.
type Store struct {
base string
stores []*diskv.Diskv
db *DB
base string
stores []*diskv.Diskv
db *DB
diskvLockDir string
}

func NewStore(base string) (*Store, error) {
casDir := filepath.Join(base, "cas")

ds := &Store{
base: base,
stores: make([]*diskv.Diskv, len(diskvStores)),
}

ds.diskvLockDir = filepath.Join(casDir, "diskvlocks")
err := os.MkdirAll(ds.diskvLockDir, defaultPathPerm)
if err != nil {
return nil, err
}

for i, p := range diskvStores {
ds.stores[i] = diskv.New(diskv.Options{
BasePath: filepath.Join(base, "cas", p),
BasePath: filepath.Join(casDir, p),
Transform: blockTransform,
})
}
db, err := NewDB(filepath.Join(base, "cas", "db"))
db, err := NewDB(filepath.Join(casDir, "db"))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -168,6 +178,16 @@ func (ds Store) ResolveKey(key string) (string, error) {
}

func (ds Store) ReadStream(key string) (io.ReadCloser, error) {
key, err := ds.ResolveKey(key)
if err != nil {
return nil, fmt.Errorf("error resolving key: %v", err)
}
keyLock, err := lock.SharedKeyLock(ds.diskvLockDir, key)
if err != nil {
return nil, fmt.Errorf("error locking image: %v", err)
}
defer keyLock.Close()

return ds.stores[blobType].ReadStream(key, false)
}

Expand Down Expand Up @@ -218,6 +238,13 @@ func (ds Store) WriteACI(r io.Reader) (string, error) {

// Import the uncompressed image into the store at the real key
key := HashToKey(h)

keyLock, err := lock.ExclusiveKeyLock(ds.diskvLockDir, key)
if err != nil {
return "", fmt.Errorf("error locking image: %v", err)
}
defer keyLock.Close()

if err = ds.stores[blobType].Import(fh.Name(), key, true); err != nil {
return "", fmt.Errorf("error importing image: %v", err)
}
Expand Down Expand Up @@ -268,6 +295,16 @@ func (ds Store) WriteRemote(remote *Remote) error {

// Get the ImageManifest with the specified key.
func (ds Store) GetImageManifest(key string) (*schema.ImageManifest, error) {
key, err := ds.ResolveKey(key)
if err != nil {
return nil, fmt.Errorf("error resolving key: %v", err)
}
keyLock, err := lock.SharedKeyLock(ds.diskvLockDir, key)
if err != nil {
return nil, fmt.Errorf("error locking image: %v", err)
}
defer keyLock.Close()

imj, err := ds.stores[imageManifestType].Read(key)
if err != nil {
return nil, fmt.Errorf("error retrieving image manifest: %v", err)
Expand Down

0 comments on commit 4618dea

Please sign in to comment.