Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bdb frontier access #289

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
<dependency>
<groupId>com.sleepycat</groupId>
<artifactId>je</artifactId>
<version>4.1.6</version>
<version>7.5.11</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
Expand Down
53 changes: 53 additions & 0 deletions engine/src/main/java/org/archive/crawler/framework/CrawlJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,25 @@
package org.archive.crawler.framework;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.logging.FileHandler;
import java.util.logging.Formatter;
import java.util.logging.Handler;
Expand All @@ -51,13 +58,15 @@
import org.apache.commons.lang.StringUtils;
import org.archive.crawler.event.CrawlStateEvent;
import org.archive.crawler.framework.CrawlController.StopCompleteEvent;
import org.archive.crawler.frontier.WorkQueue;
import org.archive.crawler.reporting.AlertThreadGroup;
import org.archive.crawler.reporting.CrawlStatSnapshot;
import org.archive.crawler.reporting.StatisticsTracker;
import org.archive.spring.ConfigPath;
import org.archive.spring.ConfigPathConfigurer;
import org.archive.spring.PathSharingContext;
import org.archive.util.ArchiveUtils;
import org.archive.util.ObjectIdentityCache;
import org.archive.util.TextUtils;
import org.joda.time.DateTime;
import org.springframework.beans.BeanWrapperImpl;
Expand Down Expand Up @@ -970,4 +979,48 @@ public String getJobStatusDescription() {
return "Finished: "+getCrawlController().getCrawlExitStatus();
}
}

protected Semaphore exportLock = new Semaphore(1);

public long exportPendingUris() {
CrawlController cc = getCrawlController();
if (cc==null) {
return -1L;
}
if (!cc.isPaused()) {
cc.requestCrawlPause();
return -2L;
}
Frontier f = cc.getFrontier();
if (f == null) {
return -3L;
}
long pendingUrisCount = 0L;
boolean bLocked = exportLock.tryAcquire();
if (bLocked) {
try {
File outFile = new File(getJobDir(), "pendingUris.txt");
if (outFile.exists()) {
outFile.delete();
}
FileOutputStream out = new FileOutputStream(outFile);
OutputStreamWriter outStreamWriter = new OutputStreamWriter(out, StandardCharsets.UTF_8);
PrintWriter writer = new PrintWriter(new BufferedWriter(outStreamWriter, 65536));
pendingUrisCount = f.exportPendingUris(writer);
writer.close();
outStreamWriter.close();
out.close();
}
catch (IOException e) {
LOGGER.log(Level.SEVERE, e.getMessage(), e);
}
finally {
exportLock.release();
}
}
else {
return -4L;
}
return pendingUrisCount;
}
}//EOC
14 changes: 14 additions & 0 deletions engine/src/main/java/org/archive/crawler/framework/Frontier.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Set;
import java.util.concurrent.BlockingQueue;

import javax.management.openmbean.CompositeData;

import org.archive.crawler.frontier.FrontierJournal;
import org.archive.crawler.reporting.StatisticsTracker;
import org.archive.crawler.frontier.WorkQueue;
import org.archive.modules.CrawlURI;
import org.archive.modules.deciderules.DecideRule;
import org.archive.modules.fetcher.FetchStats;
import org.archive.util.IdentityCacheable;
import org.archive.util.ObjectIdentityCache;
import org.archive.util.Reporter;
import org.json.JSONException;
import org.springframework.context.Lifecycle;
Expand Down Expand Up @@ -522,4 +527,13 @@ public enum State {
* conditions need to be free to call this 'just in case'.
*/
public void endDisposition();

public long exportPendingUris(PrintWriter writer);

public ObjectIdentityCache<WorkQueue> getAllQueues();

public BlockingQueue<String> getReadyClassQueues();

public Set<WorkQueue> getInProcessQueues();

}
27 changes: 27 additions & 0 deletions engine/src/main/java/org/archive/crawler/frontier/BdbFrontier.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.io.PrintWriter;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -42,6 +44,7 @@
import org.archive.checkpointing.Checkpointable;
import org.archive.modules.CrawlURI;
import org.archive.util.ArchiveUtils;
import org.archive.util.ObjectIdentityCache;
import org.archive.util.Supplier;
import org.json.JSONArray;
import org.json.JSONException;
Expand Down Expand Up @@ -468,4 +471,28 @@ protected void consistencyMarkup(
queueSummaries.put(key, val);
}
}

@Override
public long exportPendingUris(PrintWriter writer) {
if (pendingUris == null) {
return -5L;
}
return pendingUris.exportPendingUris(writer);
}

@Override
public ObjectIdentityCache<WorkQueue> getAllQueues() {
return allQueues;
}

@Override
public BlockingQueue<String> getReadyClassQueues() {
return readyClassQueues;
}

@Override
public Set<WorkQueue> getInProcessQueues() {
return inProcessQueues;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.archive.crawler.frontier;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.math.BigInteger;
import java.util.ArrayList;
Expand Down Expand Up @@ -558,4 +559,30 @@ protected void forAllPendingDo(Closure c) throws DatabaseException {
}
cursor.close();
}

/**
* Run through all uris in the pending uris database and write them to the writer.
* @param writer destination writer for writting all the uris
* @return number of uris written to the writer
*/
public long exportPendingUris(PrintWriter writer) {
if (this.pendingUrisDB == null) {
return -6L;
}
sync();
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
long uris = 0L;
Cursor cursor = pendingUrisDB.openCursor(null, null);
while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS) {
if (value.getData().length == 0) {
continue;
}
CrawlURI item = (CrawlURI) crawlUriBinding.entryToObject(value);
writer.println(item.toString());
++uris;
}
cursor.close();
return uris;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;

import javax.management.openmbean.CompositeData;

Expand All @@ -32,6 +34,7 @@
import org.archive.crawler.framework.Frontier;
import org.archive.crawler.framework.Frontier.FrontierGroup;
import org.archive.crawler.frontier.FrontierJournal;
import org.archive.crawler.frontier.WorkQueue;
import org.archive.modules.CoreAttributeConstants;
import org.archive.modules.CrawlURI;
import org.archive.modules.ProcessResult;
Expand Down Expand Up @@ -289,6 +292,22 @@ public void beginDisposition(CrawlURI curi) {
@Override
public void endDisposition() {
}
@Override
public long exportPendingUris(PrintWriter writer) {
return 0;
}
@Override
public ObjectIdentityCache<WorkQueue> getAllQueues() {
return null;
}
@Override
public BlockingQueue<String> getReadyClassQueues() {
return null;
}
@Override
public Set<WorkQueue> getInProcessQueues() {
return null;
}
}

// separate methods to make it easier to know what failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -301,12 +302,14 @@ public void testConcurrentLoadNoDomainCookieLimitBreach() throws IOException, In
bdbCookieStore().clear();
basicCookieStore().clear();
final Random rand = new Random();

final AtomicBoolean keepRunning = new AtomicBoolean(true);

Runnable runnable = new Runnable() {
@Override
public void run() {
try {
while (!Thread.interrupted()) {
while (keepRunning.get()) {
BasicClientCookie cookie = new BasicClientCookie(UUID.randomUUID().toString(), UUID.randomUUID().toString());
cookie.setDomain("d" + rand.nextInt() + ".example.com");
bdbCookieStore().addCookie(cookie);
Expand All @@ -326,10 +329,9 @@ public void run() {
}

Thread.sleep(5000);

for (int i = 0; i < threads.length; i++) {
threads[i].interrupt();
}

// Shutdown the threads:
keepRunning.set(false);
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
Expand Down
9 changes: 9 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ http://maven.apache.org/guides/mini/guide-m1-m2.html
<id>builds.archive.org,maven2</id>
<url>http://builds.archive.org/maven2</url>
</repository>
<repository>
<id>oracleReleases</id>
<name>Oracle Released Java Packages</name>
<url>http://download.oracle.com/maven</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots />
</repository>
</repositories>

<prerequisites>
Expand Down