Skip to content

Commit

Permalink
stash,filetree: improve the GC for FileTree FS
Browse files Browse the repository at this point in the history
  • Loading branch information
tsileo committed Mar 22, 2019
1 parent b77aacb commit 07d12d2
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 36 deletions.
33 changes: 33 additions & 0 deletions lua/stash_gc.lua
Expand Up @@ -3,6 +3,39 @@ local kvstore = require('kvstore')
local blobstore = require('blobstore')
local node = require('node')

function premark_kv (key, version)
local h = kvstore.get_meta_blob(key, version)
if h ~= nil then
local _, ref, _ = kvstore.get(key, version)
if ref ~= '' then
premark(ref)
end
premark(h)
end
end
_G.premark_kv = premark_kv

function premark_filetree_node (ref)
local data = blobstore.get(ref)
local cnode = node.decode(data)
if cnode.t == 'dir' then
if cnode.r then
for _, childRef in ipairs(cnode.r) do
premark_filetree_node(childRef)
end
end
else
if cnode.r then
for _, contentRef in ipairs(cnode.r) do
premark(contentRef[2])
end
end
end
-- only mark the final ref once all the "data" blobs has been saved
premark(ref)
end
_G.premark_filetree_node = premark_filetree_node

-- Setup the `mark_kv` and `mark_filetree` global helper for the GC API
function mark_kv (key, version)
local h = kvstore.get_meta_blob(key, version)
Expand Down
24 changes: 14 additions & 10 deletions pkg/filetree/filetree.go
Expand Up @@ -132,9 +132,10 @@ type Snapshot struct {
}

type FS struct {
Name string `json:"-"`
Ref string `json:"ref"`
AsOf int64 `json:"-"`
Name string `json:"-"`
Ref string `json:"ref"`
AsOf int64 `json:"-"`
Revision int64 `json:"-"`

ft *FileTree
}
Expand Down Expand Up @@ -712,6 +713,7 @@ func (ft *FileTree) FS(ctx context.Context, name, prefixFmt string, newState boo
case nil:
// Set the existing ref
fs.Ref = kv.HexHash()
fs.Revision = kv.Version
case vkv.ErrNotFound:
// XXX(tsileo): should the `ErrNotFound` be returned here?
default:
Expand All @@ -724,6 +726,7 @@ func (ft *FileTree) FS(ctx context.Context, name, prefixFmt string, newState boo
case nil:
if len(kvv.Versions) > 0 {
fs.Ref = kvv.Versions[0].HexHash()
fs.Revision = kvv.Versions[0].Version
}

case vkv.ErrNotFound:
Expand Down Expand Up @@ -1211,6 +1214,7 @@ func (ft *FileTree) fsHandler() func(http.ResponseWriter, *http.Request) {
}

w.Header().Set("ETag", node.Hash)
w.Header().Set("BlobStash-FileTree-Revision", strconv.FormatInt(fs.Revision, 10))

// Handle HEAD request
if r.Method == "HEAD" {
Expand Down Expand Up @@ -1601,7 +1605,12 @@ func (ft *FileTree) treeBlobsHandler() func(http.ResponseWriter, *http.Request)
panic(fmt.Errorf("Unknown type \"%s\"", refType))
}

tree, err := ft.TreeBlobs(ctx, fs)
node, _, _, err := fs.Path(ctx, "/", 1, false, 0)
if err != nil {
panic(fmt.Errorf("failed to get path: %v", err))
}

tree, err := ft.TreeBlobs(ctx, node)
if err != nil {
panic(err)
}
Expand All @@ -1612,14 +1621,9 @@ func (ft *FileTree) treeBlobsHandler() func(http.ResponseWriter, *http.Request)
}
}

func (ft *FileTree) TreeBlobs(ctx context.Context, fs *FS) ([]string, error) {
func (ft *FileTree) TreeBlobs(ctx context.Context, node *Node) ([]string, error) {
// FIXME(tsileo): take a FS, a fix the path arg
out := []string{}
node, _, _, err := fs.Path(ctx, "/", 1, false, 0)
if err != nil {
return nil, fmt.Errorf("failed to get path: %v", err)
}

if err := ft.IterTree(ctx, node, func(n *Node, p string) error {
out = append(out, n.Meta.Hash)

Expand Down
18 changes: 16 additions & 2 deletions pkg/filetree/fs/ngfs/ngfs.go
Expand Up @@ -359,8 +359,16 @@ func (fs *FS) GC() error {

gcScript := fmt.Sprintf(`
local kvstore = require('kvstore')
-- FS vkv ref
local key = "_filetree:fs:%s"
-- Do the "premark" step, to tell the GC which blobs are already in the root blobstore
local last_sync_version = "%d"
local _, last_ref, _ = kvstore.get(key, last_sync_version)
premark_kv(key, last_sync_version)
premark_filetree_node(last_ref)
-- Now, do the actual GC mark (and premarked blobs will be skipped)
local version = "%d"
local _, ref, _ = kvstore.get(key, version)
Expand All @@ -369,7 +377,7 @@ mark_kv(key, version)
-- mark the whole tree
mark_filetree_node(ref)
`, fs.ref, fs.lastRevision)
`, fs.ref, fs.lastSyncRev, fs.lastRevision)

// FIXME(tsileo): make the stash name configurable
resp, err := fs.clientUtil.PostMsgpack(
Expand Down Expand Up @@ -434,6 +442,10 @@ func (fs *FS) getNodeAsOf(path string, depth int, asOf int64) (*node, error) {
}

node.AsOf = asOf
node.Revision, err = strconv.ParseInt(resp.Header.Get("BlobStash-FileTree-Revision"), 10, 0)
if err != nil {
return nil, err
}
// fmt.Printf("getNode(%s) = %v\n", fs.remotePath(path), node)

return node, nil
Expand All @@ -455,6 +467,7 @@ func (cfs *FS) Root() (fs.Node, error) {
return nil, err
}
cfs.root.Add("current", ftRoot)
fmt.Printf("cfs=%+v\n", cfs)

// magic dir that list all versions, YYYY-MM-DDTHH:MM:SS
cfs.root.Add("versions", &versionsDir{cfs})
Expand Down Expand Up @@ -621,6 +634,7 @@ func (d *dir) preloadFTRoot() error {
}
}

d.fs.lastSyncRev = n.Revision
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/filetree/fs/ngfs/node.go
Expand Up @@ -21,7 +21,8 @@ type node struct {
Info map[string]interface{} `json:"info,omitempty" msgpack:"i,omitempty"`

// Set by the FS
AsOf int64 `json:"-" msgpack:"-"`
AsOf int64 `json:"-" msgpack:"-"`
Revision int64 `json:"-" msgpack:"-"`
}

// mode returns the node file mode
Expand Down
2 changes: 1 addition & 1 deletion pkg/luascripts/files.go
Expand Up @@ -5,6 +5,6 @@ package luascripts
var files = map[string]string{
"docstore_query.lua": "-- Python-like string.split implementation http://lua-users.org/wiki/SplitJoin\nfunction string:split(sSeparator, nMax, bRegexp)\n assert(sSeparator ~= '')\n assert(nMax == nil or nMax >= 1)\n\n local aRecord = {}\n\n if self:len() > 0 then\n local bPlain = not bRegexp\n nMax = nMax or -1\n\n local nField, nStart = 1, 1\n local nFirst,nLast = self:find(sSeparator, nStart, bPlain)\n while nFirst and nMax ~= 0 do\n aRecord[nField] = self:sub(nStart, nFirst-1)\n nField = nField+1\n nStart = nLast+1\n nFirst,nLast = self:find(sSeparator, nStart, bPlain)\n nMax = nMax-1\n end\n aRecord[nField] = self:sub(nStart)\n end\n\n return aRecord\nend\nfunction get_path (doc, q)\n q = q:gsub('%[%d', '.%1')\n local parts = q:split('.')\n p = doc\n for _, part in ipairs(parts) do\n if type(p) ~= 'table' then\n return nil\n end\n if part:sub(1, 1) == '[' then\n part = part:sub(2, 2)\n end\n if tonumber(part) ~= nil then\n p = p[tonumber(part)]\n else\n p = p[part]\n end\n if p == nil then\n return nil\n end\n end\n return p\nend\n_G.get_path = get_path\nfunction in_list (doc, path, value, q)\n local p = get_path(doc, path)\n if type(p) ~= 'table' then\n return false\n end\n for _, item in ipairs(p) do\n if q == nil then\n if item == value then return true end\n else\n if get_path(item, q) == value then return true end\n end\n end\n return false\nend\n_G.in_list = in_list\n\nfunction match (doc, path, op, value)\n p = get_path(doc, path)\n if type(p) ~= type(value) then return false end\n if op == 'EQ' then\n return p == value\n elseif op == 'NE' then\n return p ~= value\n elseif op == 'GT' then\n return p > value\n elseif op == 'GE' then\n return p >= value\n elseif op == 'LT' then\n return p < value\n elseif op == 'LE' then\n return p <= value\n end\n return false\nend\n_G.match = match\n",
"filetree_expr_search.lua": "-- Used as a \"match func\" when searching within a FileTree tree\nreturn function(node, contents)\n if {{.expr}} then return true else return false end\nend\n",
"stash_gc.lua": "local msgpack = require('msgpack')\nlocal kvstore = require('kvstore')\nlocal blobstore = require('blobstore')\nlocal node = require('node')\n \n-- Setup the `mark_kv` and `mark_filetree` global helper for the GC API\nfunction mark_kv (key, version)\n local h = kvstore.get_meta_blob(key, version)\n if h ~= nil then\n local _, ref, _ = kvstore.get(key, version)\n if ref ~= '' then\n mark(ref)\n end\n mark(h)\n end\n end\n _G.mark_kv = mark_kv\n\nfunction mark_filetree_node (ref)\n local data = blobstore.get(ref)\n local cnode = node.decode(data)\n if cnode.t == 'dir' then\n if cnode.r then\n for _, childRef in ipairs(cnode.r) do\n mark_filetree_node(childRef)\n end\n end\n else\n if cnode.r then\n for _, contentRef in ipairs(cnode.r) do\n mark(contentRef[2])\n end\n end\n end\n -- only mark the final ref once all the \"data\" blobs has been saved\n mark(ref)\nend\n_G.mark_filetree_node = mark_filetree_node\n",
"stash_gc.lua": "local msgpack = require('msgpack')\nlocal kvstore = require('kvstore')\nlocal blobstore = require('blobstore')\nlocal node = require('node')\n \nfunction premark_kv (key, version)\n local h = kvstore.get_meta_blob(key, version)\n if h ~= nil then\n local _, ref, _ = kvstore.get(key, version)\n if ref ~= '' then\n premark(ref)\n end\n premark(h)\n end\n end\n _G.premark_kv = premark_kv\n\nfunction premark_filetree_node (ref)\n local data = blobstore.get(ref)\n local cnode = node.decode(data)\n if cnode.t == 'dir' then\n if cnode.r then\n for _, childRef in ipairs(cnode.r) do\n premark_filetree_node(childRef)\n end\n end\n else\n if cnode.r then\n for _, contentRef in ipairs(cnode.r) do\n premark(contentRef[2])\n end\n end\n end\n -- only mark the final ref once all the \"data\" blobs has been saved\n premark(ref)\nend\n_G.premark_filetree_node = premark_filetree_node\n \n-- Setup the `mark_kv` and `mark_filetree` global helper for the GC API\nfunction mark_kv (key, version)\n local h = kvstore.get_meta_blob(key, version)\n if h ~= nil then\n local _, ref, _ = kvstore.get(key, version)\n if ref ~= '' then\n mark(ref)\n end\n mark(h)\n end\n end\n _G.mark_kv = mark_kv\n\nfunction mark_filetree_node (ref)\n local data = blobstore.get(ref)\n local cnode = node.decode(data)\n if cnode.t == 'dir' then\n if cnode.r then\n for _, childRef in ipairs(cnode.r) do\n mark_filetree_node(childRef)\n end\n end\n else\n if cnode.r then\n for _, contentRef in ipairs(cnode.r) do\n mark(contentRef[2])\n end\n end\n end\n -- only mark the final ref once all the \"data\" blobs has been saved\n mark(ref)\nend\n_G.mark_filetree_node = mark_filetree_node\n",
"test.lua": "return function()\n return {{.expr}}\nend\n",
}
2 changes: 1 addition & 1 deletion pkg/stash/api/api.go
Expand Up @@ -115,7 +115,7 @@ func (s *StashAPI) dataContextGCHandler() func(http.ResponseWriter, *http.Reques
}
fmt.Printf("\n\nGC imput: %+v\n\n", out)
if err := s.stash.DoAndDestroy(ctx, name, func(ctx context.Context, dc store.DataContext) error {
blobs, size, err := gc.GC(ctx, s.hub, s.stash, dc, out.Script)
blobs, size, err := gc.GC(ctx, s.hub, s.stash, dc, out.Script, map[string]struct{}{})
fmt.Printf("GC err=%v, output: %d blobs, %s\n\n", err, blobs, humanize.Bytes(size))
return err

Expand Down
20 changes: 13 additions & 7 deletions pkg/stash/gc/gc.go
Expand Up @@ -19,7 +19,7 @@ import (
"a4.io/blobstash/pkg/stash/store"
)

func GC(ctx context.Context, h *hub.Hub, s *stash.Stash, dc store.DataContext, script string) (int, uint64, error) {
func GC(ctx context.Context, h *hub.Hub, s *stash.Stash, dc store.DataContext, script string, existingRefs map[string]struct{}) (int, uint64, error) {

// TODO(tsileo): take a logger
refs := map[string]struct{}{}
Expand All @@ -28,23 +28,32 @@ func GC(ctx context.Context, h *hub.Hub, s *stash.Stash, dc store.DataContext, s
L := lua.NewState()
var skipped int

// premark(<blob hash>) notify the GC that this blob is already in the root blobstore explicitely (to speedup huge GC)
premark := func(L *lua.LState) int {
// TODO(tsileo): debug logging here to help troubleshot GC issues
ref := L.ToString(1)
if _, ok := existingRefs[ref]; !ok {
existingRefs[ref] = struct{}{}
}
return 0
}

// mark(<blob hash>) is the lowest-level func, it "mark"s a blob to be copied to the root blobstore
mark := func(L *lua.LState) int {
// TODO(tsileo): debug logging here to help troubleshot GC issues
ref := L.ToString(1)
if dc.Cache().Contains(ref) {
if _, ok := existingRefs[ref]; ok {
skipped++
// Skip the blob as it already in the root blob store
return 0
}

if _, ok := refs[ref]; !ok {
refs[ref] = struct{}{}
orderedRefs = append(orderedRefs, ref)
}
return 0
}

L.SetGlobal("premark", L.NewFunction(premark))
L.SetGlobal("mark", L.NewFunction(mark))
L.PreloadModule("json", loadJSON)
L.PreloadModule("msgpack", loadMsgpack)
Expand Down Expand Up @@ -85,9 +94,6 @@ func GC(ctx context.Context, h *hub.Hub, s *stash.Stash, dc store.DataContext, s
blobsCnt++
totalSize += uint64(len(data))
}

// Add the blob to the "mark cache" (that will prevent the extra "stat/exist" check
dc.Cache().Add(ref, struct{}{})
}
fmt.Printf("LRU cache skipped %d blobs, refs=%d blobs, saved %d blobs\n", skipped, len(orderedRefs), blobsCnt)

Expand Down
2 changes: 1 addition & 1 deletion pkg/stash/gc/gc_test.go
Expand Up @@ -94,7 +94,7 @@ func TestDataContextMerge(t *testing.T) {
t.Errorf("root blobstore should be empty")
}

if _, _, err := GC(ctxutil.WithNamespace(context.Background(), "tmp"), nil, s, tmpDataContext, "mark_kv('hello', 10)"); err != nil {
if _, _, err := GC(ctxutil.WithNamespace(context.Background(), "tmp"), nil, s, tmpDataContext, "mark_kv('hello', 10)", map[string]struct{}{}); err != nil {
panic(err)
}

Expand Down
11 changes: 0 additions & 11 deletions pkg/stash/stash.go
Expand Up @@ -8,7 +8,6 @@ import (
"path/filepath"
"sync"

lru "github.com/hashicorp/golang-lru"
log "github.com/inconshreveable/log15"

"a4.io/blobstash/pkg/blob"
Expand All @@ -29,7 +28,6 @@ type dataContext struct {
hub *hub.Hub
meta *meta.Meta
log log.Logger
cache *lru.Cache
dir string
root bool
closed bool
Expand All @@ -55,10 +53,6 @@ func (dc *dataContext) Closed() bool {
return dc.closed
}

func (dc *dataContext) Cache() *lru.Cache {
return dc.cache
}

func (dc *dataContext) Merge(ctx context.Context) error {
if dc.root {
return nil
Expand Down Expand Up @@ -196,10 +190,6 @@ func (s *Stash) NewDataContext(name string) (*dataContext, error) {
KvStore: kvsDst,
ReadSrc: s.rootDataContext.kvs,
}
cache, err := lru.New(2 << 18) // 500k items (will store marked blobs)
if err != nil {
return nil, err
}
dataCtx := &dataContext{
log: l,
meta: m,
Expand All @@ -209,7 +199,6 @@ func (s *Stash) NewDataContext(name string) (*dataContext, error) {
kvsProxy: kvs,
bsProxy: bs,
dir: path,
cache: cache,
}
s.contexes[name] = dataCtx
return dataCtx, nil
Expand Down
2 changes: 0 additions & 2 deletions pkg/stash/store/store.go
Expand Up @@ -11,7 +11,6 @@ import (
"a4.io/blobstash/pkg/blob"
"a4.io/blobstash/pkg/blobstore"
"a4.io/blobstash/pkg/vkv"
lru "github.com/hashicorp/golang-lru"
)

var sepCandidates = []string{":", "&", "*", "^", "#", ".", "-", "_", "+", "=", "%", "@", "!"}
Expand Down Expand Up @@ -73,7 +72,6 @@ type DataContext interface {
BlobStoreProxy() BlobStore
KvStoreProxy() KvStore
Merge(context.Context) error
Cache() *lru.Cache
Close() error
Closed() bool
Destroy() error
Expand Down

0 comments on commit 07d12d2

Please sign in to comment.