Permalink
Browse files

Unsure closing ChunkIterator stream in every possible use case.

Also trace in logs the eventual close failures instead of failing
silently.
This should help prevent holding too many unreleased system file
handlers, as in the case reported by eros on YaCy forum
(http://forum.yacy-websuche.de/viewtopic.php?f=23&t=5988&sid=b00e7486c1bf7e48a0d63eb328ccca02
)
  • Loading branch information...
luccioman committed Jun 2, 2017
1 parent 29e52bd commit c53c58fa859842c895468a30fea547e6f8981d5d
Showing with 93 additions and 43 deletions.
  1. +27 −12 source/net/yacy/kelondro/table/ChunkIterator.java
  2. +66 −31 source/net/yacy/kelondro/table/Table.java
@@ -48,7 +48,7 @@
* ATTENTION: if the iterator is not read to the end or interrupted, close() must be called to release the InputStream
* @param file: the file
* @param recordsize: the size of the elements in the file
* @param chunksize: the size of the chunks that are returned by next(). remaining bytes until the lenght of recordsize are skipped
* @param chunksize: the size of the chunks that are returned by next(). remaining bytes until the length of recordsize are skipped
* @throws FileNotFoundException
*/
@@ -66,9 +66,10 @@ public ChunkIterator(final File file, final int recordsize, final int chunksize)
}
/**
* Special close methode to release the used InputStream
* Special close method to release the used InputStream
* stream is automatically closed on last next(),
* close() needs only be called if iterator not read to the end ( hasNext() or next() has not returned null)
* close() needs only to be called if iterator did not read to the end ( hasNext() or next() has not returned null)
* @throws IOException when the stream could not be closed
*/
public void close() throws IOException {
this.stream.close();
@@ -78,29 +79,43 @@ public void close() throws IOException {
public byte[] next0() {
final byte[] chunk = new byte[chunksize];
int r, s;
byte[] result = null;
try {
// read the chunk
this.stream.readFully(chunk);
// skip remaining bytes
r = chunksize;
boolean skipError = false;
while (r < recordsize) {
s = (int) this.stream.skip(recordsize - r);
assert s > 0;
if (s <= 0) return null;
if (s <= 0) {
skipError = true;
break;
}
r += s;
}
return chunk;
if(!skipError) {
result = chunk;
}
} catch (final EOFException e) {
// no real exception, this is the normal termination
try {
this.stream.close(); // close the underlaying inputstream
} catch (IOException ex) {
ConcurrentLog.logException(ex);
}
return null;
result = null;
} catch (final IOException e) {
/* Return normally but trace the exception in log */
ConcurrentLog.logException(e);
return null;
result = null;
} finally {
if(result == null) {
/* when result is null (normal termination or not),
* we must ensure the underlying input stream is closed as we must not keep system file handlers open */
try {
this.stream.close();
} catch (IOException ex) {
ConcurrentLog.logException(ex);
}
}
}
return result;
}
}
@@ -158,42 +158,77 @@ public Table(
int i = 0;
byte[] key;
if (this.table == null) {
final Iterator<byte[]> ki = new ChunkIterator(tablefile, rowdef.objectsize, rowdef.primaryKeyLength);
while (ki.hasNext()) {
key = ki.next();
// write the key into the index table
assert key != null;
if (key == null) {i++; continue;}
if (rowdef.objectOrder.wellformed(key)) {
this.index.putUnique(key, i++);
} else {
errors.putUnique(key, i++);
}
final ChunkIterator ki = new ChunkIterator(tablefile, rowdef.objectsize, rowdef.primaryKeyLength);
try {
while (ki.hasNext()) {
key = ki.next();
// write the key into the index table
assert key != null;
if (key == null) {i++; continue;}
if (rowdef.objectOrder.wellformed(key)) {
this.index.putUnique(key, i++);
} else {
errors.putUnique(key, i++);
}
}
} finally {
/* If any error occurred while looping over the iterator, we
* must ensure the underlying stream is closed before
* transmitting the exception to the upper layer
*/
if(ki.hasNext()) {
try {
ki.close();
} catch(IOException ioe) {
/* Do not block if closing is not possible but anyway keep a trace in log */
log.warn("Could not close input stream on the file " + tablefile);
}
}
}
} else {
byte[] record;
key = new byte[rowdef.primaryKeyLength];
final ChunkIterator ri = new ChunkIterator(tablefile, rowdef.objectsize, rowdef.objectsize);
while (ri.hasNext()) {
record = ri.next();
assert record != null;
if (record == null) {i++; continue;}
System.arraycopy(record, 0, key, 0, rowdef.primaryKeyLength);
// write the key into the index table
if (rowdef.objectOrder.wellformed(key)) {
this.index.putUnique(key, i++);
// write the tail into the table
try {
this.table.addUnique(this.taildef.newEntry(record, rowdef.primaryKeyLength, true));
} catch (final SpaceExceededException e) {
this.table = null;
ri.close(); // close inputstream of chunkiterator
break;
}
} else {
errors.putUnique(key, i++);
}
try {
while (ri.hasNext()) {
record = ri.next();
assert record != null;
if (record == null) {i++; continue;}
System.arraycopy(record, 0, key, 0, rowdef.primaryKeyLength);
// write the key into the index table
if (rowdef.objectOrder.wellformed(key)) {
this.index.putUnique(key, i++);
// write the tail into the table
try {
this.table.addUnique(this.taildef.newEntry(record, rowdef.primaryKeyLength, true));
} catch (final SpaceExceededException e) {
this.table = null;
try {
ri.close(); // close inputstream of chunkiterator
} finally {
/* Do not block if closing is not possible but anyway keep a trace in log */
log.warn("Could not close input stream on the file " + tablefile);
}
break;
}
} else {
errors.putUnique(key, i++);
}
}
} finally {
/* If any error occurred while looping over the iterator, we
* must ensure the underlying stream is closed before
* transmitting the exception to the upper layer
*/
if(ri.hasNext()) {
try {
ri.close();
} catch(IOException ioe) {
/* Do not block if closing is not possible but anyway keep a trace in log */
log.warn("Could not close input stream on the file " + tablefile);
}
}
}
Runtime.getRuntime().gc();
if (abandonTable()) {

0 comments on commit c53c58f

Please sign in to comment.