Skip to content

Mark LockError-s as either recoverable or fatal, tweak API, add README #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# libdaemon-jvm

*libdaemon-jvm* is a [libdaemon](http://0pointer.de/lennart/projects/libdaemon)-inspired
library for the JVM written in Scala.

It aims at making it easier for JVM-based daemon processes to
- ensure that a single instance of it is running at a time
- rely on Unix domain sockets (or Windows named pipes) to listen to incoming connections

## Single process

*libdaemon-jvm* relies on Java file lock mechanism to ensure only a single instance
of a process is running at a time.

More concretely, it is passed a directory, where it writes or creates:
- a lock file
- a PID file
- a domain socket (except when named pipes are used on Windows)

It ensures that no-two processes relying on the same directory can run at a time, relying
on both the PID file and the domain socket to check for another running process.

## Domain sockets

*libdaemon-jvm* creates Unix domain sockets or Windows named pipes using either
- the JNI Unix domain socket and Windows named pipe support in the [ipcsocket](https://github.com/sbt/ipcsocket) library
- Unix domain socket support in Java >= 16

The ipcsocket library JNI support is only available on Linux / macOS / Windows for the
x86_64 architecture, and macOS for the ARM64 architecture (untested). For other OSes and
architectures, Java >= 16 is required.

On Windows on x86_64, *libdaemon-jvm* defaults to using ipcsocket JNI-based Windows named pipes.
On Windows but on a different architecture, it defaults to the Unix domain socket support of
Java >= 16, that happens to also work on Windows (requires a not-too-dated Windows 10 version),
but is incompatible with Windows named pipes.

On other OSes, when using Java >= 16, *libdaemon-jvm* defaults to Java's own Unix domain socket
support. On Java < 16, it only supports Linux on x86_64, or macOS on x86_64 or ARM64. Java >= 16
and ipcsocket JNI-based sockets can talk to each other on the same machine (no hard requirement
to use Java >= 16 for both clients and servers).

In all cases, when Java < 16 is supported, both Java >= 16 and Java < 16 clients and servers
can talk to each other.

## Usage

Add the following dependency to your build
```text
io.github.alexarchambault.libdaemon::libdaemon:0.0.3
```
The latest version is [![Maven Central](https://img.shields.io/maven-central/v/io.github.alexarchambault.libdaemon/libdaemon.svg)](https://maven-badges.herokuapp.com/maven-central/io.github.alexarchambault.libdaemon/libdaemon).

From the server, call `Lock.tryAcquire`, and start accepting connections on the server socket in the thunk passed to it:
```scala
import libdaemonjvm.server._
import java.nio.file._

val daemonDirectory: Path = ??? // pass a directory under the user home dir, computed with directories-jvm for example
val lockFiles = LockFiles.under(daemonDirectory, "my-app-name\\daemon") // second argument is the Windows named pipe path (that doesn't live in the file system)
Lock.tryAcquire(lockFiles) { serverSocket: Either[ServerSocket, ServerSocketChannel] =>
// serverSocket is a Right(…) when Java >= 16 Unix domain socket support is used,
// it's Left(…) when ipcsocket JNI support is used

// you should start listening on serverSocket here, and as much as possible,
// only exit this block when you are actually accepting incoming connections
}
```
23 changes: 17 additions & 6 deletions library/src/libdaemonjvm/LockFiles.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,38 @@ import java.nio.file.attribute.PosixFilePermission
import java.nio.file.StandardOpenOption
import scala.collection.JavaConverters._
import scala.util.Properties
import libdaemonjvm.server.LockError
import java.nio.channels.OverlappingFileLockException

final case class LockFiles(
lockFile: Path,
pidFile: Path,
socketPaths: SocketPaths
) {
def withLock[T](t: => T): T = {
def withLock[T](t: => Either[LockError, T]): Either[LockError, T] = {
if (!Files.exists(lockFile)) {
Files.createDirectories(lockFile.normalize.getParent)
Files.write(lockFile, Array.emptyByteArray)
}
var c: FileChannel = null
var l: FileLock = null
var c: FileChannel = null
var l: Either[OverlappingFileLockException, FileLock] = null
try {
c = FileChannel.open(lockFile, StandardOpenOption.WRITE)
l = c.lock()
t
l =
try Right(c.tryLock())
catch {
case ex: OverlappingFileLockException =>
Left(ex)
}
l match {
case Left(ex) => Left(new LockError.Locked(lockFile, ex))
case Right(null) => Left(new LockError.Locked(lockFile))
case Right(_) => t
}
}
finally {
if (l != null)
try l.release()
try l.foreach(_.release())
catch {
case _: ClosedChannelException =>
case _: IOException =>
Expand Down
18 changes: 10 additions & 8 deletions library/src/libdaemonjvm/server/Lock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ import java.net.ServerSocket

object Lock {

def tryAcquire[T](files: LockFiles)
: Either[LockError, Either[ServerSocket, ServerSocketChannel]] =
tryAcquire(files, LockProcess.default, SocketHandler.server(files.socketPaths))
def tryAcquire[T](
files: LockFiles,
proc: LockProcess
): Either[LockError, Either[ServerSocket, ServerSocketChannel]] =
tryAcquire(files, proc, SocketHandler.server(files.socketPaths))
files: LockFiles
)(
startListening: Either[ServerSocket, ServerSocketChannel] => T
): Either[LockError, T] =
tryAcquire(files, LockProcess.default) {
val socket = SocketHandler.server(files.socketPaths)
startListening(socket)
}

def tryAcquire[T](
files: LockFiles,
proc: LockProcess,
proc: LockProcess
)(
setup: => T
): Either[LockError, T] = {

Expand Down
19 changes: 16 additions & 3 deletions library/src/libdaemonjvm/server/LockError.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,23 @@ sealed abstract class LockError(
) extends Exception(message, cause)

object LockError {

sealed abstract class RecoverableError(
message: String,
cause: Throwable = null
) extends LockError(message, cause)

sealed abstract class FatalError(
message: String,
cause: Throwable = null
) extends LockError(message, cause)

final class AlreadyRunning(val pid: Int)
extends LockError(s"Daemon already running (PID: $pid)")
extends FatalError(s"Daemon already running (PID: $pid)")
final class CannotDeleteFile(val file: Path, cause: Throwable)
extends LockError(s"Cannot delete $file", cause)
extends FatalError(s"Cannot delete $file", cause)
final class ZombieFound(val pid: Int, val connectionError: Throwable)
extends LockError(s"Cannot connect to process $pid", connectionError)
extends RecoverableError(s"Cannot connect to process $pid", connectionError)
final class Locked(val file: Path, cause: Throwable = null)
extends RecoverableError(s"$file already locked", cause)
}
46 changes: 24 additions & 22 deletions manual/server/src/libdaemonjvm/TestServer.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package libdaemonjvm

import java.io.IOException
import java.io.{Closeable, IOException}
import java.nio.file.{Files, Paths}
import java.nio.file.attribute.PosixFilePermission
import java.util.concurrent.atomic.AtomicInteger
Expand All @@ -15,6 +15,26 @@ import libdaemonjvm.server.Lock
object TestServer {
val delay = 2.seconds
def runTestClients = false

def runServer(incomingConn: () => Closeable): Unit = {
val count = new AtomicInteger
while (true) {
println("Waiting for clients")
val c = incomingConn()
val idx = count.incrementAndGet()
val runnable: Runnable = { () =>
println(s"New incoming connection $idx, closing it in $delay")
Thread.sleep(delay.toMillis)
println(s"Closing incoming connection $idx")
c.close()
println(s"Closed incoming connection $idx")
}
val t = new Thread(runnable)
t.start()
Thread.sleep(1000L) // meh, wait for server to be actually listening
}
}

def main(args: Array[String]): Unit = {
val path = Paths.get("data-dir")
if (!Properties.isWin) {
Expand All @@ -29,10 +49,9 @@ object TestServer {
)
}
val files = LockFiles.under(path, "libdaemonjvm\\test-server-client\\pipe")
val incomingConn = Lock.tryAcquire(files) match {
case Left(e) => throw e
case Right(Left(s)) => () => s.accept()
case Right(Right(s)) => () => s.accept()
Lock.tryAcquire(files)(s => runServer(() => s.fold(_.accept(), _.accept()))) match {
case Left(e) => throw e
case Right(()) =>
}

def clientRunnable(idx: Int): Runnable = { () =>
Expand All @@ -59,22 +78,5 @@ object TestServer {
runClient(3)
runClient(4)
}

val count = new AtomicInteger
while (true) {
println("Waiting for clients")
val c = incomingConn()
val idx = count.incrementAndGet()
val runnable: Runnable = { () =>
println(s"New incoming connection $idx, closing it in $delay")
Thread.sleep(delay.toMillis)
println(s"Closing incoming connection $idx")
c.close()
println(s"Closed incoming connection $idx")
}
val t = new Thread(runnable)
t.setDaemon(true)
t.start()
}
}
}
19 changes: 19 additions & 0 deletions tests/test/src/libdaemonjvm/tests/LockTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,23 @@ class LockTests extends munit.FunSuite {
}
}

test("locked") {
TestUtil.withTestDir { dir =>
val files = TestUtil.lockFiles(dir)
val e = files.withLock {
TestUtil.tryAcquire(files) { maybeChannel =>
maybeChannel match {
case Left(e: LockError.Locked) =>
case Left(otherError) =>
throw new Exception("Unexpected error type (expected Locked)", otherError)
case Right(channel) =>
sys.error("Opening new server channel should have failed")
}
}
Right(())
}
expect(e.isRight)
}
}

}
49 changes: 30 additions & 19 deletions tests/test/src/libdaemonjvm/tests/TestUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.util.concurrent.CountDownLatch
import java.net.Socket
import java.util.concurrent.atomic.AtomicBoolean
import libdaemonjvm.internal.SocketFile
import scala.util.control.NonFatal

object TestUtil {
private lazy val testDirBase = {
Expand Down Expand Up @@ -67,31 +68,41 @@ object TestUtil {
files: LockFiles,
proc: LockProcess
)(f: Either[LockError, Either[ServerSocket, ServerSocketChannel]] => T): T = {
var maybeServerChannel: Either[LockError, Either[ServerSocket, ServerSocketChannel]] = null
var acceptThreadOpt = Option.empty[Thread]
val accepting = new CountDownLatch(1)
val shouldStop = new AtomicBoolean(false)
var serverChannel: Either[ServerSocket, ServerSocketChannel] = null
var acceptThreadOpt = Option.empty[Thread]
val accepting = new CountDownLatch(1)
val shouldStop = new AtomicBoolean(false)
try {
maybeServerChannel = Lock.tryAcquire(files, proc)
if (Properties.isWin)
// Windows named pipes seem no to accept clients unless accept is being called on the server socket
acceptThreadOpt =
maybeServerChannel.toOption.flatMap(_.left.toOption.map(acceptAndDiscard(
_,
accepting,
() => shouldStop.get()
)))
for (t <- acceptThreadOpt) {
t.start()
accepting.await()
Thread.sleep(1000L) // waiting so that the accept call below effectively awaits client... :|
val maybeServerChannel = Lock.tryAcquire(files, proc) {
serverChannel = SocketHandler.server(files.socketPaths)
if (Properties.isWin)
// Windows named pipes seem no to accept clients unless accept is being called on the server socket
acceptThreadOpt =
serverChannel.left.toOption.map(acceptAndDiscard(
_,
accepting,
() => shouldStop.get()
))
for (t <- acceptThreadOpt) {
t.start()
accepting.await()
// waiting so that the accept call below effectively awaits client... :|
Thread.sleep(
1000L
)
}
serverChannel
}
f(maybeServerChannel)
}
finally {
shouldStop.set(true)
SocketFile.canConnect(files.socketPaths) // unblock the server thread last accept
for (e <- Option(maybeServerChannel); channel <- e)
try SocketFile.canConnect(files.socketPaths) // unblock the server thread last accept
catch {
case NonFatal(e) =>
System.err.println(s"Ignoring $e while trying to unblock last accept")
}
for (channel <- Option(serverChannel))
channel.merge.close()
}
}
Expand Down