Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual partition assignment #95

Merged
merged 17 commits into from
Jul 16, 2015

Conversation

terciodemelo
Copy link

Automatic random generation of partition assignments limits the user control over them. At times it is necessary, or at least desirable to tweak the assignment and manually make decisions on which replica of which partition goes to. An example of such a situation is when only a small subset of topics takes most of data flow on the cluster, and you may wanna make sure they are separated, in different brokers.

A new view was created for these decisions, where the user can see both the current assignment and some broker metrics to aid on choices. Two metrics were also added to this view: Process CPU Load and System CPU Load.
screen shot 2015-07-06 at 11 06 44 pm

This view can be accessed through the route /clusters/:cluster/assignments/manual or by clicking the button "Manually Set Partition Assignmentns" in /clusters/:cluster/topics.

I hope more people find it useful!

EDIT 1: I also added two filters in this view: to filter topics and to filter metrics, in order to decrease the visual clutter on the page.

EDIT 2: Feedbacks are welcome =p

Tercio de Melo added 13 commits July 1, 2015 19:51
A screen created in order to manually define an arrangement of
partitions assignment among the availabe brokers. This may be
preferable than trusting the current automated automated way of
generating partitions assignments.
* Created new BVGetViews in ActorModel.scala, which is the "plural" of
BVGetView(id: Int).
* In class BrokerViewCacheActor created new prived method
allBrokerViews(): Seq[BVView] which is used by processActorRequest when
treating the BVGetViews query.
* In KafkaManager the getBrokersView(clusterName:String) query was
added to wrap the BVGetViews query.
The Manual Partitions Assignment view was expanded to include a short
version of brokers metrics to ease assignment decisions.
The very useful repeatWithIndex helper located in
https://gist.github.com/benoit-ponsero/4484313 has been added to the
project.
* conf/routes -> POST route added in /cluster/:c/assignments/manual to
controllers.ReassignPartitions.handleManualAssignment(c:String).

* New action handleManualAssignment(c:String) which takes a manual
reassignment form from request and applies it to
KafkaManager#manualPartitionAssignments(c:String,
    assignment:List[(String,List[(Int, List[Int])])]) which takes the
assignment from the submitted form and sends to actor
CMManualPartitionAssignments below KMClusterCommandRequest.

* New actor CMManualPartitionAssignments which take an assignment for a
particular cluster and handles the update to zookeeper so that
runPartitionAssignment may later establish it.

* View topic/manualMultipleAssignments.scala.html was fully adapted to
conform the changes made.
Previously there was a bug that if a broker X had a number of topics
and after a reassignment it didn't have any topic anymore, it would
keep appearing to have the same topics (partitions replicas) as before
because the map brokertopicPartitions wasn't updated for such brokers
in the method updateView():Unit in BrokerViewActor.scala. It has now
been fixed by performing the right mapping for such brokers.
* Error feedback screen when there's some wrong data in the submitted
reassignment form or when validation goes wrong.
* Manual reassignment form validation added, this validation verifies
if two or more replicas of the same partition are being assigned to the
same broker.
* Minor visual improvements in Manual Partitions Assignment screen.
* Refactoring inside the method handleManualAssignment(String), which
contained to much duplicated code, which was wrapped in the function
responseScreen(String, IndexedSeq[ApiError] \/ Unit) inside the same
method.
Long story short, after allowing the user to manually set partitions
assignment among the brokers, the need to better analyze the brokers
arise because one of the motivations to manually assign partitions is
to understand some of the broker's low level stats such as CPU load
and memory available, and thus improve the distributions of partitions
among the brokers by taking into consideration such informations.
That being said, kafka's MBeans are not the only ones being used now;
java.lang OperatingSystem MBean is used to get Operating System metrics
through JMX.

* New class OSMetric has been created to keep the Operating System
stats.
* Class BrokerMetrics now also wraps a OSMetric object.
* New KafkaMetrics method getOSMetric(MBeanServerConnection) was
created to fetch OS stats through JMX.
* New View app/views/common/expandedBrokerMetrics.scala.html was
created, based on brokerMetrics.scala.html, to display OS stats and to
only display the Mean of other metrics, the 1, 5, or 15 minutes
threshold are not displayed in this view.

TODO: Implement a filtering, possibly with AngularJS to better filter the
metrics and topics seen, and decrease the visual clutter.
Two filters were added to Manual Partitions Assignment: one to filter
topics, and the other to filter metrics thus reducing the visual
clutter of the page.

A new button was also added: Suggest Partition Assignment, which must
be implemented yet. When clicking on it, a suggestion of configuration
taking into consideration the metrics available must be set on the form.
Two OS-level metrics were removed: Free Physical Memory Size and Free
Swap Space Size, both turned out to not be too useful when designing a
manual partition assignment.
Moreover, I removed from the interface elements meant to be used for
suggestions of assignment. They were removed for separation of
concerns, they must come in further contributions, not this one.
@patelh
Copy link
Collaborator

patelh commented Jul 16, 2015

Looks cool. Will have to try it out and will let you know.

private def allBrokerViews(): Seq[BVView] = {
var bvs = mutable.MutableList[BVView]()
for (key <- brokerTopicPartitions.keySet.toSeq.sorted) {
val bv = brokerTopicPartitions.get(key).map { bv =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the logic used here and BVGetView into a method to reduce duplication. Maybe a new method like

produceBViewWithBrokerClusterState(bv: BVView) : BVView = {
  val bcs = for {
            metrics <- bv.metrics
            cbm <- combinedBrokerMetric
          } yield {
            val perMessages = if(cbm.messagesInPerSec.oneMinuteRate > 0) {
              BigDecimal(metrics.messagesInPerSec.oneMinuteRate / cbm.messagesInPerSec.oneMinuteRate * 100D).setScale(3, BigDecimal.RoundingMode.HALF_UP)
            } else ZERO
            val perIncoming = if(cbm.bytesInPerSec.oneMinuteRate > 0) {
              BigDecimal(metrics.bytesInPerSec.oneMinuteRate / cbm.bytesInPerSec.oneMinuteRate * 100D).setScale(3, BigDecimal.RoundingMode.HALF_UP)
            } else ZERO
            val perOutgoing = if(cbm.bytesOutPerSec.oneMinuteRate > 0) {
              BigDecimal(metrics.bytesOutPerSec.oneMinuteRate / cbm.bytesOutPerSec.oneMinuteRate * 100D).setScale(3, BigDecimal.RoundingMode.HALF_UP)
            } else ZERO
            BrokerClusterStats(perMessages, perIncoming, perOutgoing)
          }
  if(bcs.isDefined) {
    bv.copy(stats=bcs)
  } else {
    bv
  }
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! I'll do it right now

Both the method allBrokerViews() and the actor request processing
for BVGetView had similar body code, this has now been enhanced by
wrapping this particular code fragment in
produceBViewWithBrokerClusterState(bv: BVView) method
case CMManualPartitionAssignments(assignments) =>
implicit val ec = longRunningExecutionContext
val result = Future {
modify {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks to be duplicating 279-295, can we move to a method so we can reuse in both places.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Tercio de Melo added 3 commits July 16, 2015 02:33
Both actors command process for CMGeneratePartitionAssignments and
CMManualPartitionAssignments updates the current broker assignment,
previous making use of similar codes, i.e., code duplication, which is
now wrapped inside the updateAssignmentInZk method.
Added unit test for CMManualPartitionAssignments in
test/kafka/manager/TestClusterManagerActor.scala
@patelh
Copy link
Collaborator

patelh commented Jul 16, 2015

lgtm

patelh added a commit that referenced this pull request Jul 16, 2015
@patelh patelh merged commit 0a5ad79 into yahoo:master Jul 16, 2015
@terciodemelo terciodemelo deleted the manual-partition-assignment branch July 16, 2015 20:52
@yazgoo
Copy link
Contributor

yazgoo commented Oct 1, 2015

Thanks @terciodemelo. I could not get it to work though.
When clicking on Save Partition Assignment, nothing happens.
Also, I think it would be awesome to be able to export the reassignment JSON.

@terciodemelo
Copy link
Author

@yazgoo After saving the partition assignment you must run them in the topics list view. I also think that it would be great some kind of assignment exporting feature - I can imagine a couple of uses with cluster visualization - but it isn't possible for now, since the feature was temporarily removed from master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants