Skip to content

Commit

Permalink
added export to elasticsearch. The export dump can easily be imported to
Browse files Browse the repository at this point in the history
elasticsearch using the command
curl -XPOST localhost:9200/collection1/yacy/_bulk --data-binary
@yacy_dump_XXX.flatjson
  • Loading branch information
Orbiter committed Mar 31, 2017
1 parent 39ffa42 commit 69081bc
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 4 deletions.
3 changes: 2 additions & 1 deletion htroot/IndexExport_p.html
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ <h2>Index Export</h2>
<dd>
<dl>
<dt>Full Data Records:</dt>
<dd><input type="radio" name="format" value="full-solr" checked="checked" /> XML (Rich and full-text Solr data, one document per line in one large xml file, can be processed with shell tools, can be imported with DATA/SURROGATE/in/)<br />
<dd><input type="radio" name="format" value="full-solr" /> XML (Rich and full-text Solr data, one document per line in one large xml file, can be processed with shell tools, can be imported with DATA/SURROGATE/in/)<br />
<input type="radio" name="format" value="full-elasticsearch" checked="checked" /> JSON (Rich and full-text Elasticsearch data, one document per line in one flat JSON file, can be bulk-imported to elasticsearch with the command "curl -XPOST localhost:9200/collection1/yacy/_bulk --data-binary @yacy_dump_XXX.flatjson")<br />
<input type="radio" name="format" value="full-rss" /> XML (RSS)</dd>
<dt>Full URL List:</dt>
<dd><input type="radio" name="format" value="url-text" /> Plain Text List (URLs only)<br />
Expand Down
1 change: 1 addition & 0 deletions htroot/IndexExport_p.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public static serverObjects respond(@SuppressWarnings("unused") final RequestHea
if (fname.endsWith("html")) format = Fulltext.ExportFormat.html;
if (fname.endsWith("rss")) format = Fulltext.ExportFormat.rss;
if (fname.endsWith("solr")) format = Fulltext.ExportFormat.solr;
if (fname.endsWith("elasticsearch")) format = Fulltext.ExportFormat.elasticsearch;

final String filter = post.get("exportfilter", ".*");
final String query = post.get("exportquery", "*:*");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/**
* FlatJSONResponseWriter
* Copyright 2017 by Michael Peter Christen
* First released 30.03.2017 at http://yacy.net
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file lgpl21.txt
* If not, see <http://www.gnu.org/licenses/>.
*/

package net.yacy.cora.federate.solr.responsewriter;

import java.io.IOException;
import java.io.Writer;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.XML;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.QueryResponseWriter;
import org.apache.solr.response.ResultContext;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.schema.TextField;
import org.apache.solr.search.DocIterator;
import org.apache.solr.search.DocList;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.util.DateFormatUtil;
import org.json.simple.JSONArray;

import net.yacy.cora.federate.solr.SolrType;
import net.yacy.cora.util.JSONObject;

public class FlatJSONResponseWriter implements QueryResponseWriter {

private static final char lb = '\n';

private boolean elasticsearchBulkRequest;

public FlatJSONResponseWriter(boolean elasticsearchBulkRequest) {
this.elasticsearchBulkRequest = elasticsearchBulkRequest;
}

@Override
public String getContentType(SolrQueryRequest arg0, SolrQueryResponse arg1) {
return "application/json; charset=UTF-8";
}

@Override
public void init(@SuppressWarnings("rawtypes") NamedList arg0) {
}

@Override
public void write(final Writer writer, final SolrQueryRequest request, final SolrQueryResponse rsp) throws IOException {
NamedList<?> values = rsp.getValues();
DocList response = ((ResultContext) values.get("response")).docs;
writeDocs(writer, request, response);
}

private static final void writeDocs(final Writer writer, final SolrQueryRequest request, final DocList response) throws IOException {
boolean includeScore = false;
final int sz = response.size();
SolrIndexSearcher searcher = request.getSearcher();
DocIterator iterator = response.iterator();
includeScore = includeScore && response.hasScores();
IndexSchema schema = request.getSchema();
for (int i = 0; i < sz; i++) {
int id = iterator.nextDoc();
Document doc = searcher.doc(id);
writeDoc(writer, schema, null, doc.getFields(), (includeScore ? iterator.score() : 0.0f), includeScore);
}
}

private static final void writeDoc(final Writer writer, final IndexSchema schema, final String name, final List<IndexableField> fields, final float score, final boolean includeScore) throws IOException {
JSONObject json = new JSONObject(true);

int sz = fields.size();
int fidx1 = 0, fidx2 = 0;
while (fidx1 < sz) {
IndexableField value = fields.get(fidx1);
String fieldName = value.name();
fidx2 = fidx1 + 1;
while (fidx2 < sz && fieldName.equals(fields.get(fidx2).name())) {
fidx2++;
}
SchemaField sf = schema == null ? null : schema.getFieldOrNull(fieldName);
if (sf == null) {
sf = new SchemaField(fieldName, new TextField());
}
FieldType type = sf.getType();
if (fidx1 + 1 == fidx2) {
if (sf.multiValued()) {
JSONArray a = new JSONArray();
json.put(fieldName, a);
JSONObject j = new JSONObject();
String sv = value.stringValue();
setValue(j, type.getTypeName(), "x", sv); //sf.write(this, null, f1);
a.add(j.get("x"));
} else {
setValue(json, type.getTypeName(), value.name(), value.stringValue());
}
} else {
JSONArray a = new JSONArray();
json.put(fieldName, a);
for (int i = fidx1; i < fidx2; i++) {
String sv = fields.get(i).stringValue();
JSONObject j = new JSONObject();
setValue(j, type.getTypeName(), "x", sv); //sf.write(this, null, f1);
a.add(j.get("x"));
}
}
fidx1 = fidx2;
}
writer.write(json.toString());
writer.write(lb);
}

private static void setValue(final JSONObject json, final String typeName, final String name, final String value) {
if (typeName.equals(SolrType.text_general.printName()) ||
typeName.equals(SolrType.string.printName()) ||
typeName.equals(SolrType.text_en_splitting_tight.printName())) {
json.put(name, value);
} else if (typeName.equals(SolrType.bool.printName())) {
json.put(name, "F".equals(value) ? false : true);
} else if (typeName.equals(SolrType.num_integer.printName())) {
json.put(name, Long.parseLong(value));
} else if (typeName.equals(SolrType.num_long.printName())) {
json.put(name, Long.parseLong(value));
} else if (typeName.equals(SolrType.date.printName())) {
json.put(name, DateFormatUtil.formatExternal(new Date(Long.parseLong(value))));
} else if (typeName.equals(SolrType.num_float.printName())) {
json.put(name, Double.parseDouble(value));
} else if (typeName.equals(SolrType.num_double.printName())) {
json.put(name, Double.parseDouble(value));
}
}

public static final void writeDoc(final Writer writer, final SolrDocument doc) throws IOException {
JSONObject json = new JSONObject(true);
final Map<String, Object> fields = doc.getFieldValueMap();
for (String key: fields.keySet()) {
if (key == null) continue;
Object value = doc.get(key);
if (value == null) {
} else if (value instanceof Collection<?>) {
JSONArray a = new JSONArray();
json.put(key, a);
for (Object o: ((Collection<?>) value)) {
a.add(o);
}
} else {
json.put(key, value);
}
}
writer.write(json.toString());
writer.write(lb);
}
}
10 changes: 7 additions & 3 deletions source/net/yacy/search/index/Fulltext.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import net.yacy.cora.federate.solr.instance.RemoteInstance;
import net.yacy.cora.federate.solr.instance.ShardInstance;
import net.yacy.cora.federate.solr.responsewriter.EnhancedXMLResponseWriter;
import net.yacy.cora.federate.solr.responsewriter.FlatJSONResponseWriter;
import net.yacy.cora.protocol.HeaderFramework;
import net.yacy.cora.sorting.ReversibleScoreMap;
import net.yacy.cora.sorting.WeakPriorityBlockingQueue;
Expand Down Expand Up @@ -650,7 +651,7 @@ public void rebootSolr() {
}

public static enum ExportFormat {
text("txt"), html("html"), rss("rss"), solr("xml");
text("txt"), html("html"), rss("rss"), solr("xml"), elasticsearch("flatjson");
private final String ext;
private ExportFormat(String ext) {this.ext = ext;}
public String getExt() {return this.ext;}
Expand Down Expand Up @@ -801,15 +802,18 @@ public void run() {
this.count++;
}
} else {
if (this.format == ExportFormat.solr || (this.text && this.format == ExportFormat.text)) {
if (this.format == ExportFormat.solr || this.format == ExportFormat.elasticsearch || (this.text && this.format == ExportFormat.text)) {
BlockingQueue<SolrDocument> docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(this.query + " AND " + CollectionSchema.httpstatus_i.getSolrFieldName() + ":200", null, 0, 100000000, Long.MAX_VALUE, 100, 1, true);
SolrDocument doc;
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
String url = getStringFrom(doc.getFieldValue(CollectionSchema.sku.getSolrFieldName()));
if (this.pattern != null && !this.pattern.matcher(url).matches()) continue;
CRIgnoreWriter sw = new CRIgnoreWriter();
if (this.text) sw.write((String) doc.getFieldValue(CollectionSchema.text_t.getSolrFieldName())); else EnhancedXMLResponseWriter.writeDoc(sw, doc);
if (this.text) sw.write((String) doc.getFieldValue(CollectionSchema.text_t.getSolrFieldName()));
if (this.format == ExportFormat.solr) EnhancedXMLResponseWriter.writeDoc(sw, doc);
if (this.format == ExportFormat.elasticsearch) FlatJSONResponseWriter.writeDoc(sw, doc);
sw.close();
if (this.format == ExportFormat.elasticsearch) pw.println("{\"index\":{}}");
String d = sw.toString();
pw.println(d);
this.count++;
Expand Down

0 comments on commit 69081bc

Please sign in to comment.