Permalink
Browse files

Enable optional https support for /yacy/transferURL API calls.

Also updated some Javadoc and consistently use Switchboard instance as a
constructor parameter where relevant.
  • Loading branch information...
luccioman committed Dec 19, 2017
1 parent 79a2ba3 commit 1c4803e40ac883150c6bbb40885f86e4b89c96fc
@@ -334,7 +334,7 @@ public static serverObjects respond(final RequestHeader header, final serverObje
// transport to other peer
final boolean gzipBody = sb.getConfigBool("indexControl.gzipBody", false);
final int timeout = (int) sb.getConfigLong("indexControl.timeout", 60000);
final String error = Protocol.transferIndex(sb.peers, seed, icc, knownURLs, segment, gzipBody, timeout);
final String error = Protocol.transferIndex(sb, seed, icc, knownURLs, segment, gzipBody, timeout);
prop.put("result", (error == null) ? ("Successfully transferred "
+ knownURLs.size()
+ " words in "
@@ -88,37 +88,35 @@
*/
private Map<String, Transmission.Chunk> transmissionBuffer;
// the segment backend is used to store the remaining indexContainers in case that the object is closed
/** the segment backend is used to store the remaining indexContainers in case that the object is closed */
private final Segment segment;
// the seed database
/** the seed database */
private final SeedDB seeds;
// the log
/** the log */
private final ConcurrentLog log;
// transmission process
/** transmission process */
private WorkflowProcessor<Transmission.Chunk> indexingTransmissionProcessor;
// transmission object
/** transmission object */
private final Transmission transmission;
/** The Switchboard instance holding the server environment */
private final Switchboard env;
public Dispatcher(
final Segment segment,
final SeedDB seeds,
final Switchboard env,
final boolean gzipBody,
final int timeout
) {
this.env = env;
this.transmissionBuffer = new ConcurrentHashMap<String, Transmission.Chunk>();
this.segment = segment;
this.seeds = seeds;
this.segment = env.index;
this.seeds = env.peers;
this.log = new ConcurrentLog("INDEX-TRANSFER-DISPATCHER");
this.transmission = new Transmission(
this.log,
segment,
seeds,
gzipBody,
timeout);
this.transmission = new Transmission(env, this.log, gzipBody, timeout);
final int concurrentSender = Math.min(8, WorkflowProcessor.availableCPU);
this.indexingTransmissionProcessor = new WorkflowProcessor<Transmission.Chunk>(
@@ -366,7 +364,7 @@ public boolean dequeueContainer() {
while (Protocol.metadataRetrievalRunning.get() > 0) try {Thread.sleep(1000);} catch (InterruptedException e) {break;}
// we must test this here again
while (Memory.load() > Switchboard.getSwitchboard().getConfigFloat(SwitchboardConstants.INDEX_DIST_LOADPREREQ, 2.0f)) try {Thread.sleep(10000);} catch (InterruptedException e) {break;}
while (Memory.load() > this.env.getConfigFloat(SwitchboardConstants.INDEX_DIST_LOADPREREQ, 2.0f)) try {Thread.sleep(10000);} catch (InterruptedException e) {break;}
// do the transmission
final boolean success = chunk.transmit();
@@ -1596,18 +1596,25 @@ protected static boolean checkDocumentSize(SolrDocument doc, long maxSize) {
public static AtomicInteger metadataRetrievalRunning = new AtomicInteger(0);
/**
* transfer the index. If the transmission fails, return a string describing the cause. If everything is
* ok, return null.
*
* @param targetSeed
* @param indexes
* @param urlCache
* @param gzipBody
* @param timeout
* @return
*/
* transfer the index. If the transmission fails, return a string describing the
* cause. If everything is ok, return null.
*
* @param sb
* the Switchboard instance holding server environment
* @param targetSeed
* the target peer
* @param indexes
* the index entries to transfer
* @param urlCache
* @param gzipBody
* when true, the transferred data are compressed using gzip
* @param timeout
* the maximum time in milliseconds to wait for a success of the
* http(s) request to the remote peer
* @return
*/
public static String transferIndex(
final SeedDB seeds,
final Switchboard sb,
final Seed targetSeed,
final ReferenceContainerCache<WordReference> indexes,
final HandleSet urlRefs,
@@ -1632,8 +1639,11 @@ public static String transferIndex(
}
}
final boolean preferHttps = sb.getConfigBool(SwitchboardConstants.NETWORK_PROTOCOL_HTTPS_PREFERRED,
SwitchboardConstants.NETWORK_PROTOCOL_HTTPS_PREFERRED_DEFAULT);
// transfer the RWI without the URLs
Map<String, String> in = transferRWI(targetSeed, indexes, gzipBody, timeout);
Map<String, String> in = transferRWI(targetSeed, indexes, gzipBody, timeout, preferHttps);
if ( in == null ) {
// targetSeed interface departure is already handled within transferRWI() for no response situation
@@ -1645,13 +1655,13 @@ public static String transferIndex(
if ( result == null ) {
String errorCause = "no result from transferRWI";
String usedIP = in.get(Seed.IP);
seeds.peerActions.interfaceDeparture(targetSeed, usedIP); // disconnect unavailable peer
sb.peers.peerActions.interfaceDeparture(targetSeed, usedIP); // disconnect unavailable peer
return errorCause;
}
if ( !(result.equals("ok")) ) {
targetSeed.setFlagAcceptRemoteIndex(false); // the peer does not want our index
seeds.addConnected(targetSeed); // update the peer
sb.peers.addConnected(targetSeed); // update the peer
return result;
}
@@ -1672,7 +1682,7 @@ public static String transferIndex(
EventChannel.channels(EventChannel.DHTSEND).addMessage(new RSSMessage("Sent " + indexes.size() + " RWIs " + indexes.toString() + " to " + targetSeed.getName() + "/[" + targetSeed.hash + "], " + uhs.length + " URLs there unknown", "", targetSeed.hash));
in = transferURL(targetSeed, uhs, urlRefs, segment, gzipBody, timeout);
in = transferURL(targetSeed, uhs, urlRefs, segment, gzipBody, timeout, preferHttps);
if ( in == null ) {
return "no connection from transferURL";
@@ -1682,13 +1692,13 @@ public static String transferIndex(
if ( result == null ) {
String errorCause = "no result from transferURL";
String usedIP = in.get(Seed.IP);
seeds.peerActions.interfaceDeparture(targetSeed, usedIP); // disconnect unavailable peer ip
sb.peers.peerActions.interfaceDeparture(targetSeed, usedIP); // disconnect unavailable peer ip
return errorCause;
}
if ( !result.equals("ok") ) {
targetSeed.setFlagAcceptRemoteIndex(false); // the peer does not want our index
seeds.addConnected(targetSeed); // update the peer
sb.peers.addConnected(targetSeed); // update the peer
return result;
}
EventChannel.channels(EventChannel.DHTSEND).addMessage(
@@ -1701,23 +1711,29 @@ public static String transferIndex(
}
/**
* Transfer Reverse Word Index entries to remote peer.
* If the used IP is not responding, this IP (interface) is removed from
* targtSeed IP list.
* Remote peer responds with list of unknown url hashes
*
* @param targetSeed
* @param indexes
* @param gzipBody
* @param timeout
* @return peer response or null if transfer failed
*/
* Transfer Reverse Word Index entries to remote peer. If the used IP is not
* responding, this IP (interface) is removed from targtSeed IP list. Remote
* peer responds with list of unknown url hashes
*
* @param targetSeed
* the target peer
* @param indexes
* the index entries to transfer
* @param gzipBody
* when true, the transferred data are compressed using gzip
* @param timeout
* the maximum time in milliseconds to wait for a success of the
* http(s) request(s) to the remote peer
* @param preferHttps
* when true, use https when available on the target peer
* @return peer response or null if transfer failed
*/
private static Map<String, String> transferRWI(
final Seed targetSeed,
final ReferenceContainerCache<WordReference> indexes,
boolean gzipBody,
final int timeout) {
final boolean preferHttps = Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.NETWORK_PROTOCOL_HTTPS_PREFERRED, SwitchboardConstants.NETWORK_PROTOCOL_HTTPS_PREFERRED_DEFAULT);
final int timeout,
final boolean preferHttps) {
for (final String ip : targetSeed.getIPs()) {
if (ip == null) {
Network.log.warn("no address for transferRWI");
@@ -1783,6 +1799,7 @@ public static String transferIndex(
if(content != null) {
/* Success with http : mark SSL as unavailable on the target peer */
Network.log.info("yacyClient.transferRWI SSL unavailable on address " + ip);
targetSeed.setFlagSSLAvailable(false);
Switchboard.getSwitchboard().peers.updateConnected(targetSeed);
}
} else {
@@ -1809,12 +1826,18 @@ public static String transferIndex(
/**
* Transfer URL entries to remote peer
*
* @param targetSeed
* @param uhs
* @param urlRefs
* @param targetSeed
* the target peer
* @param uhs hashes of URLs to transfer (unknown by the target peer)
* @param urlRefs list of locally known URLs entries
* @param segment
* @param gzipBody
* @param timeout
* @param gzipBody
* when true, the transferred data are compressed using gzip
* @param timeout
* the maximum time in milliseconds to wait for a success of the
* http(s) request(s) to the remote peer
* @param preferHttps when true, use https when available on the target peer
* @return remote peer response
*/
private static Map<String, String> transferURL(
@@ -1823,14 +1846,10 @@ public static String transferIndex(
final HandleSet urlRefs,
final Segment segment,
boolean gzipBody,
final int timeout) {
final int timeout,
final boolean preferHttps) {
// this post a message to the remote message board
for (String ip : targetSeed.getIPs()) {
final String address = targetSeed.getPublicAddress(ip);
if ( address == null ) {
return null;
}
for (final String ip : targetSeed.getIPs()) {
// prepare post values
final String salt = crypt.randomSalt();
final Map<String, ContentBody> parts =
@@ -1851,7 +1870,7 @@ public static String transferIndex(
metadataRetrievalRunning.incrementAndGet();
for (int i = 0; i < uhs.length; i++) {
key = ASCII.getBytes(uhs[i]);
if (urlRefs.has(key)) {
if (urlRefs.has(key)) {
url = segment.fulltext().getMetadata(key);
if (url == null) {
if (Network.log.isFine()) Network.log.fine("DEBUG transferIndex: requested url hash '" + uhs[i] + "'");
@@ -1864,19 +1883,29 @@ public static String transferIndex(
urlPayloadSize += resource.length();
urlc++;
}
}
}
}
metadataRetrievalRunning.decrementAndGet();
try {
MultiProtocolURL targetBaseURL = targetSeed.getPublicMultiprotocolURL(ip, preferHttps);
parts.put("urlc", UTF8.StringBody(Integer.toString(urlc)));
final HTTPClient httpClient = new HTTPClient(ClientIdentification.yacyInternetCrawlerAgent, timeout);
final byte[] content =
httpClient.POSTbytes(
new MultiProtocolURL("http://" + address + "/yacy/transferURL.html"),
targetSeed.getHexHash() + ".yacyh",
parts,
gzipBody, true);
byte[] content = null;
try {
content = httpClient.POSTbytes(new MultiProtocolURL(targetBaseURL, "/yacy/transferURL.html"),
targetSeed.getHexHash() + ".yacyh", parts, gzipBody, true);
} catch(final IOException e) {
if(targetBaseURL.isHTTPS()) {
targetBaseURL = targetSeed.getPublicMultiprotocolURL(ip, false);
/* Failed with https : retry with http on the same address */
content = httpClient.POSTbytes(new MultiProtocolURL(targetBaseURL, "/yacy/transferURL.html"),
targetSeed.getHexHash() + ".yacyh", parts, gzipBody, true);
} else {
throw e;
}
}
final Iterator<String> v = FileUtils.strings(content);
final Map<String, String> result = FileUtils.table(v);
@@ -1885,7 +1914,7 @@ public static String transferIndex(
result.put(Seed.IP, ip); // add used ip to result for error handling (in case no "result" key was received)
return result;
} catch (final Exception e ) {
Network.log.warn("yacyClient.transferURL to " + address + " error: " + e.getMessage());
Network.log.warn("yacyClient.transferURL to " + ip + " error: " + e.getMessage());
}
}
return null;
@@ -43,6 +43,7 @@
import net.yacy.kelondro.rwi.ReferenceContainer;
import net.yacy.kelondro.rwi.ReferenceContainerCache;
import net.yacy.kelondro.workflow.WorkflowJob;
import net.yacy.search.Switchboard;
import net.yacy.search.index.Segment;
public class Transmission {
@@ -51,21 +52,24 @@
// anything beyond that might get discarded without notice
public static final int maxRWIsCount = 1000; // since SVN 7993 hardcoded in htroot/yacy/transferRWI.java:161
/** The Switchboard instance holding the server environment */
private final Switchboard env;
protected ConcurrentLog log;
protected Segment segment;
protected SeedDB seeds;
protected boolean gzipBody4Transfer;
protected int timeout4Transfer;
public Transmission(
final Switchboard env,
final ConcurrentLog log,
final Segment segment,
final SeedDB seeds,
final boolean gzipBody4Transfer,
final int timeout4Transfer) {
this.env = env;
this.log = log;
this.segment = segment;
this.seeds = seeds;
this.segment = env.index;
this.seeds = env.peers;
this.gzipBody4Transfer = gzipBody4Transfer;
this.timeout4Transfer = timeout4Transfer;
}
@@ -217,7 +221,9 @@ public boolean transmit() {
}
Transmission.this.log.info("starting new index transmission request to " + this.dhtTarget.getName());
final long start = System.currentTimeMillis();
final String error = Protocol.transferIndex(Transmission.this.seeds, this.dhtTarget, this.containers, this.references, Transmission.this.segment, Transmission.this.gzipBody4Transfer, Transmission.this.timeout4Transfer);
final String error = Protocol.transferIndex(Transmission.this.env, this.dhtTarget, this.containers,
this.references, Transmission.this.segment, Transmission.this.gzipBody4Transfer,
Transmission.this.timeout4Transfer);
if (error == null) {
// words successfully transfered
final long transferTime = System.currentTimeMillis() - start;
@@ -629,12 +629,7 @@ public boolean jobImpl() throws Exception {
//final long startedSeedListAquisition = System.currentTimeMillis();
// init a DHT transmission dispatcher
this.dhtDispatcher =
(this.peers.sizeConnected() == 0) ? null : new Dispatcher(
this.index,
this.peers,
true,
10000);
this.dhtDispatcher = (this.peers.sizeConnected() == 0) ? null : new Dispatcher(this, true, 10000);
// set up local robots.txt
this.robotstxtConfig = RobotsTxtConfig.init(this);
@@ -1477,12 +1472,7 @@ public void switchNetwork(final String networkDefinition) throws FileNotFoundExc
this.crawler = new CrawlSwitchboard(this);
// init a DHT transmission dispatcher
this.dhtDispatcher =
(this.peers.sizeConnected() == 0) ? null : new Dispatcher(
this.index,
this.peers,
true,
10000);
this.dhtDispatcher = (this.peers.sizeConnected() == 0) ? null : new Dispatcher(this, true, 10000);
// create new web structure
this.webStructure = new WebStructureGraph(new File(this.queuesRoot, "webStructure.map"));

0 comments on commit 1c4803e

Please sign in to comment.