Skip to content

Commit

Permalink
removed synchronization and concurrency in Fulltext class, concurrent
Browse files Browse the repository at this point in the history
deletions are now handled in ConcurrentUpdateSolrConnector
  • Loading branch information
Orbiter committed May 11, 2013
1 parent f965d04 commit b24d1d1
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 105 deletions.
2 changes: 1 addition & 1 deletion htroot/CrawlResults.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public static serverObjects respond(final RequestHeader header, serverObjects po
if (post.containsKey("deletedomain")) {
final String domain = post.get("domain", null);
if (domain != null) {
sb.index.fulltext().deleteDomainHostname(domain, null, false);
sb.index.fulltext().deleteDomainHostname(domain, null);
ResultURLs.deleteDomain(tabletype, domain);
}
}
Expand Down
4 changes: 2 additions & 2 deletions htroot/Crawler_p.java
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public static serverObjects respond(final RequestHeader header, final serverObje
siteFilter = CrawlProfile.siteFilter(rootURLs);
if (deleteold) {
for (DigestURI u: rootURLs) {
sb.index.fulltext().deleteDomainHashpart(u.hosthash(), deleteageDate, rootURLs.size() > 1);
sb.index.fulltext().deleteDomainHashpart(u.hosthash(), deleteageDate);
}
}
} else if (subPath) {
Expand All @@ -310,7 +310,7 @@ public static serverObjects respond(final RequestHeader header, final serverObje
for (DigestURI u: rootURLs) {
String basepath = u.toNormalform(true);
if (!basepath.endsWith("/")) {int p = basepath.lastIndexOf("/"); if (p > 0) basepath = basepath.substring(0, p + 1);}
int count = sb.index.fulltext().remove(basepath, deleteageDate, rootURLs.size() > 1);
int count = sb.index.fulltext().remove(basepath, deleteageDate);
if (count > 0) Log.logInfo("Crawler_p", "deleted " + count + " documents for host " + u.getHost());
}
}
Expand Down
2 changes: 1 addition & 1 deletion htroot/IndexControlURLs_p.java
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public static serverObjects respond(@SuppressWarnings("unused") final RequestHea

if (post.containsKey("deletedomain")) {
final String domain = post.get("domain");
segment.fulltext().deleteDomainHostname(domain, null, false);
segment.fulltext().deleteDomainHostname(domain, null);
// trigger the loading of the table
post.put("statistics", "");
}
Expand Down
168 changes: 67 additions & 101 deletions source/net/yacy/search/index/Fulltext.java
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,7 @@ public void putDocument(final SolrInputDocument doc) throws IOException {
//Date sdDate = (Date) connector.getFieldById(id, CollectionSchema.last_modified.getSolrFieldName());
//Date docDate = null;
//if (sdDate == null || (docDate = SchemaConfiguration.getDate(doc, CollectionSchema.last_modified)) == null || sdDate.before(docDate)) {
if (this.collectionConfiguration.contains(CollectionSchema.ip_s)) {
// ip_s needs a dns lookup which causes blockings during search here
connector.add(doc);
} else synchronized (this.solrInstances) {
connector.add(doc);
}
connector.add(doc);
//}
} catch (SolrException e) {
throw new IOException(e.getMessage(), e);
Expand Down Expand Up @@ -546,7 +541,7 @@ public void putMetadataLater(final URIMetadataRow entry) throws IOException {
* @param freshdate either NULL or a date in the past which is the limit for deletion. Only documents older than this date are deleted
* @throws IOException
*/
public void deleteDomainHashpart(final String hosthash, Date freshdate, boolean concurrent) {
public void deleteDomainHashpart(final String hosthash, Date freshdate) {
// first collect all url hashes that belong to the domain
assert hosthash.length() == 6;
final String collection1Query = CollectionSchema.host_id_s.getSolrFieldName() + ":\"" + hosthash + "\"" +
Expand All @@ -559,54 +554,43 @@ public void deleteDomainHashpart(final String hosthash, Date freshdate, boolean
(" AND " + WebgraphSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") :
""
);
Thread t = new Thread() {
public void run() {
// delete in solr
synchronized (Fulltext.this.solrInstances) {
try {Fulltext.this.getDefaultConnector().deleteByQuery(collection1Query);} catch (IOException e) {}
try {Fulltext.this.getWebgraphConnector().deleteByQuery(webgraphQuery);} catch (IOException e) {}
}

// delete in old metadata structure
if (Fulltext.this.urlIndexFile != null) {
final ArrayList<String> l = new ArrayList<String>();
synchronized (this) {
CloneableIterator<byte[]> i;
try {
i = Fulltext.this.urlIndexFile.keys(true, null);
String hash;
while (i != null && i.hasNext()) {
hash = ASCII.String(i.next());
if (hosthash.equals(hash.substring(6))) l.add(hash);
}

// then delete the urls using this list
for (final String h: l) Fulltext.this.urlIndexFile.delete(ASCII.getBytes(h));
} catch (IOException e) {}
}
// delete in solr
try {Fulltext.this.getDefaultConnector().deleteByQuery(collection1Query);} catch (IOException e) {}
try {Fulltext.this.getWebgraphConnector().deleteByQuery(webgraphQuery);} catch (IOException e) {}

// delete in old metadata structure
if (Fulltext.this.urlIndexFile != null) {
final ArrayList<String> l = new ArrayList<String>();
CloneableIterator<byte[]> i;
try {
i = Fulltext.this.urlIndexFile.keys(true, null);
String hash;
while (i != null && i.hasNext()) {
hash = ASCII.String(i.next());
if (hosthash.equals(hash.substring(6))) l.add(hash);
}

// finally remove the line with statistics
if (Fulltext.this.statsDump != null) {
final Iterator<HostStat> hsi = Fulltext.this.statsDump.iterator();
HostStat hs;
while (hsi.hasNext()) {
hs = hsi.next();
if (hs.hosthash.equals(hosthash)) {
hsi.remove();
break;
}
}

// then delete the urls using this list
for (final String h: l) Fulltext.this.urlIndexFile.delete(ASCII.getBytes(h));
} catch (IOException e) {}
}

// finally remove the line with statistics
if (Fulltext.this.statsDump != null) {
final Iterator<HostStat> hsi = Fulltext.this.statsDump.iterator();
HostStat hs;
while (hsi.hasNext()) {
hs = hsi.next();
if (hs.hosthash.equals(hosthash)) {
hsi.remove();
break;
}
}
};
if (concurrent) t.start(); else {
t.run();
Fulltext.this.commit(true);
}
}

public void deleteDomainHostname(final String hostname, Date freshdate, boolean concurrent) {
public void deleteDomainHostname(final String hostname, Date freshdate) {
// first collect all url hashes that belong to the domain
final String collectionQuery =
CollectionSchema.host_s.getSolrFieldName() + ":\"" + hostname + "\"" +
Expand All @@ -615,35 +599,27 @@ public void deleteDomainHostname(final String hostname, Date freshdate, boolean
""
);
final String webgraphQuery =
WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + hostname + "\"" +
((freshdate != null && freshdate.before(new Date())) ?
(" AND " + WebgraphSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") :
""
);
Thread t = new Thread() {
public void run() {
// delete in solr
synchronized (Fulltext.this.solrInstances) {
try {Fulltext.this.getDefaultConnector().deleteByQuery(collectionQuery);} catch (IOException e) {}
try {Fulltext.this.getWebgraphConnector().deleteByQuery(webgraphQuery);} catch (IOException e) {}
}
// finally remove the line with statistics
if (Fulltext.this.statsDump != null) {
final Iterator<HostStat> hsi = Fulltext.this.statsDump.iterator();
HostStat hs;
while (hsi.hasNext()) {
hs = hsi.next();
if (hs.hostname.equals(hostname)) {
hsi.remove();
break;
}
}
WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + hostname + "\"" +
((freshdate != null && freshdate.before(new Date())) ?
(" AND " + WebgraphSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") :
""
);

// delete in solr
try {Fulltext.this.getDefaultConnector().deleteByQuery(collectionQuery);} catch (IOException e) {}
try {Fulltext.this.getWebgraphConnector().deleteByQuery(webgraphQuery);} catch (IOException e) {}

// finally remove the line with statistics
if (Fulltext.this.statsDump != null) {
final Iterator<HostStat> hsi = Fulltext.this.statsDump.iterator();
HostStat hs;
while (hsi.hasNext()) {
hs = hsi.next();
if (hs.hostname.equals(hostname)) {
hsi.remove();
break;
}
}
};
if (concurrent) t.start(); else {
t.run();
Fulltext.this.commit(true);
}
}

Expand All @@ -653,30 +629,25 @@ public void run() {
* @param freshdate either NULL or a date in the past which is the limit for deletion. Only documents older than this date are deleted
* @param concurrently if true, then the method returnes immediately and runs concurrently
*/
public int remove(final String basepath, Date freshdate, final boolean concurrently) {
public int remove(final String basepath, Date freshdate) {
DigestURI uri;
try {uri = new DigestURI(basepath);} catch (MalformedURLException e) {return 0;}
final String host = uri.getHost();
final String collectionQuery = CollectionSchema.host_s.getSolrFieldName() + ":\"" + host + "\"" +
((freshdate != null && freshdate.before(new Date())) ? (" AND " + CollectionSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") : "");
final AtomicInteger count = new AtomicInteger(0);
Thread t = new Thread(){
public void run() {
final BlockingQueue<SolrDocument> docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(collectionQuery, 0, 1000000, 600000, -1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName());
try {
SolrDocument doc;
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
String u = (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName());
if (u.startsWith(basepath)) {
remove(ASCII.getBytes((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName())));
count.incrementAndGet();
}
}
if (count.get() > 0) Fulltext.this.commit(true);
} catch (InterruptedException e) {}
final BlockingQueue<SolrDocument> docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(collectionQuery, 0, 1000000, 600000, -1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName());
try {
SolrDocument doc;
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
String u = (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName());
if (u.startsWith(basepath)) {
remove(ASCII.getBytes((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName())));
count.incrementAndGet();
}
}
};
if (concurrently) t.start(); else t.run();
if (count.get() > 0) Fulltext.this.commit(true);
} catch (InterruptedException e) {}
return count.get();
}

Expand All @@ -688,11 +659,8 @@ public void run() {
public void remove(final Collection<String> deleteIDs) {
if (deleteIDs == null || deleteIDs.size() == 0) return;
try {
synchronized (Fulltext.this.solrInstances) {
this.getDefaultConnector().deleteByIds(deleteIDs);
this.getWebgraphConnector().deleteByIds(deleteIDs);
this.commit(true);
}
this.getDefaultConnector().deleteByIds(deleteIDs);
this.getWebgraphConnector().deleteByIds(deleteIDs);
} catch (final Throwable e) {
Log.logException(e);
}
Expand All @@ -708,10 +676,8 @@ public boolean remove(final byte[] urlHash) {
if (urlHash == null) return false;
try {
String id = ASCII.String(urlHash);
synchronized (this.solrInstances) {
this.getDefaultConnector().deleteById(id);
this.getWebgraphConnector().deleteById(id);
}
this.getDefaultConnector().deleteById(id);
this.getWebgraphConnector().deleteById(id);
} catch (final Throwable e) {
Log.logException(e);
}
Expand Down

0 comments on commit b24d1d1

Please sign in to comment.