Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warc writer chain #285

Merged
merged 18 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.util.Map;

import org.archive.crawler.restlet.XmlMarshaller;
import org.archive.modules.writer.WARCWriterProcessor;
import org.archive.modules.writer.BaseWARCWriterProcessor;
import org.archive.util.ArchiveUtils;

public class XmlCrawlSummaryReport extends Report {
Expand All @@ -28,7 +28,7 @@ public void write(PrintWriter writer, StatisticsTracker stats) {
CrawlStatSnapshot snapshot = stats.getLastSnapshot();

info.put("crawlName",
((WARCWriterProcessor) stats.appCtx.getBean("warcWriter")).getPrefix());
((BaseWARCWriterProcessor) stats.appCtx.getBean("warcWriter")).getPrefix());
info.put("crawlJobShortName",
stats.getCrawlController().getMetadata().getJobName());
info.put("scheduledDate", this.scheduledDate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

package org.archive.modules.extractor;

import static org.archive.format.warc.WARCConstants.HEADER_KEY_CONCURRENT_TO;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.List;
import java.io.StringWriter;
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -34,28 +37,52 @@

import org.apache.commons.httpclient.URIException;
import org.archive.crawler.reporting.CrawlerLoggerModule;
import org.archive.format.warc.WARCConstants.WARCRecordType;
import org.archive.io.warc.WARCRecordInfo;
import org.archive.modules.CoreAttributeConstants;
import org.archive.modules.CrawlURI;
import org.archive.modules.warc.BaseWARCRecordBuilder;
import org.archive.modules.warc.WARCRecordBuilder;
import org.archive.net.UURI;
import org.archive.net.UURIFactory;
import org.archive.util.ArchiveUtils;
import org.archive.util.MimetypeUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.Lifecycle;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonStreamParser;
import com.google.gson.internal.Streams;
import com.google.gson.stream.JsonWriter;

/**
* Extracts links to media by running youtube-dl in a subprocess. Runs only on
* html.
*
* <p>
* Also implements {@link WARCRecordBuilder} to write youtube-dl json to the
* warc.
*
* <p>
* To use <code>ExtractorYoutubeDL</code>, add this top-level bean:
*
* <pre>
* &lt;bean id="extractorYoutubeDL" class="org.archive.modules.extractor.ExtractorYoutubeDL"/&gt;
* </pre>
*
* Then add <code>&lt;ref bean="extractorYoutubeDL"/&gt;</code> to end of the
* fetch chain, and to the end of the warc writer chain.
*
* <p>
* Keeps a log of containing pages and media captured as a result of youtube-dl
* extraction. The format of the log is as follows:
*
* <pre>[timestamp] [media-http-status] [media-length] [media-mimetype] [media-digest] [media-timestamp] [media-url] [annotation] [containing-page-digest] [containing-page-timestamp] [containing-page-url] [seed-url]</pre>
* <pre>
* [timestamp] [media-http-status] [media-length] [media-mimetype] [media-digest] [media-timestamp] [media-url] [annotation] [containing-page-digest] [containing-page-timestamp] [containing-page-url] [seed-url]
* </pre>
*
* <p>
* For containing pages, all of the {@code media-*} fields have the value
Expand All @@ -71,14 +98,17 @@
*
* @author nlevitt
*/
public class ExtractorYoutubeDL extends Extractor implements Lifecycle {
public class ExtractorYoutubeDL extends Extractor
implements Lifecycle, WARCRecordBuilder {
private static Logger logger =
Logger.getLogger(ExtractorYoutubeDL.class.getName());

protected static final String YDL_CONTAINING_PAGE_DIGEST = "ydl-containing-page-digest";
protected static final String YDL_CONTAINING_PAGE_TIMESTAMP = "ydl-containing-page-timestamp";
protected static final String YDL_CONTAINING_PAGE_URI = "ydl-containing-page-uri";

protected static final int MAX_VIDEOS_PER_PAGE = 1000;

protected transient Logger ydlLogger = null;

protected CrawlerLoggerModule crawlerLoggerModule;
Expand Down Expand Up @@ -137,15 +167,21 @@ protected void extract(CrawlURI uri) {
logCapturedVideo(uri, ydlAnnotation);
}
} else {
List<JsonObject> ydlJsons = runYoutubeDL(uri);
if (ydlJsons != null && !ydlJsons.isEmpty()) {
for (JsonObject json: ydlJsons) {
JsonObject ydlJson = runYoutubeDL(uri);
if (ydlJson != null && ydlJson.has("entries")) {
JsonArray jsonEntries = ydlJson.getAsJsonArray("entries");
for (JsonElement e: jsonEntries) {
JsonObject json = (JsonObject) e;
if (json.get("url") != null) {
String videoUrl = json.get("url").getAsString();
addVideoOutlink(uri, json, videoUrl);
}
}
String annotation = "youtube-dl:" + ydlJsons.size();

// XXX this can be large, consider using a RecordingOutputStream
uri.getData().put("ydlJson", ydlJson);

String annotation = "youtube-dl:" + jsonEntries.size();
uri.getAnnotations().add(annotation);
logContainingPage(uri, annotation);
}
Expand Down Expand Up @@ -290,13 +326,7 @@ public String call() throws IOException {
return output;
}

/**
*
* @param uri
* @return list of json blobs returned by {@code youtube-dl --dump-json}, or
* empty list if no videos found, or failure
*/
protected List<JsonObject> runYoutubeDL(CrawlURI uri) {
protected JsonObject runYoutubeDL(CrawlURI uri) {
/*
* --format=best
*
Expand All @@ -305,7 +335,8 @@ protected List<JsonObject> runYoutubeDL(CrawlURI uri) {
* https://github.com/ytdl-org/youtube-dl/blob/master/README.md#format-selection
*/
ProcessBuilder pb = new ProcessBuilder("youtube-dl", "--ignore-config",
"--simulate", "--dump-json", "--format=best", uri.toString());
"--simulate", "--dump-single-json", "--format=best",
"--playlist-end=" + MAX_VIDEOS_PER_PAGE, uri.toString());
logger.fine("running " + pb.command());

Process proc = null;
Expand Down Expand Up @@ -344,11 +375,11 @@ protected List<JsonObject> runYoutubeDL(CrawlURI uri) {
proc.destroyForcibly();
}

List<JsonObject> ydlJsons = new ArrayList<JsonObject>();
JsonStreamParser parser = new JsonStreamParser(output.stdout);
JsonObject ydlJson = null;
try {
while (parser.hasNext()) {
ydlJsons.add((JsonObject) parser.next());
if (parser.hasNext()) {
ydlJson = (JsonObject) parser.next();
}
} catch (JsonParseException e) {
// sometimes we get no output at all from youtube-dl, which
Expand All @@ -361,7 +392,7 @@ protected List<JsonObject> runYoutubeDL(CrawlURI uri) {
return null;
}

return ydlJsons;
return ydlJson;
}

@Override
Expand Down Expand Up @@ -401,4 +432,40 @@ protected boolean shouldExtract(CrawlURI uri) {

return false;
}

@Override
public boolean shouldBuildRecord(CrawlURI curi) {
return curi.containsDataKey("ydlJson");
}

@Override
public WARCRecordInfo buildRecord(CrawlURI curi, URI concurrentTo)
throws IOException {
final String timestamp =
ArchiveUtils.getLog14Date(curi.getFetchBeginTime());

WARCRecordInfo recordInfo = new WARCRecordInfo();
recordInfo.setType(WARCRecordType.metadata);
recordInfo.setRecordId(BaseWARCRecordBuilder.generateRecordID());
if (concurrentTo != null) {
recordInfo.addExtraHeader(HEADER_KEY_CONCURRENT_TO,
"<" + concurrentTo + ">");
}
recordInfo.setUrl("youtube-dl:" + curi);
recordInfo.setCreate14DigitDate(timestamp);
recordInfo.setMimetype("application/vnd.youtube-dl_formats+json;charset=utf-8");
recordInfo.setEnforceLength(true);

JsonObject ydlJson = (JsonObject) curi.getData().get("ydlJson");
StringWriter stringWriter = new StringWriter();
JsonWriter jsonWriter = new JsonWriter(stringWriter);
jsonWriter.setIndent(" ");
Streams.write(ydlJson, jsonWriter);

byte[] b = stringWriter.toString().getBytes("UTF-8");
recordInfo.setContentStream(new ByteArrayInputStream(b));
recordInfo.setContentLength((long) b.length);

return recordInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
*/
package org.archive.modules.postprocessor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import java.util.ArrayList;
import java.util.List;

import org.archive.crawler.framework.CrawlController;
import org.archive.crawler.framework.CrawlStatus;
import org.archive.modules.CrawlURI;
import org.archive.modules.Processor;
import org.archive.modules.writer.WARCWriterProcessor;
import org.archive.modules.writer.BaseWARCWriterProcessor;
import org.springframework.beans.factory.annotation.Autowired;

public class WARCLimitEnforcer extends Processor {
Expand All @@ -38,7 +39,7 @@ public class WARCLimitEnforcer extends Processor {

protected Map<String, Map<String, Long>> limits = new HashMap<String, Map<String, Long>>();
/**
* Should match structure of {@link WARCWriterProcessor#getStats()}
* Should match structure of {@link BaseWARCWriterProcessor#getStats()}
* @param limits
*/
public void setLimits(Map<String, Map<String, Long>> limits) {
Expand All @@ -48,23 +49,23 @@ public Map<String, Map<String, Long>> getLimits() {
return limits;
}

protected WARCWriterProcessor warcWriter;
protected BaseWARCWriterProcessor warcWriter;
@Autowired
public void setWarcWriter(WARCWriterProcessor warcWriter) {
public void setWarcWriter(BaseWARCWriterProcessor warcWriter) {
this.warcWriter = warcWriter;
}
public WARCWriterProcessor getWarcWriter() {
public BaseWARCWriterProcessor getWarcWriter() {
return warcWriter;
}

{
setWarcWriters(new ArrayList<WARCWriterProcessor>());
setWarcWriters(new ArrayList<BaseWARCWriterProcessor>());
}
@SuppressWarnings("unchecked")
public List<WARCWriterProcessor> getWarcWriters() {
return (List<WARCWriterProcessor>) kp.get("warcWriters");
public List<BaseWARCWriterProcessor> getWarcWriters() {
return (List<BaseWARCWriterProcessor>) kp.get("warcWriters");
}
public void setWarcWriters(List<WARCWriterProcessor> warcWriters) {
public void setWarcWriters(List<BaseWARCWriterProcessor> warcWriters) {
kp.put("warcWriters", warcWriters);
}

Expand All @@ -91,7 +92,7 @@ protected void innerProcess(CrawlURI uri) throws InterruptedException {
AtomicLong value = null;
if(getWarcWriters() !=null && getWarcWriters().size()>0) {
value = new AtomicLong(0);
for (WARCWriterProcessor w: getWarcWriters()) {
for (BaseWARCWriterProcessor w: getWarcWriters()) {
Map<String, AtomicLong> valueBucket = w.getStats().get(j);
if(valueBucket != null) {
value.set(value.addAndGet(valueBucket.get(k).get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import org.archive.crawler.event.CrawlStateEvent;
import org.archive.modules.CrawlURI;
import org.archive.modules.writer.WARCWriterProcessor;
import org.archive.modules.writer.WARCWriterChainProcessor;
import org.archive.spring.HasKeyedProperties;
import org.archive.spring.KeyedProperties;
import org.archive.trough.TroughClient;
Expand All @@ -31,7 +31,7 @@
* <p>To use, define a {@code TroughContentDigestHistory} top-level bean in your
* crawler-beans.cxml, then add {@link ContentDigestHistoryLoader} and
* {@link ContentDigestHistoryStorer} to your fetch chain, sandwiching the
* {@link WARCWriterProcessor}. In other words, follow the directions at
* {@link WARCWriterChainProcessor}. In other words, follow the directions at
* <a href="https://github.com/internetarchive/heritrix3/wiki/Duplication%20Reduction%20Processors">https://github.com/internetarchive/heritrix3/wiki/Duplication%20Reduction%20Processors</a>
* but replace the {@link BdbContentDigestHistory} bean with a
* {@code TroughContentDigestHistory} bean.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ http://example.example/example

<!-- DISPOSITION CHAIN -->
<!-- first, processors are declared as top-level named beans -->
<bean id="warcWriter" class="org.archive.modules.writer.WARCWriterProcessor">
<bean id="warcWriter" class="org.archive.modules.writer.WARCWriterChainProcessor">
<!-- <property name="compress" value="true" /> -->
<!-- <property name="prefix" value="IAH" /> -->
<!-- <property name="maxFileSizeBytes" value="1000000000" /> -->
Expand All @@ -349,11 +349,21 @@ http://example.example/example
</list>
</property> -->
<!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
<!-- <property name="writeRequests" value="true" /> -->
<!-- <property name="writeMetadata" value="true" /> -->
<!-- <property name="writeRevisitForIdenticalDigests" value="true" /> -->
<!-- <property name="writeRevisitForNotModified" value="true" /> -->
<!-- <property name="startNewFilesOnCheckpoint" value="true" /> -->
<!--
<property name="chain">
<list>
<bean class="org.archive.modules.warc.DnsResponseRecordBuilder"/>
<bean class="org.archive.modules.warc.HttpResponseRecordBuilder"/>
<bean class="org.archive.modules.warc.WhoisResponseRecordBuilder"/>
<bean class="org.archive.modules.warc.FtpControlConversationRecordBuilder"/>
<bean class="org.archive.modules.warc.FtpResponseRecordBuilder"/>
<bean class="org.archive.modules.warc.RevisitRecordBuilder"/>
<bean class="org.archive.modules.warc.HttpRequestRecordBuilder"/>
<bean class="org.archive.modules.warc.MetadataRecordBuilder"/>
</list>
</property>
-->
</bean>
<bean id="candidates" class="org.archive.crawler.postprocessor.CandidatesProcessor">
<!-- <property name="seedsRedirectNewSeeds" value="true" /> -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class StatisticsSelfTest extends SelfTestBase {

@Override
protected String changeGlobalConfig(String config) {
String warcWriterConfig = " <bean id='warcWriter' class='org.archive.modules.writer.WARCWriterProcessor'/>\n";
String warcWriterConfig = " <bean id='warcWriter' class='org.archive.modules.writer.WARCWriterChainProcessor'/>\n";
config = config.replace("<!--@@MORE_EXTRACTORS@@-->", warcWriterConfig);
return super.changeGlobalConfig(config);
}
Expand Down
Loading