Skip to content

Commit

Permalink
fix(dsfs): remove dataset field computing deadlock
Browse files Browse the repository at this point in the history
the dsfs.computeFieldsFile `Read` and `handleRows` methods were both contending for
the same mutex lock, which would deadlock the program. Fix this by using a TrackedReader
to count bytes instead of local state, which drops the need for taking the lock on calls
to Read. Should yeild a microscopic performance bump as well
  • Loading branch information
b5 committed Dec 9, 2020
1 parent 2412a40 commit ef615a9
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 15 deletions.
1 change: 0 additions & 1 deletion base/dsfs/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func commitFileAddFunc(privKey crypto.PrivKey, pub event.Publisher) addWriteFile
log.Debugf("ensureCommitTitleAndMessage: %s", err)
return nil, fmt.Errorf("error saving: %w", err)
}

}

replaceComponentsWithRefs(ds, added, wfs.body.FullPath())
Expand Down
19 changes: 7 additions & 12 deletions base/dsfs/compute_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ type computeFieldsFile struct {

pipeReader *io.PipeReader
pipeWriter *io.PipeWriter
teeReader io.Reader
teeReader *dsio.TrackedReader
done chan error

batches int
bytesRead int
batches int
}

var (
Expand Down Expand Up @@ -89,7 +88,7 @@ func newComputeFieldsFile(
bodyAct: BodyDefault,
pipeReader: pr,
pipeWriter: pw,
teeReader: tr,
teeReader: dsio.NewTrackedReader(tr),
done: make(chan error),
}

Expand Down Expand Up @@ -125,10 +124,6 @@ func (cff *computeFieldsFile) NextFile() (qfs.File, error) {
func (cff *computeFieldsFile) Read(p []byte) (n int, err error) {
n, err = cff.teeReader.Read(p)

cff.Lock()
defer cff.Unlock()
cff.bytesRead += n

if err != nil && err.Error() == "EOF" {
cff.pipeWriter.Close()
}
Expand Down Expand Up @@ -291,11 +286,11 @@ func (cff *computeFieldsFile) handleRows(ctx context.Context, pub event.Publishe

cff.Lock()
defer cff.Unlock()
log.Debugf("determined structure values. ErrCount=%d Entries=%d Depth=%d Length=%d", valErrorCount, entries, depth, cff.bytesRead)
log.Debugw("determined structure values", "errCount", valErrorCount, "entries", entries, "depth", depth, "bytecount", cff.teeReader.BytesRead())
cff.ds.Structure.ErrCount = valErrorCount
cff.ds.Structure.Entries = entries
cff.ds.Structure.Depth = depth + 1 // need to add one for the original enclosure
cff.ds.Structure.Length = cff.bytesRead
cff.ds.Structure.Length = cff.teeReader.BytesRead()

// as we're using a manual setup on the EntryReader we also need
// to manually close the accumulator to finalize results before write
Expand All @@ -314,8 +309,8 @@ func (cff *computeFieldsFile) handleRows(ctx context.Context, pub event.Publishe
}
}

log.Debugf("done handling structured entries")
cff.done <- nil
log.Debugf("done handling structured entries")
}()

return
Expand All @@ -325,7 +320,7 @@ func (cff *computeFieldsFile) flushBatch(ctx context.Context, buf *dsio.EntryBuf
log.Debugf("flushing batch %d", cff.batches)
cff.batches++

if cff.diffMessageBuf != nil && cff.bytesRead > BodySizeSmallEnoughToDiff {
if cff.diffMessageBuf != nil && cff.teeReader.BytesRead() > BodySizeSmallEnoughToDiff {
log.Debugf("removing diffMessage data buffer. bytesRead exceeds %d bytes", BodySizeSmallEnoughToDiff)
cff.diffMessageBuf.Close()
cff.diffMessageBuf = nil
Expand Down
4 changes: 2 additions & 2 deletions base/dsfs/structure.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ func structureFileAddFunc(destFS qfs.Filesystem) addWriteFileFunc {
ds.Structure.DropTransientValues()

if wfs.body == nil {
log.Debugf("body is nil, using json structure file")
wfs.structure, err = JSONFile(PackageFileStructure.Filename(), ds.Structure)
return err
}

hook := func(ctx context.Context, f qfs.File, added map[string]string) (io.Reader, error) {
if processingFile, ok := wfs.body.(doneProcessingFile); ok {
err := <-processingFile.DoneProcessing()
if err != nil {
if err := <-processingFile.DoneProcessing(); err != nil {
return nil, err
}
}
Expand Down

0 comments on commit ef615a9

Please sign in to comment.