Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warc writer chain #285

Merged
merged 18 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions contrib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@
<artifactId>rethinkdb-driver</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
<repositories>
<repository>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.util.Map;

import org.archive.crawler.restlet.XmlMarshaller;
import org.archive.modules.writer.WARCWriterProcessor;
import org.archive.modules.writer.BaseWARCWriterProcessor;
import org.archive.util.ArchiveUtils;

public class XmlCrawlSummaryReport extends Report {
Expand All @@ -28,7 +28,7 @@ public void write(PrintWriter writer, StatisticsTracker stats) {
CrawlStatSnapshot snapshot = stats.getLastSnapshot();

info.put("crawlName",
((WARCWriterProcessor) stats.appCtx.getBean("warcWriter")).getPrefix());
((BaseWARCWriterProcessor) stats.appCtx.getBean("warcWriter")).getPrefix());
info.put("crawlJobShortName",
stats.getCrawlController().getMetadata().getJobName());
info.put("scheduledDate", this.scheduledDate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

package org.archive.modules.extractor;

import static org.archive.format.warc.WARCConstants.HEADER_KEY_CONCURRENT_TO;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.List;
import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -34,28 +37,47 @@

import org.apache.commons.httpclient.URIException;
import org.archive.crawler.reporting.CrawlerLoggerModule;
import org.archive.format.warc.WARCConstants.WARCRecordType;
import org.archive.io.warc.WARCRecordInfo;
import org.archive.modules.CoreAttributeConstants;
import org.archive.modules.CrawlURI;
import org.archive.modules.warc.BaseWARCRecordBuilder;
import org.archive.modules.warc.WARCRecordBuilder;
import org.archive.net.UURI;
import org.archive.net.UURIFactory;
import org.archive.util.ArchiveUtils;
import org.archive.util.MimetypeUtils;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.Lifecycle;

import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonStreamParser;

/**
* Extracts links to media by running youtube-dl in a subprocess. Runs only on
* html.
*
* <p>
* Also implements {@link WARCRecordBuilder} to write youtube-dl json to the
* warc.
*
* <p>
* To use <code>ExtractorYoutubeDL</code>, add this top-level bean:
*
* <pre>
* &lt;bean id="extractorYoutubeDL" class="org.archive.modules.extractor.ExtractorYoutubeDL"/&gt;
* </pre>
*
* Then add <code>&lt;ref bean="extractorYoutubeDL"/&gt;</code> to end of the
* fetch chain, and to the end of the warc writer chain.
*
* <p>
* Keeps a log of containing pages and media captured as a result of youtube-dl
* extraction. The format of the log is as follows:
*
* <pre>[timestamp] [media-http-status] [media-length] [media-mimetype] [media-digest] [media-timestamp] [media-url] [annotation] [containing-page-digest] [containing-page-timestamp] [containing-page-url] [seed-url]</pre>
* <pre>
* [timestamp] [media-http-status] [media-length] [media-mimetype] [media-digest] [media-timestamp] [media-url] [annotation] [containing-page-digest] [containing-page-timestamp] [containing-page-url] [seed-url]
* </pre>
*
* <p>
* For containing pages, all of the {@code media-*} fields have the value
Expand All @@ -71,14 +93,17 @@
*
* @author nlevitt
*/
public class ExtractorYoutubeDL extends Extractor implements Lifecycle {
public class ExtractorYoutubeDL extends Extractor
implements Lifecycle, WARCRecordBuilder {
private static Logger logger =
Logger.getLogger(ExtractorYoutubeDL.class.getName());

protected static final String YDL_CONTAINING_PAGE_DIGEST = "ydl-containing-page-digest";
protected static final String YDL_CONTAINING_PAGE_TIMESTAMP = "ydl-containing-page-timestamp";
protected static final String YDL_CONTAINING_PAGE_URI = "ydl-containing-page-uri";

protected static final int MAX_VIDEOS_PER_PAGE = 1000;

protected transient Logger ydlLogger = null;

protected CrawlerLoggerModule crawlerLoggerModule;
Expand Down Expand Up @@ -137,22 +162,50 @@ protected void extract(CrawlURI uri) {
logCapturedVideo(uri, ydlAnnotation);
}
} else {
List<JsonObject> ydlJsons = runYoutubeDL(uri);
if (ydlJsons != null && !ydlJsons.isEmpty()) {
for (JsonObject json: ydlJsons) {
if (json.get("url") != null) {
String videoUrl = json.get("url").getAsString();
addVideoOutlink(uri, json, videoUrl);
JSONObject ydlJson = runYoutubeDL(uri);
if (ydlJson != null && (ydlJson.has("entries") || ydlJson.has("url"))) {
JSONArray jsonEntries;
if (ydlJson.has("entries")) {
jsonEntries = ydlJson.getJSONArray("entries");
} else {
jsonEntries = new JSONArray(Arrays.asList(ydlJson));
}

for (int i = 0; i < jsonEntries.length(); i++) {
JSONObject jsonO = (JSONObject) jsonEntries.get(i);

// media url
if (!jsonO.isNull("url")) {
String videoUrl = jsonO.getString("url");
addVideoOutlink(uri, jsonO, videoUrl);
}

// make sure we extract watch page links from youtube playlists,
// and equivalent for other sites
if (jsonO.get("webpage_url") != null) {
String webpageUrl = jsonO.getString("webpage_url");
try {
UURI dest = UURIFactory.getInstance(uri.getUURI(), webpageUrl);
CrawlURI link = uri.createCrawlURI(dest, LinkContext.NAVLINK_MISC,
Hop.NAVLINK);
uri.getOutLinks().add(link);
} catch (URIException e1) {
logUriError(e1, uri.getUURI(), webpageUrl);
}
}
}
String annotation = "youtube-dl:" + ydlJsons.size();

// XXX this can be large, consider using a RecordingOutputStream
uri.getData().put("ydlJson", ydlJson);

String annotation = "youtube-dl:" + jsonEntries.length();
uri.getAnnotations().add(annotation);
logContainingPage(uri, annotation);
}
}
}

protected void addVideoOutlink(CrawlURI uri, JsonObject json,
protected void addVideoOutlink(CrawlURI uri, JSONObject jsonO,
String videoUrl) {
try {
UURI dest = UURIFactory.getInstance(uri.getUURI(), videoUrl);
Expand All @@ -161,9 +214,9 @@ protected void addVideoOutlink(CrawlURI uri, JsonObject json,

// annotation
String annotation = "youtube-dl:1/1";
if (!json.get("playlist_index").isJsonNull()) {
annotation = "youtube-dl:" + json.get("playlist_index") + "/"
+ json.get("n_entries");
if (!jsonO.isNull("playlist_index")) {
annotation = "youtube-dl:" + jsonO.get("playlist_index") + "/"
+ jsonO.get("n_entries");
}
link.getAnnotations().add(annotation);

Expand Down Expand Up @@ -290,13 +343,7 @@ public String call() throws IOException {
return output;
}

/**
*
* @param uri
* @return list of json blobs returned by {@code youtube-dl --dump-json}, or
* empty list if no videos found, or failure
*/
protected List<JsonObject> runYoutubeDL(CrawlURI uri) {
protected JSONObject runYoutubeDL(CrawlURI uri) {
/*
* --format=best
*
Expand All @@ -305,7 +352,8 @@ protected List<JsonObject> runYoutubeDL(CrawlURI uri) {
* https://github.com/ytdl-org/youtube-dl/blob/master/README.md#format-selection
*/
ProcessBuilder pb = new ProcessBuilder("youtube-dl", "--ignore-config",
"--simulate", "--dump-json", "--format=best", uri.toString());
"--simulate", "--dump-single-json", "--format=best",
"--playlist-end=" + MAX_VIDEOS_PER_PAGE, uri.toString());
logger.fine("running " + pb.command());

Process proc = null;
Expand Down Expand Up @@ -344,13 +392,10 @@ protected List<JsonObject> runYoutubeDL(CrawlURI uri) {
proc.destroyForcibly();
}

List<JsonObject> ydlJsons = new ArrayList<JsonObject>();
JsonStreamParser parser = new JsonStreamParser(output.stdout);
try {
while (parser.hasNext()) {
ydlJsons.add((JsonObject) parser.next());
}
} catch (JsonParseException e) {
JSONObject ydlJson = new JSONObject(output.stdout);
return ydlJson;
} catch (JSONException e) {
// sometimes we get no output at all from youtube-dl, which
// manifests as a JsonIOException
logger.log(Level.FINE,
Expand All @@ -360,8 +405,6 @@ protected List<JsonObject> runYoutubeDL(CrawlURI uri) {
e);
return null;
}

return ydlJsons;
}

@Override
Expand Down Expand Up @@ -401,4 +444,37 @@ protected boolean shouldExtract(CrawlURI uri) {

return false;
}

@Override
public boolean shouldBuildRecord(CrawlURI curi) {
return curi.containsDataKey("ydlJson");
}

@Override
public WARCRecordInfo buildRecord(CrawlURI curi, URI concurrentTo)
throws IOException {
final String timestamp =
ArchiveUtils.getLog14Date(curi.getFetchBeginTime());

WARCRecordInfo recordInfo = new WARCRecordInfo();
recordInfo.setType(WARCRecordType.metadata);
recordInfo.setRecordId(BaseWARCRecordBuilder.generateRecordID());
if (concurrentTo != null) {
recordInfo.addExtraHeader(HEADER_KEY_CONCURRENT_TO,
"<" + concurrentTo + ">");
}
recordInfo.setUrl("youtube-dl:" + curi);
recordInfo.setCreate14DigitDate(timestamp);
recordInfo.setMimetype("application/vnd.youtube-dl_formats+json;charset=utf-8");
recordInfo.setEnforceLength(true);

JSONObject ydlJson = (JSONObject) curi.getData().get("ydlJson");
String ydlJsonString = ydlJson.toString(1);

byte[] b = ydlJsonString.getBytes("UTF-8");
recordInfo.setContentStream(new ByteArrayInputStream(b));
recordInfo.setContentLength((long) b.length);

return recordInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
*/
package org.archive.modules.postprocessor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import java.util.ArrayList;
import java.util.List;

import org.archive.crawler.framework.CrawlController;
import org.archive.crawler.framework.CrawlStatus;
import org.archive.modules.CrawlURI;
import org.archive.modules.Processor;
import org.archive.modules.writer.WARCWriterProcessor;
import org.archive.modules.writer.BaseWARCWriterProcessor;
import org.springframework.beans.factory.annotation.Autowired;

public class WARCLimitEnforcer extends Processor {
Expand All @@ -38,7 +39,7 @@ public class WARCLimitEnforcer extends Processor {

protected Map<String, Map<String, Long>> limits = new HashMap<String, Map<String, Long>>();
/**
* Should match structure of {@link WARCWriterProcessor#getStats()}
* Should match structure of {@link BaseWARCWriterProcessor#getStats()}
* @param limits
*/
public void setLimits(Map<String, Map<String, Long>> limits) {
Expand All @@ -48,23 +49,23 @@ public Map<String, Map<String, Long>> getLimits() {
return limits;
}

protected WARCWriterProcessor warcWriter;
protected BaseWARCWriterProcessor warcWriter;
@Autowired
public void setWarcWriter(WARCWriterProcessor warcWriter) {
public void setWarcWriter(BaseWARCWriterProcessor warcWriter) {
this.warcWriter = warcWriter;
}
public WARCWriterProcessor getWarcWriter() {
public BaseWARCWriterProcessor getWarcWriter() {
return warcWriter;
}

{
setWarcWriters(new ArrayList<WARCWriterProcessor>());
setWarcWriters(new ArrayList<BaseWARCWriterProcessor>());
}
@SuppressWarnings("unchecked")
public List<WARCWriterProcessor> getWarcWriters() {
return (List<WARCWriterProcessor>) kp.get("warcWriters");
public List<BaseWARCWriterProcessor> getWarcWriters() {
return (List<BaseWARCWriterProcessor>) kp.get("warcWriters");
}
public void setWarcWriters(List<WARCWriterProcessor> warcWriters) {
public void setWarcWriters(List<BaseWARCWriterProcessor> warcWriters) {
kp.put("warcWriters", warcWriters);
}

Expand All @@ -91,7 +92,7 @@ protected void innerProcess(CrawlURI uri) throws InterruptedException {
AtomicLong value = null;
if(getWarcWriters() !=null && getWarcWriters().size()>0) {
value = new AtomicLong(0);
for (WARCWriterProcessor w: getWarcWriters()) {
for (BaseWARCWriterProcessor w: getWarcWriters()) {
Map<String, AtomicLong> valueBucket = w.getStats().get(j);
if(valueBucket != null) {
value.set(value.addAndGet(valueBucket.get(k).get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import org.archive.crawler.event.CrawlStateEvent;
import org.archive.modules.CrawlURI;
import org.archive.modules.writer.WARCWriterProcessor;
import org.archive.modules.writer.WARCWriterChainProcessor;
import org.archive.spring.HasKeyedProperties;
import org.archive.spring.KeyedProperties;
import org.archive.trough.TroughClient;
Expand All @@ -31,7 +31,7 @@
* <p>To use, define a {@code TroughContentDigestHistory} top-level bean in your
* crawler-beans.cxml, then add {@link ContentDigestHistoryLoader} and
* {@link ContentDigestHistoryStorer} to your fetch chain, sandwiching the
* {@link WARCWriterProcessor}. In other words, follow the directions at
* {@link WARCWriterChainProcessor}. In other words, follow the directions at
* <a href="https://github.com/internetarchive/heritrix3/wiki/Duplication%20Reduction%20Processors">https://github.com/internetarchive/heritrix3/wiki/Duplication%20Reduction%20Processors</a>
* but replace the {@link BdbContentDigestHistory} bean with a
* {@code TroughContentDigestHistory} bean.
Expand Down
Loading