Skip to content

Commit

Permalink
fight against problems with remove-methods and synchronization
Browse files Browse the repository at this point in the history
- some bugs may have been fixed with wrong removal operations
- removed temporary storage of remove-positions and replaced by direct deletions
- changed synchronization
- added many assets
- modified dbtest to also test remove during threaded stresstest

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@3576 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed Apr 17, 2007
1 parent b6a5f53 commit 7a7a1c7
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 288 deletions.
36 changes: 33 additions & 3 deletions source/dbtest.java
Expand Up @@ -9,6 +9,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -133,6 +134,23 @@ public void run() {
}
}

public static final class RemoveJob extends STJob {
public RemoveJob(final kelondroIndex aTable, final long aSource) {
super(aTable, aSource);
}

public void run() {
final STEntry entry = new STEntry(this.getSource());
try {
getTable().remove(entry.getKey());
} catch (IOException e) {
System.err.println(e);
e.printStackTrace();
System.exit(0);
}
}
}

public static final class ReadJob extends STJob {
public ReadJob(final kelondroIndex aTable, final long aSource) {
super(aTable, aSource);
Expand Down Expand Up @@ -357,14 +375,26 @@ public static void main(String[] args) {
long readCount = Long.parseLong(args[4]);
long randomstart = Long.parseLong(args[5]);
final Random random = new Random(randomstart);
long r;
int p;
ArrayList ra = new ArrayList();
for (int i = 0; i < writeCount; i++) {
serverInstantThread.oneTimeJob(new WriteJob(table, i), random.nextLong() % 1000, 50);
r = random.nextLong() % 1000;
serverInstantThread.oneTimeJob(new WriteJob(table, r), 0, 50);
if (random.nextLong() % 5 == 0) ra.add(new Long(r));
for (int j = 0; j < readCount; j++) {
serverInstantThread.oneTimeJob(new ReadJob(table, random.nextLong() % writeCount), random.nextLong() % 1000, 20);
}
if ((ra.size() > 0) && (random.nextLong() % 7 == 0)) {
p = Math.abs(random.nextInt()) % ra.size();
System.out.println("remove: " + ((Long) ra.get(p)).longValue());
serverInstantThread.oneTimeJob(new RemoveJob(table, ((Long) ra.remove(p)).longValue()), 0, 50);
}
}
while (serverInstantThread.instantThreadCounter > 0) {
try {Thread.sleep(1000);} catch (InterruptedException e) {} // wait for all tasks to finish
System.out.println("count: " + serverInstantThread.instantThreadCounter + ", jobs: " + serverInstantThread.jobs.toString());
}
while (serverInstantThread.instantThreadCounter > 0)
try {Thread.sleep(100);} catch (InterruptedException e) {} // wait for all tasks to finish
try {Thread.sleep(6000);} catch (InterruptedException e) {}
}

Expand Down
4 changes: 1 addition & 3 deletions source/de/anomic/kelondro/kelondroArray.java
Expand Up @@ -38,9 +38,7 @@ public interface kelondroArray {

public int add(kelondroRow.Entry rowinstance) throws IOException;

public void remove(int index, boolean marked) throws IOException;

public void resolveMarkedRemoved() throws IOException;
public void remove(int index) throws IOException;

public void print() throws IOException;

Expand Down
43 changes: 14 additions & 29 deletions source/de/anomic/kelondro/kelondroCollectionIndex.java
Expand Up @@ -247,13 +247,6 @@ private kelondroFixedWidthArray getArray(int partitionNumber, int serialNumber,
return array;
}

private void arrayResolveRemoved() throws IOException {
Iterator i = arrays.values().iterator();
while (i.hasNext()) {
((kelondroFixedWidthArray) i.next()).resolveMarkedRemoved();
}
}

private int arrayCapacity(int arrayCounter) {
if (arrayCounter < 0) return 0;
int load = this.loadfactor;
Expand Down Expand Up @@ -294,7 +287,7 @@ private void array_remove(
kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, chunkSize);

// delete old entry
array.remove(oldRownumber, true);
array.remove(oldRownumber);
}

private kelondroRow.Entry array_new(
Expand Down Expand Up @@ -470,7 +463,10 @@ private ArrayList array_replace_multiple(TreeMap array_replace_map, int serialNu
}

public synchronized void put(byte[] key, kelondroRowCollection collection) throws IOException, kelondroOutOfLimitsException {

assert (key != null);
assert (collection != null);
assert (collection.size() != 0);

// first find an old entry, if one exists
kelondroRow.Entry indexrow = index.get(key);

Expand All @@ -485,20 +481,12 @@ public synchronized void put(byte[] key, kelondroRowCollection collection) throw

// overwrite the old collection
// read old information
int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration
//int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration
int oldchunkcount = (int) indexrow.getColLong(idx_col_chunkcount); // the number if rows in the collection
int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array
int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file
assert (oldPartitionNumber >= arrayIndex(oldchunkcount));

if ((collection == null) || (collection.size() == 0)) {
// delete the index entry and the array
kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, oldchunksize);
array.remove(oldrownumber ,false);
index.remove(key);
return;
}

int newPartitionNumber = arrayIndex(collection.size());

// see if we need new space or if we can overwrite the old space
Expand All @@ -515,7 +503,6 @@ public synchronized void put(byte[] key, kelondroRowCollection collection) throw
key, collection, indexrow,
newPartitionNumber, serialNumber, this.payloadrow.objectsize()); // modifies indexrow
}
arrayResolveRemoved(); // remove all to-be-removed marked entries

if ((int) indexrow.getColLong(idx_col_chunkcount) != collection.size())
serverLog.logSevere("kelondroCollectionIndex", "UPDATE (put) ERROR: array has different chunkcount than index after merge: index = " + (int) indexrow.getColLong(idx_col_chunkcount) + ", collection.size() = " + collection.size());
Expand Down Expand Up @@ -684,9 +671,6 @@ record = (Object[]) i.next(); // {byte[], indexContainer}
indexrows_new.add(indexrow); // collect new index rows
}

// remove all to-be-removed marked entries
arrayResolveRemoved();

// write index entries
index.putMultiple(indexrows_existing, new Date()); // write modified indexrows in optimized manner
index.addUniqueMultiple(indexrows_new, new Date()); // write new indexrows in optimized manner
Expand Down Expand Up @@ -752,8 +736,7 @@ public synchronized void merge(indexContainer container) throws IOException, kel
key, collection, indexrow,
newPartitionNumber, oldSerialNumber, this.payloadrow.objectsize()); // modifies indexrow
}
arrayResolveRemoved(); // remove all to-be-removed marked entries


final int collectionsize = collection.size(); // extra variable for easier debugging
final int indexrowcount = (int) indexrow.getColLong(idx_col_chunkcount);
if (indexrowcount != collectionsize)
Expand Down Expand Up @@ -858,14 +841,17 @@ public synchronized int remove(byte[] key, Set removekeys) throws IOException, k
oldcollection.sort();
oldcollection.trim(false);

/* in case that the new array size is zero we dont delete the array, just allocate a minimal chunk
*
if (oldcollection.size() == 0) {
// delete the index entry and the array
kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, oldchunksize);
array.remove(oldrownumber, false);
index.remove(key);
return removed;
}

*/
int newPartitionNumber = arrayIndex(oldcollection.size());

// see if we need new space or if we can overwrite the old space
Expand All @@ -882,7 +868,6 @@ public synchronized int remove(byte[] key, Set removekeys) throws IOException, k
key, oldcollection, indexrow,
newPartitionNumber, serialNumber, this.payloadrow.objectsize()); // modifies indexrow
}
arrayResolveRemoved(); // remove all to-be-removed marked entries
index.put(indexrow); // write modified indexrow
return removed;
}
Expand Down Expand Up @@ -941,7 +926,7 @@ private synchronized kelondroRowSet getwithparams(kelondroRow.Entry indexrow, in
if (!(index.row().objectOrder.wellformed(arraykey))) {
// cleanup for a bad bug that corrupted the database
index.remove(indexkey); // the RowCollection must be considered lost
array.remove(rownumber, false); // loose the RowCollection (we don't know how much is lost)
array.remove(rownumber); // loose the RowCollection (we don't know how much is lost)
serverLog.logSevere("kelondroCollectionIndex." + array.filename, "lost a RowCollection because of a bad arraykey");
return new kelondroRowSet(this.payloadrow, 0);
}
Expand Down Expand Up @@ -969,7 +954,7 @@ private synchronized kelondroRowSet getwithparams(kelondroRow.Entry indexrow, in
index.put(indexrow);
array.logFailure("INCONSISTENCY (get) in " + arrayFile(this.path, this.filenameStub, this.loadfactor, chunksize, clusteridx, serialnumber).toString() + ": array has different chunkcount than index: index = " + chunkcount + ", array = " + chunkcountInArray + "; the index has been auto-fixed");
}
if (remove) array.remove(rownumber, false); // index is removed in calling method
if (remove) array.remove(rownumber); // index is removed in calling method
return collection;
}

Expand Down Expand Up @@ -1043,7 +1028,7 @@ public static void main(String[] args) {
collection.addUnique(rowdef.newEntry(new byte[][]{"abc".getBytes(), "efg".getBytes()}));
collectionIndex.put("erstes".getBytes(), collection);

for (int i = 0; i <= 170; i++) {
for (int i = 1; i <= 170; i++) {
collection = new kelondroRowSet(rowdef, 0);
for (int j = 0; j < i; j++) {
collection.addUnique(rowdef.newEntry(new byte[][]{("abc" + j).getBytes(), "xxx".getBytes()}));
Expand Down
77 changes: 24 additions & 53 deletions source/de/anomic/kelondro/kelondroFixedWidthArray.java
Expand Up @@ -51,16 +51,13 @@
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;

public class kelondroFixedWidthArray extends kelondroRecords implements kelondroArray {

// define the Over-Head-Array
private static short thisOHBytes = 0; // our record definition does not need extra bytes
private static short thisOHHandles = 0; // and no handles

private TreeSet markedRemoved; // a set of Integer indexes of removed records (only temporary)

public kelondroFixedWidthArray(File file, kelondroRow rowdef, int intprops) throws IOException {
// this creates a new array
super(file, false, 0, thisOHBytes, thisOHHandles, rowdef, intprops, rowdef.columns() /* txtProps */, 80 /* txtPropWidth */);
Expand All @@ -73,7 +70,6 @@ public kelondroFixedWidthArray(File file, kelondroRow rowdef, int intprops) thro
try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {}
}
}
markedRemoved = new TreeSet();
}

public kelondroFixedWidthArray(kelondroRA ra, String filename, kelondroRow rowdef, int intprops) throws IOException {
Expand All @@ -86,7 +82,6 @@ public kelondroFixedWidthArray(kelondroRA ra, String filename, kelondroRow rowde
for (int i = 0; i < rowdef.columns(); i++) {
try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {}
}
markedRemoved = new TreeSet();
}

public static kelondroFixedWidthArray open(File file, kelondroRow rowdef, int intprops) {
Expand All @@ -109,8 +104,8 @@ public synchronized void set(int index, kelondroRow.Entry rowentry) throws IOExc
// this writes a row without reading the row from the file system first

// create a node at position index with rowentry
Handle h = new Handle(index);
newNode(h, (rowentry == null) ? null : rowentry.bytes(), 0).commit(CP_NONE);
Handle h = new Handle(index);
commit(newNode(h, (rowentry == null) ? null : rowentry.bytes(), 0), CP_NONE);
// attention! this newNode call wants that the OH bytes are passed within the bulkchunk
// field. Here, only the rowentry.bytes() raw payload is passed. This is valid, because
// the OHbytes and OHhandles are zero.
Expand All @@ -119,15 +114,18 @@ public synchronized void set(int index, kelondroRow.Entry rowentry) throws IOExc
public synchronized void setMultiple(TreeMap /* of Integer/kelondroRow.Entry */ rows) throws IOException {
Iterator i = rows.entrySet().iterator();
Map.Entry entry;
Integer k;
while (i.hasNext()) {
entry = (Map.Entry) i.next();
set(((Integer) entry.getKey()).intValue(), (kelondroRow.Entry) entry.getValue());
k = (Integer) entry.getKey();
set(k.intValue(), (kelondroRow.Entry) entry.getValue());
}
}

public synchronized kelondroRow.Entry getIfValid(int index) throws IOException {
byte[] b = getNode(new Handle(index), true).getValueRow();
if (b[0] == 0) return null;
if ((b[0] == -128) && (b[1] == 0)) return null;
return row().newEntry(b);
}

Expand All @@ -147,51 +145,25 @@ protected synchronized int geti(int index) {

public synchronized int add(kelondroRow.Entry rowentry) throws IOException {
// adds a new rowentry, but re-uses a previously as-deleted marked entry
if (markedRemoved.size() == 0) {
// no records there to be re-used
Node n = newNode(rowentry.bytes());
n.commit(CP_NONE);
return n.handle().hashCode();
} else {
// re-use a removed record
Integer index = (Integer) markedRemoved.first();
markedRemoved.remove(index);
set(index.intValue(), rowentry);
return index.intValue();
}
Node n = newNode(rowentry.bytes());
commit(n, CP_NONE);
return n.handle().hashCode();
}

public synchronized void remove(int index, boolean marked) throws IOException {
public synchronized void remove(int index) throws IOException {
assert (index < (super.free() + super.size())) : "remove: index " + index + " out of bounds " + (super.free() + super.size());

if (marked) {
// does not remove directly, but sets only a mark that a record is to be deleted
// this record can be re-used with add
markedRemoved.add(new Integer(index));
} else {

// get the node at position index
Handle h = new Handle(index);
Node n = getNode(h, false);
// get the node at position index
Handle h = new Handle(index);
Node n = getNode(h, false);

// erase the row
n.setValueRow(null);
n.commit(CP_NONE);

// mark row as deleted so it can be re-used
deleteNode(h);
}
}

public synchronized void resolveMarkedRemoved() throws IOException {
Iterator i = markedRemoved.iterator();
Integer index;
while (i.hasNext()) {
index = (Integer) i.next();
remove(index.intValue(), false);
}
markedRemoved.clear();
}
// erase the row
n.setValueRow(null);
commit(n, CP_NONE);

// mark row as deleted so it can be re-used
deleteNode(h);
}

public void print() throws IOException {
System.out.println("PRINTOUT of table, length=" + size());
Expand Down Expand Up @@ -229,14 +201,14 @@ public static void main(String[] args) {
k = new kelondroFixedWidthArray(f, rowdef, 6);
k.add(k.row().newEntry(new byte[][]{"a".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"b".getBytes(), "xxxx".getBytes()}));
k.remove(0, false);
k.remove(0);

k.add(k.row().newEntry(new byte[][]{"c".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"d".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"e".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"f".getBytes(), "xxxx".getBytes()}));
k.remove(0, false);
k.remove(1, false);
k.remove(0);
k.remove(1);

k.print();
k.print(true);
Expand All @@ -251,10 +223,9 @@ public static void main(String[] args) {
k.add(k.row().newEntry(new byte[][]{(Integer.toString(i) + "-" + Integer.toString(j)).getBytes(), "xxxx".getBytes()}));
}
for (int j = 0; j < i; j++) {
k.remove(j, true);
k.remove(j);
}
}
k.resolveMarkedRemoved();
k.print();
k.print(true);
k.close();
Expand Down

0 comments on commit 7a7a1c7

Please sign in to comment.