Skip to content

Commit

Permalink
Merge pull request #1925 from nosqlbench/mwolters/nbiocache_enhancements
Browse files Browse the repository at this point in the history
progress monitor
  • Loading branch information
MarkWolters committed Apr 12, 2024
2 parents 0046230 + 72e7a1e commit efc82f0
Showing 1 changed file with 52 additions and 2 deletions.
Expand Up @@ -24,14 +24,18 @@
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

public class ResolverForNBIOCache implements ContentResolver {
public static final ResolverForNBIOCache INSTANCE = new ResolverForNBIOCache();
Expand Down Expand Up @@ -76,6 +80,26 @@ private Path resolvePath(URI uri) {
return null;
}

private static class ProgressPrinter extends TimerTask {
private final long fileSize;
private long totalBytesRead;

public ProgressPrinter(long fileSize, long totalBytesRead) {
this.fileSize = fileSize;
this.totalBytesRead = totalBytesRead;
}

public void update(long totalBytesRead) {
this.totalBytesRead = totalBytesRead;
}

@Override
public void run() {
double progress = (double) totalBytesRead / fileSize * 100;
logger.info(() -> "Progress: " + String.format("%.2f", progress) + "% completed");
}
}

private boolean downloadFile(URI uri, Path cachedFilePath, URLContent checksum) {
int retries = 0;
boolean success = false;
Expand All @@ -85,7 +109,20 @@ private boolean downloadFile(URI uri, Path cachedFilePath, URLContent checksum)
logger.info(() -> "Downloading remote file " + uri + " to cache at " + cachedFilePath);
ReadableByteChannel channel = Channels.newChannel(uri.toURL().openStream());
FileOutputStream outputStream = new FileOutputStream(cachedFilePath.toFile());
outputStream.getChannel().transferFrom(channel, 0, Long.MAX_VALUE);
long fileSize = uri.toURL().openConnection().getContentLengthLong();
long totalBytesRead = 0;
FileChannel fileChannel = outputStream.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(32768);

Timer timer = new Timer();
ProgressPrinter printer = new ProgressPrinter(fileSize, 0);
timer.scheduleAtFixedRate(printer, 2000, 2000);
while (channel.read(buffer) != -1) {
buffer.flip();
totalBytesRead += fileChannel.write(buffer);
printer.update(totalBytesRead);
buffer.clear();
}
outputStream.close();
channel.close();
logger.info(() -> "Downloaded remote file to cache at " + cachedFilePath);
Expand All @@ -110,9 +147,10 @@ private boolean verifyChecksum(Path cachedFilePath, URLContent checksum) {
String localChecksumStr = generateSHA256Checksum(cachedFilePath.toString());
Path checksumPath = checksumPath(cachedFilePath);
Files.writeString(checksumPath, localChecksumStr);
logger.debug(() -> "Generated local checksum and saved to cache at " + checksumPath);
logger.info(() -> "Generated local checksum and saved to cache at " + checksumPath);
String remoteChecksum = stripControlCharacters(new String(checksum.getInputStream().readAllBytes()));
if (localChecksumStr.equals(remoteChecksum)) {
logger.info(() -> "Checksums match for " + checksumPath + " and " + checksum);
return true;
} else {
logger.warn(() -> "checksums do not match for " + checksumPath + " and " + checksum);
Expand Down Expand Up @@ -162,6 +200,7 @@ private void createCacheDir(Path cachedFilePath) {
}

private void cleanupCache(Path cachedFilePath) {
logger.info(() -> "Cleaning up cache for " + cachedFilePath);
if (!cachedFilePath.toFile().delete())
logger.warn(() -> "Could not delete cached file " + cachedFilePath);
Path checksumPath = checksumPath(cachedFilePath);
Expand All @@ -180,13 +219,15 @@ private Path execute(NBIOResolverConditions condition, Path cachedFilePath, URI
if (downloadFile(uri, cachedFilePath, checksum)) {
return cachedFilePath;
} else {
cleanupCache(cachedFilePath);
throw new RuntimeException("Error downloading remote file to cache at " + cachedFilePath);
}
case UPDATE_NO_VERIFY:
logger.warn(() -> "Checksum verification is disabled, downloading remote file to cache at " + cachedFilePath);
if (downloadFile(uri, cachedFilePath, null)) {
return cachedFilePath;
} else {
cleanupCache(cachedFilePath);
throw new RuntimeException("Error downloading remote file to cache at " + cachedFilePath);
}
case LOCAL_VERIFY:
Expand All @@ -198,6 +239,7 @@ private Path execute(NBIOResolverConditions condition, Path cachedFilePath, URI
String localChecksum = Files.readString(getOrCreateChecksum(cachedFilePath));
String remoteChecksum = stripControlCharacters(new String(checksum.getInputStream().readAllBytes()));
if (localChecksum.equals(remoteChecksum)) {
logger.info(() -> "Checksums match, returning cached file " + cachedFilePath);
return cachedFilePath;
}
else {
Expand Down Expand Up @@ -260,12 +302,20 @@ private Path checksumPath(Path cachedFilePath) {
}

private static String generateSHA256Checksum(String filePath) throws IOException, NoSuchAlgorithmException {
logger.info(() -> "Generating sha256 checksum for " + filePath);
long fileSize = Files.size(Path.of(filePath));
long totalBytesRead = 0;
Timer timer = new Timer();
ProgressPrinter printer = new ProgressPrinter(fileSize, 0);
timer.scheduleAtFixedRate(printer, 2000, 2000);
MessageDigest md = MessageDigest.getInstance("SHA-256");
try (InputStream is = new FileInputStream(filePath)) {
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
md.update(buffer, 0, bytesRead);
totalBytesRead += bytesRead;
printer.update(totalBytesRead);
}
}
byte[] digest = md.digest();
Expand Down

0 comments on commit efc82f0

Please sign in to comment.