Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

559 deterministic update distribution #569

Closed
wants to merge 3 commits into from

Conversation

replay
Copy link
Contributor

@replay replay commented Mar 17, 2017

Changes who Cassandra Idx decides whether the LastUpdate property of an index entry needs to be updated or not.
The decision is now based around a hash that's using a given metric's Id as input, so if multiple nodes receive the same metric point they should all reach the same conclusion.

fixes #559

@Dieterbe
Copy link
Contributor

so if multiple nodes receive the same metric point they should all reach the same conclusion.

is this an important or beneficial characteristic? why?

@replay
Copy link
Contributor Author

replay commented Mar 17, 2017

Because we're deciding based on the last update time whether a metric is active or not.
Afaik there have been cases where we load balanced between multiple nodes and got different sets of metrics because on some nodes a metric was considered active and on another it was not.
Iirc awoods was talking about that

@woodsaj
Copy link
Member

woodsaj commented Mar 18, 2017

I just had an idea.

The logic we have in place here is to prevent us from trying to update all metricDefs at the same time, as this would result in ingestion blocking while waiting for writes to cassandra to complete.

Why dont we just simplify this and use a static updateInterval, but use a non-blocking write to the writeQueue. If we successfully add the metricDef to the writeQueue then we update the lastUpdate timestamp. If we cant write to the writeQueue because it is full, then we just move on. The next time the metric is received we will try and add it to the writeQueue again. This will lead to the updates of all MetricDefs to be spaced out over the window.

eg,

type writeMode int

const (
	noWrite writeMode = iota
	nonBlockingWrite
	blockingWrite
)

cassandraWriteMode := noWrite

if inMemory {
	if existing.Partition == partition {
		if existing.LastUpdate + (updateInterval/time.Second) < time.Now().Unix() {
			cassandraWriteMode = nonBlockingWrite
		}
	} else {
	 	cassandraWriteMode = blockingWrite
	}
} else {
	cassandraWriteMode = blockingWrite
}

if cassandraWriteMode != noWrite {
	def := schema.MetricDefinitionFromMetricData(data)
	def.Partition = partition
	if updateCassIdx {
		if cassandraWriteMode == nonBlockingWrite {
		    select {
		    case c.writeQueue <- writeReq{recvTime: time.Now(), def: def}:
		    	log.Debug("cassandra-idx updating def in index.")
		    default:
		    	//could not add to the writeQueue, we will retry this metric next time
		    	// we see it.
		    	return
		    }
		} else {
			c.writeQueue <- writeReq{recvTime: time.Now(), def: def}
			log.Debug("cassandra-idx updating def in index.")
		}
	}
	c.MemoryIdx.AddOrUpdateDef(def)
	stat.Value(time.Since(pre))
}

@Dieterbe
Copy link
Contributor

Dieterbe commented Mar 20, 2017

We have to take a step back. What problem are we actually trying to solve?
AFAICT:

  1. we want the code to be deterministic so we can unit test. or at least @replay wants this. Personally I think if the code is easy to reason about (which the current one line logic is), then this is not very important
  2. we want the code to be deterministic so that multiple nodes will save at the same time for any given metric id. this would be a nice to have: we will change our deployment so that all instances in the read path update their indexes constantly, but others may experience the problem where different read nodes apply different filtering for render requests.
  3. whatever we do, we want to retain the property to not update all metricdefs at once.

The current code is easy to reason about but non-deterministic. The proposed formula makes it deterministic. @woodsaj's latest suggestion makes it non-deterministic again, I don't understand the point.

Also there's so much code being written/changed all touching this code (the latest stuff in master + #568 which is high prio + #570 which is high prio). I told @replay he could tackle this under the assumption it would be a trivial patch but if this is going to get complicated then we should wait and focus on the other stuff first.

@replay
Copy link
Contributor Author

replay commented Mar 20, 2017

@woodsaj regarding your suggestion:
With the currently implemented solution in this PR the writes are going to be more or less evenly distributed across time.
Assuming that at each full minute/hours there's a spike in writes, wouldn't your proposed solution have a bigger potential to max-out the write throughput around those spikes and hence slow the reads down by a larger amount than the currently implemented one in this PR? I think if we intentionally max out the write through put then we might get into issues with read timeouts again, distributing the writes over time would help to reduce these issues.
Furthermore, there's no guarantee that an update gets written. If we run into a scenario where f.e. we have somebody that's sending a whole bunch of metrics at once and always in the same order at each interval, then the ones at the end of that order might always be skipped while the ones in the beginning would always get written.

@woodsaj
Copy link
Member

woodsaj commented Mar 20, 2017

computing the hash for every metric received is expensive. We can probably improve ingestion latency by 10% (and therefor increase maximum ingestion rate) by using the non-blocking channel writes i have used above. There is definitely room for improvement to handle the highlighted edge cases.

Given there are a number of issues with the index, we should do a larger re-design/refactor to address them all.

  • prevent idx updates from blocking ingestion
  • reduce overhead of handling index updates
  • keep more precise lastUpdate times in the index so we can more aggressivly filter queries and not hit cassandra for series that have no data.
  • dont try write all updates to cassandra at once
  • only primary nodes should write to cassandra

@replay
Copy link
Contributor Author

replay commented Mar 20, 2017

Yeah, that's a valid concern, i haven't benchmarked the hashing.
We could also easily cache the hash by associating it's result with the schema.MetricDefinition objects or their Id. That would get us the benefits of having the hash while still preventing it from slowing the ingestion down.

@replay replay force-pushed the 559_deterministic_update_distribution branch from 6d9979a to d0f2c0d Compare March 20, 2017 20:19
@replay replay force-pushed the 559_deterministic_update_distribution branch from d0f2c0d to 4a48ab2 Compare March 20, 2017 20:33
@woodsaj woodsaj mentioned this pull request Mar 21, 2017
@Dieterbe Dieterbe closed this Mar 28, 2017
@Dieterbe Dieterbe deleted the 559_deterministic_update_distribution branch September 18, 2018 09:25
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Use deterministic method to distribute across fuzziness window
3 participants