Skip to content

Commit

Permalink
added surrogate import process for exported solr dumps.
Browse files Browse the repository at this point in the history
Just throw your solr dump file into DATA/SURROGATES/in/ and it will be
imported!
  • Loading branch information
Orbiter committed May 30, 2015
1 parent b775372 commit b43811d
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 88 deletions.
7 changes: 3 additions & 4 deletions htroot/api/ymarks/import_ymark.java
Expand Up @@ -30,7 +30,6 @@
import net.yacy.data.ymark.YMarkUtil;
import net.yacy.data.ymark.YMarkXBELImporter;
import net.yacy.document.Parser.Failure;
import net.yacy.document.content.SurrogateReader;
import net.yacy.kelondro.blob.Tables;
import net.yacy.kelondro.workflow.InstantBusyThread;
import net.yacy.search.Switchboard;
Expand All @@ -39,8 +38,6 @@

import org.xml.sax.SAXException;



public class import_ymark {

public static serverObjects respond(final RequestHeader header, final serverObjects post, final serverSwitch env) {
Expand Down Expand Up @@ -92,9 +89,10 @@ public static serverObjects respond(final RequestHeader header, final serverObje
final byte[] bytes = UTF8.getBytes(post.get("bmkfile$file"));
stream = new ByteArrayInputStream(bytes);
if(post.get("importer").equals("surro") && stream != null) {
/**
SurrogateReader surrogateReader;
try {
surrogateReader = new SurrogateReader(stream, queueSize);
surrogateReader = new SurrogateReader(stream, queueSize, sb.crawlStacker, sb.index.fulltext().getDefaultConfiguration());
} catch (final IOException e) {
//TODO: display an error message
ConcurrentLog.logException(e);
Expand All @@ -106,6 +104,7 @@ public static serverObjects respond(final RequestHeader header, final serverObje
putBookmark(sb, bmk_user, bmk, autoTaggingQueue, autotag, empty, indexing, medialink);
}
prop.put("status", "1");
*/
} else {
MonitoredReader reader = null;
try {
Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.MultiMapSolrParams;
import org.apache.solr.common.util.NamedList;

import net.yacy.cora.storage.Configuration;
import net.yacy.cora.util.ConcurrentLog;
Expand Down
138 changes: 93 additions & 45 deletions source/net/yacy/document/content/SurrogateReader.java
Expand Up @@ -24,26 +24,32 @@

package net.yacy.document.content;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PushbackInputStream;
import java.io.Reader;
import java.io.StringReader;
import java.net.MalformedURLException;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.zip.GZIPInputStream;

import javax.xml.parsers.ParserConfigurationException;
import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;


import net.yacy.cora.document.encoding.UTF8;
import net.yacy.cora.document.id.DigestURL;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.crawler.CrawlStacker;
import net.yacy.search.schema.CollectionConfiguration;

import org.apache.solr.client.solrj.impl.XMLResponseParser;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.NamedList;
import org.xml.sax.Attributes;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
Expand All @@ -63,16 +69,19 @@ public class SurrogateReader extends DefaultHandler implements Runnable {
" xmlns:geo=\"http://www.w3.org/2003/01/geo/wgs84_pos#\">";
public final static String SURROGATES_MAIN_ELEMENT_CLOSE =
"</" + SURROGATES_MAIN_ELEMENT_NAME + ">";
public final static SolrInputDocument POISON_DOCUMENT = new SolrInputDocument();

// class variables
private final StringBuilder buffer;
private boolean parsingValue;
private DCEntry surrogate;
private DCEntry dcEntry;
private String elementName;
private final BlockingQueue<DCEntry> surrogates;
private final BlockingQueue<SolrInputDocument> surrogates;
private SAXParser saxParser;
private final InputSource inputSource;
private final InputStream inputStream;
private final PushbackInputStream inputStream;
private final CrawlStacker crawlStacker;
private final CollectionConfiguration configuration;

private static final ThreadLocal<SAXParser> tlSax = new ThreadLocal<SAXParser>();
private static SAXParser getParser() throws SAXException {
Expand All @@ -87,13 +96,19 @@ private static SAXParser getParser() throws SAXException {
}
return parser;
}

public SurrogateReader(final InputStream stream, int queueSize, CrawlStacker crawlStacker, CollectionConfiguration configuration) throws IOException {
this(new PushbackInputStream(stream, 200), queueSize, crawlStacker, configuration);
}

public SurrogateReader(final InputStream stream, int queueSize) throws IOException {
public SurrogateReader(final PushbackInputStream stream, int queueSize, CrawlStacker crawlStacker, CollectionConfiguration configuration) throws IOException {
this.crawlStacker = crawlStacker;
this.configuration = configuration;
this.buffer = new StringBuilder(300);
this.parsingValue = false;
this.surrogate = null;
this.dcEntry = null;
this.elementName = null;
this.surrogates = new ArrayBlockingQueue<DCEntry>(queueSize);
this.surrogates = new ArrayBlockingQueue<>(queueSize);

Reader reader = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
this.inputSource = new InputSource(reader);
Expand All @@ -110,8 +125,37 @@ public SurrogateReader(final InputStream stream, int queueSize) throws IOExcepti

@Override
public void run() {
// test the syntax of the stream by reading parts of the beginning
try {
this.saxParser.parse(this.inputSource, this);
if (isSolrDump()) {
BufferedReader br = new BufferedReader(new InputStreamReader(this.inputStream, "UTF-8"));
String line;
while ((line = br.readLine()) != null) {
if (!line.startsWith("<doc>")) continue;
try {
NamedList<Object> nl = new XMLResponseParser().processResponse(new StringReader("<result>" + line + "</result>")); //
SolrDocument doc = (SolrDocument) nl.iterator().next().getValue();

// check if url is in accepted domain
String u = (String) doc.getFieldValue("sku");
if (u != null) {
try {
DigestURL url = new DigestURL(u);
final String urlRejectReason = this.crawlStacker.urlInAcceptedDomain(url);
if ( urlRejectReason == null ) {
// convert DCEntry to SolrInputDocument
this.surrogates.put(this.configuration.toSolrInputDocument(doc));
}
} catch (MalformedURLException e) {
}
}
} catch (Throwable ee) {
// bad line
}
}
} else {
this.saxParser.parse(this.inputSource, this);
}
} catch (final SAXParseException e) {
ConcurrentLog.logException(e);
} catch (final SAXException e) {
Expand All @@ -120,7 +164,7 @@ public void run() {
ConcurrentLog.logException(e);
} finally {
try {
this.surrogates.put(DCEntry.poison);
this.surrogates.put(POISON_DOCUMENT);
} catch (final InterruptedException e1) {
ConcurrentLog.logException(e1);
}
Expand All @@ -132,12 +176,35 @@ public void run() {
}
}

private boolean isSolrDump() {
try {
if (this.inputStream.available() < 60) return false;
byte[] b = new byte[100];
this.inputStream.read(b);
try {
String s = UTF8.String(b);
if (s.contains("<response>") && s.contains("<result>")) {
this.inputStream.unread(b);
return true;
}
} catch (IOException e) {
ConcurrentLog.logException(e);
this.inputStream.unread(b);
return false;
}
} catch (IOException e) {
ConcurrentLog.logException(e);
return false;
}
return false;
}

@Override
public void startElement(final String uri, final String name, String tag, final Attributes atts) throws SAXException {
if (tag == null) return;
tag = tag.toLowerCase();
if ("record".equals(tag) || "document".equals(tag) || "doc".equals(tag)) {
this.surrogate = new DCEntry();
this.dcEntry = new DCEntry();
} else if ("element".equals(tag) || "str".equals(tag) || "int".equals(tag) || "bool".equals(tag) || "long".equals(tag)) {
this.elementName = atts.getValue("name");
this.parsingValue = true;
Expand All @@ -158,12 +225,17 @@ public void endElement(final String uri, final String name, String tag) {
if ("record".equals(tag) || "document".equals(tag) || "doc".equals(tag)) {
//System.out.println("A Title: " + this.surrogate.title());
try {
this.surrogates.put(this.surrogate);
// check if url is in accepted domain
final String urlRejectReason = this.crawlStacker.urlInAcceptedDomain(this.dcEntry.getIdentifier(true));
if ( urlRejectReason == null ) {
// convert DCEntry to SolrInputDocument
this.surrogates.put(this.configuration.toSolrInputDocument(this.dcEntry));
}
} catch (final InterruptedException e) {
ConcurrentLog.logException(e);
} finally {
//System.out.println("B Title: " + this.surrogate.title());
this.surrogate = null;
this.dcEntry = null;
this.buffer.setLength(0);
this.parsingValue = false;
}
Expand All @@ -173,22 +245,22 @@ public void endElement(final String uri, final String name, String tag) {
} else if ("str".equals(tag) || "int".equals(tag) || "bool".equals(tag) || "long".equals(tag)){
final String value = buffer.toString().trim();
if (this.elementName != null) {
this.surrogate.getMap().put(this.elementName, new String[]{value});
this.dcEntry.getMap().put(this.elementName, new String[]{value});
}
this.buffer.setLength(0);
this.parsingValue = false;
} else if ("value".equals(tag)) {
//System.out.println("BUFFER-SIZE=" + buffer.length());
final String value = buffer.toString().trim();
if (this.elementName != null) {
this.surrogate.getMap().put(this.elementName, new String[]{value});
this.dcEntry.getMap().put(this.elementName, new String[]{value});
}
this.buffer.setLength(0);
this.parsingValue = false;
} else if (tag.startsWith("dc:") || tag.startsWith("geo:") || tag.startsWith("md:")) {
final String value = buffer.toString().trim();
if (this.elementName != null && tag.equals(this.elementName)) {
Map<String,String[]> map = this.surrogate.getMap();
Map<String,String[]> map = this.dcEntry.getMap();
String[] oldcontent = map.get(this.elementName);
if (oldcontent == null || oldcontent.length == 0) {
map.put(this.elementName, new String[]{value});
Expand All @@ -211,37 +283,13 @@ public void characters(final char ch[], final int start, final int length) {
}
}

public DCEntry take() {
public SolrInputDocument take() {
try {
return this.surrogates.take();
} catch (final InterruptedException e) {
ConcurrentLog.logException(e);
return null;
}
}

public static void main(String[] args) {
File f = new File(args[0]);
SurrogateReader sr;
try {
InputStream is = new BufferedInputStream(new FileInputStream(f));
if (f.getName().endsWith(".gz")) is = new GZIPInputStream(is);
sr = new SurrogateReader(is, 1);

Thread t = new Thread(sr, "Surrogate-Reader " + f.getAbsolutePath());
t.start();
DCEntry s;
while ((s = sr.take()) != DCEntry.poison) {
System.out.println("Title: " + s.getTitle());
System.out.println("Date: " + s.getDate());
System.out.println("Creator: " + s.getCreator());
System.out.println("Publisher: " + s.getPublisher());
System.out.println("URL: " + s.getIdentifier(true));
System.out.println("Language: " + s.getLanguage());
System.out.println("Body: " + s.getDescriptions().toString());
}
} catch (final IOException e) {
ConcurrentLog.logException(e);
}
}

}
40 changes: 4 additions & 36 deletions source/net/yacy/search/Switchboard.java
Expand Up @@ -155,7 +155,6 @@
import net.yacy.document.Parser;
import net.yacy.document.TextParser;
import net.yacy.document.Parser.Failure;
import net.yacy.document.content.DCEntry;
import net.yacy.document.content.SurrogateReader;
import net.yacy.document.importer.OAIListFriendsLoader;
import net.yacy.document.parser.audioTagParser;
Expand Down Expand Up @@ -1984,46 +1983,15 @@ public boolean processSurrogate(final String s) {
}

public void processSurrogate(final InputStream is, final String name) throws IOException {
final SurrogateReader reader = new SurrogateReader(is, 100);
final SurrogateReader reader = new SurrogateReader(is, 100, this.crawlStacker, this.index.fulltext().getDefaultConfiguration());
final Thread readerThread = new Thread(reader, name);
readerThread.start();
DCEntry surrogate;
Response response;
while ( (surrogate = reader.take()) != DCEntry.poison ) {
SolrInputDocument surrogate;
while ((surrogate = reader.take()) != SurrogateReader.POISON_DOCUMENT ) {
// check if url is in accepted domain
assert surrogate != null;
assert this.crawlStacker != null;
final String urlRejectReason =
this.crawlStacker.urlInAcceptedDomain(surrogate.getIdentifier(true));
if ( urlRejectReason != null ) {
this.log.warn("Rejected URL '"
+ surrogate.getIdentifier(true)
+ "': "
+ urlRejectReason);
continue;
}

if (surrogate.get("text_t") == null) {
// create a queue entry
final Document document = surrogate.document();
final Request request =
new Request(
ASCII.getBytes(this.peers.mySeed().hash),
surrogate.getIdentifier(true),
null,
"",
surrogate.getDate(),
this.crawler.defaultSurrogateProfile.handle(),
0,
this.crawler.defaultSurrogateProfile.timezoneOffset());
response = new Response(request, null, null, this.crawler.defaultSurrogateProfile, false, null);
final IndexingQueueEntry queueEntry =
new IndexingQueueEntry(response, new Document[] {document}, null);

this.indexingCondensementProcessor.enQueue(queueEntry);
} else {
this.index.putDocument(this.index.fulltext().getDefaultConfiguration().toSolrInputDocument(surrogate));
}
this.index.putDocument(surrogate);
if (shallTerminate()) break;
}
}
Expand Down
2 changes: 0 additions & 2 deletions source/net/yacy/search/index/Fulltext.java
Expand Up @@ -27,7 +27,6 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Array;
import java.net.MalformedURLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -78,7 +77,6 @@
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.core.SolrInfoMBean;
import org.apache.commons.io.output.StringBuilderWriter;
import org.apache.lucene.util.Version;

public final class Fulltext {
Expand Down

0 comments on commit b43811d

Please sign in to comment.