Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
fix: do not lose files when writing files into subshards that contain…
Browse files Browse the repository at this point in the history
… other subshards (#3936)

When writing a file into a hamt shard we hash the filename to figure out where in the
shard to place the file.

If the hash means that we end up adding the file into an existing subshard that also
contains another subshard, we should populate the other subshard's children otherwise
they will not be there when we calculate the new CID for the subshard and they will
be lost.

Fixes #3921
  • Loading branch information
achingbrain committed Nov 12, 2021
1 parent 15184bf commit 8a3ed19
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 26 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ jobs:
deps: ipfs-core@$PWD/packages/ipfs-core/dist
- name: ipfs browser service worker
repo: https://github.com/ipfs-examples/js-ipfs-browser-service-worker.git
deps: ipfs-core@$PWD/packages/ipfs-core/dist,ipfs-message-port-client/dist@$PWD/packages/ipfs-message-port-client/dist,ipfs-message-port-protocol@$PWD/packages/ipfs-message-port-protocol/dist,ipfs-message-port-server@$PWD/packages/ipfs-message-port-server/dist
deps: ipfs-core@$PWD/packages/ipfs-core/dist,ipfs-message-port-client@$PWD/packages/ipfs-message-port-client/dist,ipfs-message-port-protocol@$PWD/packages/ipfs-message-port-protocol/dist,ipfs-message-port-server@$PWD/packages/ipfs-message-port-server/dist
- name: ipfs browser sharing across tabs
repo: https://github.com/ipfs-examples/js-ipfs-browser-sharing-node-across-tabs.git
deps: ipfs-core@$PWD/packages/ipfs-core/dist,ipfs-message-port-client@$PWD/packages/ipfs-message-port-client/dist,ipfs-message-port-server@$PWD/packages/ipfs-message-port-server/dist
Expand All @@ -365,7 +365,7 @@ jobs:
deps: ipfs-core@$PWD/packages/ipfs-core/dist
- name: ipfs custom ipfs repo
repo: https://github.com/ipfs-examples/js-ipfs-custom-ipfs-repo.git
deps: ipfs@$PWD/packages/ipfs/dist
deps: ipfs-core@$PWD/packages/ipfs-core/dist
- name: ipfs custom ipld formats
repo: https://github.com/ipfs-examples/js-ipfs-custom-ipld-formats.git
deps: ipfs-core@$PWD/packages/ipfs-core/dist,ipfs-daemon@$PWD/packages/ipfs-daemon/dist,ipfs-http-client@$PWD/packages/ipfs-http-client/dist
Expand All @@ -380,7 +380,7 @@ jobs:
deps: ipfs-http-client@$PWD/packages/ipfs-http-client/dist,ipfs@$PWD/packages/ipfs/dist
- name: ipfs-http-client name api
repo: https://github.com/ipfs-examples/js-ipfs-http-client-name-api.git
deps: ipfs@$PWD/packages/ipfs/dist,ipfs-http-client@$PWD/packages/ipfs-http-client/dist
deps: ipfs-http-client@$PWD/packages/ipfs-http-client/dist
- name: ipfs-http-client upload file
repo: https://github.com/ipfs-examples/js-ipfs-http-client-upload-file.git
deps: ipfs@$PWD/packages/ipfs/dist,ipfs-http-client@$PWD/packages/ipfs-http-client/dist
Expand Down
44 changes: 44 additions & 0 deletions packages/interface-ipfs-core/src/files/write.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { randomBytes, randomStream } from 'iso-random-stream'
import all from 'it-all'
import isShardAtPath from '../utils/is-shard-at-path.js'
import * as raw from 'multiformats/codecs/raw'
import map from 'it-map'

/**
* @typedef {import('ipfsd-ctl').Factory} Factory
Expand Down Expand Up @@ -903,6 +904,49 @@ export function testWrite (factory, options) {
long: true
}))).to.eventually.not.be.empty()
})

it('writes a file to a sub-shard of a shard that contains another sub-shard', async () => {
const data = Uint8Array.from([0, 1, 2])

await ipfs.files.mkdir('/hamttest-mfs')

const files = [
'file-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000-1398.txt',
'vivanov-sliceart',
'file-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000-1230.txt',
'methodify',
'fis-msprd-style-loader_0_13_1',
'js-form',
'file-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000-1181.txt',
'node-gr',
'yanvoidmodule',
'file-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000-1899.txt',
'file-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000-372.txt',
'file-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000-1032.txt',
'file-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000-1293.txt',
'file-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000-766.txt'
]

for (const path of files) {
await ipfs.files.write(`/hamttest-mfs/${path}`, data, {
shardSplitThreshold: 0,
create: true
})
}

const beforeFiles = await all(map(ipfs.files.ls('/hamttest-mfs'), (entry) => entry.name))

expect(beforeFiles).to.have.lengthOf(files.length)

await ipfs.files.write('/hamttest-mfs/supermodule_test', data, {
shardSplitThreshold: 0,
create: true
})

const afterFiles = await all(map(ipfs.files.ls('/hamttest-mfs'), (entry) => entry.name))

expect(afterFiles).to.have.lengthOf(beforeFiles.length + 1)
})
})
})
}
41 changes: 41 additions & 0 deletions packages/interface-ipfs-core/src/utils/dump-shard.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { UnixFS } from 'ipfs-unixfs'

/**
* @param {string} path
* @param {import('ipfs-core-types').IPFS} ipfs
*/
export default async function dumpShard (path, ipfs) {
const stats = await ipfs.files.stat(path)
const { value: node } = await ipfs.dag.get(stats.cid)
const entry = UnixFS.unmarshal(node.Data)

if (entry.type !== 'hamt-sharded-directory') {
throw new Error('Not a shard')
}

await dumpSubShard(stats.cid, ipfs)
}

/**
* @param {import('multiformats/cid').CID} cid
* @param {import('ipfs-core-types').IPFS} ipfs
* @param {string} prefix
*/
async function dumpSubShard (cid, ipfs, prefix = '') {
const { value: node } = await ipfs.dag.get(cid)
const entry = UnixFS.unmarshal(node.Data)

if (entry.type !== 'hamt-sharded-directory') {
throw new Error('Not a shard')
}

for (const link of node.Links) {
const { value: subNode } = await ipfs.dag.get(link.Hash)
const subEntry = UnixFS.unmarshal(subNode.Data)
console.info(`${prefix}${link.Name}`, ' ', subEntry.type) // eslint-disable-line no-console

if (link.Name.length === 2) {
await dumpSubShard(link.Hash, ipfs, `${prefix} `)
}
}
}
4 changes: 2 additions & 2 deletions packages/ipfs-core/src/components/files/utils/add-link.js
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ const addFileToShardedDirectory = async (context, options) => {
// subshard hasn't been loaded, descend to the next level of the HAMT
if (!path[index]) {
log(`Loaded new subshard ${segment.prefix}`)
await recreateHamtLevel(subShard.Links, rootBucket, segment.bucket, parseInt(segment.prefix, 16))
await recreateHamtLevel(context, subShard.Links, rootBucket, segment.bucket, parseInt(segment.prefix, 16))

const position = await rootBucket._findNewBucketAndPos(file.name)

Expand All @@ -355,7 +355,7 @@ const addFileToShardedDirectory = async (context, options) => {
const nextSegment = path[index]

// add next levels worth of links to bucket
await addLinksToHamtBucket(subShard.Links, nextSegment.bucket, rootBucket)
await addLinksToHamtBucket(context, subShard.Links, nextSegment.bucket, rootBucket)

nextSegment.node = subShard
}
Expand Down
48 changes: 39 additions & 9 deletions packages/ipfs-core/src/components/files/utils/hamt-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,21 @@ export const updateHamtDirectory = async (context, links, bucket, options) => {
}

/**
* @param {MfsContext} context
* @param {PBLink[]} links
* @param {Bucket<any>} rootBucket
* @param {Bucket<any>} parentBucket
* @param {number} positionAtParent
*/
export const recreateHamtLevel = async (links, rootBucket, parentBucket, positionAtParent) => {
export const recreateHamtLevel = async (context, links, rootBucket, parentBucket, positionAtParent) => {
// recreate this level of the HAMT
const bucket = new Bucket({
hash: rootBucket._options.hash,
bits: rootBucket._options.bits
}, parentBucket, positionAtParent)
parentBucket._putObjectAt(positionAtParent, bucket)

await addLinksToHamtBucket(links, bucket, rootBucket)
await addLinksToHamtBucket(context, links, bucket, rootBucket)

return bucket
}
Expand All @@ -99,28 +100,57 @@ export const recreateInitialHamtLevel = async (links) => {
bits: hamtBucketBits
})

await addLinksToHamtBucket(links, bucket, bucket)
// populate sub bucket but do not recurse as we do not want to pull whole shard in
await Promise.all(
links.map(async link => {
const linkName = (link.Name || '')

if (linkName.length === 2) {
const pos = parseInt(linkName, 16)

const subBucket = new Bucket({
hash: bucket._options.hash,
bits: bucket._options.bits
}, bucket, pos)
bucket._putObjectAt(pos, subBucket)

return Promise.resolve()
}

return bucket.put(linkName.substring(2), {
size: link.Tsize,
cid: link.Hash
})
})
)

return bucket
}

/**
* @param {MfsContext} context
* @param {PBLink[]} links
* @param {Bucket<any>} bucket
* @param {Bucket<any>} rootBucket
*/
export const addLinksToHamtBucket = async (links, bucket, rootBucket) => {
export const addLinksToHamtBucket = async (context, links, bucket, rootBucket) => {
await Promise.all(
links.map(link => {
links.map(async link => {
const linkName = (link.Name || '')

if (linkName.length === 2) {
log('Populating sub bucket', linkName)
const pos = parseInt(linkName, 16)
const block = await context.repo.blocks.get(link.Hash)
const node = dagPB.decode(block)

bucket._putObjectAt(pos, new Bucket({
const subBucket = new Bucket({
hash: rootBucket._options.hash,
bits: rootBucket._options.bits
}, bucket, pos))
}, bucket, pos)
bucket._putObjectAt(pos, subBucket)

await addLinksToHamtBucket(context, node.Links, subBucket, rootBucket)

return Promise.resolve()
}
Expand Down Expand Up @@ -213,7 +243,7 @@ export const generatePath = async (context, fileName, rootNode) => {
if (!path[i + 1]) {
log(`Loaded new subshard ${segment.prefix}`)

await recreateHamtLevel(node.Links, rootBucket, segment.bucket, parseInt(segment.prefix, 16))
await recreateHamtLevel(context, node.Links, rootBucket, segment.bucket, parseInt(segment.prefix, 16))
const position = await rootBucket._findNewBucketAndPos(fileName)

// i--
Expand All @@ -229,7 +259,7 @@ export const generatePath = async (context, fileName, rootNode) => {
const nextSegment = path[i + 1]

// add intermediate links to bucket
await addLinksToHamtBucket(node.Links, nextSegment.bucket, rootBucket)
await addLinksToHamtBucket(context, node.Links, nextSegment.bucket, rootBucket)

nextSegment.node = node
}
Expand Down
13 changes: 1 addition & 12 deletions packages/ipfs-core/src/components/files/write.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { importer } from 'ipfs-unixfs-importer'
import {
decode
} from '@ipld/dag-pb'
import { sha256, sha512 } from 'multiformats/hashes/sha2'
import { createStat } from './stat.js'
import { createMkdir } from './mkdir.js'
import { addLink } from './utils/add-link.js'
Expand Down Expand Up @@ -293,17 +292,7 @@ const write = async (context, source, destination, options) => {
mtime = destination.unixfs.mtime
}

let hasher
switch (options.hashAlg) {
case 'sha2-256':
hasher = sha256
break
case 'sha2-512':
hasher = sha512
break
default:
throw new Error(`TODO vmx 2021-03-31: Proper error message for unsupported hash algorithms like ${options.hashAlg}`)
}
const hasher = await context.hashers.getHasher(options.hashAlg)

const result = await last(importer([{
content: content,
Expand Down

0 comments on commit 8a3ed19

Please sign in to comment.