Skip to content

Commit

Permalink
dagwriter-parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
jbenet committed Oct 30, 2014
1 parent 71ac117 commit b16b5b9
Showing 1 changed file with 52 additions and 21 deletions.
73 changes: 52 additions & 21 deletions unixfs/io/dagwriter.go
@@ -1,6 +1,8 @@
package io

import (
"sync"

"github.com/jbenet/go-ipfs/importer/chunk"
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/pin"
Expand Down Expand Up @@ -46,51 +48,66 @@ func (dw *DagWriter) startSplitter() {
mbf := new(ft.MultiBlock)
root := new(dag.Node)

// concurrent writing to disk
indirectNodes := make(chan *dag.Node)
var wg sync.WaitGroup

// function to consume the nodesToWrite channel
writeNodes := func() {
defer wg.Done()
for {
node, more := <-indirectNodes
if !more {
return
}
log.Info("dagwriter worker writing")
dw.writeNode(node, pin.Indirect)
log.Info("dagwriter worker writing done")
}
}

// spin off 10 worker goroutines.
for i := 0; i < 100; i++ {
wg.Add(1)
go writeNodes()
}

for blkData := range blkchan {
if dw.seterr != nil {
return
}

// Store the block size in the root node
mbf.AddBlockSize(uint64(len(blkData)))
node := &dag.Node{Data: ft.WrapData(blkData)}
nk, err := dw.dagserv.Add(node)
if dw.Pinner != nil {
dw.Pinner.PinWithMode(nk, pin.Indirect)
}
if err != nil {
dw.seterr = err
log.Critical("Got error adding created node to dagservice: %s", err)
return
}
indirectNodes <- node

// Add a link to this node without storing a reference to the memory
err = root.AddNodeLinkClean("", node)
err := root.AddNodeLinkClean("", node)
if err != nil {
dw.seterr = err
log.Critical("Got error adding created node to root node: %s", err)
log.Criticalf("Got error adding created node to root node: %s", err)
return
}
}
close(indirectNodes)

// Generate the root node data
mbf.Data = first
data, err := mbf.GetBytes()
if err != nil {
dw.seterr = err
log.Critical("Failed generating bytes for multiblock file: %s", err)
log.Criticalf("Failed generating bytes for multiblock file: %s", err)
return
}
root.Data = data

// Add root node to the dagservice
rootk, err := dw.dagserv.Add(root)
if err != nil {
dw.seterr = err
log.Critical("Got error adding created node to dagservice: %s", err)
return
}
if dw.Pinner != nil {
dw.Pinner.PinWithMode(rootk, pin.Recursive)
}
dw.writeNode(root, pin.Recursive)
dw.node = root
wg.Wait()
dw.done <- struct{}{}
log.Info("dagwriter done")
}

func (dw *DagWriter) Write(b []byte) (int, error) {
Expand All @@ -101,6 +118,20 @@ func (dw *DagWriter) Write(b []byte) (int, error) {
return len(b), nil
}

func (dw *DagWriter) writeNode(nd *dag.Node, pinMode pin.PinMode) {
nk, err := dw.dagserv.Add(nd)
if err != nil {
dw.seterr = err
log.Criticalf("Got error adding created node to dagservice: %s", err)
return
}

if dw.Pinner != nil {
dw.Pinner.PinWithMode(nk, pinMode)
}
return
}

// Close the splitters input channel and wait for it to finish
// Must be called to finish up splitting, otherwise split method
// will never halt
Expand Down

0 comments on commit b16b5b9

Please sign in to comment.