Skip to content

Commit

Permalink
add ContentPlugin.contents() to iterate over chunks of data from a fi…
Browse files Browse the repository at this point in the history
…le (as ByteBuffers)
  • Loading branch information
meisl committed Jan 14, 2014
1 parent 4deb77a commit bc1e246
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 29 deletions.
28 changes: 13 additions & 15 deletions src/NtfsStreamsJ.java
Expand Up @@ -271,29 +271,27 @@ public String getMD5(String fileName) throws IOException {
// TODO: lock file while calculating MD5
long lastModified = file.lastModified();
AlternateDataStream md5ADS = new AlternateDataStream(file, "MD5");
//FileInputStream fis = new FileInputStream(file);
/*
int nread = 0;
while ((nread = fis.read(buffer)) != -1) {
md.update(buffer, 0, nread);
};
fis.close();
*/

FileChannel in = FileChannel.open(Paths.get(fileName));
ByteBuffer buffer = ByteBuffer.allocate(1024 * 512);
int nread = 0;
long ttlRead = 0;
long t = -System.currentTimeMillis();
while ( (nread = in.read(buffer)) >= 0) {
/*
FileChannel in = FileChannel.open(Paths.get(fileName));
ByteBuffer buffer = ByteBuffer.allocate(1024 * 512);
while (in.read(buffer) >= 0) {
buffer.flip();
ttlRead += buffer.remaining();
md.update(buffer);
buffer.clear();
ttlRead += nread;
};
t += System.currentTimeMillis();
log.info("time=" + (t / 1000.0) + " sec, ttlRead=" + ttlRead + ", size=" + in.size() + ", " + ((double)ttlRead * 0.00095367431640625 / t ) + " MB/sec");
in.close();
*/
for (ByteBuffer buffer: contents(fileName)) {
ttlRead += buffer.remaining();
md.update(buffer);
}

t += System.currentTimeMillis();
log.info("time=" + (t / 1000.0) + " sec, ttlRead=" + ttlRead + ", " + ((double)ttlRead * 0.00095367431640625 / t ) + " MB/sec");


byte[] mdbytes = md.digest();
Expand Down
131 changes: 117 additions & 14 deletions src/plugins/wdx/ContentPlugin.java
Expand Up @@ -6,6 +6,9 @@
import java.io.*;

import java.util.*;

import java.nio.*;
import java.nio.file.*;
import java.nio.channels.*;

import plugins.wdx.WDXPluginAdapter;
Expand Down Expand Up @@ -185,6 +188,102 @@ public void listFields(PrintStream out) {
out.println(n++ + "\t" + f);
}
}

public static class UncheckedIOException extends RuntimeException {
final IOException inner;
public UncheckedIOException(IOException inner) {
super(inner);
this.inner = inner;
}
}


public Iterable<ByteBuffer> contents(final String fileName) throws IOException {
final FileChannel channel = FileChannel.open(Paths.get(fileName), StandardOpenOption.READ);
return new Iterable<ByteBuffer>() {
boolean iteratorCalled = false;
public Iterator<ByteBuffer> iterator() {
if (iteratorCalled) {
throw new IllegalStateException("iterator() may be called only once!");
}
iteratorCalled = true;
return new Iterator<ByteBuffer>() {

ByteBuffer bufA = ByteBuffer.allocate(1024 * 1024);
ByteBuffer bufB = ByteBuffer.allocate(1024 * 1024);
ByteBuffer nextOut = bufA;
boolean isBufAReady = false;
boolean isBufBReady = false;
boolean isFinished = false;

private boolean fillNextOut() throws IOException {
nextOut.clear();
if (channel.read(nextOut) < 0) {
channel.close();
isFinished = true;
return false;
}
nextOut.flip();
nextOut.mark();
return true;
}

@Override
public boolean hasNext() throws UncheckedIOException {
try {
if (isFinished) {
return false;
}
if (nextOut == bufA) {
if (isBufAReady) {
return true;
}
return isBufAReady = fillNextOut();
} else { // nextOut == bufB
if (isBufBReady) {
return true;
}
return isBufBReady = fillNextOut();
}
} catch (IOException e) {
myLog.error("contents:" + e);
throw new UncheckedIOException(e);
}
}

@Override
public ByteBuffer next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
if (nextOut == bufA) {
nextOut = bufB;
isBufAReady = false;
return bufA;
}
nextOut = bufA;
isBufBReady = false;
return bufB;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}

@Override
public void finalize() throws Throwable {
channel.close(); // has no effect if already closed
}
};
}

@Override
public void finalize() throws Throwable {
channel.close(); // has no effect if already closed
}
};
}

@Override
public final int contentGetSupportedField(int fieldIndex,
Expand Down Expand Up @@ -253,22 +352,26 @@ public final int contentGetValue(String fileName,
workingThread = Thread.currentThread();
}
}
Object value = field._getValue(fileName);
synchronized(this) {
if (workingThread == Thread.currentThread()) {
Thread.interrupted(); // clear interrupted status
workingThread = null;
try {
Object value = field._getValue(fileName);
synchronized(this) {
if (workingThread == Thread.currentThread()) {
Thread.interrupted(); // clear interrupted status
workingThread = null;
}
}

if (t != 0) {
t += System.currentTimeMillis();
myLog.warn("end slow " + field.name + " after " + t + "ms: " + value + " for \"" + fileName + "\"");
} else {
myLog.info(field.name + "=" + value);
}
fieldValue.setValue(field.type, value);
return field.type;
} catch (UncheckedIOException e) {
throw e.inner;
}

if (t != 0) {
t += System.currentTimeMillis();
myLog.warn("end slow " + field.name + " after " + t + "ms: \"" + fileName + "\"");
} else {
myLog.info(field.name + "=" + value);
}
fieldValue.setValue(field.type, value);
return field.type;
} catch (AsynchronousCloseException e) { // also catches ClosedByInterruptException
Thread.interrupted(); // clear interrupted status
workingThread = null;
Expand Down

0 comments on commit bc1e246

Please sign in to comment.