Skip to content

Commit

Permalink
tried to fix more deadlocks:
Browse files Browse the repository at this point in the history
- changed connection modes in ftpc
- replaced sort tread pool in row collections by new one using util.concurrent. the old pool had caused blockings

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4582 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed Mar 19, 2008
1 parent 26155cb commit f3996e6
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 191 deletions.
2 changes: 1 addition & 1 deletion build.properties
Expand Up @@ -3,7 +3,7 @@ javacSource=1.5
javacTarget=1.5

# Release Configuration
releaseVersion=0.574
releaseVersion=0.575
stdReleaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
embReleaseFile=yacy_emb_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
proReleaseFile=yacy_pro_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
Expand Down
73 changes: 21 additions & 52 deletions source/de/anomic/kelondro/kelondroRowCollection.java
Expand Up @@ -31,7 +31,8 @@
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import de.anomic.server.serverFileUtils;
import de.anomic.server.serverMemory;
Expand All @@ -45,13 +46,13 @@ public class kelondroRowCollection {
private static final int isortlimit = 20;
static final Integer dummy = new Integer(0);

public static final qsortthread sortingthread;
public static ExecutorService sortingthreadexecutor = null;

static {
if (serverProcessor.useCPU > 1) {
sortingthread = new qsortthread();
sortingthread.start();
sortingthreadexecutor = Executors.newCachedThreadPool();
} else {
sortingthread = null;
sortingthreadexecutor = null;
}
}

Expand Down Expand Up @@ -475,11 +476,12 @@ public synchronized final void sort() {
}
byte[] swapspace = new byte[this.rowdef.objectsize];
int p = partition(0, this.chunkcount, this.sortBound, swapspace);
if ((sortingthread != null) && (p > 50) && (sortingthread.isAlive())) {
if ((sortingthreadexecutor != null) && (!sortingthreadexecutor.isShutdown()) && (p > 50)) {
// sort this using multi-threading
sortingthread.process(this, 0, p, 0);
Thread qsortthread = new qsortthread(this, 0, p, 0);
sortingthreadexecutor.execute(qsortthread);
qsort(p, this.chunkcount, 0, swapspace);
sortingthread.waitFinish();
if (qsortthread.isAlive()) try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); }
} else {
qsort(0, p, 0, swapspace);
qsort(p, this.chunkcount, 0, swapspace);
Expand All @@ -489,55 +491,22 @@ public synchronized final void sort() {
}

public static class qsortthread extends Thread {
private boolean terminate;
private SynchronousQueue<qsortobject> startObject;
private SynchronousQueue<Integer> finishObject;
public qsortthread() {
this.terminate = false;
this.startObject = new SynchronousQueue<qsortobject>();
this.finishObject = new SynchronousQueue<Integer>();
this.setName("kelondroRowCollection SORT THREAD");
}
public void process(kelondroRowCollection rc, int L, int R, int S) {
assert rc != null;
synchronized (startObject) {
try {this.startObject.put(new qsortobject(rc, L, R, S));} catch (InterruptedException e) {}
}
}
public void waitFinish() {
try {this.finishObject.take();} catch (InterruptedException e) {}
kelondroRowCollection rc;
int L, R, S;

public qsortthread(kelondroRowCollection rc, int L, int R, int S) {
this.rc = rc;
this.L = L;
this.R = R;
this.S = S;
}
public void terminate() {
this.terminate = true;
this.interrupt();
}

public void run() {
qsortobject so = null;
while (!terminate) {
try {so = this.startObject.take();} catch (InterruptedException e) {
break;
}
assert so != null;
so.rc.qsort(so.sl, so.sr, so.sb, new byte[so.rc.rowdef.objectsize]);
try {this.finishObject.put(dummy);} catch (InterruptedException e1) {
break;
}
so = null;
}
rc.qsort(L, R, S, new byte[rc.rowdef.objectsize]);
synchronized (rc) {rc.notify();}
}
}

private static class qsortobject {
protected kelondroRowCollection rc;
protected int sl, sr, sb;
public qsortobject(kelondroRowCollection rc, int L, int R, int S) {
this.rc = rc;
this.sl = L;
this.sr = R;
this.sb = S;
}
}

final void qsort(int L, int R, int S, byte[] swapspace) {
if (R - L < isortlimit) {
isort(L, R, swapspace);
Expand Down
154 changes: 17 additions & 137 deletions source/de/anomic/net/ftpc.java
@@ -1,12 +1,19 @@
// ftpc.java
// -------------------------------------
// (C) by Michael Peter Christen; mc@anomic.de
// ftpc.java
// (C) by Michael Peter Christen; mc@yacy.net
// first published on http://www.anomic.de
// Frankfurt, Germany, 2002, 2004, 2006
// main implementation finished: 28.05.2002
// last major change: 06.05.2004
// added html generation for directories: 5.9.2006
//
// This is a part of YaCy, a peer-to-peer based web search engine
//
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
Expand All @@ -20,25 +27,6 @@
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
// Using this software in any meaning (reading, learning, copying, compiling,
// running) means that you agree that the Author(s) is (are) not responsible
// for cost, loss of data or any harm that may be caused directly or indirectly
// by usage of this softare or this documentation. The usage of this software
// is on your own risk. The installation and usage (starting/running) of this
// software may allow other people or application to access your computer and
// any attached devices and is highly dependent on the configuration of the
// software which must be done by the user of the software; the author(s) is
// (are) also not responsible for proper configuration and usage of the
// software, even if provoked by documentation provided together with
// the software.
//
// Any changes to this file according to the GPL as documented in the file
// gpl.txt aside this file in the shipment you received can be done to the
// lines that follows this copyright notice here, but changes must not be
// done inside the copyright notive above. A re-distribution must contain
// the intact and unchanged copyright notice.
// Contributions and changes to the program code must be marked as such.

package de.anomic.net;

Expand All @@ -48,7 +36,6 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
Expand Down Expand Up @@ -1603,6 +1590,11 @@ private void open(final String host, final int port) throws IOException {
try {
ControlSocket = new Socket(host, port);
ControlSocket.setSoTimeout(getTimeout());
ControlSocket.setKeepAlive(true);
ControlSocket.setTcpNoDelay(true); // no accumulation until buffer is full
ControlSocket.setSoLinger(false, getTimeout()); // !wait for all data being written on close()
ControlSocket.setSendBufferSize(1440); // read http://www.cisco.com/warp/public/105/38.shtml
ControlSocket.setReceiveBufferSize(1440); // read http://www.cisco.com/warp/public/105/38.shtml
clientInput = new BufferedReader(new InputStreamReader(ControlSocket.getInputStream()));
clientOutput = new DataOutputStream(new BufferedOutputStream(ControlSocket.getOutputStream()));

Expand Down Expand Up @@ -2116,6 +2108,8 @@ private boolean isPassive() {
private void createActiveDataPort() throws IOException {
// create data socket and bind it to free port available
DataSocketActive = new ServerSocket(0);
DataSocketActive.setSoTimeout(getTimeout());
DataSocketActive.setReceiveBufferSize(1440); // read http://www.cisco.com/warp/public/105/38.shtml
applyDataSocketTimeout();

// get port socket has been bound to
Expand Down Expand Up @@ -2502,120 +2496,6 @@ public void setDataSocketTimeout(final int timeout) {
}
}

private class ee extends SecurityException {

private static final long serialVersionUID = 1L;
private int value = 0;

public ee() {
}

public ee(final int value) {
super();
this.value = value;
}

public int value() {
return value;
}
}

// TODO is this necessary??? (not used, no function)
private class sm extends SecurityManager {
public void checkCreateClassLoader() {
}

public void checkAccess(final Thread g) {
}

public void checkAccess(final ThreadGroup g) {
}

public void checkExit(final int status) {
// System.outPrintln("ShellSecurityManager: object
// called System.exit(" + status + ")");
// signal that someone is trying to terminate the JVM.
throw new ee(status);
}

public void checkExec(final String cmd) {
}

public void checkLink(final String lib) {
}

public void checkRead(final FileDescriptor fd) {
}

public void checkRead(final String file) {
}

public void checkRead(final String file, final Object context) {
}

public void checkWrite(final FileDescriptor fd) {
}

public void checkWrite(final String file) {
}

public void checkDelete(final String file) {
}

public void checkConnect(final String host, final int port) {
}

public void checkConnect(final String host, final int port, final Object context) {
}

public void checkListen(final int port) {
}

public void checkAccept(final String host, final int port) {
}

public void checkMulticast(final InetAddress maddr) {
}

// public void checkMulticast(InetAddress maddr, byte ttl) { }
public void checkPropertiesAccess() {
}

public void checkPropertyAccess(final String key) {
}

public void checkPropertyAccess(final String key, final String def) {
}

public boolean checkTopLevelWindow(final Object window) {
return true;
}

public void checkPrintJobAccess() {
}

public void checkSystemClipboardAccess() {
}

public void checkAwtEventQueueAccess() {
}

public void checkPackageAccess(final String pkg) {
}

public void checkPackageDefinition(final String pkg) {
}

public void checkSetFactory() {
}

public void checkMemberAccess(final Class<?> clazz, final int which) {
}

public void checkSecurityAccess(final String provider) {
}
}

public static List<String> dir(final String host, final String remotePath, final String account,
final String password, final boolean extended) {
try {
Expand Down
2 changes: 1 addition & 1 deletion source/yacy.java
Expand Up @@ -416,7 +416,7 @@ private static void startup(File homePath, long startupMemFree, long startupMemT
serverLog.logSevere("MAIN CONTROL LOOP", "PANIC: " + e.getMessage(),e);
}
// shut down
if (kelondroRowCollection.sortingthread != null) kelondroRowCollection.sortingthread.terminate();
if (kelondroRowCollection.sortingthreadexecutor != null) kelondroRowCollection.sortingthreadexecutor.shutdown();
serverLog.logConfig("SHUTDOWN", "caught termination signal");
server.terminate(false);
server.interrupt();
Expand Down

0 comments on commit f3996e6

Please sign in to comment.