Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

fix: handle shard updates that create subshards of subshards #47

Merged
merged 1 commit into from
Mar 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 173 additions & 126 deletions src/core/utils/add-link.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ const {
} = require('ipld-dag-pb')
const CID = require('cids')
const waterfall = require('async/waterfall')
const DirSharded = require('ipfs-unixfs-importer/src/importer/dir-sharded')
const series = require('async/series')
const whilst = require('async/whilst')
const log = require('debug')('ipfs:mfs:core:utils:add-link')
const UnixFS = require('ipfs-unixfs')
const DirSharded = require('ipfs-unixfs-importer/src/importer/dir-sharded')
const {
generatePath,
updateHamtDirectory
updateHamtDirectory,
recreateHamtLevel,
createShard,
toPrefix,
addLinksToHamtBucket
} = require('./hamt-utils')

const defaultOptions = {
Expand Down Expand Up @@ -125,140 +127,185 @@ const addToDirectory = (context, options, callback) => {
}

const addToShardedDirectory = (context, options, callback) => {
return waterfall([
(cb) => generatePath(context, options.name, options.parent, cb),
({ rootBucket, path }, cb) => {
updateShard(context, path.reverse(), {
name: options.name,
cid: options.cid,
size: options.size
}, 0, options, (err, result = {}) => cb(err, { rootBucket, ...result }))
},
({ rootBucket, node }, cb) => updateHamtDirectory(context, node.links, rootBucket, options, cb)
], callback)
}

const updateShard = (context, positions, child, index, options, callback) => {
const {
bucket,
prefix,
node
} = positions[index]

const link = node.links
.find(link => link.name.substring(0, 2) === prefix && link.name !== `${prefix}${child.name}`)

return waterfall([
(cb) => {
if (link && link.name.length > 2) {
log(`Converting existing file ${link.name} into sub-shard for ${child.name}`)

return waterfall([
(done) => createShard(context, [{
name: link.name.substring(2),
size: link.size,
multihash: link.cid.buffer
}, {
name: child.name,
size: child.size,
multihash: child.cid.buffer
}], {}, done),
({ node: { links: [ shard ] } }, done) => {
let position = 0

// step through the shard until we find the newly created sub-shard
return whilst(
() => position < positions.length - 1,
(next) => {
const shardPrefix = positions[position].prefix

log(`Prefix at position ${position} is ${shardPrefix} - shard.name ${shard.name}`)

if (shard.name.substring(0, 2) !== shardPrefix) {
return next(new Error(`Unexpected prefix ${shard.name} !== ${shardPrefix}, position ${position}`))
}

position++

context.ipld.get(shard.cid, (err, result) => {
if (err) {
return next(err)
}

if (position < positions.length) {
const nextPrefix = positions[position].prefix
const nextShard = result.value.links.find(link => link.name.substring(0, 2) === nextPrefix)

if (nextShard) {
shard = nextShard
}
}

next(err, { cid: result && result.cid, node: result && result.value })
})
},
done
)
},
(result, cb) => updateShardParent(context, bucket, node, link.name, result.node, result.cid, prefix, options, cb)
], cb)
}
return addFileToShardedDirectoryy(context, options, (err, result) => {
if (err) {
return callback(err)
}

if (link && link.name.length === 2) {
log(`Descending into sub-shard ${link.name} for ${child.name}`)
const {
shard, path
} = result

return waterfall([
(cb) => updateShard(context, positions, child, index + 1, options, cb),
(result, cb) => updateShardParent(context, bucket, node, link.name, result.node, result.cid, prefix, options, cb)
], cb)
shard.flush('', context.ipld, null, async (err, result) => {
if (err) {
return callback(err)
}

log(`Adding or replacing file`, prefix + child.name)
updateShardParent(context, bucket, node, prefix + child.name, child, child.cid, prefix + child.name, options, cb)
}
], callback)
// we have written out the shard, but only one sub-shard will have been written so replace it in the original shard
const oldLink = options.parent.links
.find(link => link.name.substring(0, 2) === path[0].prefix)

const newLink = result.node.links
.find(link => link.name.substring(0, 2) === path[0].prefix)

waterfall([
(done) => {
if (!oldLink) {
return done(null, options.parent)
}

DAGNode.rmLink(options.parent, oldLink.name, done)
},
(parent, done) => DAGNode.addLink(parent, newLink, done),
(parent, done) => updateHamtDirectory(context, parent.links, path[0].bucket, options, done)
], callback)
})
})
}

const createShard = (context, contents, options, callback) => {
const shard = new DirSharded({
root: true,
dir: true,
parent: null,
parentKey: null,
path: '',
dirty: true,
flat: false,

...options
})
const addFileToShardedDirectoryy = (context, options, callback) => {
const file = {
name: options.name,
cid: options.cid,
size: options.size
}

const operations = contents.map(contents => {
return (cb) => {
shard.put(contents.name, {
size: contents.size,
multihash: contents.multihash
}, cb)
// start at the root bucket and descend, loading nodes as we go
recreateHamtLevel(options.parent.links, null, null, null, async (err, rootBucket) => {
if (err) {
return callback(err)
}
})

return series(
operations,
(err) => {
if (err) {
return callback(err)
const shard = new DirSharded({
root: true,
dir: true,
parent: null,
parentKey: null,
path: '',
dirty: true,
flat: false
})
shard._bucket = rootBucket

// load subshards until the bucket & position no longer changes
const position = await rootBucket._findNewBucketAndPos(file.name)
const path = toBucketPath(position)
path[0].node = options.parent
let index = 0

whilst(
() => index < path.length,
(next) => {
let segment = path[index]
index++
let node = segment.node

let link = node.links
.find(link => link.name.substring(0, 2) === segment.prefix)

if (!link) {
// prefix is new, file will be added to the current bucket
log(`Link ${segment.prefix}${file.name} will be added`)
index = path.length
return next(null, shard)
}

if (link.name === `${segment.prefix}${file.name}`) {
// file already existed, file will be added to the current bucket
log(`Link ${segment.prefix}${file.name} will be replaced`)
index = path.length
return next(null, shard)
}

if (link.name.length > 2) {
// another file had the same prefix, will be replaced with a subshard
log(`Link ${link.name} will be replaced with a subshard`)
index = path.length
return next(null, shard)
}

// load sub-shard
log(`Found subshard ${segment.prefix}`)
context.ipld.get(link.cid, (err, result) => {
if (err) {
return next(err)
}

// subshard hasn't been loaded, descend to the next level of the HAMT
if (!path[index]) {
log(`Loaded new subshard ${segment.prefix}`)
const node = result.value

return recreateHamtLevel(node.links, rootBucket, segment.bucket, parseInt(segment.prefix, 16), async (err) => {
if (err) {
return next(err)
}

const position = await rootBucket._findNewBucketAndPos(file.name)

path.push({
bucket: position.bucket,
prefix: toPrefix(position.pos),
node: node
})

return next(null, shard)
})
}

const nextSegment = path[index]

// add next level's worth of links to bucket
addLinksToHamtBucket(result.value.links, nextSegment.bucket, rootBucket, (error) => {
nextSegment.node = result.value

next(error, shard)
})
})
},
(err, shard) => {
if (err) {
return callback(err)
}

// finally add the new file into the shard
shard.put(file.name, {
size: file.size,
multihash: file.cid.buffer
}, (err) => {
callback(err, {
shard, path
})
})
}

shard.flush('', context.ipld, null, callback)
}
)
)
})
}

const updateShardParent = (context, bucket, parent, name, node, cid, prefix, options, callback) => {
waterfall([
(done) => DAGNode.rmLink(parent, name, done),
(parent, done) => DAGNode.addLink(parent, new DAGLink(prefix, node.size, cid), done),
(parent, done) => updateHamtDirectory(context, parent.links, bucket, options, done)
], callback)
const toBucketPath = (position) => {
let bucket = position.bucket
let positionInBucket = position.pos
let path = [{
bucket,
prefix: toPrefix(positionInBucket)
}]

bucket = position.bucket._parent
positionInBucket = position.bucket._posAtParent

while (bucket) {
path.push({
bucket,
prefix: toPrefix(positionInBucket)
})

positionInBucket = bucket._posAtParent
bucket = bucket._parent
}

path.reverse()

return path
}

module.exports = addLink
43 changes: 41 additions & 2 deletions src/core/utils/hamt-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
} = require('ipld-dag-pb')
const waterfall = require('async/waterfall')
const whilst = require('async/whilst')
const series = require('async/series')
const Bucket = require('hamt-sharding/src/bucket')
const DirSharded = require('ipfs-unixfs-importer/src/importer/dir-sharded')
const log = require('debug')('ipfs:mfs:core:utils:hamt-utils')
Expand Down Expand Up @@ -63,7 +64,10 @@ const addLinksToHamtBucket = (links, bucket, rootBucket, callback) => {
return Promise.resolve()
}

return (rootBucket || bucket).put(link.name.substring(2), true)
return (rootBucket || bucket).put(link.name.substring(2), {
size: link.size,
multihash: link.cid
})
})
)
.then(() => callback(null, bucket), callback)
Expand Down Expand Up @@ -180,10 +184,45 @@ const generatePath = (context, fileName, rootNode, callback) => {
})
}

const createShard = (context, contents, options, callback) => {
const shard = new DirSharded({
root: true,
dir: true,
parent: null,
parentKey: null,
path: '',
dirty: true,
flat: false,

...options
})

const operations = contents.map(contents => {
return (cb) => {
shard.put(contents.name, {
size: contents.size,
multihash: contents.multihash
}, cb)
}
})

return series(
operations,
(err) => {
if (err) {
return callback(err)
}

shard.flush('', context.ipld, null, callback)
}
)
}

module.exports = {
generatePath,
updateHamtDirectory,
recreateHamtLevel,
addLinksToHamtBucket,
toPrefix
toPrefix,
createShard
}
Loading