Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix write path lock contention #6168

Merged
merged 4 commits into from
Mar 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
- [#6121](https://github.com/influxdata/influxdb/issues/6121): Fix panic: slice index out of bounds in TSM index
- [#6140](https://github.com/influxdata/influxdb/issues/6140): Ensure Shard engine not accessed when closed.
- [#6110](https://github.com/influxdata/influxdb/issues/6110): Fix for 0.9 upgrade path when using RPM
- [#6131](https://github.com/influxdata/influxdb/issues/6061): Fix write throughput regression with large number of measurments

## v0.11.0 [2016-03-22]

Expand Down
48 changes: 14 additions & 34 deletions tsdb/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,15 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem
func (d *DatabaseIndex) AssignShard(k string, shardID uint64) {
ss := d.Series(k)
if ss != nil {
d.mu.Lock()
ss.AssignShard(shardID)
d.mu.Unlock()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a lot of read contention on Series.shardIDs? If there's not then it would be more efficient to use a sync.Mutex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to leave this as a sync.RWMutex because I've had to change almost all of our uses of sync.Mutex to sync.RWMutex due to lock contention issues that appear under different workloads.


// TagsForSeries returns the tag map for the passed in series
func (d *DatabaseIndex) TagsForSeries(key string) map[string]string {
d.mu.RLock()
defer d.mu.RUnlock()

ss := d.series[key]
if ss == nil {
return nil
Expand Down Expand Up @@ -375,8 +374,6 @@ func (d *DatabaseIndex) DropMeasurement(name string) {
delete(d.series, s.Key)
}

m.drop()

d.statMap.Add(statDatabaseSeries, int64(-len(m.seriesByID)))
d.statMap.Add(statDatabaseMeasurements, -1)
}
Expand Down Expand Up @@ -418,8 +415,6 @@ type Measurement struct {
measurement *Measurement
seriesByTagKeyValue map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids
seriesIDs SeriesIDs // sorted list of series IDs in this measurement

statMap *expvar.Map
}

// NewMeasurement allocates and initializes a new Measurement.
Expand All @@ -432,12 +427,6 @@ func NewMeasurement(name string, idx *DatabaseIndex) *Measurement {
seriesByID: make(map[uint64]*Series),
seriesByTagKeyValue: make(map[string]map[string]SeriesIDs),
seriesIDs: make(SeriesIDs, 0),

statMap: influxdb.NewStatistics(
fmt.Sprintf("measurement:%s.%s", name, idx.name),
"measurement",
map[string]string{"database": idx.name, "measurement": name},
),
}
}

Expand Down Expand Up @@ -530,7 +519,6 @@ func (m *Measurement) AddSeries(s *Series) bool {
valueMap[v] = ids
}

m.statMap.Add(statMeasurementSeries, 1)
return true
}

Expand Down Expand Up @@ -578,17 +566,9 @@ func (m *Measurement) DropSeries(seriesID uint64) {
}
}

m.statMap.Add(statMeasurementSeries, -1)

return
}

// drop handles any cleanup for when a measurement is dropped.
// Currently only cleans up stats.
func (m *Measurement) drop() {
m.statMap.Add(statMeasurementSeries, int64(-len(m.seriesIDs)))
}

// filters walks the where clause of a select statement and returns a map with all series ids
// matching the where clause and any filter expression that should be applied to each
func (m *Measurement) filters(condition influxql.Expr) (map[uint64]influxql.Expr, error) {
Expand Down Expand Up @@ -1311,9 +1291,9 @@ func (a Measurements) union(other Measurements) Measurements {

// Series belong to a Measurement and represent unique time series in a database
type Series struct {
Key string
Tags map[string]string

mu sync.RWMutex
Key string
Tags map[string]string
id uint64
measurement *Measurement
shardIDs map[uint64]bool // shards that have this series defined
Expand All @@ -1329,11 +1309,16 @@ func NewSeries(key string, tags map[string]string) *Series {
}

func (s *Series) AssignShard(shardID uint64) {
s.mu.Lock()
s.shardIDs[shardID] = true
s.mu.Unlock()
}

// MarshalBinary encodes the object to a binary format.
func (s *Series) MarshalBinary() ([]byte, error) {
s.mu.RLock()
defer s.mu.RUnlock()

var pb internal.Series
pb.Key = &s.Key
for k, v := range s.Tags {
Expand All @@ -1346,6 +1331,9 @@ func (s *Series) MarshalBinary() ([]byte, error) {

// UnmarshalBinary decodes the object from a binary format.
func (s *Series) UnmarshalBinary(buf []byte) error {
s.mu.Lock()
defer s.mu.Unlock()

var pb internal.Series
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
Expand All @@ -1360,17 +1348,9 @@ func (s *Series) UnmarshalBinary(buf []byte) error {

// InitializeShards initializes the list of shards.
func (s *Series) InitializeShards() {
s.mu.Lock()
s.shardIDs = make(map[uint64]bool)
}

// match returns true if all tags match the series' tags.
func (s *Series) match(tags map[string]string) bool {
for k, v := range tags {
if s.Tags[k] != v {
return false
}
}
return true
s.mu.Unlock()
}

// SeriesIDs is a convenience type for sorting, checking equality, and doing
Expand Down