Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
*/
public abstract class AbstractFunctionalTest extends BasicJavaClientREST {

protected final static String OPTIC_USER = "opticUser";
protected final static String OPTIC_USER_PASSWORD = "0pt1c";
protected final static String DB_NAME = "java-functest";

protected static DatabaseClient client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.marklogic.client.datamovement.functionaltests;
package com.marklogic.client.fastfunctest;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -23,6 +23,7 @@
import com.marklogic.client.admin.TransformExtensionsManager;
import com.marklogic.client.datamovement.*;
import com.marklogic.client.datamovement.ApplyTransformListener.ApplyResult;
import com.marklogic.client.datamovement.functionaltests.WriteHostBatcherTest;
import com.marklogic.client.document.DocumentPage;
import com.marklogic.client.document.DocumentRecord;
import com.marklogic.client.document.ServerTransform;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,54 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.marklogic.client.datamovement.functionaltests;
package com.marklogic.client.fastfunctest;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.marklogic.client.DatabaseClient;
import com.marklogic.client.datamovement.*;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.DeleteListener;
import com.marklogic.client.datamovement.JobTicket;
import com.marklogic.client.datamovement.QueryBatcher;
import com.marklogic.client.datamovement.UrisToWriterListener;
import com.marklogic.client.datamovement.WriteBatcher;
import com.marklogic.client.datamovement.functionaltests.WriteHostBatcherTest;
import com.marklogic.client.document.DocumentPage;
import com.marklogic.client.document.DocumentRecord;
import com.marklogic.client.functionaltest.BasicJavaClientREST;
import com.marklogic.client.io.*;
import com.marklogic.client.io.DocumentMetadataHandle;
import com.marklogic.client.io.FileHandle;
import com.marklogic.client.io.Format;
import com.marklogic.client.io.JacksonHandle;
import com.marklogic.client.io.StringHandle;
import com.marklogic.client.query.StructuredQueryBuilder;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.*;

import java.io.*;
import java.util.*;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class DeleteListenerTest extends BasicJavaClientREST {
public class DeleteListenerTest extends AbstractFunctionalTest {

private static String dbName = "DeleteListener";
private static DataMovementManager dmManager = null;
private static final String TEST_DIR_PREFIX = "/WriteHostBatcher-testdata/";

private static DatabaseClient dbClient;
private static String user = "admin";
private static int port = 8000;
private static String password = "admin";
private static String server = "App-Services";
private static JacksonHandle jacksonHandle;
private static StringHandle stringHandle;
private static FileHandle fileHandle;
Expand All @@ -54,37 +71,10 @@ public class DeleteListenerTest extends BasicJavaClientREST {
private static File fileJson;
private static JsonNode jsonNode;
private static final String query1 = "fn:count(fn:doc())";
private static String[] hostNames;
private static int forestCount = 1;

@BeforeAll
public static void setUpBeforeClass() throws Exception {
loadGradleProperties();
server = getRestAppServerName();
port = getRestAppServerPort();

hostNames = getHosts();
createDB(dbName);
Thread.currentThread().sleep(500L);
//Ensure DB has at-least one forest
createForestonHost(dbName + "-" + forestCount, dbName, hostNames[0]);
forestCount++;
for (String forestHost : hostNames) {
for(int i = 0; i < new Random().nextInt(3); i++) {
createForestonHost(dbName + "-" + forestCount, dbName, forestHost);
forestCount++;
}
Thread.currentThread().sleep(500L);
}
// Create App Server if needed.
createRESTServerWithDB(server, port);

assocRESTServer(server, dbName, port);
if (IsSecurityEnabled()) {
enableSecurityOnRESTServer(server, dbName);
}

dbClient = getDatabaseClient(user, password, getConnType());
dbClient = newDatabaseClientBuilder().build();
dmManager = dbClient.newDataMovementManager();

// JacksonHandle
Expand All @@ -105,21 +95,10 @@ public static void setUpBeforeClass() throws Exception {
fileHandle.setFormat(Format.JSON);
}

@AfterAll
public static void tearDownAfterClass() throws Exception {
associateRESTServerWithDB(server, "Documents");
for (int i = 0; i < forestCount -1; i++) {
System.out.println(dbName + "-" + (i + 1));
detachForest(dbName, dbName + "-" + (i + 1));
deleteForest(dbName + "-" + (i + 1));
}

deleteDB(dbName);
}

@BeforeEach
public void setUp() throws Exception {
Thread.currentThread().sleep(1000L);
deleteDocuments(client);

WriteBatcher ihb2 = dmManager.newWriteBatcher();
ihb2.withBatchSize(27).withThreadCount(10);
dmManager.startJob(ihb2);
Expand All @@ -132,11 +111,6 @@ public void setUp() throws Exception {
assertEquals(2000, dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue());
}

@AfterEach
public void tearDown() throws Exception {
clearDB(port);
}

@Test
public void massDeleteSingleThread() throws Exception {
HashSet<String> urisList = new HashSet<>();
Expand All @@ -162,7 +136,6 @@ public void massDeleteSingleThread() throws Exception {
queryBatcher.awaitCompletion();
dmManager.stopJob(ticket);

Thread.currentThread().sleep(2000L);
assertEquals(2000, urisList.size());

AtomicInteger successDocs = new AtomicInteger();
Expand Down Expand Up @@ -241,32 +214,31 @@ public void massDeleteMultipleThreads() throws Exception {
}

@Test
public void massDeleteConsistentSnapShot() throws Exception {
Map<String, String> props = new HashMap<String, String>();
props.put("merge-timestamp", "-6000000000");
changeProperty(props, "/manage/v2/databases/" + dbName + "/properties");
Thread.currentThread().sleep(5000L);

QueryBatcher queryBatcher = dmManager.newQueryBatcher(
new StructuredQueryBuilder().collection("DeleteListener"))
.withBatchSize(7)
.withConsistentSnapshot()
.withThreadCount(5)
.onUrisReady(new DeleteListener())
.onQueryFailure(throwable -> {
System.out.println("Exceptions thrown from callback onQueryFailure");
throwable.printStackTrace();

});
public void massDeleteConsistentSnapShot() {
setMergeTimestamp(DB_NAME, "-600000000");

try {
QueryBatcher queryBatcher = dmManager.newQueryBatcher(
new StructuredQueryBuilder().collection("DeleteListener"))
.withBatchSize(7)
.withConsistentSnapshot()
.withThreadCount(5)
.onUrisReady(new DeleteListener())
.onQueryFailure(throwable -> {
System.out.println("Exceptions thrown from callback onQueryFailure");
throwable.printStackTrace();

});

JobTicket ticket = dmManager.startJob(queryBatcher);
queryBatcher.awaitCompletion();
dmManager.stopJob(ticket);

assertEquals(0, dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue());
} finally {
setMergeTimestamp(DB_NAME, "0");
}

JobTicket ticket = dmManager.startJob(queryBatcher);
queryBatcher.awaitCompletion();
dmManager.stopJob(ticket);

props.put("merge-timestamp", "0");
changeProperty(props, "/manage/v2/databases/" + dbName + "/properties");
// if ( failures2.length() > 0 ) fail(failures2.toString());
assertEquals(0, dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue());
}

@Test
Expand Down Expand Up @@ -303,84 +275,85 @@ public void deleteNonExistentDoc() throws Exception {
// ISSUE 94
@Test
public void deleteServerFile() throws Exception {

Map<String, String> props = new HashMap<String, String>();
props.put("merge-timestamp", "-6000000000");
changeProperty(props, "/manage/v2/databases/" + dbName + "/properties");
Thread.currentThread().sleep(5000L);

class MyRunnable implements Runnable {
@Override
public void run() {
for (int j = 1999; j >= 200; j--) {
dbClient.newDocumentManager().delete("/local/json-" + j);
}
}
}
Thread t1;
t1 = new Thread(new MyRunnable());

Set<String> urisList = Collections.synchronizedSet(new HashSet<>());

QueryBatcher queryBatcher = dmManager.newQueryBatcher(
new StructuredQueryBuilder().collection("DeleteListener"))
.withBatchSize(11)
.withThreadCount(4)
.withConsistentSnapshot()
.onUrisReady(batch -> {
for (String s : batch.getItems()) {
urisList.add(s);
}
})
.onQueryFailure(throwable -> {
System.out.println("Exceptions thrown from callback onQueryFailure");
throwable.printStackTrace();

});

t1.start();
JobTicket ticket = dmManager.startJob(queryBatcher);

queryBatcher.awaitCompletion();
t1.join();
dmManager.stopJob(ticket);

System.out.println("URI's size " + urisList.size());
AtomicInteger successDocs = new AtomicInteger();
Set<String> uris2 = Collections.synchronizedSet(new HashSet<>());
StringBuffer failures2 = new StringBuffer();

QueryBatcher deleteBatcher = dmManager.newQueryBatcher(urisList.iterator())
.withBatchSize(13)
.withThreadCount(5)
.onUrisReady(new DeleteListener())
.onUrisReady(batch -> successDocs.addAndGet(batch.getItems().length))
.onUrisReady(batch -> uris2.addAll(Arrays.asList(batch.getItems())))
.onQueryFailure(throwable -> {
throwable.printStackTrace();
failures2.append("ERROR:[" + throwable + "]\n");
});

JobTicket delTicket = dmManager.startJob(deleteBatcher);
deleteBatcher.awaitCompletion();
dmManager.stopJob(delTicket);

if (failures2.length() > 0)
fail(failures2.toString());

assertEquals(0, dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue());

DocumentPage page = dbClient.newDocumentManager().read("/local/json-1998");
JacksonHandle dh = new JacksonHandle();
while (page.hasNext()) {
DocumentRecord rec = page.next();
rec.getContent(dh);
System.out.println("Results are: " + dh.get().get("k1").asText());

}

props.put("merge-timestamp", "0");
changeProperty(props, "/manage/v2/databases/" + dbName + "/properties");
// if (true) return;

setMergeTimestamp(DB_NAME, "-6000000000");

try {
// No idea what this is supposed to be doing, as the docs it's deleting don't ever exist.
// j initially was 1999 but that was causing the test to take about 20s, so lowered it.
class MyRunnable implements Runnable {
@Override
public void run() {
for (int j = 300; j >= 200; j--) {
dbClient.newDocumentManager().delete("/local/json-" + j);
}
}
}
Thread t1;
t1 = new Thread(new MyRunnable());

Set<String> urisList = Collections.synchronizedSet(new HashSet<>());

QueryBatcher queryBatcher = dmManager.newQueryBatcher(
new StructuredQueryBuilder().collection("DeleteListener"))
.withBatchSize(11)
.withThreadCount(4)
.withConsistentSnapshot()
.onUrisReady(batch -> {
for (String s : batch.getItems()) {
urisList.add(s);
}
})
.onQueryFailure(throwable -> {
System.out.println("Exceptions thrown from callback onQueryFailure");
throwable.printStackTrace();

});

t1.start();
JobTicket ticket = dmManager.startJob(queryBatcher);

queryBatcher.awaitCompletion();
t1.join();
dmManager.stopJob(ticket);

System.out.println("URI's size " + urisList.size());
AtomicInteger successDocs = new AtomicInteger();
Set<String> uris2 = Collections.synchronizedSet(new HashSet<>());
StringBuffer failures2 = new StringBuffer();

QueryBatcher deleteBatcher = dmManager.newQueryBatcher(urisList.iterator())
.withBatchSize(13)
.withThreadCount(5)
.onUrisReady(new DeleteListener())
.onUrisReady(batch -> successDocs.addAndGet(batch.getItems().length))
.onUrisReady(batch -> uris2.addAll(Arrays.asList(batch.getItems())))
.onQueryFailure(throwable -> {
throwable.printStackTrace();
failures2.append("ERROR:[" + throwable + "]\n");
});

JobTicket delTicket = dmManager.startJob(deleteBatcher);
deleteBatcher.awaitCompletion();
dmManager.stopJob(delTicket);

if (failures2.length() > 0)
fail(failures2.toString());

assertEquals(0, dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue());

DocumentPage page = dbClient.newDocumentManager().read("/local/json-1998");
JacksonHandle dh = new JacksonHandle();
while (page.hasNext()) {
DocumentRecord rec = page.next();
rec.getContent(dh);
System.out.println("Results are: " + dh.get().get("k1").asText());

}
} finally {
setMergeTimestamp(DB_NAME, "0");
}
}

@Test
Expand Down
Loading