Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,27 @@
package io.zonky.test.db.postgres.embedded;


import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.Channel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
Expand All @@ -49,6 +52,7 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -63,13 +67,17 @@
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.postgresql.ds.PGSimpleDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tukaani.xz.XZInputStream;

import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.WRITE;

@SuppressWarnings("PMD.AvoidDuplicateLiterals") // "postgres"
public class EmbeddedPostgres implements Closeable
{
Expand Down Expand Up @@ -637,16 +645,15 @@ private static String getArchitecture()
* Unpack archive compressed by tar with xz compression. By default system tar is used (faster). If not found, then the
* java implementation takes place.
*
* @param tbzPath The archive path.
* @param stream A stream with the postgres binaries.
* @param targetDir The directory to extract the content to.
*/
private static void extractTxz(String tbzPath, String targetDir) throws IOException
{
private static void extractTxz(InputStream stream, String targetDir) throws IOException {
try (
InputStream in = Files.newInputStream(Paths.get(tbzPath));
XZInputStream xzIn = new XZInputStream(in);
XZInputStream xzIn = new XZInputStream(stream);
TarArchiveInputStream tarIn = new TarArchiveInputStream(xzIn)
) {
final Phaser phaser = new Phaser(1);
TarArchiveEntry entry;

while ((entry = tarIn.getNextTarEntry()) != null) {
Expand All @@ -663,9 +670,33 @@ private static void extractTxz(String tbzPath, String targetDir) throws IOExcept
throw new IllegalStateException("could not read " + individualFile);
}
mkdirs(fsObject.getParentFile());
try (OutputStream outputFile = new FileOutputStream(fsObject)) {
IOUtils.write(content, outputFile);
}

final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(fsObject.toPath(), CREATE_NEW, WRITE);
final ByteBuffer buffer = ByteBuffer.wrap(content);

phaser.register();
fileChannel.write(buffer, 0, fileChannel, new CompletionHandler<Integer, Channel>() {
@Override
public void completed(Integer written, Channel channel) {
closeChannel(channel);
}

@Override
public void failed(Throwable error, Channel channel) {
LOG.error("Could not write file {}", fsObject.getAbsolutePath(), error);
closeChannel(channel);
}

private void closeChannel(Channel channel) {
try {
channel.close();
} catch (IOException e) {
LOG.error("Unexpected error while closing the channel", e);
} finally {
phaser.arriveAndDeregister();
}
}
});
} else if (entry.isDirectory()) {
mkdirs(fsObject);
} else {
Expand All @@ -678,6 +709,8 @@ private static void extractTxz(String tbzPath, String targetDir) throws IOExcept
fsObject.setExecutable(true);
}
}

phaser.arriveAndAwaitAdvance();
}
}

Expand All @@ -694,10 +727,8 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver)

LOG.info("Detected a {} {} system", system, machineHardware);
File pgDir;
File pgTbz;
final InputStream pgBinary;
try {
pgTbz = File.createTempFile("pgpg", "pgpg");
pgBinary = pgBinaryResolver.getPgBinary(system, machineHardware);
} catch (final IOException e) {
throw new ExceptionInInitializerError(e);
Expand All @@ -707,16 +738,12 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver)
throw new IllegalStateException("No Postgres binary found for " + system + " / " + machineHardware);
}

try (DigestInputStream pgArchiveData = new DigestInputStream(
pgBinary, MessageDigest.getInstance("MD5"));
FileOutputStream os = new FileOutputStream(pgTbz))
{
IOUtils.copy(pgArchiveData, os);
try (DigestInputStream pgArchiveData = new DigestInputStream(pgBinary, MessageDigest.getInstance("MD5"));
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
IOUtils.copy(pgArchiveData, baos);
pgArchiveData.close();
os.close();

String pgDigest = Hex.encodeHexString(pgArchiveData.getMessageDigest().digest());

pgDir = new File(getWorkingDirectory(), String.format("PG-%s", pgDigest));

mkdirs(pgDir);
Expand All @@ -725,14 +752,16 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver)

if (!pgDirExists.exists()) {
try (FileOutputStream lockStream = new FileOutputStream(unpackLockFile);
FileLock unpackLock = lockStream.getChannel().tryLock()) {
FileLock unpackLock = lockStream.getChannel().tryLock()) {
if (unpackLock != null) {
try {
if (pgDirExists.exists()) {
throw new IllegalStateException("unpack lock acquired but .exists file is present " + pgDirExists);
}
LOG.info("Extracting Postgres...");
extractTxz(pgTbz.getPath(), pgDir.getPath());
try (ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray())) {
extractTxz(bais, pgDir.getPath());
}
if (!pgDirExists.createNewFile()) {
throw new IllegalStateException("couldn't make .exists file " + pgDirExists);
}
Expand Down Expand Up @@ -760,10 +789,6 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver)
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ExceptionInInitializerError(ie);
} finally {
if (!pgTbz.delete()) {
LOG.warn("could not delete {}", pgTbz);
}
}
BINARY_DIR.set(pgDir);
LOG.info("Postgres binaries at {}", pgDir);
Expand Down