Permalink
Browse files

mmvdump: make reader concurrent

  • Loading branch information...
1 parent 31ad2c9 commit 0be976ea0d8b3bbeec4e125813b5a5efaa6df91e @suyash suyash committed Jul 27, 2016
Showing with 158 additions and 36 deletions.
  1. +155 −33 mmvdump/mmvdump.go
  2. +3 −3 mmvdump/mmvdump_test.go
View
@@ -17,6 +17,7 @@ import (
"errors"
"fmt"
"math"
+ "sync"
"unsafe"
)
@@ -101,65 +102,165 @@ func readTocs(data []byte, count int32) ([]*Toc, error) {
}
func readInstances(data []byte, offset uint64, count int32) (map[uint64]*Instance, error) {
+ var wg sync.WaitGroup
+ wg.Add(int(count))
+
instances := make(map[uint64]*Instance)
+
+ var (
+ instance *Instance
+ err error
+ m sync.Mutex
+ )
+
for i := int32(0); i < count; i, offset = i+1, offset+InstanceLength {
- instance, err := readInstance(data, offset)
- if err != nil {
- return nil, err
- }
- instances[offset] = instance
+ go func(offset uint64) {
+ instance, err = readInstance(data, offset)
+ if err == nil {
+ m.Lock()
+ instances[offset] = instance
+ m.Unlock()
+ }
+ wg.Done()
+ }(offset)
+ }
+
+ wg.Wait()
+
+ if err != nil {
+ return nil, err
}
return instances, nil
}
func readInstanceDomains(data []byte, offset uint64, count int32) (map[uint64]*InstanceDomain, error) {
+ var wg sync.WaitGroup
+ wg.Add(int(count))
+
indoms := make(map[uint64]*InstanceDomain)
+
+ var (
+ indom *InstanceDomain
+ err error
+ m sync.Mutex
+ )
+
for i := int32(0); i < count; i, offset = i+1, offset+InstanceDomainLength {
- indom, err := readInstanceDomain(data, offset)
- if err != nil {
- return nil, err
- }
- indoms[offset] = indom
+ go func(offset uint64) {
+ indom, err = readInstanceDomain(data, offset)
+ if err == nil {
+ m.Lock()
+ indoms[offset] = indom
+ m.Unlock()
+ }
+ wg.Done()
+ }(offset)
+ }
+
+ wg.Wait()
+
+ if err != nil {
+ return nil, err
}
return indoms, nil
}
func readMetrics(data []byte, offset uint64, count int32) (map[uint64]*Metric, error) {
+ var wg sync.WaitGroup
+ wg.Add(int(count))
+
metrics := make(map[uint64]*Metric)
+
+ var (
+ metric *Metric
+ err error
+ m sync.Mutex
+ )
+
for i := int32(0); i < count; i, offset = i+1, offset+MetricLength {
- metric, err := readMetric(data, offset)
- if err != nil {
- return nil, err
- }
- metrics[offset] = metric
+ go func(offset uint64) {
+ metric, err = readMetric(data, offset)
+ if err == nil {
+ m.Lock()
+ metrics[offset] = metric
+ m.Unlock()
+ }
+ wg.Done()
+ }(offset)
+ }
+
+ wg.Wait()
+
+ if err != nil {
+ return nil, err
}
return metrics, nil
}
func readValues(data []byte, offset uint64, count int32) (map[uint64]*Value, error) {
+ var wg sync.WaitGroup
+ wg.Add(int(count))
+
values := make(map[uint64]*Value)
+
+ var (
+ value *Value
+ err error
+ m sync.Mutex
+ )
+
for i := int32(0); i < count; i, offset = i+1, offset+ValueLength {
- value, err := readValue(data, offset)
- if err != nil {
- return nil, err
- }
- values[offset] = value
+ go func(offset uint64) {
+ value, err = readValue(data, offset)
+ if err == nil {
+ m.Lock()
+ values[offset] = value
+ m.Unlock()
+ }
+ wg.Done()
+ }(offset)
+ }
+
+ wg.Wait()
+
+ if err != nil {
+ return nil, err
}
return values, nil
}
func readStrings(data []byte, offset uint64, count int32) (map[uint64]*String, error) {
+ var wg sync.WaitGroup
+ wg.Add(int(count))
+
strings := make(map[uint64]*String)
+
+ var (
+ str *String
+ err error
+ m sync.Mutex
+ )
+
for i := int32(0); i < count; i, offset = i+1, offset+StringLength {
- str, err := readString(data, offset)
- if err != nil {
- return nil, err
- }
- strings[offset] = str
+ go func(offset uint64) {
+ str, err = readString(data, offset)
+ if err == nil {
+ m.Lock()
+ strings[offset] = str
+ m.Unlock()
+ }
+ wg.Done()
+ }(offset)
+ }
+
+ wg.Wait()
+
+ if err != nil {
+ return nil, err
}
return strings, nil
@@ -186,24 +287,45 @@ func Dump(data []byte) (
return nil, nil, nil, nil, nil, nil, nil, err
}
+ var wg sync.WaitGroup
+ wg.Add(len(tocs))
+
for _, toc := range tocs {
switch toc.Type {
case TocInstances:
- instances, err = readInstances(data, toc.Offset, toc.Count)
+ go func(offset uint64, count int32) {
+ instances, err = readInstances(data, offset, count)
+ wg.Done()
+ }(toc.Offset, toc.Count)
case TocIndoms:
- indoms, err = readInstanceDomains(data, toc.Offset, toc.Count)
+ go func(offset uint64, count int32) {
+ indoms, err = readInstanceDomains(data, offset, count)
+ wg.Done()
+ }(toc.Offset, toc.Count)
case TocMetrics:
- metrics, err = readMetrics(data, toc.Offset, toc.Count)
+ go func(offset uint64, count int32) {
+ metrics, err = readMetrics(data, offset, count)
+ wg.Done()
+ }(toc.Offset, toc.Count)
case TocValues:
- values, err = readValues(data, toc.Offset, toc.Count)
+ go func(offset uint64, count int32) {
+ values, err = readValues(data, offset, count)
+ wg.Done()
+ }(toc.Offset, toc.Count)
case TocStrings:
- strings, err = readStrings(data, toc.Offset, toc.Count)
- }
- if err != nil {
- return nil, nil, nil, nil, nil, nil, nil, err
+ go func(offset uint64, count int32) {
+ strings, err = readStrings(data, offset, count)
+ wg.Done()
+ }(toc.Offset, toc.Count)
}
}
+ wg.Wait()
+
+ if err != nil {
+ return nil, nil, nil, nil, nil, nil, nil, err
+ }
+
return
}
View
@@ -55,14 +55,14 @@ func TestMmvDump1(t *testing.T) {
}
if len(metrics) != 1 {
- t.Errorf("expected number of strings %d, got %d", 1, len(metrics))
+ t.Errorf("expected number of metrics %d, got %d", 1, len(metrics))
}
if len(values) != 1 {
- t.Errorf("expected number of strings %d, got %d", 1, len(values))
+ t.Errorf("expected number of values %d, got %d", 1, len(values))
}
if len(instances) != 0 {
- t.Errorf("expected number of strings %d, got %d", 0, len(instances))
+ t.Errorf("expected number of instances %d, got %d", 0, len(instances))
}
}

0 comments on commit 0be976e

Please sign in to comment.