Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Added in API to manage replications on the server

  • Loading branch information...
commit 67794095a2d8065c36c0f74cab99afd373c7e323 1 parent 3fc2756
Anthony Payne authored
View
2  build.xml
@@ -2,7 +2,7 @@
<project name="couchdb4j" basedir="." default="jar">
<property name="version.major" value="0"/>
- <property name="version.minor" value="3.0-SVN"/>
+ <property name="version.minor" value="3.1"/>
<property name="java.src.dir" value="src/java"/>
<property name="lib.dir" value="lib/"/>
View
73 src/java/com/fourspaces/couchdb/CouchTask.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright (c) 2011 Cummings Engineering Consultants, Inc. All Rights Reserved
+ */
+package com.fourspaces.couchdb;
+
+/**
+ * This class encapsulates the data for a task running on a couch server (as returned from the query to "_active_tasks")
+ *
+ * @author anthony.payne
+ *
+ */
+public class CouchTask {
+
+ /** Key of JSON field for the task type */
+ public static final String TASK_TYPE_KEY = "type";
+ /** Key of the JSON field for the task */
+ public static final String TASK_TASK_KEY = "task";
+ /** Key for the JSON field for the status of the task */
+ public static final String TASK_STATUS_KEY = "status";
+ /** Key for the JSON field for the pid field of the task */
+ public static final String TASK_PID_KEY = "pid";
+
+ /** Type of task */
+ protected String type;
+ /** Details of the task */
+ protected String task;
+ /** Status of the task */
+ protected String status;
+ /** PID of the task */
+ protected String pid;
+
+
+ /**
+ * @param type Type of task
+ * @param task Task details
+ * @param status Status of task
+ * @param pid PID of task
+ */
+ public CouchTask(final String type, final String task, final String status, final String pid) {
+ this.type = type;
+ this.task = task;
+ this.status = status;
+ this.pid = pid;
+ }
+
+ /**
+ * @return the type
+ */
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * @return the task
+ */
+ public String getTask() {
+ return task;
+ }
+
+ /**
+ * @return the status
+ */
+ public String getStatus() {
+ return status;
+ }
+
+ /**
+ * @return the pid
+ */
+ public String getPid() {
+ return pid;
+ }
+}
View
388 src/java/com/fourspaces/couchdb/ReplicationTask.java
@@ -0,0 +1,388 @@
+/**
+ * Copyright (c) 2011 Cummings Engineering Consultants, Inc. All Rights Reserved
+ */
+package com.fourspaces.couchdb;
+
+import java.net.InetAddress;
+import java.net.URL;
+
+import net.sf.json.JSONObject;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This class encapsulates the data for a replication task running on a couch server
+ * (as returned from the query to "_active_tasks")
+ *
+ * Note: The "equals()" method is overidden for this class in order to easily compare two tasks. However the
+ * "hashCode()" method was not overidden, so this class should not be used inside hash tables.
+ *
+ * @author anthony.payne
+ *
+ */
+public class ReplicationTask extends CouchTask {
+
+ /**
+ * Logging instance for this class.
+ */
+ static Log log = LogFactory.getLog(Document.class);
+
+ /** Used to identify a running task as a replication task */
+ public static final String TASK_TYPE = "Replication";
+
+ /** Used to parse the string holding the task details */
+ private static final String DELIMITER = " ";
+
+ /** JSON key for the source field */
+ private static final String SOURCE_KEY = "source";
+ /** JSON key for the target field */
+ private static final String TARGET_KEY = "target";
+ /** JSON key for the create_target field */
+ private static final String CREATE_TARGET_KEY = "create_target";
+ /** JSON key for the continuous field */
+ private static final String CONTINUOUS_KEY = "continuous";
+ /** JSON key for the cancel field */
+ private static final String CANCEL_KEY = "cancel";
+
+ /** Source (DB) of the replication */
+ private ReplicationTarget source;
+
+ /** Target (DB) of the replication */
+ private ReplicationTarget destination;
+
+ /** Flag indicating if this is to be a continuous replication or not */
+ private boolean continuous;
+ /** Flag indicating if the target (DB) should be created if it does not exist */
+ private boolean createTarget;
+ /** Flag indicating if this task is to be canceled or not */
+ private boolean cancel;
+
+ /**
+ * Creates a replication task from the task details returned from a DB query about running tasks.
+ *
+ * @param task Task details
+ * @param status Status of task
+ * @param pid PID of task
+ */
+ public ReplicationTask(final String task, final String status, final String pid) {
+ super(TASK_TYPE, task, status, pid);
+
+ source = null;
+ destination = null;
+ continuous = false;
+ createTarget = false;
+ cancel = false;
+ }
+
+ /**
+ * Creates a replication task between the source and destination.
+ *
+ * @param source Source for the replication
+ * @param destination Target for the replication
+ */
+ public ReplicationTask(ReplicationTarget source, ReplicationTarget destination) {
+ super(TASK_TYPE, null, null, null);
+
+ this.source = source;
+ this.destination = destination;
+ }
+
+ /**
+ * Initializes the fields based on the data in the "task" string.
+ * @return
+ */
+ public boolean loadDetailsFromTask() {
+ if(task == null) {
+ return false;
+ }
+
+ String[] parts = task.split(DELIMITER);
+
+ if(parts.length < 4) {
+ log.error("Unable to parse replication task: " + task);
+ return false;
+ }
+
+ // [0] - Task ID
+ // [1] - source URL
+ // [2] - "->"
+ // [3] - destination URL
+
+ source = ReplicationTarget.fromUrl(parts[1]);
+ destination = ReplicationTarget.fromUrl(parts[3]);
+
+ if(source == null || destination == null) {
+ log.error("Unable to extract source and destination details from replication task: " + task);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * @return the source
+ */
+ public ReplicationTarget getSource() {
+ return source;
+ }
+
+ /**
+ * @return the destination
+ */
+ public ReplicationTarget getDestination() {
+ return destination;
+ }
+
+ /**
+ * @return the continuous
+ */
+ public boolean isContinuous() {
+ return continuous;
+ }
+
+ /**
+ * @return The JSON object representing this replication task. Null is returned upon failure.
+ */
+ public JSONObject getCreateRequest() {
+ final JSONObject object = new JSONObject();
+ final String source = this.source.buildUrl();
+ final String destination = this.destination.buildUrl();
+ if(source == null || destination == null) {
+ log.error("Unable to build source or destination URL");
+ return null;
+ }
+ object.put(SOURCE_KEY, source);
+ object.put(TARGET_KEY, destination);
+
+ if(createTarget) {
+ object.put(CREATE_TARGET_KEY, Boolean.TRUE);
+ }
+
+ if(continuous) {
+ object.put(CONTINUOUS_KEY, Boolean.TRUE);
+ }
+
+ if(cancel) {
+ object.put(CANCEL_KEY, Boolean.TRUE);
+ }
+
+ return object;
+ }
+
+ /**
+ * sets the continuous flag.
+ */
+ public void setContinuous() {
+ continuous = true;
+ }
+
+ /**
+ * Sets the create target flag
+ */
+ public void setCreateTarget() {
+ createTarget = true;
+ }
+
+ /**
+ * Sets the cancel flag
+ */
+ public void setCancel() {
+ cancel = true;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if(obj instanceof ReplicationTask) {
+ final ReplicationTask other = (ReplicationTask) obj;
+ if(source.equals(other.source) && destination.equals(other.destination)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Simple class for encapsulating the details about a source or destination of a replication task.
+ *
+ * Note: The "equals()" method is overidden for this class in order to easily compare two targets. However the
+ * "hashCode()" method was not overidden, so this class should not be used inside hash tables.
+ *
+ * @author anthony.payne
+ *
+ */
+ public static class ReplicationTarget {
+ private final static String FULL_URL_PREFIX = "http";
+ private final static String PATH_SEPARATOR = "/";
+ private final static int PORT_NOT_USED = -1;
+
+ /** The item in the database being replicated (could be a DB or a document) */
+ private String replicatedEntity;
+
+ /** The server */
+ private String server;
+
+ /** The Couch port */
+ private int port;
+
+ /** Indicates if this target is remote (true) or local (false) */
+ private boolean isRemote;
+
+ private ReplicationTarget() {
+
+ }
+
+ /**
+ * Constructor used for local targets
+ * @param replicatedEntry
+ */
+ public ReplicationTarget(final String replicatedEntry) {
+ this(replicatedEntry, null, PORT_NOT_USED);
+ }
+
+ /**
+ * Constructor used to specify the server and port along with the entity being replicated.
+ * @param replicatedEntry
+ * @param server
+ * @param port
+ */
+ public ReplicationTarget(final String replicatedEntry, final String server, final int port) {
+ this.replicatedEntity = replicatedEntry;
+ this.server = server;
+ this.port = port;
+ isRemote = (server != null);
+ }
+
+ /**
+ * Builds the URL for this replication source or destination
+ * @return String holding the URL
+ */
+ public String buildUrl() {
+ final StringBuffer buffer = new StringBuffer();
+
+ if(isRemote == false) {
+ buffer.append(replicatedEntity);
+ } else {
+ buffer.append(FULL_URL_PREFIX + "://" + server + ":" + port + "/" + replicatedEntity);
+ }
+
+ return buffer.toString();
+ }
+
+ /**
+ * Constructs a ReplicationTarget given a URL
+ * @param url URL from a replication task
+ * @return The replication target instance if successful; null if not
+ */
+ static private ReplicationTarget fromUrl(final String url) {
+ if(url.startsWith(FULL_URL_PREFIX) == false) {
+ final ReplicationTarget target = new ReplicationTarget();
+ target.isRemote = false;
+ target.port = PORT_NOT_USED;
+ target.replicatedEntity = url;
+ target.server = null;
+ return target;
+ }
+
+ try {
+ final URL asUrl = new URL(url);
+ final ReplicationTarget target = new ReplicationTarget();
+ target.server = asUrl.getHost();
+ target.port = asUrl.getPort();
+ target.replicatedEntity = asUrl.getPath();
+
+ if(target.replicatedEntity.startsWith(PATH_SEPARATOR)) {
+ target.replicatedEntity = target.replicatedEntity.substring(PATH_SEPARATOR.length());
+ }
+ if(target.replicatedEntity.endsWith(PATH_SEPARATOR)) {
+ target.replicatedEntity = target.replicatedEntity.substring(0, target.replicatedEntity.length() -
+ PATH_SEPARATOR.length());
+ }
+
+ target.isRemote = false;
+
+ if(target.server != null) {
+ final InetAddress tempAddress = InetAddress.getByName(target.server);
+ target.isRemote = !(tempAddress.isLoopbackAddress());
+ }
+
+ log.debug(target.toString());
+
+ return target;
+ } catch(final Exception e) {
+ log.debug("Failed to create target due to exception, " + e);
+ }
+ return null;
+ }
+
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("Host: ");
+ if(server != null) {
+ buffer.append(server + ", ");
+ } else {
+ buffer.append("Not set, ");
+ }
+ buffer.append("Port = " + port + ", ");
+ buffer.append("Path = " + replicatedEntity + ", isRemote = " + isRemote);
+ return buffer.toString();
+ }
+
+
+ /**
+ * @return the replicatedEntity
+ */
+ public String getReplicatedEntity() {
+ return replicatedEntity;
+ }
+
+
+ /**
+ * @return the sourceDatabaseServer
+ */
+ public String getServer() {
+ return server;
+ }
+
+
+ /**
+ * @return the sourceDatabasePort
+ */
+ public int getPort() {
+ return port;
+ }
+
+
+ /**
+ * @return the isRemote
+ */
+ public boolean isRemote() {
+ return isRemote;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if(obj instanceof ReplicationTarget) {
+ final ReplicationTarget other = (ReplicationTarget) obj;
+
+ if(((replicatedEntity == null && other.replicatedEntity == null) ||
+ replicatedEntity.equals(other.replicatedEntity)) &&
+ ((server == null && other.server == null) || server.equals(other.server)) &&
+ port == other.port) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+}
View
62 src/java/com/fourspaces/couchdb/Session.java
@@ -23,6 +23,7 @@
import java.util.List;
import net.sf.json.JSONArray;
+import net.sf.json.JSONObject;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
@@ -469,4 +470,65 @@ protected String encodeParameter(String paramValue) {
throw new RuntimeException(e);
}
}
+
+ /**
+ * This method will retrieve a list of replication tasks that are currently running under the couch server this
+ * session is attached to.
+ *
+ * @return List of replication tasks running on the server.
+ */
+ public List<ReplicationTask> getReplicationTasks() {
+ final List<ReplicationTask> replicationTasks = new ArrayList<ReplicationTask>();
+ CouchResponse resp = get("_active_tasks");
+ JSONArray ar = resp.getBodyAsJSONArray();
+
+ for(int i = 0; i < ar.size(); i++) {
+ final JSONObject task = ar.getJSONObject(i);
+
+ if(ReplicationTask.TASK_TYPE.equals(task.getString(CouchTask.TASK_TYPE_KEY))) {
+ final ReplicationTask replicationTask = new ReplicationTask(task.getString(CouchTask.TASK_TASK_KEY),
+ task.getString(CouchTask.TASK_STATUS_KEY), task.getString(CouchTask.TASK_PID_KEY));
+
+ if(replicationTask.loadDetailsFromTask() == true) {
+ replicationTasks.add(replicationTask);
+ } else {
+ log.error("Unable to load replication task details from server response.");
+ }
+ } else {
+ log.trace("Ignoring non-replication task.");
+ }
+ }
+
+ log.trace("Found " + replicationTasks.size() + " replication tasks");
+
+ return replicationTasks;
+ }
+
+ /**
+ * This method will attempt to start the replication task on the couch server instance this session is attached to.
+ *
+ * @param task Task to start on the server
+ * @return True if the task was accepted by the couch server instance; False otherwise
+ */
+ public boolean postReplicationTask(final ReplicationTask task) {
+ final String postUrl = buildUrl("_replicate");
+
+ try {
+
+ log.trace("Post URL: " + postUrl);
+
+ final JSONObject replicateReq = task.getCreateRequest();
+
+ log.trace(replicateReq.toString());
+
+ CouchResponse resp = post("_replicate", replicateReq.toString());
+
+ return (resp.getErrorId() == null);
+ } catch(Exception e) {
+ System.out.println("Exception while attempting post" + e);
+ e.printStackTrace();
+
+ }
+ return false;
+ }
}
View
77 src/test/com/fourspaces/couchdb/test/ReplicationTaskTest.java
@@ -0,0 +1,77 @@
+/**
+ * Copyright (c) 2011 Cummings Engineering Consultants, Inc. All Rights Reserved
+ */
+package com.fourspaces.couchdb.test;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.fourspaces.couchdb.ReplicationTask;
+
+/**
+ * @author anthony.payne
+ *
+ */
+public class ReplicationTaskTest {
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ /**
+ * Test method for {@link com.fourspaces.couchdb.ReplicationTask#loadDetailsFromTask()}.
+ */
+ @Test
+ public void testLoadDetailsFromTask() {
+ final String task = "e9db21: shard_1_entity_db -> http://192.168.4.20:5984/shard_1_entity_db/";
+ final String status = "MR Processed source update #594";
+ final String pid = "<0.201.0>";
+
+ ReplicationTask repTask = new ReplicationTask(task, status, pid);
+
+ assertTrue(repTask.loadDetailsFromTask());
+
+ assertEquals(status, repTask.getStatus());
+ assertEquals(pid, repTask.getPid());
+ assertNotNull(repTask.getSource());
+ assertFalse(repTask.getSource().isRemote());
+ assertEquals("shard_1_entity_db", repTask.getSource().getReplicatedEntity());
+
+ assertNotNull(repTask.getDestination());
+ assertTrue(repTask.getDestination().isRemote());
+ assertEquals("192.168.4.20", repTask.getDestination().getServer());
+ assertEquals("5984", Integer.toString(repTask.getDestination().getPort()));
+ assertEquals("shard_1_entity_db", repTask.getDestination().getReplicatedEntity());
+
+ }
+
+}
Please sign in to comment.
Something went wrong with that request. Please try again.