From ea01832ba60ef41348140a8914d87c801d356592 Mon Sep 17 00:00:00 2001 From: shuzhang1989 Date: Wed, 9 Aug 2017 15:41:12 -0700 Subject: [PATCH 1/5] [WIP] Add cluster namespace support --- .../rocksplicator/controller/Cluster.java | 77 +++++++++++ .../rocksplicator/controller/Task.java | 9 +- .../rocksplicator/controller/TaskQueue.java | 24 ++-- .../controller/bean/ClusterBean.java | 16 +-- .../bean/ConsistentHashRingsBean.java | 13 +- .../controller/config/ConfigParser.java | 8 +- .../ConsistentHashRingsConfigParser.java | 7 +- .../controller/mysql/MySQLTaskQueue.java | 128 +++++++++--------- .../controller/mysql/entity/TagEntity.java | 20 ++- .../controller/mysql/entity/TagId.java | 16 +++ .../controller/mysql/entity/TaskEntity.java | 14 +- .../ConsistentHashRingsConfigParserTest.java | 7 +- .../controller/ControllerConfiguration.java | 10 +- .../controller/ControllerService.java | 2 +- .../controller/resource/Clusters.java | 107 ++++++++------- .../controller/resource/Tasks.java | 10 +- .../controller/WorkerConfig.java | 2 +- .../rocksplicator/controller/WorkerPool.java | 8 +- .../controller/tasks/AddHostTask.java | 5 +- .../controller/tasks/ChainedTask.java | 3 +- .../controller/tasks/ConfigCheckTask.java | 9 +- .../controller/tasks/Context.java | 7 +- .../controller/tasks/HealthCheckTask.java | 33 ++--- .../controller/tasks/LoadSSTTask.java | 11 +- .../controller/tasks/LocalAckTaskQueue.java | 23 ++-- .../controller/tasks/PromoteTask.java | 3 +- .../controller/tasks/RebalanceTask.java | 5 +- .../controller/tasks/RemoveHostTask.java | 5 +- .../rocksplicator/controller/util/ZKUtil.java | 44 ++---- controller/tools/mysql/create_tables.sql | 8 +- 30 files changed, 367 insertions(+), 267 deletions(-) create mode 100644 controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Cluster.java create mode 100644 controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TagId.java diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Cluster.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Cluster.java new file mode 100644 index 00000000..9659fcac --- /dev/null +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Cluster.java @@ -0,0 +1,77 @@ +/* + * Copyright 2017 Pinterest, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.pinterest.rocksplicator.controller; + +/** + * Storing cluster metadata (not including detailed segment/shard info) + * + * @author shu (shu@pinterest.com) + */ +public class Cluster { + public String namespace; + public String name; + + public Cluster(final String namespace, final String name) { + this.namespace = namespace; + this.name = name; + } + + + public String getNamespace() { + return namespace; + + } + + public Cluster setNamespace(String namespace) { + this.namespace = namespace; + return this; + } + + public String getName() { + return name; + } + + public Cluster setName(String name) { + this.name = name; + return this; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Cluster cluster = (Cluster) o; + + if (namespace != null ? !namespace.equals(cluster.namespace) : cluster.namespace != null) + return false; + return name != null ? name.equals(cluster.name) : cluster.name == null; + + } + + @Override + public int hashCode() { + int result = namespace != null ? namespace.hashCode() : 0; + result = 31 * result + (name != null ? name.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return namespace + "/" + name; + } +} diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Task.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Task.java index 1019b610..fed8b707 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Task.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Task.java @@ -16,7 +16,6 @@ package com.pinterest.rocksplicator.controller; -import java.sql.Timestamp; import java.util.Date; /** @@ -29,7 +28,7 @@ public class Task extends TaskBase { public long id; public int state; - public String clusterName; + public Cluster cluster; public Date createdAt; public Date runAfter; public Date lastAliveAt; @@ -46,8 +45,8 @@ public Task setState(int state) { return this; } - public Task setClusterName(String clusterName) { - this.clusterName = clusterName; + public Task setCluster(Cluster cluster) { + this.cluster = cluster; return this; } @@ -103,7 +102,7 @@ public String toString() { "id=" + id + ", state=" + state + ", name='" + name + '\'' + - ", clusterName='" + clusterName + '\'' + + ", cluster='" + cluster.namespace + "/" + cluster.name + '\'' + ", createdAt=" + createdAt + ", runAfter=" + runAfter + ", lastAliveAt=" + lastAliveAt + diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/TaskQueue.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/TaskQueue.java index 2aacf260..8d23bff3 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/TaskQueue.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/TaskQueue.java @@ -28,24 +28,24 @@ public interface TaskQueue { /** * Create a new cluster in the task queue. - * @param clusterName + * @param cluster * @return false on error */ - default boolean createCluster(final String clusterName) { + default boolean createCluster(final Cluster cluster) { return true; } /** * Enqueue a task. * @param taskBase entity of the task to enqueue - * @param clusterName Which cluster is this task for + * @param cluster Which cluster is this task for * @param runDelaySeconds the task should be delayed for this many of seconds to run. If <= 0, no * delay required * @return false on error */ default boolean enqueueTask(final TaskBase taskBase, - final String clusterName, - final int runDelaySeconds) { + final Cluster cluster, + final int runDelaySeconds) { return true; } @@ -134,7 +134,7 @@ default boolean finishTaskAndEnqueuePendingTask(final long id, * @param cluster which cluster to lock * @return false on error */ - default boolean lockCluster(final String cluster) { + default boolean lockCluster(final Cluster cluster) { return true; } @@ -143,7 +143,7 @@ default boolean lockCluster(final String cluster) { * @param cluster which cluster to unlock * @return false on error */ - default boolean unlockCluster(final String cluster) { + default boolean unlockCluster(final Cluster cluster) { return true; } @@ -153,7 +153,7 @@ default boolean unlockCluster(final String cluster) { * @param cluster which cluster to remove * @return false on error */ - default boolean removeCluster(final String cluster) { + default boolean removeCluster(final Cluster cluster) { return true; } @@ -188,12 +188,12 @@ default boolean keepTaskAlive(final long id) { * Return all tasks in the state and associated with clusterName. * If clusterName is null, all clusters are included. * If state is null, all states are included. - * @param clusterName which cluster to peek tasks for + * @param cluster which cluster to peek tasks for * @param state peek tasks in this state only * @return the list of tasks found */ - default List peekTasks(final String clusterName, - final Integer state) { + default List peekTasks(final Cluster cluster, + final Integer state) { return new ArrayList<>(); } @@ -219,7 +219,7 @@ default boolean removeTask(long id) { * Return all clusters managed by this task queue. * @return a set of cluster names */ - default Set getAllClusters() { + default Set getAllClusters() { return Collections.emptySet(); } } diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/bean/ClusterBean.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/bean/ClusterBean.java index 184c6f4b..c0eeda34 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/bean/ClusterBean.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/bean/ClusterBean.java @@ -17,6 +17,7 @@ package com.pinterest.rocksplicator.controller.bean; +import com.pinterest.rocksplicator.controller.Cluster; import org.hibernate.validator.constraints.NotEmpty; import java.util.ArrayList; @@ -29,19 +30,18 @@ public class ClusterBean { @NotEmpty - /** name of the cluster */ - private String name; + private Cluster cluster; /** list of segments in this cluster */ private List segments = Collections.emptyList(); - public String getName() { - return name; + public ClusterBean setCluster(Cluster cluster) { + this.cluster = cluster; + return this; } - public ClusterBean setName(String name) { - this.name = name; - return this; + public Cluster getCluster() { + return cluster; } public List getSegments() { @@ -55,7 +55,7 @@ public ClusterBean setSegments(List segments) { @Override public String toString() { - return name; + return cluster.toString(); } } diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/bean/ConsistentHashRingsBean.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/bean/ConsistentHashRingsBean.java index 73267ae5..6a90d6b5 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/bean/ConsistentHashRingsBean.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/bean/ConsistentHashRingsBean.java @@ -16,6 +16,7 @@ package com.pinterest.rocksplicator.controller.bean; +import com.pinterest.rocksplicator.controller.Cluster; import org.hibernate.validator.constraints.NotEmpty; import java.util.ArrayList; @@ -30,16 +31,16 @@ public class ConsistentHashRingsBean { @NotEmpty - private String name; + private Cluster cluster; private List consistentHashRings = Collections.emptyList(); - public String getName() { - return name; + public Cluster getCluster() { + return cluster; } - public ConsistentHashRingsBean setName(String name) { - this.name = name; + public ConsistentHashRingsBean setCluster(Cluster cluster) { + this.cluster = cluster; return this; } @@ -55,7 +56,7 @@ public ConsistentHashRingsBean setConsistentHashRings( @Override public String toString() { - return name; + return cluster.toString(); } } diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/config/ConfigParser.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/config/ConfigParser.java index 877a52ae..f3d6b8f4 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/config/ConfigParser.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/config/ConfigParser.java @@ -16,6 +16,7 @@ package com.pinterest.rocksplicator.controller.config; +import com.pinterest.rocksplicator.controller.Cluster; import com.pinterest.rocksplicator.controller.bean.ClusterBean; import com.pinterest.rocksplicator.controller.bean.HostBean; import com.pinterest.rocksplicator.controller.bean.Role; @@ -53,12 +54,13 @@ private ConfigParser() { /** * Convert cluster config data into a {@link ClusterBean}. * - * @param clusterName name of the cluster + * @param cluster * @param content binary config data * @return ClusterBean or null if parsing failed */ @SuppressWarnings("unchecked") - public static ClusterBean parseClusterConfig(String clusterName, byte[] content) { + public static ClusterBean parseClusterConfig(Cluster cluster, + byte[] content) { try { Map segmentMap = OBJECT_MAPPER.readValue(new String(content, UTF_8), HashMap.class); @@ -87,7 +89,7 @@ public static ClusterBean parseClusterConfig(String clusterName, byte[] content) segment.setHosts(hosts); segments.add(segment); } - return new ClusterBean().setName(clusterName).setSegments(segments); + return new ClusterBean().setCluster(cluster).setSegments(segments); } catch (IOException | IllegalArgumentException e) { LOG.error("Failed to parse cluster config.", e); return null; diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/config/ConsistentHashRingsConfigParser.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/config/ConsistentHashRingsConfigParser.java index 5254cca5..fff9dcea 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/config/ConsistentHashRingsConfigParser.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/config/ConsistentHashRingsConfigParser.java @@ -16,6 +16,7 @@ package com.pinterest.rocksplicator.controller.config; +import com.pinterest.rocksplicator.controller.Cluster; import com.pinterest.rocksplicator.controller.bean.ConsistentHashRingBean; import com.pinterest.rocksplicator.controller.bean.ConsistentHashRingsBean; @@ -78,7 +79,7 @@ private ConsistentHashRingsConfigParser() {} @SuppressWarnings("unchecked") public static ConsistentHashRingsBean parseConsistentHashRingsConfig( - String clusterName, byte[] content) { + Cluster cluster, byte[] content) { try { Map ringsMap = OBJECT_MAPPER.readValue( new String(content, UTF_8), HashMap.class); @@ -100,7 +101,9 @@ public static ConsistentHashRingsBean parseConsistentHashRingsConfig( .setName(entry.getKey()); rings.add(consistentHashRingBean); } - return new ConsistentHashRingsBean().setConsistentHashRings(rings).setName(clusterName); + return new ConsistentHashRingsBean() + .setConsistentHashRings(rings) + .setCluster(cluster); } catch (IOException e) { LOG.error("Failed to parse consistent hash ring config.", e); return null; diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java index 5451dd91..20dc7bad 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java @@ -17,11 +17,13 @@ package com.pinterest.rocksplicator.controller.mysql; import com.google.common.collect.ImmutableMap; +import com.pinterest.rocksplicator.controller.Cluster; import com.pinterest.rocksplicator.controller.Task; import com.pinterest.rocksplicator.controller.TaskBase; import com.pinterest.rocksplicator.controller.TaskQueue; import com.pinterest.rocksplicator.controller.bean.TaskState; import com.pinterest.rocksplicator.controller.mysql.entity.TagEntity; +import com.pinterest.rocksplicator.controller.mysql.entity.TagId; import com.pinterest.rocksplicator.controller.mysql.entity.TaskEntity; import org.apache.commons.lang3.time.DateUtils; import org.slf4j.Logger; @@ -74,7 +76,6 @@ void beginTransaction() { } } - public MySQLTaskQueue(String jdbcUrl, String dbUser, String dbPassword) { this.entityManagerFactory = Persistence.createEntityManagerFactory( JDBC_CONFIGS.PERSISTENCE_UNIT_NAME, new ImmutableMap.Builder() @@ -91,7 +92,8 @@ static Task convertTaskEntityToTask(TaskEntity taskEntity) { return new Task() .setId(taskEntity.getId()) .setState(taskEntity.getState()) - .setClusterName(taskEntity.getCluster().getName()) + .setCluster(new Cluster(taskEntity.getCluster().getNamespace(), + taskEntity.getCluster().getName())) .setName(taskEntity.getName()) .setCreatedAt(taskEntity.getCreatedAt()) .setRunAfter(taskEntity.getRunAfter()) @@ -104,103 +106,105 @@ static Task convertTaskEntityToTask(TaskEntity taskEntity) { @Override - public boolean createCluster(final String clusterName) { - TagEntity cluster = getEntityManager().find(TagEntity.class, clusterName); - if (cluster != null) { - LOG.error("Cluster {} is already existed", clusterName); + public boolean createCluster(final Cluster cluster) { + TagEntity tagEntity = getEntityManager().find(TagEntity.class, new TagId(cluster)); + if (tagEntity != null) { + LOG.error("Cluster {} is already existed", cluster); return false; } - TagEntity newCluster = new TagEntity().setName(clusterName); + TagEntity newCluster = new TagEntity().setName(cluster.getName()) + .setNamespace(cluster.getNamespace()); beginTransaction(); getEntityManager().persist(newCluster); getEntityManager().getTransaction().commit(); return true; } + @Override - public boolean lockCluster(final String clusterName) { + public boolean lockCluster(final Cluster cluster) { beginTransaction(); - TagEntity cluster = getEntityManager().find( - TagEntity.class, clusterName, LockModeType.PESSIMISTIC_WRITE); + TagEntity tagEntity = getEntityManager().find( + TagEntity.class, new TagId(cluster), LockModeType.PESSIMISTIC_WRITE); try { - if (cluster == null) { - LOG.error("Cluster {} hasn't been created", clusterName); + if (tagEntity == null) { + LOG.error("Cluster {} hasn't been created", cluster); throw new MySQLTaskQueueException(); } - if (cluster.getLocks() == 1) { - LOG.error("Cluster {} is already locked, cannot double lock", clusterName); + if (tagEntity.getLocks() == 1) { + LOG.error("Cluster {} is already locked, cannot double lock", cluster); throw new MySQLTaskQueueException(); } } catch (MySQLTaskQueueException e) { getEntityManager().getTransaction().rollback(); return false; } - cluster.setLocks(1); - getEntityManager().persist(cluster); + tagEntity.setLocks(1); + getEntityManager().persist(tagEntity); getEntityManager().getTransaction().commit(); return true; } @Override - public boolean unlockCluster(final String clusterName) { + public boolean unlockCluster(final Cluster cluster) { beginTransaction(); - TagEntity cluster = getEntityManager().find( - TagEntity.class, clusterName, LockModeType.PESSIMISTIC_WRITE); + TagEntity tagEntity = getEntityManager().find( + TagEntity.class, new TagId(cluster), LockModeType.PESSIMISTIC_WRITE); if (cluster == null) { - LOG.error("Cluster {} hasn't been created", clusterName); + LOG.error("Cluster {} hasn't been created", cluster); getEntityManager().getTransaction().rollback(); return false; } - cluster.setLocks(0); - getEntityManager().persist(cluster); + tagEntity.setLocks(0); + getEntityManager().persist(tagEntity); getEntityManager().getTransaction().commit(); return true; } @Override - public boolean removeCluster(final String clusterName) { + public boolean removeCluster(final Cluster cluster) { beginTransaction(); - TagEntity cluster = getEntityManager().find( - TagEntity.class, clusterName, LockModeType.PESSIMISTIC_WRITE); + TagEntity tagEntity = getEntityManager().find( + TagEntity.class, new TagId(cluster), LockModeType.PESSIMISTIC_WRITE); try { - if (cluster == null) { - LOG.error("Cluster {} hasn't been created", clusterName); + if (tagEntity == null) { + LOG.error("Cluster {} hasn't been created", cluster); throw new MySQLTaskQueueException(); } - if (cluster.getLocks() == 1) { - LOG.error("Cluster {} is already locked, cannot remove.", clusterName); + if (tagEntity.getLocks() == 1) { + LOG.error("Cluster {} is already locked, cannot remove.", cluster); throw new MySQLTaskQueueException(); } } catch (MySQLTaskQueueException e) { getEntityManager().getTransaction().rollback(); return false; } - getEntityManager().remove(cluster); + getEntityManager().remove(tagEntity); getEntityManager().getTransaction().commit(); return true; } @Override - public Set getAllClusters() { + public Set getAllClusters() { Query query = getEntityManager().createNamedQuery("tag.findAll"); - List result = query.getResultList(); - Set clusterNames = new HashSet<>(); - result.stream().forEach(name -> { - clusterNames.add(name); + List result = query.getResultList(); + Set clusters = new HashSet<>(); + result.stream().forEach(fields -> { + clusters.add(new Cluster((String)fields[0], (String)fields[1])); }); - return clusterNames; + return clusters; } private TaskEntity enqueueTaskImpl(final TaskBase taskBase, - final String clusterName, + final Cluster cluster, final int runDelaySeconds, final TaskState state, final String claimedWorker) { - TagEntity cluster = getEntityManager().find( - TagEntity.class, clusterName, LockModeType.PESSIMISTIC_WRITE); + TagEntity tagEntity = getEntityManager().find( + TagEntity.class, new TagId(cluster), LockModeType.PESSIMISTIC_WRITE); if (cluster == null) { - LOG.error("Cluster {} is not created", clusterName); + LOG.error("Cluster {} is not created", cluster); getEntityManager().getTransaction().rollback(); return null; } @@ -208,7 +212,7 @@ private TaskEntity enqueueTaskImpl(final TaskBase taskBase, .setName(taskBase.name) .setPriority(taskBase.priority) .setBody(taskBase.body) - .setCluster(cluster) + .setCluster(tagEntity) .setLastAliveAt(new Date()) .setClaimedWorker(claimedWorker) .setState(state.intValue()) @@ -220,10 +224,10 @@ private TaskEntity enqueueTaskImpl(final TaskBase taskBase, @Override public boolean enqueueTask(final TaskBase taskBase, - final String clusterName, + final Cluster cluster, final int runDelaySeconds) { beginTransaction(); - TaskEntity taskEntity = enqueueTaskImpl(taskBase, clusterName, runDelaySeconds, TaskState + TaskEntity taskEntity = enqueueTaskImpl(taskBase, cluster, runDelaySeconds, TaskState .PENDING, null); if (taskEntity == null) { return false; @@ -255,7 +259,7 @@ public Task dequeueTask(final String worker) { return convertTaskEntityToTask(claimedTask); } - private String ackTask(final long id, + private Cluster ackTask(final long id, final String output, TaskState ackState, boolean unlockCluster) { @@ -278,23 +282,23 @@ private String ackTask(final long id, cluster.setLocks(0); getEntityManager().persist(cluster); } - return cluster.getName(); + return new Cluster(cluster.getNamespace(), cluster.getName()); } @Override public boolean finishTask(final long id, final String output) { beginTransaction(); - String clusterName = ackTask(id, output, TaskState.DONE, true); + Cluster cluster = ackTask(id, output, TaskState.DONE, true); getEntityManager().getTransaction().commit(); - return clusterName != null; + return cluster != null; } @Override public boolean failTask(final long id, final String reason) { beginTransaction(); - String clusterName = ackTask(id, reason, TaskState.FAILED, true); + Cluster cluster = ackTask(id, reason, TaskState.FAILED, true); getEntityManager().getTransaction().commit(); - return clusterName != null; + return cluster != null; } @Override @@ -340,11 +344,11 @@ public long finishTaskAndEnqueueRunningTask(final long id, final TaskBase newTaskBase, final String worker) { beginTransaction(); - String clusterName = ackTask(id, output, TaskState.DONE, false); - if (clusterName == null) { + Cluster cluster = ackTask(id, output, TaskState.DONE, false); + if (cluster == null) { return -1; } - TaskEntity entity = enqueueTaskImpl(newTaskBase, clusterName, 0, TaskState.RUNNING, worker); + TaskEntity entity = enqueueTaskImpl(newTaskBase, cluster, 0, TaskState.RUNNING, worker); getEntityManager().getTransaction().commit(); return entity.getId(); } @@ -355,11 +359,11 @@ private boolean ackTaskAndEnqueuePendingTask(final long id, final int runDelaySeconds, TaskState ackState) { beginTransaction(); - String clusterName = ackTask(id, output, ackState, true); - if (clusterName == null) { + Cluster cluster = ackTask(id, output, ackState, true); + if (cluster == null) { return false; } - TaskEntity entity = enqueueTaskImpl(newTaskBase, clusterName, runDelaySeconds, + TaskEntity entity = enqueueTaskImpl(newTaskBase, cluster, runDelaySeconds, TaskState.PENDING, null); if (entity == null) { return false; @@ -386,20 +390,22 @@ public boolean failTaskAndEnqueuePendingTask(final long id, } @Override - public List peekTasks(final String clusterName, final Integer state) { + public List peekTasks(final Cluster cluster, final Integer state) { Query query; - if (clusterName != null && state != null) { + if (cluster != null && state != null) { query = getEntityManager() .createNamedQuery("task.peekTasksFromClusterWithState") - .setParameter("state", state).setParameter("name", clusterName); - }else if (state != null) { + .setParameter("state", state) + .setParameter("namespace", cluster.namespace).setParameter("name", cluster.name); + } else if (state != null) { query = getEntityManager() .createNamedQuery("task.peekTasksWithState") .setParameter("state", state); - }else if (clusterName != null) { + } else if (cluster != null) { + // TODO query = getEntityManager() .createNamedQuery("task.peekTasksFromCluster") - .setParameter("name", clusterName); + .setParameter("name", cluster.name).setParameter("namespace", cluster.namespace); }else{ query = getEntityManager() .createNamedQuery("task.peekAllTasks"); diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TagEntity.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TagEntity.java index 5ad8e228..1bb8ca7d 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TagEntity.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TagEntity.java @@ -19,6 +19,7 @@ import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; +import javax.persistence.IdClass; import javax.persistence.NamedQueries; import javax.persistence.NamedQuery; import javax.persistence.Table; @@ -29,21 +30,27 @@ /** * MySQL tag table schema: + * namespace VARCHAR(128) NOT NULL, * name VARCHAR(128) NOT NULL, * locks TINYINT UNSIGNED NOT NULL, * created_at DATETIME NOT NULL, * owner VARCHAR(256), - * PRIMARY KEY (name) + * PRIMARY KEY (namespace, name) */ - @Entity(name = "tag") +@IdClass(TagId.class) @Table(name = "tag") @NamedQueries({ @NamedQuery(name = "tag.findAll", - query = "SELECT t.name FROM tag t"), + query = "SELECT t.namespace, t.name FROM tag t"), }) public class TagEntity { + @Id + @Column(name = "namespace") + @NotNull + private String namespace; + @Id @Column(name = "name") @NotNull @@ -65,6 +72,13 @@ public TagEntity() { this.createdAt = new Date(); } + public String getNamespace() { return namespace; } + + public TagEntity setNamespace(String namespace) { + this.namespace = namespace; + return this; + } + public String getName() { return name; } diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TagId.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TagId.java new file mode 100644 index 00000000..c1bd2c99 --- /dev/null +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TagId.java @@ -0,0 +1,16 @@ +package com.pinterest.rocksplicator.controller.mysql.entity; + +import com.pinterest.rocksplicator.controller.Cluster; + +/** + * Composite Primary key for Tag table + */ +public class TagId { + String namespace; + String name; + + public TagId(Cluster cluster) { + this.namespace = cluster.getNamespace(); + this.name = cluster.getName(); + } +} diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TaskEntity.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TaskEntity.java index 7ca1436b..0b918cf3 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TaskEntity.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TaskEntity.java @@ -22,6 +22,7 @@ import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.JoinColumn; +import javax.persistence.JoinColumns; import javax.persistence.ManyToOne; import javax.persistence.NamedQueries; import javax.persistence.NamedQuery; @@ -37,6 +38,7 @@ * name VARCHAR(128), * priority TINYINT UNSIGNED NOT NULL, # 0 is the highest priority * state TINYINT UNSIGNED NOT NULL, # 0: Pending, 1: Running, 2: Done, 3: FAILED + * tag_namespace VARCHAR(128) NOT NULL, * tag_name VARCHAR(128) NOT NULL, * body TEXT NOT NULL, * created_at DATETIME NOT NULL, @@ -45,7 +47,7 @@ * last_alive_at DATETIME, * output TEXT, * PRIMARY KEY (id), - * FOREIGN KEY (tag_name) REFERENCES tag(name) ON UPDATE RESTRICT ON DELETE CASCADE + * FOREIGN KEY (tag_namespace, tag_name) REFERENCES tag(namespace, name) ON UPDATE RESTRICT ON DELETE CASCADE */ @Entity (name = "task") @Table (name = "task") @@ -58,12 +60,13 @@ @NamedQuery(name = "task.peekAllTasks", query = "SELECT t FROM task t INNER JOIN t.cluster c"), @NamedQuery(name = "task.peekTasksFromCluster", - query = "SELECT t FROM task t INNER JOIN t.cluster c WHERE c.name = :name"), + query = "SELECT t FROM task t INNER JOIN t.cluster c " + + "WHERE c.name = :name AND c.namespace = :namespace"), @NamedQuery(name = "task.peekTasksWithState", query = "SELECT t FROM task t WHERE t.state = :state"), @NamedQuery(name = "task.peekTasksFromClusterWithState", query = "SELECT t FROM task t INNER JOIN t.cluster c WHERE t.state = :state AND " + - "c.name = :name"), + "c.namespace = :namespace AND c.name = :name"), }) public class TaskEntity { @@ -83,7 +86,10 @@ public class TaskEntity { @NotNull private int state; - @JoinColumn(name="tag_name", referencedColumnName="name") + @JoinColumns({ + @JoinColumn(name="tag_namespace", referencedColumnName="namespace"), + @JoinColumn(name="tag_name", referencedColumnName="name") + }) @ManyToOne @NotNull private TagEntity cluster; diff --git a/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/config/ConsistentHashRingsConfigParserTest.java b/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/config/ConsistentHashRingsConfigParserTest.java index 572b0f89..fc6c8ab2 100644 --- a/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/config/ConsistentHashRingsConfigParserTest.java +++ b/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/config/ConsistentHashRingsConfigParserTest.java @@ -16,6 +16,7 @@ package com.pinterest.rocksplicator.controller.config; +import com.pinterest.rocksplicator.controller.Cluster; import com.pinterest.rocksplicator.controller.bean.ConsistentHashRingBean; import com.pinterest.rocksplicator.controller.bean.ConsistentHashRingsBean; import com.pinterest.rocksplicator.controller.bean.HostBean; @@ -58,8 +59,8 @@ public void testParseConsistentHashRingsConfig() { "}"; ConsistentHashRingsBean consistentHashRingsBean = ConsistentHashRingsConfigParser.parseConsistentHashRingsConfig( - "cluster", config.getBytes()); - Assert.assertEquals(consistentHashRingsBean.getName(), "cluster"); + new Cluster("namespace", "cluster"), config.getBytes()); + Assert.assertEquals(consistentHashRingsBean.getCluster(), new Cluster("namespace", "cluster")); HashMap> expectedRings = new HashMap<>(); expectedRings.put("us-east-1a_0", Arrays.asList("127.1.1.1:8091", "127.1.1.2:8092")); expectedRings.put("us-east-1a_1", Arrays.asList("127.2.2.2:8093", "127.2.2.3:8094")); @@ -77,6 +78,4 @@ public void testParseConsistentHashRingsConfig() { Assert.assertEquals(hostsList, expectedRings.get(name)); } } - - } diff --git a/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/ControllerConfiguration.java b/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/ControllerConfiguration.java index 755fc1da..8ab772a2 100644 --- a/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/ControllerConfiguration.java +++ b/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/ControllerConfiguration.java @@ -32,7 +32,7 @@ public class ControllerConfiguration extends Configuration { private String zkCluster; @NotEmpty - private String zkPath = "/config/services/rocksdb/"; + private String defaultNamespace = "rocksdb"; @NotEmpty private String jdbcUrl; @@ -53,13 +53,13 @@ public void setZkHostsFile(String zkHostsFile) { } @JsonProperty - public String getZkPath() { - return zkPath; + public String getDefaultNamespace() { + return defaultNamespace; } @JsonProperty - public void setZkPath(String zkPath) { - this.zkPath = zkPath; + public void setDefaultNamespace(String defaultNamespace) { + this.defaultNamespace = defaultNamespace; } @JsonProperty diff --git a/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/ControllerService.java b/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/ControllerService.java index 2d9982f1..c0f6ffce 100644 --- a/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/ControllerService.java +++ b/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/ControllerService.java @@ -63,7 +63,7 @@ public void stop() throws Exception { }); environment.jersey().register( - new Clusters(configuration.getZkPath(), zkClient, taskQueue) + new Clusters(zkClient, taskQueue) ); environment.jersey().register(new Tasks(taskQueue)); diff --git a/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Clusters.java b/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Clusters.java index 85e1fb15..914b5e66 100644 --- a/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Clusters.java +++ b/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Clusters.java @@ -16,6 +16,7 @@ package com.pinterest.rocksplicator.controller.resource; +import com.pinterest.rocksplicator.controller.Cluster; import com.pinterest.rocksplicator.controller.TaskBase; import com.pinterest.rocksplicator.controller.TaskQueue; import com.pinterest.rocksplicator.controller.bean.ClusterBean; @@ -59,18 +60,20 @@ public class Clusters { private static final Logger LOG = LoggerFactory.getLogger(Clusters.class); - private final String zkPath; + private static final String ZK_PREFIX = "/config/services/"; private final CuratorFramework zkClient; private final TaskQueue taskQueue; - public Clusters(String zkPath, - CuratorFramework zkClient, + public Clusters(CuratorFramework zkClient, TaskQueue taskQueue) { - this.zkPath = zkPath; this.zkClient = zkClient; this.taskQueue = taskQueue; } + private String getClusterZKPath(final String namespace, final String clusterName) { + return ZK_PREFIX + namespace + "/" + clusterName; + } + /** * Retrieves cluster information by cluster name. * @@ -78,12 +81,13 @@ public Clusters(String zkPath, * @return ClusterBean */ @GET - @Path("/{clusterName : [a-zA-Z0-9\\-_]+}") + @Path("/{namespace: [a-zA-Z0-9\\-_]+}/{clusterName : [a-zA-Z0-9\\-_]+}") @Produces(MediaType.APPLICATION_JSON) - public Response get(@PathParam("clusterName") String clusterName) { + public Response get(@PathParam("namespace") String namespace, + @PathParam("clusterName") String clusterName) { final ClusterBean clusterBean; try { - clusterBean = checkExistenceAndGetClusterBean(clusterName); + clusterBean = checkExistenceAndGetClusterBean(namespace, clusterName); } catch (Exception e) { String message = String.format("Failed to read from zookeeper: %s", e); LOG.error(message); @@ -104,14 +108,14 @@ public Response get(@PathParam("clusterName") String clusterName) { @GET @Produces(MediaType.APPLICATION_JSON) public Response getAll(@QueryParam("verbose") Optional verbose) { - final Set clusters = taskQueue.getAllClusters(); + final Set clusters = taskQueue.getAllClusters(); if (!verbose.isPresent() || verbose.get().equals(Boolean.FALSE)) { return Utils.buildResponse(HttpStatus.OK_200, clusters); } final List clusterBeans = new ArrayList<>(clusters.size()); try { - for (String cluster : clusters) { - ClusterBean clusterBean = checkExistenceAndGetClusterBean(cluster); + for (Cluster cluster : clusters) { + ClusterBean clusterBean = checkExistenceAndGetClusterBean(cluster.namespace, cluster.name); if (clusterBean != null) { clusterBeans.add(clusterBean); } @@ -132,11 +136,13 @@ public Response getAll(@QueryParam("verbose") Optional verbose) { * @param clusterName name of the cluster */ @POST - @Path("/initialize/{clusterName : [a-zA-Z0-9\\-_]+}") + @Path("/initialize/{namespace: [a-zA-Z0-9\\-_]+}/{clusterName : [a-zA-Z0-9\\-_]+}") @Produces(MediaType.APPLICATION_JSON) - public Response initialize(@PathParam("clusterName") String clusterName) { + public Response initialize(@PathParam("namespace") String namespace, + @PathParam("clusterName") String clusterName, + @QueryParam("zkPrefix") Optional zkPrefix) { // Create directly, we dont - if (taskQueue.createCluster(clusterName)) { + if (taskQueue.createCluster(new Cluster(namespace, clusterName))) { return Utils.buildResponse(HttpStatus.OK_200, ImmutableMap.of("data", true)); } else { String message = String.format("Cluster %s is already existed", clusterName); @@ -150,10 +156,11 @@ public Response initialize(@PathParam("clusterName") String clusterName) { * @param clusterName name of the cluster */ @POST - @Path("/remove/{clusterName : [a-zA-Z0-9\\-_]+}") + @Path("/remove/{namespace: [a-zA-Z0-9\\-_]+}/{clusterName : [a-zA-Z0-9\\-_]+}") @Produces(MediaType.APPLICATION_JSON) - public Response remove(@PathParam("clusterName") String clusterName) { - if (taskQueue.removeCluster(clusterName)) { + public Response remove(@PathParam("namespace") String namespace, + @PathParam("clusterName") String clusterName) { + if (taskQueue.removeCluster(new Cluster(namespace, clusterName))) { return Utils.buildResponse(HttpStatus.OK_200, ImmutableMap.of("data", true)); } else { String message = String.format("Cluster %s is already locked, cannot remove", clusterName); @@ -172,11 +179,12 @@ public Response remove(@PathParam("clusterName") String clusterName) { * @param newHostOp (optional) new host to add, in the format of ip:port */ @POST - @Path("/replaceHost/{clusterName : [a-zA-Z0-9\\-_]+}") + @Path("/replaceHost/{namespace: [a-zA-Z0-9\\-_]+}/{clusterName : [a-zA-Z0-9\\-_]+}") @Produces(MediaType.APPLICATION_JSON) - public Response replaceHost(@PathParam("clusterName") String clusterName, - @NotEmpty @QueryParam("oldHost") String oldHostString, - @QueryParam("newHost") Optional newHostOp) { + public Response replaceHost(@PathParam("namespace") String namespace, + @PathParam("clusterName") String clusterName, + @NotEmpty @QueryParam("oldHost") String oldHostString, + @QueryParam("newHost") Optional newHostOp) { //TODO(angxu) support adding random new host later if (!newHostOp.isPresent()) { @@ -203,7 +211,7 @@ public Response replaceHost(@PathParam("clusterName") String clusterName, //TODO(angxu) Add .retry(maxRetry) if necessary .getEntity(); - taskQueue.enqueueTask(task, clusterName, 0); + taskQueue.enqueueTask(task, new Cluster(namespace, clusterName), 0); return Utils.buildResponse(HttpStatus.OK_200, ImmutableMap.of("data", true)); } catch (JsonProcessingException e) { String message = "Cannot serialize parameters"; @@ -222,9 +230,10 @@ public Response replaceHost(@PathParam("clusterName") String clusterName, * @param rateLimit s3 download size limit in mb */ @POST - @Path("/loadData/{clusterName : [a-zA-Z0-9\\-_]+}") + @Path("/loadData/{namespace: [a-zA-Z0-9\\-_]+}/{clusterName : [a-zA-Z0-9\\-_]+}") @Produces(MediaType.APPLICATION_JSON) - public Response loadData(@PathParam("clusterName") String clusterName, + public Response loadData(@PathParam("namespace") String namespace, + @PathParam("clusterName") String clusterName, @NotEmpty @QueryParam("segmentName") String segmentName, @NotEmpty @QueryParam("s3Bucket") String s3Bucket, @NotEmpty @QueryParam("s3Prefix") String s3Prefix, @@ -233,7 +242,7 @@ public Response loadData(@PathParam("clusterName") String clusterName, try { TaskBase task = new LoadSSTTask(segmentName, s3Bucket, s3Prefix, concurrency.orElse(20), rateLimit.orElse(64)).getEntity(); - taskQueue.enqueueTask(task, clusterName, 0); + taskQueue.enqueueTask(task, new Cluster(namespace, clusterName), 0); return Utils.buildResponse(HttpStatus.OK_200, ImmutableMap.of("data", true)); } catch (JsonProcessingException e) { String message = "Cannot serialize parameters"; @@ -250,10 +259,11 @@ public Response loadData(@PathParam("clusterName") String clusterName, * @return true if the given cluster is locked, false otherwise */ @POST - @Path("/lock/{clusterName : [a-zA-Z0-9\\-_]+}") + @Path("/lock/{namespace: [a-zA-Z0-9\\-_]+}/{clusterName : [a-zA-Z0-9\\-_]+}") @Produces(MediaType.APPLICATION_JSON) - public Response lock(@PathParam("clusterName") String clusterName) { - if (taskQueue.lockCluster(clusterName)) { + public Response lock(@PathParam("namespace") String namespace, + @PathParam("clusterName") String clusterName) { + if (taskQueue.lockCluster(new Cluster(namespace, clusterName))) { return Utils.buildResponse(HttpStatus.OK_200, ImmutableMap.of("data", true)); } else { String message = String.format("Cluster %s is already locked, cannot double lock", clusterName); @@ -268,10 +278,11 @@ public Response lock(@PathParam("clusterName") String clusterName) { * @return true if the given cluster is unlocked, false otherwise */ @POST - @Path("/unlock/{clusterName : [a-zA-Z0-9\\-_]+}") + @Path("/unlock/{namespace: [a-zA-Z0-9\\-_]+}/{clusterName : [a-zA-Z0-9\\-_]+}") @Produces(MediaType.APPLICATION_JSON) - public Response unlock(@PathParam("clusterName") String clusterName) { - if (taskQueue.unlockCluster(clusterName)) { + public Response unlock(@PathParam("namespace") String namespace, + @PathParam("clusterName") String clusterName) { + if (taskQueue.unlockCluster(new Cluster(namespace, clusterName))) { return Utils.buildResponse(HttpStatus.OK_200, ImmutableMap.of("data", true)); } else { String message = String.format("Cluster %s is not created yet", clusterName); @@ -286,13 +297,14 @@ public Response unlock(@PathParam("clusterName") String clusterName) { * @throws Exception */ @POST - @Path("/logging/{clusterName : [a-zA-Z0-9\\-_]+}") + @Path("/logging/{namespace: [a-zA-Z0-9\\-_]+}/{clusterName : [a-zA-Z0-9\\-_]+}") @Produces(MediaType.APPLICATION_JSON) - public Response sendLogTask(@PathParam("clusterName") String clusterName, - @NotEmpty @QueryParam("message") String message) { + public Response sendLogTask(@PathParam("namespace") String namespace, + @PathParam("clusterName") String clusterName, + @NotEmpty @QueryParam("message") String message) { try { TaskBase task = new LoggingTask(message).getEntity(); - taskQueue.enqueueTask(task, clusterName, 0); + taskQueue.enqueueTask(task, new Cluster(namespace, clusterName), 0); return Utils.buildResponse(HttpStatus.OK_200, ImmutableMap.of("data", true)); } catch (JsonProcessingException e) { String errorMessage = "Cannot serialize parameters"; @@ -308,9 +320,10 @@ public Response sendLogTask(@PathParam("clusterName") String clusterName, * @param zkPrefix if not specified, use DEFAULT_ZK_PATH in WorkerConfig. */ @POST - @Path("/healthcheck/{clusterName : [a-zA-Z0-9\\-_]+}") + @Path("/healthcheck/{namespace: [a-zA-Z0-9\\-_]+}/{clusterName : [a-zA-Z0-9\\-_]+}") @Produces(MediaType.APPLICATION_JSON) - public Response healthcheck(@PathParam("clusterName") String clusterName, + public Response healthcheck(@PathParam("namespace") String namespace, + @PathParam("clusterName") String clusterName, @QueryParam("interval") Optional intervalSeconds, @QueryParam("zkPrefix") Optional zkPrefix) { try { @@ -320,7 +333,7 @@ public Response healthcheck(@PathParam("clusterName") String clusterName, } TaskBase healthCheckTask = new HealthCheckTask(param) .recur(intervalSeconds.orElse(0)).getEntity(); - taskQueue.enqueueTask(healthCheckTask, clusterName, 0); + taskQueue.enqueueTask(healthCheckTask, new Cluster(namespace, clusterName), 0); return Utils.buildResponse(HttpStatus.OK_200, ImmutableMap.of("data", true)); } catch (JsonProcessingException e) { String message = "Cannot serialize parameters"; @@ -337,16 +350,17 @@ public Response healthcheck(@PathParam("clusterName") String clusterName, * @return */ @POST - @Path("/configcheck/{clusterName: [a-zA-Z0-9\\-_]+}") + @Path("/configcheck/{namespace: [a-zA-Z0-9\\-_]+}/{clusterName: [a-zA-Z0-9\\-_]+}") @Produces(MediaType.APPLICATION_JSON) - public Response configCheck(@PathParam("clusterName") String clusterName, + public Response configCheck(@PathParam("namespace") String namespace, + @PathParam("clusterName") String clusterName, @QueryParam("interval") Optional intervalSeconds, @QueryParam("replicas") Optional numReplicas) { try { ConfigCheckTask.Param param = new ConfigCheckTask.Param().setNumReplicas(numReplicas.orElse(3)); TaskBase configCheckTask = new ConfigCheckTask(param).recur(intervalSeconds.orElse(0)).getEntity(); - taskQueue.enqueueTask(configCheckTask, clusterName, 0); + taskQueue.enqueueTask(configCheckTask, new Cluster(namespace, clusterName), 0); return Utils.buildResponse(HttpStatus.OK_200, ImmutableMap.of("data", true)); } catch (JsonProcessingException e) { String message = "Cannot serialize parameters: " + e.getMessage(); @@ -356,15 +370,16 @@ public Response configCheck(@PathParam("clusterName") String clusterName, } - private ClusterBean checkExistenceAndGetClusterBean(String clusterName) throws Exception { - if (zkClient.checkExists().forPath(zkPath + clusterName) == null) { - LOG.error("Znode {} doesn't exist.", zkPath + clusterName); + private ClusterBean checkExistenceAndGetClusterBean( + String namespace, String clusterName) throws Exception { + if (zkClient.checkExists().forPath(getClusterZKPath(namespace, clusterName)) == null) { + LOG.error("Znode {} doesn't exist.", getClusterZKPath(namespace, clusterName)); return null; } - byte[] data = zkClient.getData().forPath(zkPath + clusterName); - ClusterBean clusterBean = ConfigParser.parseClusterConfig(clusterName, data); + byte[] data = zkClient.getData().forPath(getClusterZKPath(namespace, clusterName)); + ClusterBean clusterBean = ConfigParser.parseClusterConfig(new Cluster(namespace, clusterName), data); if (clusterBean == null) { - LOG.error("Failed to parse config for cluster {}.", clusterName); + LOG.error("Failed to parse config for cluster {}/{}.", namespace, clusterName); } return clusterBean; } diff --git a/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Tasks.java b/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Tasks.java index bd87b1ef..a2bea924 100644 --- a/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Tasks.java +++ b/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Tasks.java @@ -16,6 +16,7 @@ package com.pinterest.rocksplicator.controller.resource; +import com.pinterest.rocksplicator.controller.Cluster; import com.pinterest.rocksplicator.controller.Task; import com.pinterest.rocksplicator.controller.TaskQueue; import com.pinterest.rocksplicator.controller.bean.TaskState; @@ -67,7 +68,7 @@ public Response get(@PathParam("id") Long id) { } /** - * Retrieves all the tasks that match the given cluster name and/or + * Retrieves all the tasks that match the given cluster namespace/name and/or * the {@link TaskState state}. * * @param clusterName name of the cluster being queried @@ -76,9 +77,10 @@ public Response get(@PathParam("id") Long id) { */ @GET @Produces(MediaType.APPLICATION_JSON) - public Response findTasks(@QueryParam("clusterName") Optional clusterName, - @QueryParam("state") Optional state) { - List result = taskQueue.peekTasks(clusterName.orElse(null), + public Response findTasks(@QueryParam("namespace") Optional namespace, + @QueryParam("clusterName") Optional clusterName, + @QueryParam("state") Optional state) { + List result = taskQueue.peekTasks(new Cluster(namespace.get(), clusterName.get()), state.map(TaskState::intValue).orElse(null)); return Utils.buildResponse(HttpStatus.OK_200, result); } diff --git a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/WorkerConfig.java b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/WorkerConfig.java index dabb5114..5934ed5a 100644 --- a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/WorkerConfig.java +++ b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/WorkerConfig.java @@ -46,7 +46,7 @@ public final class WorkerConfig { private static final String SENDER_EMAIL_ADDRESS_KEY = "sender_email"; private static final String RECEIVER_EMAIL_ADDRESS_KEY = "receiver_email"; - private static final String DEFAULT_ZK_PATH = "/config/services/rocksdb/"; + private static final String DEFAULT_ZK_PATH = "/config/services/"; private static final String DEFAULT_ZK_ENDPOINTS = "observerzookeeper010:2181"; private static final String DEFAULT_ZK_HOSTS_FILE_PATH = "bin/zookeeper_hosts.conf"; private static final String DEFAULT_ZK_CLUSTER = "default"; diff --git a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/WorkerPool.java b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/WorkerPool.java index 363b43b0..96b850b7 100644 --- a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/WorkerPool.java +++ b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/WorkerPool.java @@ -40,7 +40,7 @@ public final class WorkerPool { // TODO: graceful shutdown. private static final Logger LOG = LoggerFactory.getLogger(WorkerPool.class); private final Semaphore idleWorkersSemaphore; - private final ConcurrentHashMap runningTasks; + private final ConcurrentHashMap runningTasks; private final ExecutorService executorService; private final TaskQueue taskQueue; @@ -67,7 +67,7 @@ public synchronized boolean assignTask(Task task) throws Exception { Future future = executorService.submit(() -> { try { - final Context ctx = new Context(task.id, task.clusterName, taskQueue, + final Context ctx = new Context(task.id, task.cluster, taskQueue, WorkerConfig.getHostName() + ":" + Thread.currentThread().getName()); baseTask.process(ctx); LOG.info("Finished processing task {}.", task.name); @@ -77,12 +77,12 @@ public synchronized boolean assignTask(Task task) throws Exception { // completed by itself. Therefore, the result of this operation is ignored. taskQueue.failTask(task.id, t.getMessage()); } finally { - runningTasks.remove(task.clusterName); + runningTasks.remove(task.cluster); idleWorkersSemaphore.release(); } }); - runningTasks.put(task.clusterName, future); + runningTasks.put(task.cluster, future); return true; } diff --git a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/AddHostTask.java b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/AddHostTask.java index a2658b9f..910d9a5b 100644 --- a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/AddHostTask.java +++ b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/AddHostTask.java @@ -86,7 +86,8 @@ public AddHostTask(Param param) { @Override public void process(Context ctx) throws Exception { - final String clusterName = ctx.getCluster(); + // TODO(shu): unify the HDFS naming. + final String clusterName = ctx.getCluster().getNamespace() + "/" + ctx.getCluster().getName(); final String hdfsDir = getParameter().getHdfsDir(); final HostBean hostToAdd = getParameter().getHostToAdd(); final int rateLimitMbs = getParameter().getRateLimitMbs(); @@ -101,7 +102,7 @@ public void process(Context ctx) throws Exception { return; } - ClusterBean clusterBean = ZKUtil.getClusterConfig(zkClient, clusterName); + ClusterBean clusterBean = ZKUtil.getClusterConfig(zkClient, ctx.getCluster()); if (clusterBean == null) { ctx.getTaskQueue().failTask(ctx.getId(), "Failed to read cluster config from zookeeper."); return; diff --git a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/ChainedTask.java b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/ChainedTask.java index 1cf00d36..75f39d30 100644 --- a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/ChainedTask.java +++ b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/ChainedTask.java @@ -49,7 +49,6 @@ public int getPriority() { @Override public void process(Context ctx) throws Exception { long id = ctx.getId(); - final String cluster = ctx.getCluster(); final String worker = ctx.getWorker(); final TaskQueue taskQueue = ctx.getTaskQueue(); @@ -69,7 +68,7 @@ public void process(Context ctx) throws Exception { tasks.push(chainedTask.getParameter().getT1()); } else { LocalAckTaskQueue lq = new LocalAckTaskQueue(taskQueue); - ctx = new Context(id, cluster, lq, worker); + ctx = new Context(id, ctx.getCluster(), lq, worker); try { task.process(ctx); } catch (Exception ex) { diff --git a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/ConfigCheckTask.java b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/ConfigCheckTask.java index c6ff9568..59c634ea 100644 --- a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/ConfigCheckTask.java +++ b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/ConfigCheckTask.java @@ -49,8 +49,7 @@ public ConfigCheckTask(Param param) { @Override public void process(Context ctx) throws Exception { - final String clusterName = ctx.getCluster(); - ClusterBean clusterBean = ZKUtil.getClusterConfig(zkClient, clusterName); + ClusterBean clusterBean = ZKUtil.getClusterConfig(zkClient, ctx.getCluster()); if (clusterBean == null) { ctx.getTaskQueue().failTask(ctx.getId(), "Failed to read cluster config from zookeeper."); return; @@ -59,13 +58,13 @@ public void process(Context ctx) throws Exception { for (SegmentBean segmentBean : clusterBean.getSegments()) { checkSegment(segmentBean, getParameter().getNumReplicas()); } - ctx.getTaskQueue().finishTask(ctx.getId(), "Cluster " + clusterName + " has good config"); + ctx.getTaskQueue().finishTask(ctx.getId(), "Cluster " + ctx.getCluster() + " has good config"); } catch (Exception e) { String errorMessage = String.format("Cluster %s doesn't have good shard distribution, reason = %s,", - clusterName, e.getMessage()); + ctx.getCluster(), e.getMessage()); ctx.getTaskQueue().failTask(ctx.getId(), errorMessage); - emailSender.sendEmail("Config Check failed for " + clusterName, errorMessage); + emailSender.sendEmail("Config Check failed for " + ctx.getCluster(), errorMessage); } } diff --git a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/Context.java b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/Context.java index 1f2a9bba..957c924d 100644 --- a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/Context.java +++ b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/Context.java @@ -16,6 +16,7 @@ package com.pinterest.rocksplicator.controller.tasks; +import com.pinterest.rocksplicator.controller.Cluster; import com.pinterest.rocksplicator.controller.TaskQueue; @@ -27,11 +28,11 @@ public class Context { private long id; - private String cluster; + private Cluster cluster; private String worker; private TaskQueue taskQueue; - public Context(long id, String cluster, TaskQueue taskQueue, String worker) { + public Context(long id, Cluster cluster, TaskQueue taskQueue, String worker) { this.id = id; this.cluster = cluster; this.worker = worker; @@ -42,7 +43,7 @@ public long getId() { return id; } - public String getCluster() { + public Cluster getCluster() { return cluster; } diff --git a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/HealthCheckTask.java b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/HealthCheckTask.java index 9ef7e14e..2f17f8fa 100644 --- a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/HealthCheckTask.java +++ b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/HealthCheckTask.java @@ -22,10 +22,7 @@ import com.pinterest.rocksplicator.controller.util.AdminClientFactory; import com.pinterest.rocksplicator.controller.util.EmailSender; import com.pinterest.rocksplicator.controller.util.ZKUtil; -import com.pinterest.rocksplicator.controller.WorkerConfig; - -import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.curator.framework.CuratorFramework; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -46,7 +43,7 @@ * * @author Ang Xu (angxu@pinterest.com) */ -public class HealthCheckTask extends AbstractTask { +public class HealthCheckTask extends AbstractTask { public static final Logger LOG = LoggerFactory.getLogger(HealthCheckTask.class); @@ -62,18 +59,16 @@ public class HealthCheckTask extends AbstractTask { /** * Construct a new HealthCheckTask with number of replicas equals to 3 */ - public HealthCheckTask() { this(new HealthCheckTask.Param());} + public HealthCheckTask() { this(new Parameter());} - public HealthCheckTask(HealthCheckTask.Param param) { + public HealthCheckTask(Parameter param) { super(param); } @Override public void process(Context ctx) { - final String clusterName = ctx.getCluster(); try { - ClusterBean clusterBean = ZKUtil.getClusterConfig( - zkClient, getParameter().getZkPrefix(), clusterName); + ClusterBean clusterBean = ZKUtil.getClusterConfig(zkClient, ctx.getCluster()); if (clusterBean == null) { ctx.getTaskQueue().failTask(ctx.getId(), "Failed to read cluster config from zookeeper."); return; @@ -100,30 +95,18 @@ public void process(Context ctx) { if (!badHosts.isEmpty()) { String errorMessage = String.format("Unable to ping hosts: %s", badHosts); ctx.getTaskQueue().failTask(ctx.getId(),errorMessage); - emailSender.sendEmail("Healthcheck Failed for " + clusterName, errorMessage); + emailSender.sendEmail("Healthcheck Failed for " + ctx.getCluster(), errorMessage); return; } LOG.info("All hosts are good"); - String output = String.format("Cluster %s is healthy", clusterName); + String output = String.format("Cluster %s is healthy", ctx.getCluster()); ctx.getTaskQueue().finishTask(ctx.getId(), output); } catch (Exception ex) { String errorMessage = - String.format("Cluster %s is unhealthy, reason = %s", clusterName, ex.getMessage()); + String.format("Cluster %s is unhealthy, reason = %s", ctx.getCluster(), ex.getMessage()); ctx.getTaskQueue().failTask(ctx.getId(), errorMessage); - emailSender.sendEmail("Healthcheck Failed for " + clusterName, errorMessage); - } - } - - public static class Param extends Parameter { - @JsonProperty - private String zkPrefix = WorkerConfig.getZKPath(); - - public String getZkPrefix() { return zkPrefix; } - - public Param setZkPrefix(String zkPrefix) { - this.zkPrefix= zkPrefix; - return this; + emailSender.sendEmail("Healthcheck Failed for " + ctx.getCluster(), errorMessage); } } } diff --git a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/LoadSSTTask.java b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/LoadSSTTask.java index fb3d4782..1c726b98 100644 --- a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/LoadSSTTask.java +++ b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/LoadSSTTask.java @@ -75,12 +75,11 @@ public LoadSSTTask(Param param) { @Override public void process(Context ctx) throws Exception { - final String clusterName = ctx.getCluster(); final String segment = getParameter().getSegment(); - ClusterBean clusterBean = ZKUtil.getClusterConfig(zkClient, clusterName); + ClusterBean clusterBean = ZKUtil.getClusterConfig(zkClient, ctx.getCluster()); if (clusterBean == null) { - LOG.error("Failed to get config for cluster {}.", clusterName); + LOG.error("Failed to get config for cluster {}.", ctx.getCluster()); ctx.getTaskQueue().failTask(ctx.getId(), "Failed to read cluster config from zookeeper."); return; } @@ -90,7 +89,7 @@ public void process(Context ctx) throws Exception { .findAny() .orElse(null); if (segmentBean == null) { - String errMsg = String.format("Segment %s not in cluster %s.", segment, clusterName); + String errMsg = String.format("Segment %s not in cluster %s.", segment, ctx.getCluster()); LOG.error(errMsg); ctx.getTaskQueue().failTask(ctx.getId(), errMsg); return; @@ -106,14 +105,14 @@ public void process(Context ctx) throws Exception { doLoadSST(executor, segmentBean, Role.SLAVE); LOG.info("Second pass done."); } catch (InterruptedException | ExecutionException ex) { - LOG.error("Failed to load sst to cluster {}.", clusterName, ex); + LOG.error("Failed to load sst to cluster {}.", ctx.getCluster(), ex); ctx.getTaskQueue().failTask(ctx.getId(), "Failed to load sst, error=" + ex.getMessage()); return; } executor.shutdown(); executor.shutdownNow(); - ctx.getTaskQueue().finishTask(ctx.getId(), "Finished loading sst to " + clusterName); + ctx.getTaskQueue().finishTask(ctx.getId(), "Finished loading sst to " + ctx.getCluster()); } private void doLoadSST(ExecutorService executor, SegmentBean segment, Role role) diff --git a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/LocalAckTaskQueue.java b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/LocalAckTaskQueue.java index 9ac4281a..84b52dc3 100644 --- a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/LocalAckTaskQueue.java +++ b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/LocalAckTaskQueue.java @@ -16,6 +16,7 @@ package com.pinterest.rocksplicator.controller.tasks; +import com.pinterest.rocksplicator.controller.Cluster; import com.pinterest.rocksplicator.controller.Task; import com.pinterest.rocksplicator.controller.TaskBase; import com.pinterest.rocksplicator.controller.TaskQueue; @@ -90,15 +91,15 @@ public boolean failTask(final long id, final String reason) { /** methods below simply delegates calls to {@code taskQueue} **/ @Override - public boolean createCluster(final String clusterName) { - return taskQueue.createCluster(clusterName); + public boolean createCluster(final Cluster cluster) { + return taskQueue.createCluster(cluster); } @Override public boolean enqueueTask(final TaskBase task, - final String clusterName, + final Cluster cluster, final int runDelaySeconds) { - return taskQueue.enqueueTask(task, clusterName, runDelaySeconds); + return taskQueue.enqueueTask(task, cluster, runDelaySeconds); } @Override @@ -123,17 +124,17 @@ public boolean finishTaskAndEnqueuePendingTask(final long id, } @Override - public boolean lockCluster(final String cluster) { + public boolean lockCluster(final Cluster cluster) { return taskQueue.lockCluster(cluster); } @Override - public boolean unlockCluster(final String cluster) { + public boolean unlockCluster(final Cluster cluster) { return taskQueue.unlockCluster(cluster); } @Override - public boolean removeCluster(final String cluster) { + public boolean removeCluster(final Cluster cluster) { return taskQueue.removeCluster(cluster); } @@ -153,9 +154,9 @@ public boolean keepTaskAlive(final long id) { } @Override - public List peekTasks(final String clusterName, - final Integer state) { - return taskQueue.peekTasks(clusterName, state); + public List peekTasks(final Cluster cluster, + final Integer state) { + return taskQueue.peekTasks(cluster, state); } @Override @@ -169,7 +170,7 @@ public boolean removeTask(long id) { } @Override - public Set getAllClusters() { + public Set getAllClusters() { return taskQueue.getAllClusters(); } } diff --git a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/PromoteTask.java b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/PromoteTask.java index 09d9bc96..90bf761e 100644 --- a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/PromoteTask.java +++ b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/PromoteTask.java @@ -65,8 +65,7 @@ public PromoteTask(Param parameter) { @Override public void process(Context ctx) throws Exception { - final String clusterName = ctx.getCluster(); - final ClusterBean clusterBean = ZKUtil.getClusterConfig(zkClient, clusterName); + final ClusterBean clusterBean = ZKUtil.getClusterConfig(zkClient, ctx.getCluster()); boolean updated = false; for (SegmentBean segment : clusterBean.getSegments()) { diff --git a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/RebalanceTask.java b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/RebalanceTask.java index 017e0de0..749b68f0 100644 --- a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/RebalanceTask.java +++ b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/RebalanceTask.java @@ -65,8 +65,7 @@ public RebalanceTask(Param param) { @Override public void process(Context ctx) throws Exception { - final String clusterName = ctx.getCluster(); - final ClusterBean clusterBean = ZKUtil.getClusterConfig(zkClient, clusterName); + final ClusterBean clusterBean = ZKUtil.getClusterConfig(zkClient, ctx.getCluster()); for (SegmentBean segmentBean : clusterBean.getSegments()) { int numHosts = segmentBean.getHosts().size(); @@ -147,7 +146,7 @@ public void process(Context ctx) throws Exception { } } } - ctx.getTaskQueue().finishTask(ctx.getId(), "Successfully rebalanced cluster " + clusterName); + ctx.getTaskQueue().finishTask(ctx.getId(), "Successfully rebalanced cluster " + ctx.getCluster()); } /** diff --git a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/RemoveHostTask.java b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/RemoveHostTask.java index d1090773..e71a0370 100644 --- a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/RemoveHostTask.java +++ b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/tasks/RemoveHostTask.java @@ -71,7 +71,6 @@ public RemoveHostTask(Param param) { @Override public void process(Context ctx) throws Exception { - final String clusterName = ctx.getCluster(); final HostBean toRemove = getParameter().getHostToRemove(); final Admin.Client client = clientFactory.getClient(toRemove); @@ -91,9 +90,9 @@ public void process(Context ctx) throws Exception { } // 2) update cluster config to reflect the change - ClusterBean clusterBean = ZKUtil.getClusterConfig(zkClient, clusterName); + ClusterBean clusterBean = ZKUtil.getClusterConfig(zkClient, ctx.getCluster()); if (clusterBean == null) { - LOG.error("Failed to get config for cluster {}.", clusterName); + LOG.error("Failed to get config for cluster {}.", ctx.getCluster()); ctx.getTaskQueue().failTask(ctx.getId(), "Failed to read cluster config from zookeeper."); return; } diff --git a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/util/ZKUtil.java b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/util/ZKUtil.java index fe01267f..5cfe8117 100644 --- a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/util/ZKUtil.java +++ b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/util/ZKUtil.java @@ -16,9 +16,10 @@ package com.pinterest.rocksplicator.controller.util; -import com.pinterest.rocksplicator.controller.WorkerConfig; +import com.pinterest.rocksplicator.controller.Cluster; import com.pinterest.rocksplicator.controller.bean.ClusterBean; import com.pinterest.rocksplicator.controller.config.ConfigParser; +import com.pinterest.rocksplicator.controller.WorkerConfig; import org.apache.curator.framework.CuratorFramework; @@ -30,48 +31,25 @@ public final class ZKUtil { private ZKUtil() { } - public static String getClusterConfigZKPath(String clusterName) { - return WorkerConfig.getZKPath() + clusterName; + public static String getClusterConfigZKPath(Cluster cluster) { + return WorkerConfig.getZKPath() + cluster.namespace + "/" + cluster.name; } /** * Get config for a given cluster from Zookeeper. * * @param zkClient zookeeper client to use - * @param clusterName name of the cluster + * @param cluster the cluster * @return serialized cluster config, or null if there is an error */ - public static ClusterBean getClusterConfig(CuratorFramework zkClient, String clusterName) - throws Exception { - if (zkClient.checkExists().forPath(getClusterConfigZKPath(clusterName)) == null) { + public static ClusterBean getClusterConfig(CuratorFramework zkClient, + Cluster cluster) throws Exception { + if (zkClient.checkExists().forPath(getClusterConfigZKPath(cluster)) == null) { return null; } - byte[] data = zkClient.getData().forPath(ZKUtil.getClusterConfigZKPath(clusterName)); - return ConfigParser.parseClusterConfig(clusterName, data); - } - - /** - * Get config for a given cluster from zookeeper if the zk prefix is different from default. - * - * @param zkClient zookeeper client to use - * @param zkPrefix the customized zk prefix - * @param clusterName - * @return serialized cluster config, or null if there is an error - */ - public static ClusterBean getClusterConfig( - CuratorFramework zkClient, String zkPrefix, String clusterName) throws Exception { - String zkPath; - if (zkPrefix.endsWith("/")) { - zkPath = zkPrefix + clusterName; - } else { - zkPath = zkPrefix + "/" + clusterName; - } - if (zkClient.checkExists().forPath(zkPath) == null) { - return null; - } - byte[] data = zkClient.getData().forPath(zkPath); - return ConfigParser.parseClusterConfig(clusterName, data); + byte[] data = zkClient.getData().forPath(ZKUtil.getClusterConfigZKPath(cluster)); + return ConfigParser.parseClusterConfig(cluster, data); } /** @@ -83,7 +61,7 @@ public static ClusterBean getClusterConfig( public static void updateClusterConfig(CuratorFramework zkClient, ClusterBean clusterBean) throws Exception { zkClient.setData().forPath( - getClusterConfigZKPath(clusterBean.getName()), + getClusterConfigZKPath(clusterBean.getCluster()), ConfigParser.serializeClusterConfig(clusterBean).getBytes() ); } diff --git a/controller/tools/mysql/create_tables.sql b/controller/tools/mysql/create_tables.sql index a3d46dd0..37c7c010 100755 --- a/controller/tools/mysql/create_tables.sql +++ b/controller/tools/mysql/create_tables.sql @@ -8,18 +8,20 @@ DROP TABLE IF EXISTS task; DROP TABLE IF EXISTS tag; CREATE TABLE IF NOT EXISTS tag ( + namespace VARCHAR(128) NOT NULL, name VARCHAR(128) NOT NULL, locks TINYINT UNSIGNED NOT NULL, created_at DATETIME NOT NULL, owner VARCHAR(256), - PRIMARY KEY (name) + PRIMARY KEY (namespace, name) ) ENGINE=INNODB; CREATE TABLE IF NOT EXISTS task ( id BIGINT AUTO_INCREMENT, name VARCHAR(128), priority TINYINT UNSIGNED NOT NULL, # 0 is the highest priority - state TINYINT UNSIGNED NOT NULL, # 0: Pending, 1: Running, 2: Done, 3: FAILED + state TINYINT UNSIGNED NOT NULL, # 0: Pending, 1: Running, 2: Done, 3: FAILED + tag_namespace VARCHAR(128) NOT NULL, tag_name VARCHAR(128) NOT NULL, body TEXT NOT NULL, created_at DATETIME NOT NULL, @@ -28,5 +30,5 @@ CREATE TABLE IF NOT EXISTS task ( last_alive_at DATETIME, output TEXT, PRIMARY KEY (id), - FOREIGN KEY (tag_name) REFERENCES tag(name) ON UPDATE RESTRICT ON DELETE CASCADE + FOREIGN KEY (tag_namespace, tag_name) REFERENCES tag(namespace, name) ON UPDATE RESTRICT ON DELETE CASCADE ) ENGINE=INNODB; From 334241e83412b7aa6cf25a5d27a85cac3abe264a Mon Sep 17 00:00:00 2001 From: shuzhang1989 Date: Fri, 11 Aug 2017 15:23:04 -0700 Subject: [PATCH 2/5] Make it compiled --- .../controller/config/ConfigParserTest.java | 20 +++-- .../mysql/MySQLTaskQueueIntegrationTest.java | 77 ++++++++++--------- .../controller/resource/Clusters.java | 26 ++++--- .../rocksplicator/controller/WorkerPool.java | 2 +- .../controller/DispatcherTest.java | 8 +- .../controller/FIFOTaskQueue.java | 10 +-- .../controller/WorkerPoolTest.java | 4 +- .../controller/tasks/ConfigCheckTaskTest.java | 26 +++---- .../controller/tasks/TaskBaseTest.java | 3 +- 9 files changed, 95 insertions(+), 81 deletions(-) diff --git a/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/config/ConfigParserTest.java b/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/config/ConfigParserTest.java index 22329da4..d0396a9a 100644 --- a/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/config/ConfigParserTest.java +++ b/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/config/ConfigParserTest.java @@ -16,6 +16,7 @@ package com.pinterest.rocksplicator.controller.config; +import com.pinterest.rocksplicator.controller.Cluster; import com.pinterest.rocksplicator.controller.bean.ClusterBean; import com.pinterest.rocksplicator.controller.bean.HostBean; import com.pinterest.rocksplicator.controller.bean.Role; @@ -149,9 +150,10 @@ public void testParseClusterConfig() { " }" + "}"; - ClusterBean bean = ConfigParser.parseClusterConfig("test", config.getBytes()); + ClusterBean bean = ConfigParser.parseClusterConfig(new Cluster("test", "test"), config.getBytes()); Assert.assertNotNull(bean); - Assert.assertEquals(bean.getName(), "test"); + Assert.assertEquals(bean.getCluster().getName(), "test"); + Assert.assertEquals(bean.getCluster().getNamespace(), "test"); Assert.assertEquals(bean.getSegments().size(), 2); // user_pins @@ -211,9 +213,10 @@ public void testParseClusterConfig2() { " }" + "}"; - ClusterBean cluster = ConfigParser.parseClusterConfig("test2", config.getBytes()); + ClusterBean cluster = ConfigParser.parseClusterConfig(new Cluster("test", "test"), config.getBytes()); Assert.assertNotNull(cluster); - Assert.assertEquals(cluster.getName(), "test2"); + Assert.assertEquals(cluster.getCluster().getName(), "test"); + Assert.assertEquals(cluster.getCluster().getNamespace(), "test"); Assert.assertEquals(cluster.getSegments().size(), 1); SegmentBean userPins = findSegment(cluster.getSegments(), "user_pins").get(); @@ -249,7 +252,7 @@ public void testParseClusterConfig2() { @Test(dataProvider = "config") public void testSerializeClusterConfig(String originalConfig) throws JsonProcessingException { - ClusterBean deserialized= ConfigParser.parseClusterConfig("", originalConfig.getBytes()); + ClusterBean deserialized= ConfigParser.parseClusterConfig(new Cluster("test", "test"), originalConfig.getBytes()); String serializedConfig = ConfigParser.serializeClusterConfig(deserialized); assertConfigEquals(originalConfig, serializedConfig); } @@ -267,8 +270,10 @@ public static Optional findShard(List shards, int id) { } public static void assertConfigEquals(String conf1, String conf2) { - ClusterBean cluster1 = ConfigParser.parseClusterConfig("", conf1.getBytes()); - ClusterBean cluster2 = ConfigParser.parseClusterConfig("", conf2.getBytes()); + ClusterBean cluster1 = ConfigParser.parseClusterConfig( + new Cluster("test", "test"), conf1.getBytes()); + ClusterBean cluster2 = ConfigParser.parseClusterConfig( + new Cluster("test2", "test2"), conf2.getBytes()); Assert.assertNotNull(cluster1); Assert.assertNotNull(cluster2); @@ -287,6 +292,5 @@ public static void assertConfigEquals(String conf1, String conf2) { } } } - } } diff --git a/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueueIntegrationTest.java b/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueueIntegrationTest.java index 80796145..fa5e234e 100644 --- a/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueueIntegrationTest.java +++ b/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueueIntegrationTest.java @@ -16,6 +16,7 @@ package com.pinterest.rocksplicator.controller.mysql; +import com.pinterest.rocksplicator.controller.Cluster; import com.pinterest.rocksplicator.controller.Task; import com.pinterest.rocksplicator.controller.TaskBase; import org.testng.Assert; @@ -35,6 +36,8 @@ public class MySQLTaskQueueIntegrationTest { private static final String TEST_CLUSTER_NAME = "integ_test"; + private static final String TEST_CLUSTER_NAMESPACE = "integ_test_rocksdb"; + private Cluster cluster = new Cluster(TEST_CLUSTER_NAMESPACE, TEST_CLUSTER_NAME); private MySQLTaskQueue queue; @BeforeMethod @@ -44,25 +47,25 @@ protected void checkMySQLRunning() { } catch (Exception e) { throw new SkipException("MySQL is not running correctly"); } - this.queue.createCluster(TEST_CLUSTER_NAME); + this.queue.createCluster(cluster); } @AfterMethod protected void CleanUp() { - this.queue.removeCluster(TEST_CLUSTER_NAME); + this.queue.removeCluster(cluster); } @Test public void testClusterTable() throws MySQLTaskQueue.MySQLTaskQueueException { - Set clusters = queue.getAllClusters(); + Set clusters = queue.getAllClusters(); Assert.assertEquals(1, clusters.size()); - Assert.assertTrue(clusters.contains(TEST_CLUSTER_NAME)); - Assert.assertTrue(queue.lockCluster(TEST_CLUSTER_NAME)); - Assert.assertFalse(queue.lockCluster(TEST_CLUSTER_NAME)); - Assert.assertFalse(queue.removeCluster(TEST_CLUSTER_NAME)); - Assert.assertTrue(queue.unlockCluster(TEST_CLUSTER_NAME)); - Assert.assertTrue(queue.removeCluster(TEST_CLUSTER_NAME)); - Assert.assertFalse(queue.removeCluster(TEST_CLUSTER_NAME)); + Assert.assertTrue(clusters.contains(cluster)); + Assert.assertTrue(queue.lockCluster(cluster)); + Assert.assertFalse(queue.lockCluster(cluster)); + Assert.assertFalse(queue.removeCluster(cluster)); + Assert.assertTrue(queue.unlockCluster(cluster)); + Assert.assertTrue(queue.removeCluster(cluster)); + Assert.assertFalse(queue.removeCluster(cluster)); } TaskBase createTaskBase(String name, int priority, String body) { @@ -76,22 +79,23 @@ TaskBase createTaskBase(String name, int priority, String body) { @Test public void testEnqueueTask() { Assert.assertTrue( - queue.enqueueTask(createTaskBase("test-task", 1, "test-task-body"), TEST_CLUSTER_NAME, 10)); + queue.enqueueTask(createTaskBase("test-task", 1, "test-task-body"), cluster, 10)); Assert.assertFalse( - queue.enqueueTask(createTaskBase("test-task", 1, "test-task-body"), "non-exist", 10)); + queue.enqueueTask(createTaskBase("test-task", 1, "test-task-body"), + new Cluster("nonexist", "nonexist"), 10)); } @Test public void testDequeueAckTask() throws InterruptedException { Assert.assertTrue( queue.enqueueTask(createTaskBase("test-task-p1", 1, "test-task-body-p1"), - TEST_CLUSTER_NAME, 0)); + cluster, 0)); Assert.assertTrue( queue.enqueueTask(createTaskBase("test-task-p0", 0, "test-task-body-p0"), - TEST_CLUSTER_NAME, 0)); + cluster, 0)); Assert.assertTrue( queue.enqueueTask(createTaskBase("test-task-p2", 2, "test-task-body-p2"), - TEST_CLUSTER_NAME, 0)); + cluster, 0)); Thread.sleep(2000); Task task1 = queue.dequeueTask("worker"); Assert.assertEquals(task1.name, "test-task-p0"); @@ -111,7 +115,7 @@ public void testDequeueAckTask() throws InterruptedException { // Test run delay seconds Assert.assertTrue( queue.enqueueTask(createTaskBase("test-task-delayed", 0, "test-task-body-delayed"), - TEST_CLUSTER_NAME, 2)); + cluster, 2)); Thread.sleep(1000); Assert.assertEquals(queue.dequeueTask("worker"), null); Thread.sleep(3000); @@ -123,29 +127,28 @@ public void testDequeueAckTask() throws InterruptedException { @Test public void testResetZombieTasks() throws InterruptedException { Assert.assertTrue( - queue.enqueueTask(createTaskBase("test-task-zombie", 1, "test-task-body-p1"), - TEST_CLUSTER_NAME, 0)); + queue.enqueueTask(createTaskBase("test-task-zombie", 1, "test-task-body-p1"), cluster, 0)); Thread.sleep(2000); Task task = queue.dequeueTask("worker"); Assert.assertEquals(task.name, "test-task-zombie"); - Assert.assertFalse(queue.lockCluster(TEST_CLUSTER_NAME)); + Assert.assertFalse(queue.lockCluster(cluster)); // make the task zombie Thread.sleep(2000); Assert.assertEquals(queue.resetZombieTasks(1), 1); - Assert.assertTrue(queue.lockCluster(TEST_CLUSTER_NAME)); - Assert.assertTrue(queue.unlockCluster(TEST_CLUSTER_NAME)); + Assert.assertTrue(queue.lockCluster(cluster)); + Assert.assertTrue(queue.unlockCluster(cluster)); Task task2 = queue.dequeueTask("worker"); Assert.assertEquals(task2.name, "test-task-zombie"); Thread.sleep(1000); Assert.assertEquals(queue.resetZombieTasks(2), 0); - queue.unlockCluster(TEST_CLUSTER_NAME); + queue.unlockCluster(cluster); } @Test public void testKeepTasksAlive() throws InterruptedException { Assert.assertTrue( queue.enqueueTask(createTaskBase("test-task-alive", 1, "test-task-body-p1"), - TEST_CLUSTER_NAME, 0)); + cluster, 0)); Thread.sleep(2000); Task task = queue.dequeueTask("worker"); Thread.sleep(2000); @@ -160,19 +163,19 @@ public void testKeepTasksAlive() throws InterruptedException { public void testFinishTaskAndEnqueueRunningTask() throws InterruptedException { Assert.assertTrue( queue.enqueueTask(createTaskBase("test-task-to-dequeue", 1, "test-task-body-p1"), - TEST_CLUSTER_NAME, 0)); + cluster, 0)); Thread.sleep(2000); Task task = queue.dequeueTask("worker"); - Assert.assertFalse(queue.lockCluster(TEST_CLUSTER_NAME)); + Assert.assertFalse(queue.lockCluster(cluster)); TaskBase newTask = createTaskBase("new-running-task", 0, "body"); Assert.assertTrue( queue.finishTaskAndEnqueueRunningTask(task.id, "output", newTask, "worker") > 0); - Assert.assertFalse(queue.lockCluster(TEST_CLUSTER_NAME)); - task = queue.peekTasks(TEST_CLUSTER_NAME, 2).get(0); + Assert.assertFalse(queue.lockCluster(cluster)); + task = queue.peekTasks(cluster, 2).get(0); Assert.assertEquals(task.name, "test-task-to-dequeue"); - task = queue.peekTasks(TEST_CLUSTER_NAME, 1).get(0); + task = queue.peekTasks(cluster, 1).get(0); Assert.assertEquals(task.name, "new-running-task"); - Assert.assertTrue(queue.peekTasks(TEST_CLUSTER_NAME, 0).isEmpty()); + Assert.assertTrue(queue.peekTasks(cluster, 0).isEmpty()); queue.finishTask(task.id, "done"); } @@ -180,18 +183,18 @@ public void testFinishTaskAndEnqueueRunningTask() throws InterruptedException { public void testFinishTaskAndEnqueuePendingTask() throws InterruptedException { Assert.assertTrue( queue.enqueueTask(createTaskBase("test-task-to-dequeue", 1, "test-task-body-p1"), - TEST_CLUSTER_NAME, 0)); + cluster, 0)); Thread.sleep(2000); Task task = queue.dequeueTask("worker"); - Assert.assertFalse(queue.lockCluster(TEST_CLUSTER_NAME)); + Assert.assertFalse(queue.lockCluster(cluster)); TaskBase newTask = createTaskBase("new-pending-task", 0, "body"); Assert.assertTrue(queue.finishTaskAndEnqueuePendingTask(task.id, "output", newTask, 0)); - Assert.assertTrue(queue.lockCluster(TEST_CLUSTER_NAME)); + Assert.assertTrue(queue.lockCluster(cluster)); Assert.assertEquals(queue.dequeueTask("worker"), null); - Assert.assertTrue(queue.unlockCluster(TEST_CLUSTER_NAME)); - task = queue.peekTasks(TEST_CLUSTER_NAME, 2).get(0); + Assert.assertTrue(queue.unlockCluster(cluster)); + task = queue.peekTasks(cluster, 2).get(0); Assert.assertEquals(task.name, "test-task-to-dequeue"); - task = queue.peekTasks(TEST_CLUSTER_NAME, 0).get(0); + task = queue.peekTasks(cluster, 0).get(0); Assert.assertEquals(task.name, "new-pending-task"); } @@ -199,11 +202,11 @@ public void testFinishTaskAndEnqueuePendingTask() throws InterruptedException { public void testRemoveFinishedTasks() throws InterruptedException { Assert.assertTrue( queue.enqueueTask(createTaskBase("test-task-p0", 0, "test-task-body-p0"), - TEST_CLUSTER_NAME, 0)); + cluster, 0)); Thread.sleep(2000); Assert.assertTrue( queue.enqueueTask(createTaskBase("test-task-p2", 2, "test-task-body-p2"), - TEST_CLUSTER_NAME, 0)); + cluster, 0)); Thread.sleep(2000); Task task1 = queue.dequeueTask("worker"); Assert.assertTrue(queue.finishTask(task1.id, "")); diff --git a/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Clusters.java b/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Clusters.java index 914b5e66..1faca1ea 100644 --- a/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Clusters.java +++ b/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Clusters.java @@ -77,6 +77,7 @@ private String getClusterZKPath(final String namespace, final String clusterName /** * Retrieves cluster information by cluster name. * + * @param namespace cluster namespace * @param clusterName name of the cluster * @return ClusterBean */ @@ -133,6 +134,7 @@ public Response getAll(@QueryParam("verbose") Optional verbose) { * Initializes a given cluster. This may include adding designated tag * in DB and/or writing shard config to zookeeper. * + * @param namespace cluster namespace * @param clusterName name of the cluster */ @POST @@ -153,6 +155,7 @@ public Response initialize(@PathParam("namespace") String namespace, /** * Remove tag in DB and shard config in zookeeper. * + * @param namespace cluster namespace * @param clusterName name of the cluster */ @POST @@ -174,8 +177,9 @@ public Response remove(@PathParam("namespace") String namespace, * Otherwise, controller will randomly pick one for the user. * * + * @param namespace cluster namespace * @param clusterName name of the cluster - * @param oldHost host to be replaced, in the format of ip:port + * @param oldHostString host to be replaced, in the format of ip:port * @param newHostOp (optional) new host to add, in the format of ip:port */ @POST @@ -222,6 +226,7 @@ public Response replaceHost(@PathParam("namespace") String namespace, /** * Loads sst files from s3 into a given cluster. * + * @param namespace cluster namespace * @param clusterName name of the cluster * @param segmentName name of the segment * @param s3Bucket S3 bucket name @@ -255,6 +260,7 @@ public Response loadData(@PathParam("namespace") String namespace, * operations on the same cluster. It is caller's responsibility to properly * release the lock via {@link #unlock(String)}. * + * @param namespace cluster namespace * @param clusterName name of the cluster to lock * @return true if the given cluster is locked, false otherwise */ @@ -274,6 +280,7 @@ public Response lock(@PathParam("namespace") String namespace, /** * Unlocks a given cluster. * + * @param namespace cluster namespace * @param clusterName name of the cluster to unlock * @return true if the given cluster is unlocked, false otherwise */ @@ -292,6 +299,8 @@ public Response unlock(@PathParam("namespace") String namespace, /** * Send a LoggingTask to worker. + * + * @param namespace cluster namespace * @param clusterName * @return * @throws Exception @@ -315,24 +324,19 @@ public Response sendLogTask(@PathParam("namespace") String namespace, /** * Send a healthcehck task to a cluster. + * + * @param namespace * @param clusterName * @param intervalSeconds If not specified, it's a one-off task, otherwise the task is repeatable. - * @param zkPrefix if not specified, use DEFAULT_ZK_PATH in WorkerConfig. */ @POST @Path("/healthcheck/{namespace: [a-zA-Z0-9\\-_]+}/{clusterName : [a-zA-Z0-9\\-_]+}") @Produces(MediaType.APPLICATION_JSON) public Response healthcheck(@PathParam("namespace") String namespace, @PathParam("clusterName") String clusterName, - @QueryParam("interval") Optional intervalSeconds, - @QueryParam("zkPrefix") Optional zkPrefix) { + @QueryParam("interval") Optional intervalSeconds) { try { - HealthCheckTask.Param param = new HealthCheckTask.Param(); - if (zkPrefix.isPresent()) { - param.setZkPrefix(zkPrefix.get()); - } - TaskBase healthCheckTask = new HealthCheckTask(param) - .recur(intervalSeconds.orElse(0)).getEntity(); + TaskBase healthCheckTask = new HealthCheckTask().recur(intervalSeconds.orElse(0)).getEntity(); taskQueue.enqueueTask(healthCheckTask, new Cluster(namespace, clusterName), 0); return Utils.buildResponse(HttpStatus.OK_200, ImmutableMap.of("data", true)); } catch (JsonProcessingException e) { @@ -344,6 +348,8 @@ public Response healthcheck(@PathParam("namespace") String namespace, /** * Send a configcheck task to a cluster. + * + * @param namespace cluster namespace * @param clusterName * @param intervalSeconds if not specified, it's a one-off task, otherwise the task is repeatable. * @param numReplicas the number of replicas per shard. If not speicfied, use default of 3. diff --git a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/WorkerPool.java b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/WorkerPool.java index 96b850b7..eee2bce2 100644 --- a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/WorkerPool.java +++ b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/WorkerPool.java @@ -91,7 +91,7 @@ public synchronized boolean assignTask(Task task) throws Exception { * @param cluster the name of the cluster * @return */ - public synchronized boolean abortTask(String cluster) throws Exception { + public synchronized boolean abortTask(Cluster cluster) throws Exception { Future runningTask = runningTasks.get(cluster); if (runningTask == null) { LOG.error("No running task of cluster " + cluster); diff --git a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/DispatcherTest.java b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/DispatcherTest.java index 488579cb..f6811541 100644 --- a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/DispatcherTest.java +++ b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/DispatcherTest.java @@ -49,7 +49,7 @@ private Task getSleepIncrementTaskFromQueue() throws JsonProcessingException { new SleepIncrementTask(sleepTimeMillis) .getEntity() ); - task.clusterName = nameCounter.toString(); + task.setCluster(new Cluster("rocksdb", nameCounter.toString())); nameCounter += 1; return task; } @@ -153,7 +153,7 @@ public long finishTaskAndEnqueueRunningTask(final long id, return super.finishTaskAndEnqueueRunningTask(id, output, newTask, worker); } }; - tq.enqueueTask(task, Integer.toString(++nameCounter), 0); + tq.enqueueTask(task, new Cluster("rocksdb", Integer.toString(++nameCounter)), 0); Semaphore idleWorkersSemaphore = new Semaphore(2); ThreadPoolExecutor threadPoolExecutor = @@ -202,7 +202,7 @@ public long finishTaskAndEnqueueRunningTask(final long id, return super.finishTaskAndEnqueueRunningTask(id, output, newTask, worker); } }; - tq.enqueueTask(task, Integer.toString(++nameCounter), 0); + tq.enqueueTask(task, new Cluster("rocksdb", Integer.toString(++nameCounter)), 0); Semaphore idleWorkersSemaphore = new Semaphore(2); ThreadPoolExecutor threadPoolExecutor = @@ -243,7 +243,7 @@ public long finishTaskAndEnqueueRunningTask(final long id, return super.finishTaskAndEnqueueRunningTask(id, output, newTask, worker); } }; - tq.enqueueTask(task, Integer.toString(++nameCounter), 0); + tq.enqueueTask(task, new Cluster("rocksdb", Integer.toString(++nameCounter)), 0); Semaphore idleWorkersSemaphore = new Semaphore(2); ThreadPoolExecutor threadPoolExecutor = diff --git a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/FIFOTaskQueue.java b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/FIFOTaskQueue.java index b72d067e..cd2010b7 100644 --- a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/FIFOTaskQueue.java +++ b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/FIFOTaskQueue.java @@ -46,12 +46,12 @@ public String getResult(long id) { @Override public boolean enqueueTask(final TaskBase task, - final String clusterName, - final int runDelaySeconds) { + final Cluster cluster, + final int runDelaySeconds) { Task taskInternal = new Task(task); taskInternal.id = currentId.getAndIncrement(); - taskInternal.clusterName = clusterName; + taskInternal.cluster = cluster; taskInternal.runAfter = new Timestamp(System.currentTimeMillis() + runDelaySeconds * 1000); taskQueue.offer(taskInternal); return true; @@ -95,7 +95,7 @@ public boolean finishTaskAndEnqueuePendingTask(final long id, final TaskBase taskBase, final int runDelaySeconds) { result.putIfAbsent(id, output); - return enqueueTask(taskBase, "", runDelaySeconds); + return enqueueTask(taskBase, new Cluster("rocksdb", "test"), runDelaySeconds); } @Override @@ -104,7 +104,7 @@ public boolean failTaskAndEnqueuePendingTask(final long id, final TaskBase taskBase, final int runDelaySeconds) { result.putIfAbsent(id, output); - return enqueueTask(taskBase, "", runDelaySeconds); + return enqueueTask(taskBase, new Cluster("rocksdb", "test"), runDelaySeconds); } } diff --git a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/WorkerPoolTest.java b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/WorkerPoolTest.java index 5584e374..8cf50a09 100644 --- a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/WorkerPoolTest.java +++ b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/WorkerPoolTest.java @@ -36,7 +36,7 @@ private Task getSleepIncrementTask() throws JsonProcessingException { new SleepIncrementTask(1000) .getEntity() ); - task.clusterName = nameCounter.toString(); + task.cluster = new Cluster("rocksdb", nameCounter.toString()); nameCounter += 1; return task; } @@ -103,7 +103,7 @@ public void testCancelSingleTask() throws Exception { Task task = getSleepIncrementTask(); workerPool.assignTask(task); Thread.sleep(10); - workerPool.abortTask(task.clusterName); + workerPool.abortTask(task.cluster); Thread.sleep(100); Assert.assertEquals(1, idleWorkersSemaphore.availablePermits()); } diff --git a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/ConfigCheckTaskTest.java b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/ConfigCheckTaskTest.java index d4e5fe34..696496a5 100644 --- a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/ConfigCheckTaskTest.java +++ b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/ConfigCheckTaskTest.java @@ -34,7 +34,7 @@ public void testSuccessful() throws Exception { FIFOTaskQueue taskQueue = new FIFOTaskQueue(10); Context ctx = new Context(123, CLUSTER, taskQueue, null); configCheckTask.process(ctx); - Assert.assertEquals(taskQueue.getResult(123), "Cluster devtest has good config"); + Assert.assertEquals(taskQueue.getResult(123), "Cluster rocksdb/devtest has good config"); } @Test @@ -48,16 +48,16 @@ public void testMissingReplicas() throws Exception { " \"127.0.0.1:8092:us-east-1e\": [\"00000:S\", \"00001:S\", \"00002:M\"]" + " }" + "}"; - zkClient.createContainers(ZKUtil.getClusterConfigZKPath("badcluster")); - zkClient.setData().forPath(ZKUtil.getClusterConfigZKPath("badcluster"), + zkClient.createContainers(ZKUtil.getClusterConfigZKPath(CLUSTER)); + zkClient.setData().forPath(ZKUtil.getClusterConfigZKPath(CLUSTER), missingReplicaConfig.getBytes()); ConfigCheckTask configCheckTask = new ConfigCheckTask(3); injector.injectMembers(configCheckTask); FIFOTaskQueue taskQueue = new FIFOTaskQueue(10); - Context ctx = new Context(123, "badcluster", taskQueue, null); + Context ctx = new Context(123, CLUSTER, taskQueue, null); configCheckTask.process(ctx); Assert.assertEquals(taskQueue.getResult(123), - "Cluster badcluster doesn't have good shard distribution, " + + "Cluster rocksdb/devtest doesn't have good shard distribution, " + "reason = Incorrect number of replicas. Bad shards: {user_pins2=2},"); } @@ -72,16 +72,16 @@ public void testMissingMasters() throws Exception { " \"127.0.0.1:8092:us-east-1e\": [\"00000:S\", \"00001:S\", \"00002:M\"]" + " }" + "}"; - zkClient.createContainers(ZKUtil.getClusterConfigZKPath("badcluster")); - zkClient.setData().forPath(ZKUtil.getClusterConfigZKPath("badcluster"), + zkClient.createContainers(ZKUtil.getClusterConfigZKPath(CLUSTER)); + zkClient.setData().forPath(ZKUtil.getClusterConfigZKPath(CLUSTER), missingReplicaConfig.getBytes()); ConfigCheckTask configCheckTask = new ConfigCheckTask(3); injector.injectMembers(configCheckTask); FIFOTaskQueue taskQueue = new FIFOTaskQueue(10); - Context ctx = new Context(123, "badcluster", taskQueue, null); + Context ctx = new Context(123, CLUSTER, taskQueue, null); configCheckTask.process(ctx); Assert.assertEquals(taskQueue.getResult(123), - "Cluster badcluster doesn't have good shard distribution, " + + "Cluster rocksdb/devtest doesn't have good shard distribution, " + "reason = Missing masters for some shards: [user_pins0],"); } @@ -96,16 +96,16 @@ public void testIncorrectShardNumber() throws Exception { " \"127.0.0.1:8092:us-east-1e\": [\"00000:S\", \"00001:S\", \"00002:M\"]" + " }" + "}"; - zkClient.createContainers(ZKUtil.getClusterConfigZKPath("badcluster")); - zkClient.setData().forPath(ZKUtil.getClusterConfigZKPath("badcluster"), + zkClient.createContainers(ZKUtil.getClusterConfigZKPath(CLUSTER)); + zkClient.setData().forPath(ZKUtil.getClusterConfigZKPath(CLUSTER), missingReplicaConfig.getBytes()); ConfigCheckTask configCheckTask = new ConfigCheckTask(3); injector.injectMembers(configCheckTask); FIFOTaskQueue taskQueue = new FIFOTaskQueue(10); - Context ctx = new Context(123, "badcluster", taskQueue, null); + Context ctx = new Context(123, CLUSTER, taskQueue, null); configCheckTask.process(ctx); Assert.assertEquals(taskQueue.getResult(123), - "Cluster badcluster doesn't have good shard distribution, " + + "Cluster rocksdb/devtest doesn't have good shard distribution, " + "reason = Incorrect number of shards. Expected 3 but actually 4.,"); } } diff --git a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/TaskBaseTest.java b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/TaskBaseTest.java index 4a8dcc92..6c960a2c 100644 --- a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/TaskBaseTest.java +++ b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/TaskBaseTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.when; import com.pinterest.rocksdb_admin.thrift.Admin; +import com.pinterest.rocksplicator.controller.Cluster; import com.pinterest.rocksplicator.controller.bean.HostBean; import com.pinterest.rocksplicator.controller.util.AdminClientFactory; import com.pinterest.rocksplicator.controller.util.EmailSender; @@ -43,7 +44,7 @@ * @author Ang Xu (angxu@pinterest.com) */ public abstract class TaskBaseTest { - protected static final String CLUSTER = "devtest"; + protected static final Cluster CLUSTER = new Cluster("rocksdb", "devtest"); protected static final String CONFIG = "{" + " \"user_pins\": {" + From 271954fa202b6cf52f0acc23fb5d2692cee77bbe Mon Sep 17 00:00:00 2001 From: shuzhang1989 Date: Fri, 11 Aug 2017 16:59:43 -0700 Subject: [PATCH 3/5] Fix tests --- .../rocksplicator/controller/mysql/MySQLTaskQueue.java | 2 +- .../rocksplicator/controller/mysql/entity/TaskEntity.java | 2 +- .../rocksplicator/controller/tasks/AddHostTaskTest.java | 2 +- .../rocksplicator/controller/tasks/HealthCheckTaskTest.java | 2 +- .../rocksplicator/controller/tasks/LoadSSTTaskTest.java | 4 ++-- .../rocksplicator/controller/tasks/RebalanceTaskTest.java | 2 +- controller/tools/mysql/create_tables.sql | 2 +- 7 files changed, 8 insertions(+), 8 deletions(-) diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java index 20dc7bad..c08edbf2 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java @@ -203,7 +203,7 @@ private TaskEntity enqueueTaskImpl(final TaskBase taskBase, final String claimedWorker) { TagEntity tagEntity = getEntityManager().find( TagEntity.class, new TagId(cluster), LockModeType.PESSIMISTIC_WRITE); - if (cluster == null) { + if (tagEntity == null) { LOG.error("Cluster {} is not created", cluster); getEntityManager().getTransaction().rollback(); return null; diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TaskEntity.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TaskEntity.java index 0b918cf3..2c22a77a 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TaskEntity.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TaskEntity.java @@ -86,11 +86,11 @@ public class TaskEntity { @NotNull private int state; + @ManyToOne @JoinColumns({ @JoinColumn(name="tag_namespace", referencedColumnName="namespace"), @JoinColumn(name="tag_name", referencedColumnName="name") }) - @ManyToOne @NotNull private TagEntity cluster; diff --git a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/AddHostTaskTest.java b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/AddHostTaskTest.java index 3010f3d0..0dfe09e9 100644 --- a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/AddHostTaskTest.java +++ b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/AddHostTaskTest.java @@ -89,7 +89,7 @@ public void testSuccessful() throws Exception { Assert.assertEquals(backupDBRequests.get(0).getDb_name(), "user_pins00001"); Assert.assertEquals(backupDBRequests.get(0).getLimit_mbs(), 100); Assert.assertTrue(backupDBRequests.get(0).getHdfs_backup_dir().startsWith( - "/hdfs/devtest/user_pins/00001/127.0.0.1/")); + "/hdfs/rocksdb/devtest/user_pins/00001/127.0.0.1/")); List restoreDBRequests = restoreDBCaptor.getAllValues(); Assert.assertEquals(restoreDBRequests.size(), 2); diff --git a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/HealthCheckTaskTest.java b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/HealthCheckTaskTest.java index 634c805d..f6ce8b64 100644 --- a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/HealthCheckTaskTest.java +++ b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/HealthCheckTaskTest.java @@ -38,7 +38,7 @@ public void testSuccessful() throws Exception { Context ctx = new Context(123, CLUSTER, taskQueue, null); t.process(ctx); - Assert.assertEquals(taskQueue.getResult(123), "Cluster devtest is healthy"); + Assert.assertEquals(taskQueue.getResult(123), "Cluster rocksdb/devtest is healthy"); } @Test diff --git a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/LoadSSTTaskTest.java b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/LoadSSTTaskTest.java index f0f3726e..f81cc3f4 100644 --- a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/LoadSSTTaskTest.java +++ b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/LoadSSTTaskTest.java @@ -52,7 +52,7 @@ public void testSuccessful() throws Exception { Context ctx = new Context(123, CLUSTER, taskQueue, null); task.process(ctx); - Assert.assertEquals(taskQueue.getResult(123), "Finished loading sst to devtest"); + Assert.assertEquals(taskQueue.getResult(123), "Finished loading sst to rocksdb/devtest"); Assert.assertEquals(clearDBCaptor.getAllValues().size(), 9); Assert.assertEquals(addS3SstCaptor.getAllValues().size(), 9); for (AddS3SstFilesToDBRequest request : addS3SstCaptor.getAllValues()) { @@ -74,7 +74,7 @@ public void testBadSegment() throws Exception { Context ctx = new Context(123, CLUSTER, taskQueue, null); task.process(ctx); - Assert.assertEquals(taskQueue.getResult(123), "Segment unknown not in cluster devtest."); + Assert.assertEquals(taskQueue.getResult(123), "Segment unknown not in cluster rocksdb/devtest."); } @Test diff --git a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/RebalanceTaskTest.java b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/RebalanceTaskTest.java index 25a53881..0bdf5c1e 100644 --- a/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/RebalanceTaskTest.java +++ b/controller/controller-worker/src/test/java/com/pinterest/rocksplicator/controller/tasks/RebalanceTaskTest.java @@ -81,7 +81,7 @@ public void testImbalancedCluster(String clusterConfig) throws Exception { Context ctx = new Context(123, CLUSTER, taskQueue, null); task.process(ctx); - Assert.assertEquals(taskQueue.getResult(123), "Successfully rebalanced cluster devtest"); + Assert.assertEquals(taskQueue.getResult(123), "Successfully rebalanced cluster rocksdb/devtest"); byte[] newConfigBytes = zkClient.getData().forPath(ZKUtil.getClusterConfigZKPath(CLUSTER)); ConfigParserTest.assertConfigEquals(new String(newConfigBytes), CONFIG); diff --git a/controller/tools/mysql/create_tables.sql b/controller/tools/mysql/create_tables.sql index 37c7c010..1ce34210 100755 --- a/controller/tools/mysql/create_tables.sql +++ b/controller/tools/mysql/create_tables.sql @@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS tag ( CREATE TABLE IF NOT EXISTS task ( id BIGINT AUTO_INCREMENT, - name VARCHAR(128), + name VARCHAR(128) NOT NULL, priority TINYINT UNSIGNED NOT NULL, # 0 is the highest priority state TINYINT UNSIGNED NOT NULL, # 0: Pending, 1: Running, 2: Done, 3: FAILED tag_namespace VARCHAR(128) NOT NULL, From cf8b931fa78e6b89b1c06dab23ca80cba0acb270 Mon Sep 17 00:00:00 2001 From: shuzhang1989 Date: Tue, 15 Aug 2017 11:52:01 -0700 Subject: [PATCH 4/5] Private --- .gitignore | 1 + .../com/pinterest/rocksplicator/controller/Cluster.java | 7 ++----- .../java/com/pinterest/rocksplicator/controller/Task.java | 2 +- .../rocksplicator/controller/mysql/MySQLTaskQueue.java | 4 ++-- .../rocksplicator/controller/resource/Clusters.java | 2 +- .../pinterest/rocksplicator/controller/util/ZKUtil.java | 2 +- 6 files changed, 8 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 7a1e3735..a0d4c2c4 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ build .project .idea *.iml +.DS_store target/ cmake-build-debug/ diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Cluster.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Cluster.java index 9659fcac..79317259 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Cluster.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Cluster.java @@ -22,18 +22,16 @@ * @author shu (shu@pinterest.com) */ public class Cluster { - public String namespace; - public String name; + private String namespace; + private String name; public Cluster(final String namespace, final String name) { this.namespace = namespace; this.name = name; } - public String getNamespace() { return namespace; - } public Cluster setNamespace(String namespace) { @@ -60,7 +58,6 @@ public boolean equals(Object o) { if (namespace != null ? !namespace.equals(cluster.namespace) : cluster.namespace != null) return false; return name != null ? name.equals(cluster.name) : cluster.name == null; - } @Override diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Task.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Task.java index fed8b707..a7dd40aa 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Task.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Task.java @@ -102,7 +102,7 @@ public String toString() { "id=" + id + ", state=" + state + ", name='" + name + '\'' + - ", cluster='" + cluster.namespace + "/" + cluster.name + '\'' + + ", cluster='" + cluster.getNamespace() + "/" + cluster.getName()+ '\'' + ", createdAt=" + createdAt + ", runAfter=" + runAfter + ", lastAliveAt=" + lastAliveAt + diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java index c08edbf2..b48ca00e 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java @@ -396,7 +396,7 @@ public List peekTasks(final Cluster cluster, final Integer state) { query = getEntityManager() .createNamedQuery("task.peekTasksFromClusterWithState") .setParameter("state", state) - .setParameter("namespace", cluster.namespace).setParameter("name", cluster.name); + .setParameter("namespace", cluster.getNamespace()).setParameter("name", cluster.getName()); } else if (state != null) { query = getEntityManager() .createNamedQuery("task.peekTasksWithState") @@ -405,7 +405,7 @@ public List peekTasks(final Cluster cluster, final Integer state) { // TODO query = getEntityManager() .createNamedQuery("task.peekTasksFromCluster") - .setParameter("name", cluster.name).setParameter("namespace", cluster.namespace); + .setParameter("name", cluster.getNamespace()).setParameter("namespace", cluster.getName()); }else{ query = getEntityManager() .createNamedQuery("task.peekAllTasks"); diff --git a/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Clusters.java b/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Clusters.java index 1faca1ea..6b27a14d 100644 --- a/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Clusters.java +++ b/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Clusters.java @@ -116,7 +116,7 @@ public Response getAll(@QueryParam("verbose") Optional verbose) { final List clusterBeans = new ArrayList<>(clusters.size()); try { for (Cluster cluster : clusters) { - ClusterBean clusterBean = checkExistenceAndGetClusterBean(cluster.namespace, cluster.name); + ClusterBean clusterBean = checkExistenceAndGetClusterBean(cluster.getNamespace(), cluster.getName()); if (clusterBean != null) { clusterBeans.add(clusterBean); } diff --git a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/util/ZKUtil.java b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/util/ZKUtil.java index 5cfe8117..7059c875 100644 --- a/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/util/ZKUtil.java +++ b/controller/controller-worker/src/main/java/com/pinterest/rocksplicator/controller/util/ZKUtil.java @@ -32,7 +32,7 @@ private ZKUtil() { } public static String getClusterConfigZKPath(Cluster cluster) { - return WorkerConfig.getZKPath() + cluster.namespace + "/" + cluster.name; + return WorkerConfig.getZKPath() + cluster.getNamespace() + "/" + cluster.getName(); } /** From 61dcb748acca79715bbb3b9d8252eed9ddd96304 Mon Sep 17 00:00:00 2001 From: shuzhang1989 Date: Tue, 15 Aug 2017 13:37:34 -0700 Subject: [PATCH 5/5] Make peekTasks work with new cluster naming schema and added test --- .../rocksplicator/controller/Cluster.java | 3 ++ .../controller/mysql/MySQLTaskQueue.java | 27 ++++++++++------ .../controller/mysql/entity/TaskEntity.java | 6 ++++ .../mysql/MySQLTaskQueueIntegrationTest.java | 31 +++++++++++++++++++ .../controller/resource/Tasks.java | 6 +++- 5 files changed, 62 insertions(+), 11 deletions(-) diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Cluster.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Cluster.java index 79317259..e047e22f 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Cluster.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/Cluster.java @@ -22,7 +22,10 @@ * @author shu (shu@pinterest.com) */ public class Cluster { + // Namespace is the type of the cluster. For example, realpin, rocksdb or scorpion private String namespace; + + // name is the detailed cluster name under the namespace, used for multi-cluster setup for services. private String name; public Cluster(final String namespace, final String name) { diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java index b48ca00e..3ae156c2 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueue.java @@ -392,23 +392,30 @@ public boolean failTaskAndEnqueuePendingTask(final long id, @Override public List peekTasks(final Cluster cluster, final Integer state) { Query query; - if (cluster != null && state != null) { + if (!cluster.getName().isEmpty() && !cluster.getNamespace().isEmpty() && state != null) { query = getEntityManager() .createNamedQuery("task.peekTasksFromClusterWithState") .setParameter("state", state) .setParameter("namespace", cluster.getNamespace()).setParameter("name", cluster.getName()); - } else if (state != null) { + } else if (!cluster.getNamespace().isEmpty() && cluster.getName().isEmpty() && state != null) { query = getEntityManager() - .createNamedQuery("task.peekTasksWithState") - .setParameter("state", state); - } else if (cluster != null) { - // TODO + .createNamedQuery("task.peekTasksWithStateFromNamespace") + .setParameter("state", state).setParameter("namespace", cluster.getNamespace()); + } else if (cluster.getNamespace().isEmpty() && cluster.getName().isEmpty() && state != null) { + query = getEntityManager() + .createNamedQuery("task.peekTasksWithState") + .setParameter("state", state); + } else if (!cluster.getName().isEmpty() && !cluster.getNamespace().isEmpty() && state == null) { + query = getEntityManager() + .createNamedQuery("task.peekTasksFromCluster") + .setParameter("namespace", cluster.getNamespace()).setParameter("name", cluster.getName()); + } else if (!cluster.getNamespace().isEmpty() && cluster.getName().isEmpty() && state == null) { query = getEntityManager() - .createNamedQuery("task.peekTasksFromCluster") - .setParameter("name", cluster.getNamespace()).setParameter("namespace", cluster.getName()); - }else{ + .createNamedQuery("task.peekTasksFromNamespace") + .setParameter("namespace", cluster.getNamespace()); + } else { query = getEntityManager() - .createNamedQuery("task.peekAllTasks"); + .createNamedQuery("task.peekAllTasks"); } List result = query.getResultList(); return result.stream().map( diff --git a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TaskEntity.java b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TaskEntity.java index 2c22a77a..183bcbdf 100644 --- a/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TaskEntity.java +++ b/controller/controller-common/src/main/java/com/pinterest/rocksplicator/controller/mysql/entity/TaskEntity.java @@ -59,11 +59,17 @@ query = "SELECT t FROM task t WHERE t.id = :id AND t.state = 1"), @NamedQuery(name = "task.peekAllTasks", query = "SELECT t FROM task t INNER JOIN t.cluster c"), + @NamedQuery(name = "task.peekTasksFromNamespace", + query = "SELECT t FROM task t INNER JOIN t.cluster c " + + "WHERE c.namespace = :namespace"), @NamedQuery(name = "task.peekTasksFromCluster", query = "SELECT t FROM task t INNER JOIN t.cluster c " + "WHERE c.name = :name AND c.namespace = :namespace"), @NamedQuery(name = "task.peekTasksWithState", query = "SELECT t FROM task t WHERE t.state = :state"), + @NamedQuery(name = "task.peekTasksWithStateFromNamespace", + query = "SELECT t FROM task t INNER JOIN t.cluster c WHERE t.state = :state AND " + + "c.namespace = :namespace"), @NamedQuery(name = "task.peekTasksFromClusterWithState", query = "SELECT t FROM task t INNER JOIN t.cluster c WHERE t.state = :state AND " + "c.namespace = :namespace AND c.name = :name"), diff --git a/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueueIntegrationTest.java b/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueueIntegrationTest.java index fa5e234e..38cf510d 100644 --- a/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueueIntegrationTest.java +++ b/controller/controller-common/src/test/java/com/pinterest/rocksplicator/controller/mysql/MySQLTaskQueueIntegrationTest.java @@ -215,4 +215,35 @@ public void testRemoveFinishedTasks() throws InterruptedException { Assert.assertEquals(queue.removeFinishedTasks(3), 1); } + @Test + public void testPeekTasks() throws InterruptedException { + Cluster realpin_p2p = new Cluster("realpin", "p2p"); + Cluster realpin_pinnability = new Cluster("realpin", "pinnability"); + Cluster aperture = new Cluster("rocksdb", "aperture"); + this.queue.createCluster(realpin_p2p); + this.queue.createCluster(realpin_pinnability); + this.queue.createCluster(aperture); + Assert.assertTrue( + queue.enqueueTask(createTaskBase("realpin-p2p", 1, "realpin-p2p-body"), realpin_p2p, 0)); + Assert.assertTrue( + queue.enqueueTask(createTaskBase( + "realpin-pinnability", 1, "realpin-pinnability-body"), realpin_pinnability, 0)); + Assert.assertTrue( + queue.enqueueTask(createTaskBase("apreture", 1, "aperture-body"), aperture, 0)); + Assert.assertEquals(this.queue.peekTasks(realpin_p2p, 0).size(), 1); + Assert.assertEquals(this.queue.peekTasks(realpin_pinnability, 0).size(), 1); + Assert.assertEquals(this.queue.peekTasks(aperture, 0).size(), 1); + Assert.assertEquals(this.queue.peekTasks(realpin_p2p, null).size(), 1); + Assert.assertEquals(this.queue.peekTasks(realpin_pinnability, null).size(), 1); + Assert.assertEquals(this.queue.peekTasks(aperture, null).size(), 1); + Assert.assertEquals(this.queue.peekTasks(new Cluster("realpin", ""), 0).size(), 2); + Assert.assertEquals(this.queue.peekTasks(new Cluster("rocksdb", ""), 0).size(), 1); + Assert.assertEquals(this.queue.peekTasks(new Cluster("realpin", ""), null).size(), 2); + Assert.assertEquals(this.queue.peekTasks(new Cluster("rocksdb", ""), null).size(), 1); + + this.queue.removeCluster(realpin_p2p); + this.queue.removeCluster(realpin_pinnability); + this.queue.removeCluster(aperture); + } + } diff --git a/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Tasks.java b/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Tasks.java index a2bea924..b8c86195 100644 --- a/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Tasks.java +++ b/controller/controller-http/src/main/java/com/pinterest/rocksplicator/controller/resource/Tasks.java @@ -80,7 +80,11 @@ public Response get(@PathParam("id") Long id) { public Response findTasks(@QueryParam("namespace") Optional namespace, @QueryParam("clusterName") Optional clusterName, @QueryParam("state") Optional state) { - List result = taskQueue.peekTasks(new Cluster(namespace.get(), clusterName.get()), + if(!namespace.isPresent() && namespace.isPresent()) { + Utils.buildResponse(HttpStatus.NOT_FOUND_404, + ImmutableMap.of("message", "we don't allow empty namespace with non-empty cluster name")); + } + List result = taskQueue.peekTasks(new Cluster(namespace.orElse(""), clusterName.orElse("")), state.map(TaskState::intValue).orElse(null)); return Utils.buildResponse(HttpStatus.OK_200, result); }