Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge pull request #138 from nlevitt/novel-quotas
crawl level and host level limits on *novel* (not deduplicated) bytes and urls
  • Loading branch information
adam-miller committed Jan 14, 2016
2 parents b828969 + 9e4c7f0 commit bf2a887
Show file tree
Hide file tree
Showing 17 changed files with 1,313 additions and 164 deletions.
@@ -0,0 +1,135 @@
/*
* This file is part of the Heritrix web crawler (crawler.archive.org).
*
* Licensed to the Internet Archive (IA) by one or more individual
* contributors.
*
* The IA licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.archive.crawler.prefetch;

import java.util.HashMap;
import java.util.Map;

import org.archive.modules.CrawlURI;
import org.archive.modules.FetchChain;
import org.archive.modules.ProcessResult;
import org.archive.modules.Processor;
import org.archive.modules.fetcher.FetchStats;
import org.archive.modules.fetcher.FetchStatusCodes;
import org.archive.modules.net.CrawlHost;
import org.archive.modules.net.ServerCache;
import org.springframework.beans.factory.annotation.Autowired;

import com.google.common.net.InternetDomainName;

/**
* Enforces quotas on a host. Should be configured early in {@link FetchChain} in
* crawler-beans.cxml. Supports quotas on any of the fields tracked in
* {@link FetchStats}.
*
* @see QuotaEnforcer
* @see SourceQuotaEnforcer
* @contributor nlevitt
*/
public class HostQuotaEnforcer extends Processor {

protected ServerCache serverCache;
public ServerCache getServerCache() {
return this.serverCache;
}
@Autowired
public void setServerCache(ServerCache serverCache) {
this.serverCache = serverCache;
}

protected String host;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}

protected boolean applyToSubdomains = false;
public boolean getApplyToSubdomains() {
return applyToSubdomains;
}
/**
* Whether to apply the quotas to each subdomain of {@link #host}
* (separately, not cumulatively).
*/
public void setApplyToSubdomains(boolean applyToSubdomains) {
this.applyToSubdomains = applyToSubdomains;
}

protected Map<String,Long> quotas = new HashMap<String, Long>();
public Map<String, Long> getQuotas() {
return quotas;
}
/**
* Keys can be any of the {@link FetchStats} keys.
*/
public void setQuotas(Map<String, Long> quotas) {
this.quotas = quotas;
}

@Override
protected boolean shouldProcess(CrawlURI curi) {
String uriHostname = serverCache.getHostFor(curi.getUURI()).getHostName();
if (getApplyToSubdomains() && InternetDomainName.isValid(host) && InternetDomainName.isValid(uriHostname)) {
InternetDomainName h = InternetDomainName.from(host);
InternetDomainName uriHostOrAncestor = InternetDomainName.from(uriHostname);
while (true) {
if (uriHostOrAncestor.equals(h)) {
return true;
}
if (uriHostOrAncestor.hasParent()) {
uriHostOrAncestor = uriHostOrAncestor.parent();
} else {
break;
}
}

return false;
} else {
return serverCache.getHostFor(curi.getUURI()) == serverCache.getHostFor(host);
}

}

@Override
protected void innerProcess(CrawlURI curi) throws InterruptedException {
throw new AssertionError();
}

@Override
protected ProcessResult innerProcessResult(CrawlURI curi) throws InterruptedException {
if (!shouldProcess(curi)) {
return ProcessResult.PROCEED;
}

final CrawlHost host = serverCache.getHostFor(curi.getUURI());

for (String k: quotas.keySet()) {
if (host.getSubstats().get(k) >= quotas.get(k)) {
curi.getAnnotations().add("hostQuota:" + k);
curi.setFetchStatus(FetchStatusCodes.S_BLOCKED_BY_QUOTA);
return ProcessResult.FINISH;
}
}

return ProcessResult.PROCEED;
}

}
@@ -0,0 +1,92 @@
/*
* This file is part of the Heritrix web crawler (crawler.archive.org).
*
* Licensed to the Internet Archive (IA) by one or more individual
* contributors.
*
* The IA licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.archive.modules.postprocessor;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;

import org.archive.crawler.framework.CrawlController;
import org.archive.crawler.framework.CrawlStatus;
import org.archive.modules.CrawlURI;
import org.archive.modules.Processor;
import org.archive.modules.writer.WARCWriterProcessor;
import org.springframework.beans.factory.annotation.Autowired;

public class WARCLimitEnforcer extends Processor {

private final static Logger log =
Logger.getLogger(WARCLimitEnforcer.class.getName());

protected Map<String, Map<String, Long>> limits = new HashMap<String, Map<String, Long>>();
/**
* Should match structure of {@link WARCWriterProcessor#getStats()}
* @param limits
*/
public void setLimits(Map<String, Map<String, Long>> limits) {
this.limits = limits;
}
public Map<String, Map<String, Long>> getLimits() {
return limits;
}

protected WARCWriterProcessor warcWriter;
@Autowired
public void setWarcWriter(WARCWriterProcessor warcWriter) {
this.warcWriter = warcWriter;
}
public WARCWriterProcessor getWarcWriter() {
return warcWriter;
}

protected CrawlController controller;
public CrawlController getCrawlController() {
return this.controller;
}
@Autowired
public void setCrawlController(CrawlController controller) {
this.controller = controller;
}

@Override
protected boolean shouldProcess(CrawlURI uri) {
return true;
}

@Override
protected void innerProcess(CrawlURI uri) throws InterruptedException {
for (String j: limits.keySet()) {
for (String k: limits.get(j).keySet()) {
Long limit = limits.get(j).get(k);

Map<String, AtomicLong> valueBucket = warcWriter.getStats().get(j);
if (valueBucket != null) {
AtomicLong value = valueBucket.get(k);
if (value != null
&& value.get() >= limit) {
log.info("stopping crawl because warcwriter stats['" + j + "']['" + k + "']=" + value.get() + " exceeds limit " + limit);
controller.requestCrawlStop(CrawlStatus.FINISHED_WRITE_LIMIT);
}
}
}
}
}

}
Expand Up @@ -228,9 +228,6 @@ protected Map<String, Object> parseHbaseResult(CrawlURI curi, Result hbaseResult
public void store(CrawlURI curi) {
if (!curi.hasContentDigestHistory()
|| curi.getContentDigestHistory().isEmpty()) {
logger.warning("not saving empty content digest history (do you "
+ " have a ContentDigestHistoryLoader in your disposition"
+ " chain?) - " + curi);
return;
}
if (logger.isLoggable(Level.FINER)) {
Expand Down
Expand Up @@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.archive.crawler.framework;

import org.archive.crawler.event.StatSnapshotEvent;
Expand Down Expand Up @@ -47,6 +47,59 @@ public void setMaxBytesDownload(long maxBytesDownload) {
this.maxBytesDownload = maxBytesDownload;
}

protected long maxNovelBytes = 0L;
public long getMaxNovelBytes() {
return maxNovelBytes;
}
/**
* Maximum number of uncompressed payload bytes to write to WARC response or
* resource records. Once this number is exceeded the crawler will stop. A
* value of zero means no upper limit.
*/
public void setMaxNovelBytes(long maxNovelBytes) {
this.maxNovelBytes = maxNovelBytes;
}

protected long maxNovelUrls = 0L;
public long getMaxNovelUrls() {
return maxNovelUrls;
}
/**
* Maximum number of novel (not deduplicated) urls to download. Once this
* number is exceeded the crawler will stop. A value of zero means no upper
* limit.
*/
public void setMaxNovelUrls(long maxNovelUrls) {
this.maxNovelUrls = maxNovelUrls;
}

protected long maxWarcNovelUrls = 0L;
public long getMaxWarcNovelUrls() {
return maxWarcNovelUrls;
}
/**
* Maximum number of urls to write to WARC response or resource records.
* Once this number is exceeded the crawler will stop. A value of zero means
* no upper limit.
*/
public void setMaxWarcNovelUrls(long maxWarcNovelUrls) {
this.maxWarcNovelUrls = maxWarcNovelUrls;
}

protected long maxWarcNovelBytes = 0L;
public long getMaxWarcNovelBytes() {
return maxWarcNovelBytes;
}

/**
* Maximum number of novel (not deduplicated) bytes to write to WARC
* response or resource records. Once this number is exceeded the crawler
* will stop. A value of zero means no upper limit.
*/
public void setMaxWarcNovelBytes(long maxWarcNovelBytes) {
this.maxWarcNovelBytes = maxWarcNovelBytes;
}

/**
* Maximum number of documents to download. Once this number is exceeded the
* crawler will stop. A value of zero means no upper limit.
Expand Down Expand Up @@ -79,25 +132,34 @@ public CrawlController getCrawlController() {
public void setCrawlController(CrawlController controller) {
this.controller = controller;
}

@Override
public void onApplicationEvent(ApplicationEvent event) {
if(event instanceof StatSnapshotEvent) {
CrawlStatSnapshot snapshot = ((StatSnapshotEvent)event).getSnapshot();
checkForLimitsExceeded(snapshot);
}
}

protected void checkForLimitsExceeded(CrawlStatSnapshot snapshot) {
if (maxBytesDownload > 0 && snapshot.bytesProcessed >= maxBytesDownload) {
controller.requestCrawlStop(CrawlStatus.FINISHED_DATA_LIMIT);
} else if (maxNovelBytes > 0 && snapshot.novelBytes >= maxNovelBytes) {
controller.requestCrawlStop(CrawlStatus.FINISHED_DATA_LIMIT);
} else if (maxWarcNovelBytes > 0 && snapshot.warcNovelBytes >= maxWarcNovelBytes) {
controller.requestCrawlStop(CrawlStatus.FINISHED_DATA_LIMIT);
} else if (maxDocumentsDownload > 0
&& snapshot.downloadedUriCount >= maxDocumentsDownload) {
controller.requestCrawlStop(CrawlStatus.FINISHED_DOCUMENT_LIMIT);
} else if (maxNovelUrls > 0
&& snapshot.novelUriCount >= maxNovelUrls) {
controller.requestCrawlStop(CrawlStatus.FINISHED_DOCUMENT_LIMIT);
} else if (maxWarcNovelUrls > 0
&& snapshot.warcNovelUriCount >= maxWarcNovelUrls) {
controller.requestCrawlStop(CrawlStatus.FINISHED_DOCUMENT_LIMIT);
} else if (maxTimeSeconds > 0
&& snapshot.elapsedMilliseconds >= maxTimeSeconds * 1000) {
controller.requestCrawlStop(CrawlStatus.FINISHED_TIME_LIMIT);
}
}

}

0 comments on commit bf2a887

Please sign in to comment.