diff --git a/src/main/java/io/zonky/test/db/postgres/embedded/EmbeddedPostgres.java b/src/main/java/io/zonky/test/db/postgres/embedded/EmbeddedPostgres.java index abb858fb..d059f9e4 100644 --- a/src/main/java/io/zonky/test/db/postgres/embedded/EmbeddedPostgres.java +++ b/src/main/java/io/zonky/test/db/postgres/embedded/EmbeddedPostgres.java @@ -14,24 +14,27 @@ package io.zonky.test.db.postgres.embedded; +import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.Channel; +import java.nio.channels.CompletionHandler; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.security.DigestInputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -49,6 +52,7 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -63,6 +67,7 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.time.StopWatch; import org.postgresql.ds.PGSimpleDataSource; @@ -70,6 +75,9 @@ import org.slf4j.LoggerFactory; import org.tukaani.xz.XZInputStream; +import static java.nio.file.StandardOpenOption.CREATE_NEW; +import static java.nio.file.StandardOpenOption.WRITE; + @SuppressWarnings("PMD.AvoidDuplicateLiterals") // "postgres" public class EmbeddedPostgres implements Closeable { @@ -637,16 +645,15 @@ private static String getArchitecture() * Unpack archive compressed by tar with xz compression. By default system tar is used (faster). If not found, then the * java implementation takes place. * - * @param tbzPath The archive path. + * @param stream A stream with the postgres binaries. * @param targetDir The directory to extract the content to. */ - private static void extractTxz(String tbzPath, String targetDir) throws IOException - { + private static void extractTxz(InputStream stream, String targetDir) throws IOException { try ( - InputStream in = Files.newInputStream(Paths.get(tbzPath)); - XZInputStream xzIn = new XZInputStream(in); + XZInputStream xzIn = new XZInputStream(stream); TarArchiveInputStream tarIn = new TarArchiveInputStream(xzIn) ) { + final Phaser phaser = new Phaser(1); TarArchiveEntry entry; while ((entry = tarIn.getNextTarEntry()) != null) { @@ -663,9 +670,33 @@ private static void extractTxz(String tbzPath, String targetDir) throws IOExcept throw new IllegalStateException("could not read " + individualFile); } mkdirs(fsObject.getParentFile()); - try (OutputStream outputFile = new FileOutputStream(fsObject)) { - IOUtils.write(content, outputFile); - } + + final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(fsObject.toPath(), CREATE_NEW, WRITE); + final ByteBuffer buffer = ByteBuffer.wrap(content); + + phaser.register(); + fileChannel.write(buffer, 0, fileChannel, new CompletionHandler() { + @Override + public void completed(Integer written, Channel channel) { + closeChannel(channel); + } + + @Override + public void failed(Throwable error, Channel channel) { + LOG.error("Could not write file {}", fsObject.getAbsolutePath(), error); + closeChannel(channel); + } + + private void closeChannel(Channel channel) { + try { + channel.close(); + } catch (IOException e) { + LOG.error("Unexpected error while closing the channel", e); + } finally { + phaser.arriveAndDeregister(); + } + } + }); } else if (entry.isDirectory()) { mkdirs(fsObject); } else { @@ -678,6 +709,8 @@ private static void extractTxz(String tbzPath, String targetDir) throws IOExcept fsObject.setExecutable(true); } } + + phaser.arriveAndAwaitAdvance(); } } @@ -694,10 +727,8 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver) LOG.info("Detected a {} {} system", system, machineHardware); File pgDir; - File pgTbz; final InputStream pgBinary; try { - pgTbz = File.createTempFile("pgpg", "pgpg"); pgBinary = pgBinaryResolver.getPgBinary(system, machineHardware); } catch (final IOException e) { throw new ExceptionInInitializerError(e); @@ -707,16 +738,12 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver) throw new IllegalStateException("No Postgres binary found for " + system + " / " + machineHardware); } - try (DigestInputStream pgArchiveData = new DigestInputStream( - pgBinary, MessageDigest.getInstance("MD5")); - FileOutputStream os = new FileOutputStream(pgTbz)) - { - IOUtils.copy(pgArchiveData, os); + try (DigestInputStream pgArchiveData = new DigestInputStream(pgBinary, MessageDigest.getInstance("MD5")); + ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + IOUtils.copy(pgArchiveData, baos); pgArchiveData.close(); - os.close(); String pgDigest = Hex.encodeHexString(pgArchiveData.getMessageDigest().digest()); - pgDir = new File(getWorkingDirectory(), String.format("PG-%s", pgDigest)); mkdirs(pgDir); @@ -725,14 +752,16 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver) if (!pgDirExists.exists()) { try (FileOutputStream lockStream = new FileOutputStream(unpackLockFile); - FileLock unpackLock = lockStream.getChannel().tryLock()) { + FileLock unpackLock = lockStream.getChannel().tryLock()) { if (unpackLock != null) { try { if (pgDirExists.exists()) { throw new IllegalStateException("unpack lock acquired but .exists file is present " + pgDirExists); } LOG.info("Extracting Postgres..."); - extractTxz(pgTbz.getPath(), pgDir.getPath()); + try (ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray())) { + extractTxz(bais, pgDir.getPath()); + } if (!pgDirExists.createNewFile()) { throw new IllegalStateException("couldn't make .exists file " + pgDirExists); } @@ -760,10 +789,6 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver) } catch (final InterruptedException ie) { Thread.currentThread().interrupt(); throw new ExceptionInInitializerError(ie); - } finally { - if (!pgTbz.delete()) { - LOG.warn("could not delete {}", pgTbz); - } } BINARY_DIR.set(pgDir); LOG.info("Postgres binaries at {}", pgDir);