-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broker truncation #2276
Broker truncation #2276
Conversation
a725955
to
dc896ce
Compare
@@ -24,6 +24,12 @@ import ( | |||
// only occurs when the reader is at the end of all the data. | |||
const DefaultPollInterval = 100 * time.Millisecond | |||
|
|||
// DefaultMaxTopicSize is the largest a topic can get before truncation. | |||
const DefaultMaxTopicSize = 1024 * 1024 * 1024 // 10MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//1GB
Thanks @jwilder -- comment was a copy 'n' paste error. |
30da629
to
f8a0925
Compare
Thanks for feedback @jwilder -- updated. |
207b900
to
13dfdc6
Compare
LGTM. I'm a little concerned about how frequently |
f5f78f9
to
24196e7
Compare
@jwilder -- I don't think it would make a huge impact, but it's not great, I agree. Take a look at the modified implementation. I think it's safe, since the topic is locked. |
0309647
to
2413ad6
Compare
return | ||
} | ||
nBytesDeleted += size | ||
segments = segments[1 : len(segments)-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is slicing an item off on both ends. Shouldn't it be segments[1:]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it should be. :-) That's what you get when you don't unit-test. :-)
Yeah, that looks better. |
2413ad6
to
8219f25
Compare
OK, updated so there is only a single read of the topic directory. Since the topic is locked, segments can't change since writing to the topic also takes the lock. |
d021b6f
to
b6d68bb
Compare
Unit test same.
A segment must now have been replicated by at least 1 node before it can be deleted.
b6d68bb
to
e96148c
Compare
@@ -691,6 +744,13 @@ func (t *Topic) IndexForURL(u url.URL) uint64 { | |||
return t.indexByURL[u] | |||
} | |||
|
|||
// SetIndexForURL sets the replicated index for a given data URL. | |||
func (t *Topic) SetIndexForURL(index uint64, u url.URL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exported this so I could easily unit-test.
If a data node requests a topic index that is earier than is present for a topic, tombstones allow the broker to know that the data node should be redirected to another node that has the topic's data already replicated. If no tombstone exists, then the broker can simply restart replaying the topic data it has.
e96148c
to
f591150
Compare
return | ||
} | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this to a named function so it's isolated? Also, the Open()
is already kind of big.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, was on the fence about this.
Overall lgtm, mostly minor nits. |
Thanks @benbjohnson -- maybe I'll just scrap the named returns. Might be clearer. |
b209afd
to
f6763ef
Compare
f6763ef
to
dab100a
Compare
Named returns scrapped, simpler is better. Green build. |
This change implements basic topic truncation on Brokers. It does this by periodically checking each topic for its size, and if greater than a certain threshold and the topic data has been replicated, deleting segments until the required size is reached.
The change also exposes maximum topic and segment sizes via configuration, as I believe some people may want to change these numbers.
I decided to implement this via a go-routine, so that the function to check the topic sizes could simply walk the filesystem, and not have to worry too much about efficiency vis a vis caching sizes in RAM. I think a design whereby truncation is performed every few minutes is fine (it's how Apache Kafka works), and this approach seems simpler and more robust.
Remaining: