Permalink
Browse files

Implement surrogate import from Warc archives (as first option handle

warc = Web ARChive File Format.
Warc files with extension .warc or compressed warc.gz can be placed in the
DATA/surrogate/in and contained responses are imported to the index.
The used library is stream based so we can easily extend it later to use
and load warc's from the net.
  • Loading branch information...
reger24 committed Mar 30, 2017
1 parent 5b5b9d5 commit 510f11d3745e14841420781376b733fd248d51f3
@@ -99,5 +99,9 @@
<classpathentry kind="lib" path="lib/imageio-bmp-3.3.1.jar"/>
<classpathentry kind="lib" path="lib/jsonic-1.2.0.jar"/>
<classpathentry kind="lib" path="lib/langdetect.jar"/>
<classpathentry kind="lib" path="lib/jwat-common-1.0.4.jar"/>
<classpathentry kind="lib" path="lib/jwat-gzip-1.0.4.jar"/>
<classpathentry kind="lib" path="lib/jwat-archive-common-1.0.4.jar"/>
<classpathentry kind="lib" path="lib/jwat-warc-1.0.4.jar"/>
<classpathentry kind="output" path="gen"/>
</classpath>
@@ -217,6 +217,10 @@
<pathelement location="${lib}/jsch-0.1.54.jar" />
<pathelement location="${lib}/json-simple-1.1.1.jar" />
<pathelement location="${lib}/jsoup-1.10.2.jar" />
<pathelement location="${lib}/jwat-common-1.0.4.jar" />
<pathelement location="${lib}/jwat-gzip-1.0.4.jar" />
<pathelement location="${lib}/jwat-archive-common-1.0.4.jar" />
<pathelement location="${lib}/jwat-warc-1.0.4.jar" />
<pathelement location="${lib}/log4j-over-slf4j-1.7.24.jar" />
<pathelement location="${lib}/lucene-analyzers-common-5.5.3.jar" />
<pathelement location="${lib}/lucene-analyzers-phonetic-5.5.3.jar" />
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
20 pom.xml
@@ -476,6 +476,26 @@
<version>1.10.2</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.jwat</groupId>
<artifactId>jwat-common</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.jwat</groupId>
<artifactId>jwat-gzip</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.jwat</groupId>
<artifactId>jwat-warc</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.jwat</groupId>
<artifactId>jwat-archive-common</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>com.youcruit.com.cybozu.labs</groupId>
<artifactId>langdetect</artifactId>
@@ -0,0 +1,138 @@
/**
* WarcImporter.java
* (C) 2017 by reger24; https://github.com/reger24
*
* This is a part of YaCy, a peer-to-peer based web search engine
*
* LICENSE
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program.
* If not, see <http://www.gnu.org/licenses/>.
*/
package net.yacy.document.importer;
import java.io.IOException;
import java.io.InputStream;
import net.yacy.cora.document.id.DigestURL;
import net.yacy.cora.protocol.HeaderFramework;
import net.yacy.cora.protocol.RequestHeader;
import net.yacy.cora.protocol.ResponseHeader;
import net.yacy.cora.util.ByteBuffer;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.crawler.retrieval.Request;
import net.yacy.crawler.retrieval.Response;
import net.yacy.document.TextParser;
import net.yacy.search.Switchboard;
import net.yacy.server.http.ChunkedInputStream;
import org.jwat.common.HeaderLine;
import org.jwat.common.HttpHeader;
import org.jwat.warc.WarcConstants;
import org.jwat.warc.WarcReader;
import org.jwat.warc.WarcReaderFactory;
import org.jwat.warc.WarcRecord;
/**
* Web Archive file format reader to process the warc archive content (responses)
*
* Warc format specification ISO 28500
* https://archive.org/details/WARCISO28500Version1Latestdraft
* http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf
*
* http://archive-access.sourceforge.net/warc/warc_file_format-0.9.html
* http://archive-access.sourceforge.net/warc/
*/
public class WarcImporter {
/**
* Reads a Warc file and adds all contained responses to the index.
* The reader automatically handles plain or gzip'd warc files
*
* @param f inputstream for the warc file
* @throws IOException
*/
public void indexWarcRecords(InputStream f) throws IOException {
byte[] content;
int cnt = 0;
WarcReader localwarcReader = WarcReaderFactory.getReader(f);
WarcRecord wrec = localwarcReader.getNextRecord();
while (wrec != null) {
HeaderLine hl = wrec.getHeader(WarcConstants.FN_WARC_TYPE);
if (hl != null && hl.value.equals(WarcConstants.RT_RESPONSE)) { // filter responses
hl = wrec.getHeader(WarcConstants.FN_WARC_TARGET_URI);
DigestURL location = new DigestURL(hl.value);
HttpHeader http = wrec.getHttpHeader();
if (http != null && http.statusCode == 200) { // process http response header OK (status 200)
if (TextParser.supportsMime(http.contentType) == null) { // check availability of parser
InputStream istream = wrec.getPayloadContent();
hl = http.getHeader(HeaderFramework.TRANSFER_ENCODING);
if (hl != null && hl.value.contains("chunked")) {
// because chunked stream.read doesn't read source fully, make sure all chunks are read
istream = new ChunkedInputStream(istream);
final ByteBuffer bbuffer = new ByteBuffer();
int c;
while ((c = istream.read()) >= 0) {
bbuffer.append(c);
}
content = bbuffer.getBytes();
} else {
content = new byte[(int) http.getPayloadLength()];
istream.read(content, 0, content.length);
}
istream.close();
RequestHeader requestHeader = new RequestHeader();
ResponseHeader responseHeader = new ResponseHeader(http.statusCode);
for (HeaderLine hx : http.getHeaderList()) { // include all original response headers for parser
responseHeader.put(hx.name, hx.value);
}
final Request request = new Request(
null,
location,
requestHeader.referer() == null ? null : requestHeader.referer().hash(),
"warc",
responseHeader.lastModified(),
Switchboard.getSwitchboard().crawler.defaultRemoteProfile.handle(), // use remote profile (to index text & media, without writing to cache
0,
Switchboard.getSwitchboard().crawler.defaultRemoteProfile.timezoneOffset());
final Response response = new Response(
request,
requestHeader,
responseHeader,
Switchboard.getSwitchboard().crawler.defaultRemoteProfile,
false,
content
);
Switchboard.getSwitchboard().toIndexer(response);
cnt++;
}
}
}
wrec = localwarcReader.getNextRecord();
}
localwarcReader.close();
ConcurrentLog.info("WarcImporter", "Indexed " + cnt + " documents");
}
}
@@ -164,6 +164,7 @@
import net.yacy.document.content.DCEntry;
import net.yacy.document.content.SurrogateReader;
import net.yacy.document.importer.OAIListFriendsLoader;
import net.yacy.document.importer.WarcImporter;
import net.yacy.document.parser.audioTagParser;
import net.yacy.document.parser.pdfParser;
import net.yacy.document.parser.html.Evaluation;
@@ -2002,6 +2003,16 @@ public boolean processSurrogate(final String s) {
if (zis != null) try {zis.close();} catch (final IOException e) {}
}
return moved;
} else if (s.endsWith(".warc") || s.endsWith(".warc.gz")) {
try {
InputStream is = new BufferedInputStream(new FileInputStream(infile));
WarcImporter wri = new WarcImporter();
wri.indexWarcRecords(is);
moved = infile.renameTo(outfile);
} catch (IOException ex) {
log.warn("IO Error processing warc file " + infile);
}
return moved;
}
InputStream is = null;
try {
@@ -2162,7 +2173,9 @@ public boolean surrogateProcess() {
if ( surrogate.endsWith(".xml")
|| surrogate.endsWith(".xml.gz")
|| surrogate.endsWith(".xml.zip") ) {
|| surrogate.endsWith(".xml.zip")
|| surrogate.endsWith(".warc")
|| surrogate.endsWith(".warc.gz") ) {
// read the surrogate file and store entry in index
if ( processSurrogate(surrogate) ) {
return true;

2 comments on commit 510f11d

@luccioman

This comment has been minimized.

Member

luccioman replied May 19, 2017

Hi @reger24 , in the WarcImporter you create Requests with a null initiator parameter.
It then make YaCy Switchboard think the request was made by YaCy proxy (see Response.processCase()) and therefore uses then the shallIndexCacheForProxy() (with much more restrictive rules, as reported by LA_FORGE on the forum http://forum.yacy-websuche.de/viewtopic.php?f=5&t=5990) instead of shallIndexCacheForCrawler()...
Was it your intention to consider warc import as proxy load? Shouldn't we rather set an initiator value in the Request object, at least to consider in the case of Warc Import the event origin as EventOrigin.GLOBAL_CRAWLING or EventOrigin.SURROGATES or a new one?

@reger24

This comment has been minimized.

Member

reger24 replied May 21, 2017

No it is/was not intended to get in conflict with proxy. So I should've used surrogate profile (but intended to also index media). I don't think it's whorsewhile to create a new EventOrigin etc. for warc.
So the choice seems to be to change mypeer as initiator or use surrogate profile...... I guess later is closer to the matter (will change it).
Thanks for pointing it out.

Please sign in to comment.