Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster namespace support #119

Merged
merged 5 commits into from Aug 15, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,77 @@
/*
* Copyright 2017 Pinterest, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.pinterest.rocksplicator.controller;

/**
* Storing cluster metadata (not including detailed segment/shard info)
*
* @author shu (shu@pinterest.com)
*/
public class Cluster {
public String namespace;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

public String name;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also can we add some comments on what is namespace and what is name in the cluster?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure


public Cluster(final String namespace, final String name) {
this.namespace = namespace;
this.name = name;
}


public String getNamespace() {
return namespace;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

public Cluster setNamespace(String namespace) {
this.namespace = namespace;
return this;
}

public String getName() {
return name;
}

public Cluster setName(String name) {
this.name = name;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

Cluster cluster = (Cluster) o;

if (namespace != null ? !namespace.equals(cluster.namespace) : cluster.namespace != null)
return false;
return name != null ? name.equals(cluster.name) : cluster.name == null;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line

}

@Override
public int hashCode() {
int result = namespace != null ? namespace.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}

@Override
public String toString() {
return namespace + "/" + name;
}
}
Expand Up @@ -16,7 +16,6 @@

package com.pinterest.rocksplicator.controller;

import java.sql.Timestamp;
import java.util.Date;

/**
Expand All @@ -29,7 +28,7 @@ public class Task extends TaskBase {

public long id;
public int state;
public String clusterName;
public Cluster cluster;
public Date createdAt;
public Date runAfter;
public Date lastAliveAt;
Expand All @@ -46,8 +45,8 @@ public Task setState(int state) {
return this;
}

public Task setClusterName(String clusterName) {
this.clusterName = clusterName;
public Task setCluster(Cluster cluster) {
this.cluster = cluster;
return this;
}

Expand Down Expand Up @@ -103,7 +102,7 @@ public String toString() {
"id=" + id +
", state=" + state +
", name='" + name + '\'' +
", clusterName='" + clusterName + '\'' +
", cluster='" + cluster.namespace + "/" + cluster.name + '\'' +
", createdAt=" + createdAt +
", runAfter=" + runAfter +
", lastAliveAt=" + lastAliveAt +
Expand Down
Expand Up @@ -28,24 +28,24 @@ public interface TaskQueue {

/**
* Create a new cluster in the task queue.
* @param clusterName
* @param cluster
* @return false on error
*/
default boolean createCluster(final String clusterName) {
default boolean createCluster(final Cluster cluster) {
return true;
}

/**
* Enqueue a task.
* @param taskBase entity of the task to enqueue
* @param clusterName Which cluster is this task for
* @param cluster Which cluster is this task for
* @param runDelaySeconds the task should be delayed for this many of seconds to run. If <= 0, no
* delay required
* @return false on error
*/
default boolean enqueueTask(final TaskBase taskBase,
final String clusterName,
final int runDelaySeconds) {
final Cluster cluster,
final int runDelaySeconds) {
return true;
}

Expand Down Expand Up @@ -134,7 +134,7 @@ default boolean finishTaskAndEnqueuePendingTask(final long id,
* @param cluster which cluster to lock
* @return false on error
*/
default boolean lockCluster(final String cluster) {
default boolean lockCluster(final Cluster cluster) {
return true;
}

Expand All @@ -143,7 +143,7 @@ default boolean lockCluster(final String cluster) {
* @param cluster which cluster to unlock
* @return false on error
*/
default boolean unlockCluster(final String cluster) {
default boolean unlockCluster(final Cluster cluster) {
return true;
}

Expand All @@ -153,7 +153,7 @@ default boolean unlockCluster(final String cluster) {
* @param cluster which cluster to remove
* @return false on error
*/
default boolean removeCluster(final String cluster) {
default boolean removeCluster(final Cluster cluster) {
return true;
}

Expand Down Expand Up @@ -188,12 +188,12 @@ default boolean keepTaskAlive(final long id) {
* Return all tasks in the state and associated with clusterName.
* If clusterName is null, all clusters are included.
* If state is null, all states are included.
* @param clusterName which cluster to peek tasks for
* @param cluster which cluster to peek tasks for
* @param state peek tasks in this state only
* @return the list of tasks found
*/
default List<Task> peekTasks(final String clusterName,
final Integer state) {
default List<Task> peekTasks(final Cluster cluster,
final Integer state) {
return new ArrayList<>();
}

Expand All @@ -219,7 +219,7 @@ default boolean removeTask(long id) {
* Return all clusters managed by this task queue.
* @return a set of cluster names
*/
default Set<String> getAllClusters() {
default Set<Cluster> getAllClusters() {
return Collections.emptySet();
}
}
Expand Up @@ -17,6 +17,7 @@
package com.pinterest.rocksplicator.controller.bean;


import com.pinterest.rocksplicator.controller.Cluster;
import org.hibernate.validator.constraints.NotEmpty;

import java.util.ArrayList;
Expand All @@ -29,19 +30,18 @@
public class ClusterBean {

@NotEmpty
/** name of the cluster */
private String name;
private Cluster cluster;

/** list of segments in this cluster */
private List<SegmentBean> segments = Collections.emptyList();

public String getName() {
return name;
public ClusterBean setCluster(Cluster cluster) {
this.cluster = cluster;
return this;
}

public ClusterBean setName(String name) {
this.name = name;
return this;
public Cluster getCluster() {
return cluster;
}

public List<SegmentBean> getSegments() {
Expand All @@ -55,7 +55,7 @@ public ClusterBean setSegments(List<SegmentBean> segments) {

@Override
public String toString() {
return name;
return cluster.toString();
}

}
Expand Up @@ -16,6 +16,7 @@

package com.pinterest.rocksplicator.controller.bean;

import com.pinterest.rocksplicator.controller.Cluster;
import org.hibernate.validator.constraints.NotEmpty;

import java.util.ArrayList;
Expand All @@ -30,16 +31,16 @@
public class ConsistentHashRingsBean {

@NotEmpty
private String name;
private Cluster cluster;

private List<ConsistentHashRingBean> consistentHashRings = Collections.emptyList();

public String getName() {
return name;
public Cluster getCluster() {
return cluster;
}

public ConsistentHashRingsBean setName(String name) {
this.name = name;
public ConsistentHashRingsBean setCluster(Cluster cluster) {
this.cluster = cluster;
return this;
}

Expand All @@ -55,7 +56,7 @@ public ConsistentHashRingsBean setConsistentHashRings(

@Override
public String toString() {
return name;
return cluster.toString();
}

}
Expand Up @@ -16,6 +16,7 @@

package com.pinterest.rocksplicator.controller.config;

import com.pinterest.rocksplicator.controller.Cluster;
import com.pinterest.rocksplicator.controller.bean.ClusterBean;
import com.pinterest.rocksplicator.controller.bean.HostBean;
import com.pinterest.rocksplicator.controller.bean.Role;
Expand Down Expand Up @@ -53,12 +54,13 @@ private ConfigParser() {
/**
* Convert cluster config data into a {@link ClusterBean}.
*
* @param clusterName name of the cluster
* @param cluster
* @param content binary config data
* @return ClusterBean or null if parsing failed
*/
@SuppressWarnings("unchecked")
public static ClusterBean parseClusterConfig(String clusterName, byte[] content) {
public static ClusterBean parseClusterConfig(Cluster cluster,
byte[] content) {
try {
Map<String, Object> segmentMap =
OBJECT_MAPPER.readValue(new String(content, UTF_8), HashMap.class);
Expand Down Expand Up @@ -87,7 +89,7 @@ public static ClusterBean parseClusterConfig(String clusterName, byte[] content)
segment.setHosts(hosts);
segments.add(segment);
}
return new ClusterBean().setName(clusterName).setSegments(segments);
return new ClusterBean().setCluster(cluster).setSegments(segments);
} catch (IOException | IllegalArgumentException e) {
LOG.error("Failed to parse cluster config.", e);
return null;
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.pinterest.rocksplicator.controller.config;

import com.pinterest.rocksplicator.controller.Cluster;
import com.pinterest.rocksplicator.controller.bean.ConsistentHashRingBean;
import com.pinterest.rocksplicator.controller.bean.ConsistentHashRingsBean;

Expand Down Expand Up @@ -78,7 +79,7 @@ private ConsistentHashRingsConfigParser() {}

@SuppressWarnings("unchecked")
public static ConsistentHashRingsBean parseConsistentHashRingsConfig(
String clusterName, byte[] content) {
Cluster cluster, byte[] content) {
try {
Map<String, Object> ringsMap = OBJECT_MAPPER.readValue(
new String(content, UTF_8), HashMap.class);
Expand All @@ -100,7 +101,9 @@ public static ConsistentHashRingsBean parseConsistentHashRingsConfig(
.setName(entry.getKey());
rings.add(consistentHashRingBean);
}
return new ConsistentHashRingsBean().setConsistentHashRings(rings).setName(clusterName);
return new ConsistentHashRingsBean()
.setConsistentHashRings(rings)
.setCluster(cluster);
} catch (IOException e) {
LOG.error("Failed to parse consistent hash ring config.", e);
return null;
Expand Down