Permalink
Browse files

Removed unncessary reflection usage for workflow tasks.

This improves code readability and maintainability (calls hierarchy are
easier to read) and eventually performance.
  • Loading branch information...
luccioman committed Jan 15, 2018
1 parent 897d3d3 commit 9ddf92d143012572c5ec1b3562cd9fbb3d12ca7a
@@ -55,13 +55,14 @@
import net.yacy.crawler.robots.RobotsTxt;
import net.yacy.document.TextParser;
import net.yacy.kelondro.workflow.WorkflowProcessor;
import net.yacy.kelondro.workflow.WorkflowTask;
import net.yacy.peers.SeedDB;
import net.yacy.repository.Blacklist.BlacklistType;
import net.yacy.repository.FilterEngine;
import net.yacy.search.Switchboard;
import net.yacy.search.index.Segment;
public final class CrawlStacker {
public final class CrawlStacker implements WorkflowTask<Request>{
public static String ERROR_NO_MATCH_MUST_MATCH_FILTER = "url does not match must-match filter ";
public static String ERROR_MATCH_WITH_MUST_NOT_MATCH_FILTER = "url matches must-not-match filter ";
@@ -99,7 +100,7 @@ public CrawlStacker(
this.acceptLocalURLs = acceptLocalURLs;
this.acceptGlobalURLs = acceptGlobalURLs;
this.domainList = domainList;
this.requestQueue = new WorkflowProcessor<Request>("CrawlStacker", "This process checks new urls before they are enqueued into the balancer (proper, double-check, correct domain, filter)", new String[]{"Balancer"}, this, "job", 10000, null, WorkflowProcessor.availableCPU);
this.requestQueue = new WorkflowProcessor<Request>("CrawlStacker", "This process checks new urls before they are enqueued into the balancer (proper, double-check, correct domain, filter)", new String[]{"Balancer"}, this, 10000, null, WorkflowProcessor.availableCPU);
CrawlStacker.log.info("STACKCRAWL thread initialized.");
}
@@ -130,7 +131,8 @@ public synchronized void close() {
clear();
}
public Request job(final Request entry) {
@Override
public Request process(final Request entry) {
// this is the method that is called by the busy thread from outside
if (entry == null) return null;
@@ -24,10 +24,6 @@
package net.yacy.kelondro.workflow;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import net.yacy.cora.util.ConcurrentLog;
@@ -36,43 +32,19 @@
public class InstantBlockingThread<J extends WorkflowJob> extends AbstractBlockingThread<J> implements BlockingThread<J> {
private static final String BLOCKINGTHREAD = "BLOCKINGTHREAD";
private final Method jobExecMethod;
private final Object environment;
private final Long handle;
private final WorkflowTask<J> task;
private static AtomicInteger handleCounter = new AtomicInteger(0);
private static AtomicInteger instantThreadCounter = new AtomicInteger(0);
private static final ConcurrentMap<Long, String> jobs = new ConcurrentHashMap<Long, String>();
public InstantBlockingThread(final WorkflowProcessor<J> manager) {
super();
// jobExec is the name of a method of the object 'env' that executes the one-step-run
// jobCount is the name of a method that returns the size of the job
// set the manager of blocking queues for input and output
setManager(manager);
// define execution class
final Object env = manager.getEnvironment();
final String jobExec = manager.getMethodName();
this.jobExecMethod = execMethod(env, jobExec);
this.environment = (env instanceof Class<?>) ? null : env;
setName(this.jobExecMethod.getClass().getName() + "." + this.jobExecMethod.getName() + "." + handleCounter.getAndIncrement());
this.handle = Long.valueOf(System.currentTimeMillis() + getName().hashCode());
}
protected static Method execMethod(final Object env, final String jobExec) {
final Class<?> theClass = (env instanceof Class<?>) ? (Class<?>) env : env.getClass();
try {
for (final Method method: theClass.getMethods()) {
if ((method.getParameterTypes().length == 1) && (method.getName().equals(jobExec))) {
return method;
}
}
throw new NoSuchMethodException(jobExec + " does not exist in " + env.getClass().getName());
} catch (final NoSuchMethodException e) {
throw new RuntimeException("serverInstantThread, wrong declaration of jobExec: " + e.getMessage());
}
// define task to be executed
this.task = manager.getTask();
setName(manager.getName() + "." + handleCounter.getAndIncrement());
}
@Override
@@ -95,21 +67,17 @@ public J job(final J next) throws Exception {
instantThreadCounter.incrementAndGet();
//System.out.println("started job " + this.handle + ": " + this.getName());
jobs.put(this.handle, getName());
try {
out = (J) this.jobExecMethod.invoke(this.environment, new Object[]{next});
out = this.task.process(next);
} catch (final Throwable e) {
ConcurrentLog.severe(BLOCKINGTHREAD, "Internal Error in serverInstantThread.job: " + e.getMessage());
ConcurrentLog.severe(BLOCKINGTHREAD, "shutting down thread '" + getName() + "'");
final Throwable targetException = (e instanceof InvocationTargetException) ? ((InvocationTargetException) e).getTargetException() : null;
ConcurrentLog.logException(e);
ConcurrentLog.logException(e.getCause());
if (targetException != null) ConcurrentLog.logException(targetException);
ConcurrentLog.severe(BLOCKINGTHREAD, "Runtime Error in serverInstantThread.job, thread '" + getName() + "': " + e.getMessage());
}
instantThreadCounter.decrementAndGet();
jobs.remove(this.handle);
getManager().increaseJobTime(System.currentTimeMillis() - t);
}
return out;
@@ -47,27 +47,26 @@
private BlockingQueue<J> input;
private final WorkflowProcessor<J> output;
private final int maxpoolsize;
private final Object environment;
private final String processName, methodName, description;
private final WorkflowTask<J> task;
private final String processName, description;
private final String[] childs;
private long blockTime, execTime, passOnTime;
private long execCount;
public WorkflowProcessor(
final String name, final String description, final String[] childnames,
final Object env, final String jobExecMethod,
final WorkflowTask<J> task,
final int inputQueueSize, final WorkflowProcessor<J> output,
final int maxpoolsize) {
// start a fixed number of executors that handle entries in the process queue
this.environment = env;
this.processName = name;
this.description = description;
this.methodName = jobExecMethod;
this.task = task;
this.childs = childnames;
this.maxpoolsize = maxpoolsize;
this.input = new LinkedBlockingQueue<J>(Math.max(maxpoolsize + 1, inputQueueSize));
this.output = output;
this.executor = Executors.newCachedThreadPool(new NamePrefixThreadFactory(this.methodName));
this.executor = Executors.newCachedThreadPool(new NamePrefixThreadFactory(name));
this.executorRunning = new AtomicInteger(0);
/*
for (int i = 0; i < this.maxpoolsize; i++) {
@@ -85,13 +84,9 @@ public WorkflowProcessor(
processMonitor.add(this);
}
public Object getEnvironment() {
return this.environment;
}
public String getMethodName() {
return this.methodName;
}
public WorkflowTask<J> getTask() {
return this.task;
}
public int getQueueSize() {
if (this.input == null) return 0;
@@ -169,14 +164,13 @@ private synchronized void relaxCapacity() {
this.input = i;
}
@SuppressWarnings("unchecked")
public void enQueue(final J in) {
// ensure that enough job executors are running
if (this.input == null || this.executor == null || this.executor.isShutdown() || this.executor.isTerminated()) {
// execute serialized without extra thread
//Log.logWarning("PROCESSOR", "executing job " + environment.getClass().getName() + "." + methodName + " serialized");
try {
final J out = (J) InstantBlockingThread.execMethod(this.environment, this.methodName).invoke(this.environment, new Object[]{in});
final J out = this.task.process(in);
if (out != null && this.output != null) {
this.output.enQueue(out);
}
@@ -0,0 +1,47 @@
// WorkflowTask.java
// ---------------------------
// Copyright 2018 by luccioman; https://github.com/luccioman
//
// This is a part of YaCy, a peer-to-peer based web search engine
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package net.yacy.kelondro.workflow;
/**
* A workflow task to be processed by a {@link WorkflowProcessor}.
*
* @author luccioman
* @param <ENTRY>
* the workflow entry type to be processed
*
*/
public interface WorkflowTask<ENTRY> {
/**
* Process a single workflow entry and eventually return the entry to be
* processed by the next processor in the workflow
*
* @param in
* the workflow entry
* @return an entry for the next processor or null
* @throws Exception
* when an error occurred
*/
ENTRY process(final ENTRY in) throws Exception;
}
@@ -44,11 +44,13 @@
import net.yacy.kelondro.index.RowHandleSet;
import net.yacy.kelondro.rwi.ReferenceContainer;
import net.yacy.kelondro.workflow.WorkflowProcessor;
import net.yacy.kelondro.workflow.WorkflowTask;
import net.yacy.peers.Transmission.Chunk;
import net.yacy.search.Switchboard;
import net.yacy.search.SwitchboardConstants;
import net.yacy.search.index.Segment;
public class Dispatcher {
public class Dispatcher implements WorkflowTask<Transmission.Chunk> {
/**
* the dispatcher class accumulates indexContainerCache objects before they are transfered
@@ -123,7 +125,7 @@ public Dispatcher(
"transferDocumentIndex",
"This is the RWI transmission process",
new String[]{"RWI/Cache/Collections"},
this, "transferDocumentIndex", concurrentSender * 3, null, concurrentSender);
this, concurrentSender * 3, null, concurrentSender);
}
public int bufferSize() {
@@ -350,15 +352,16 @@ public boolean dequeueContainer() {
this.indexingTransmissionProcessor.enQueue(chunk);
return true;
}
@Override
public Chunk process(final Transmission.Chunk chunk) throws Exception {
return transferDocumentIndex(chunk);
}
/**
* transfer job: this method is called using reflection from the switchboard
* the method is called as a Workflow process. That means it is always called whenever
* a job is placed in the workflow queue. This happens in dequeueContainer()
* @param chunk
* @return
* Transfer job implementation
*/
public Transmission.Chunk transferDocumentIndex(final Transmission.Chunk chunk) {
private Transmission.Chunk transferDocumentIndex(final Transmission.Chunk chunk) {
// try to keep the system healthy; sleep as long as System load is too high
while (Protocol.metadataRetrievalRunning.get() > 0) try {Thread.sleep(1000);} catch (InterruptedException e) {break;}
@@ -195,6 +195,7 @@
import net.yacy.kelondro.workflow.InstantBusyThread;
import net.yacy.kelondro.workflow.OneTimeBusyThread;
import net.yacy.kelondro.workflow.WorkflowProcessor;
import net.yacy.kelondro.workflow.WorkflowTask;
import net.yacy.kelondro.workflow.WorkflowThread;
import net.yacy.peers.DHTSelection;
import net.yacy.peers.Dispatcher;
@@ -1006,8 +1007,14 @@ public boolean jobImpl() throws Exception {
new String[] {
"RWI/Cache/Collections"
},
this,
"storeDocumentIndex",
new WorkflowTask<IndexingQueueEntry>() {
@Override
public IndexingQueueEntry process(final IndexingQueueEntry in) throws Exception {
storeDocumentIndex(in);
return null;
}
},
2,
null,
1);
@@ -1018,8 +1025,13 @@ public boolean jobImpl() throws Exception {
new String[] {
"storeDocumentIndex"
},
this,
"webStructureAnalysis",
new WorkflowTask<IndexingQueueEntry>() {
@Override
public IndexingQueueEntry process(final IndexingQueueEntry in) throws Exception {
return webStructureAnalysis(in);
}
},
WorkflowProcessor.availableCPU + 1,
this.indexingStorageProcessor,
WorkflowProcessor.availableCPU);
@@ -1030,8 +1042,13 @@ public boolean jobImpl() throws Exception {
new String[] {
"webStructureAnalysis"
},
this,
"condenseDocument",
new WorkflowTask<IndexingQueueEntry>() {
@Override
public IndexingQueueEntry process(final IndexingQueueEntry in) throws Exception {
return condenseDocument(in);
}
},
WorkflowProcessor.availableCPU + 1,
this.indexingAnalysisProcessor,
WorkflowProcessor.availableCPU);
@@ -1042,8 +1059,13 @@ public boolean jobImpl() throws Exception {
new String[] {
"condenseDocument", "CrawlStacker"
},
this,
"parseDocument",
new WorkflowTask<IndexingQueueEntry>() {
@Override
public IndexingQueueEntry process(final IndexingQueueEntry in) throws Exception {
return parseDocument(in);
}
},
Math.max(20, WorkflowProcessor.availableCPU * 2), // it may happen that this is filled with new files from the search process. That means there should be enough place for two result pages
this.indexingCondensementProcessor,
WorkflowProcessor.availableCPU);
@@ -2890,8 +2912,6 @@ public boolean crawlJobIsPaused(final String jobType) {
/**
* Parse a response to produce a new document to add to the index.
* <strong>Important :</strong> this method is called using reflection as a Workflow process and must therefore remain public.
* @param in an indexing workflow entry containing a response to parse
*/
public IndexingQueueEntry parseDocument(final IndexingQueueEntry in) {
in.queueEntry.updateStatus(Response.QUEUE_STATE_PARSING);
@@ -3071,10 +3091,11 @@ public IndexingQueueEntry parseDocument(final IndexingQueueEntry in) {
return documents;
}
/**
* <strong>Important :</strong> this method is called using reflection as a Workflow process and must therefore remain public.
* @param in an indexing workflow entry containing a response and the related parsed document(s)
*/
/**
* This does a structural analysis of plain texts: markup of headlines, slicing
* into phrases (i.e. sentences), markup with position, counting of words,
* calculation of term frequency.
*/
public IndexingQueueEntry condenseDocument(final IndexingQueueEntry in) {
in.queueEntry.updateStatus(Response.QUEUE_STATE_CONDENSING);
CrawlProfile profile = in.queueEntry.profile();
@@ -3150,9 +3171,7 @@ public IndexingQueueEntry condenseDocument(final IndexingQueueEntry in) {
}
/**
* Perform web structure analysis on parsed documents and update the web structure graph.
* <strong>Important :</strong> this method is called using reflection as a Workflow process and must therefore remain public.
* @param in an indexing workflow entry containing parsed document(s)
* Perform web structure analysis on parsed documents and update the web structure graph.
*/
public IndexingQueueEntry webStructureAnalysis(final IndexingQueueEntry in) {
in.queueEntry.updateStatus(Response.QUEUE_STATE_STRUCTUREANALYSIS);
@@ -3166,11 +3185,9 @@ public IndexingQueueEntry webStructureAnalysis(final IndexingQueueEntry in) {
}
return in;
}
/**
* Store a new entry to the local index.
* <strong>Important :</strong> this method is called using reflection as a Workflow process and must therefore remain public.
* @param in an indexing workflow entry containing parsed document(s) and a condenser instance
*/
public void storeDocumentIndex(final IndexingQueueEntry in) {
in.queueEntry.updateStatus(Response.QUEUE_STATE_INDEXSTORAGE);

0 comments on commit 9ddf92d

Please sign in to comment.