diff --git a/cs/src/core/Device/LocalStorageDevice.cs b/cs/src/core/Device/LocalStorageDevice.cs index 5418a03ba..7dec23bf3 100644 --- a/cs/src/core/Device/LocalStorageDevice.cs +++ b/cs/src/core/Device/LocalStorageDevice.cs @@ -182,14 +182,26 @@ private SafeFileHandle CreateHandle(int segmentId) fileFlags = fileFlags | Native32.FILE_FLAG_NO_BUFFERING; if (deleteOnClose) + { fileFlags = fileFlags | Native32.FILE_FLAG_DELETE_ON_CLOSE; + // FILE_SHARE_DELETE allows multiple FASTER instances to share a single log directory and each can specify deleteOnClose. + // This will allow the files to persist until all handles across all instances have been closed. + fileShare = fileShare | Native32.FILE_SHARE_DELETE; + } + var logHandle = Native32.CreateFileW( GetSegmentName(segmentId), fileAccess, fileShare, IntPtr.Zero, fileCreation, fileFlags, IntPtr.Zero); + if (logHandle.IsInvalid) + { + var error = Marshal.GetLastWin32Error(); + throw new IOException($"Error creating log file for {GetSegmentName(segmentId)}, error: {error}", Native32.MakeHRFromErrorCode(error)); + } + if (preallocateFile) SetFileSize(FileName, logHandle, segmentSize); diff --git a/cs/src/core/Utilities/Native32.cs b/cs/src/core/Utilities/Native32.cs index 5c02f3603..45972b631 100644 --- a/cs/src/core/Utilities/Native32.cs +++ b/cs/src/core/Utilities/Native32.cs @@ -53,6 +53,8 @@ private struct MARK_HANDLE_INFO internal const uint FILE_FLAG_DELETE_ON_CLOSE = 0x04000000; internal const uint FILE_FLAG_NO_BUFFERING = 0x20000000; internal const uint FILE_FLAG_OVERLAPPED = 0x40000000; + + internal const uint FILE_SHARE_DELETE = 0x00000004; #endregion #region io functions @@ -320,6 +322,11 @@ public static bool SetFileSize(SafeFileHandle file_handle, long file_size) return true; } + + internal static int MakeHRFromErrorCode(int errorCode) + { + return unchecked(((int)0x80070000) | errorCode); + } #endregion } } diff --git a/cs/test/SharedDirectoryTests.cs b/cs/test/SharedDirectoryTests.cs new file mode 100644 index 000000000..849956bce --- /dev/null +++ b/cs/test/SharedDirectoryTests.cs @@ -0,0 +1,232 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using FASTER.core; +using NUnit.Framework; +using NUnit.Framework.Internal; +using System; +using System.IO; +using System.Linq; + +namespace FASTER.test.recovery.sumstore +{ + [TestFixture] + internal class SharedDirectoryTests + { + const long numUniqueKeys = (1 << 14); + const long keySpace = (1L << 14); + const long numOps = (1L << 19); + const long refreshInterval = (1L << 8); + const long completePendingInterval = (1L << 10); + private string rootPath; + private string sharedLogDirectory; + FasterTestInstance original; + FasterTestInstance clone; + + [SetUp] + public void Setup() + { + this.rootPath = $"{TestContext.CurrentContext.TestDirectory}\\{Path.GetRandomFileName()}"; + Directory.CreateDirectory(this.rootPath); + this.sharedLogDirectory = $"{this.rootPath}\\SharedLogs"; + Directory.CreateDirectory(this.sharedLogDirectory); + + this.original = new FasterTestInstance(); + this.clone = new FasterTestInstance(); + } + + [TearDown] + public void TearDown() + { + this.original.TearDown(); + this.clone.TearDown(); + try + { + Directory.Delete(this.rootPath, recursive: true); + } + catch + { + } + } + + [Test] + public void SharedLogDirectory() + { + this.original.Initialize($"{this.rootPath}\\OriginalCheckpoint", this.sharedLogDirectory); + this.original.Faster.StartSession(); + Assert.IsTrue(IsDirectoryEmpty(this.sharedLogDirectory)); // sanity check + Populate(this.original.Faster); + + // Take checkpoint from original to start the clone from + Assert.IsTrue(this.original.Faster.TakeFullCheckpoint(out var checkpointGuid)); + Assert.IsTrue(this.original.Faster.CompleteCheckpoint(wait: true)); + + // Sanity check against original + Assert.IsFalse(IsDirectoryEmpty(this.sharedLogDirectory)); + Test(this.original, checkpointGuid); + + // Copy checkpoint directory + var cloneCheckpointDirectory = $"{this.rootPath}\\CloneCheckpoint"; + CopyDirectory(new DirectoryInfo(this.original.CheckpointDirectory), new DirectoryInfo(cloneCheckpointDirectory)); + + // Recover from original checkpoint + this.clone.Initialize(cloneCheckpointDirectory, this.sharedLogDirectory); + this.clone.Faster.Recover(checkpointGuid); + this.clone.Faster.StartSession(); + + // Both sessions should work concurrently + Test(this.original, checkpointGuid); + Test(this.clone, checkpointGuid); + + // Dispose original, files should not be deleted + this.original.TearDown(); + + // Clone should still work + Assert.IsFalse(IsDirectoryEmpty(this.sharedLogDirectory)); + Test(this.clone, checkpointGuid); + + this.clone.TearDown(); + + // Files should be deleted after both instances are closed + Assert.IsTrue(IsDirectoryEmpty(this.sharedLogDirectory)); + } + + private struct FasterTestInstance + { + public string CheckpointDirectory { get; private set; } + public string LogDirectory { get; private set; } + public FasterKV Faster { get; private set; } + public IDevice LogDevice { get; private set; } + + public void Initialize(string checkpointDirectory, string logDirectory) + { + this.CheckpointDirectory = checkpointDirectory; + this.LogDirectory = logDirectory; + + this.LogDevice = Devices.CreateLogDevice($"{this.LogDirectory}\\log", deleteOnClose: true); + this.Faster = new FasterKV( + keySpace, + new Functions(), + new LogSettings { LogDevice = this.LogDevice }, + new CheckpointSettings { CheckpointDir = this.CheckpointDirectory, CheckPointType = CheckpointType.FoldOver }); + } + + public void TearDown() + { + this.Faster?.StopSession(); + this.Faster?.Dispose(); + this.Faster = null; + this.LogDevice?.Close(); + this.LogDevice = null; + } + } + + private void Populate(FasterKV fasterInstance) + { + // Prepare the dataset + var inputArray = new Input[numOps]; + for (int i = 0; i < numOps; i++) + { + inputArray[i].adId.adId = i % numUniqueKeys; + inputArray[i].numClicks.numClicks = 1; + } + + // Process the batch of input data + for (int i = 0; i < numOps; i++) + { + fasterInstance.RMW(ref inputArray[i].adId, ref inputArray[i], Empty.Default, i); + + if (i % completePendingInterval == 0) + { + fasterInstance.CompletePending(false); + } + else if (i % refreshInterval == 0) + { + fasterInstance.Refresh(); + } + } + + // Make sure operations are completed + fasterInstance.CompletePending(true); + } + + private void Test(FasterTestInstance fasterInstance, Guid checkpointToken) + { + var checkpointInfo = default(HybridLogRecoveryInfo); + Assert.IsTrue(checkpointInfo.Recover(checkpointToken, new DirectoryConfiguration(fasterInstance.CheckpointDirectory))); + + // Create array for reading + var inputArray = new Input[numUniqueKeys]; + for (int i = 0; i < numUniqueKeys; i++) + { + inputArray[i].adId.adId = i; + inputArray[i].numClicks.numClicks = 0; + } + + var input = default(Input); + var output = default(Output); + + // Issue read requests + for (var i = 0; i < numUniqueKeys; i++) + { + var status = fasterInstance.Faster.Read(ref inputArray[i].adId, ref input, ref output, Empty.Default, i); + Assert.IsTrue(status == Status.OK); + inputArray[i].numClicks = output.value; + } + + // Complete all pending requests + fasterInstance.Faster.CompletePending(true); + + + // Compute expected array + long[] expected = new long[numUniqueKeys]; + foreach (var guid in checkpointInfo.continueTokens.Keys) + { + var sno = checkpointInfo.continueTokens[guid]; + for (long i = 0; i <= sno; i++) + { + var id = i % numUniqueKeys; + expected[id]++; + } + } + + int threadCount = 1; // single threaded test + int numCompleted = threadCount - checkpointInfo.continueTokens.Count; + for (int t = 0; t < numCompleted; t++) + { + var sno = numOps; + for (long i = 0; i < sno; i++) + { + var id = i % numUniqueKeys; + expected[id]++; + } + } + + // Assert that expected is same as found + for (long i = 0; i < numUniqueKeys; i++) + { + Assert.IsTrue( + expected[i] == inputArray[i].numClicks.numClicks, + "Debug error for AdId {0}: Expected ({1}), Found({2})", inputArray[i].adId.adId, expected[i], inputArray[i].numClicks.numClicks); + } + } + + private bool IsDirectoryEmpty(string path) => !Directory.Exists(path) || !Directory.EnumerateFileSystemEntries(path).Any(); + + private static void CopyDirectory(DirectoryInfo source, DirectoryInfo target) + { + // Copy each file + foreach (var file in source.GetFiles()) + { + file.CopyTo(Path.Combine(target.FullName, file.Name), true); + } + + // Copy each subdirectory + foreach (var sourceSubDirectory in source.GetDirectories()) + { + var targetSubDirectory = target.CreateSubdirectory(sourceSubDirectory.Name); + CopyDirectory(sourceSubDirectory, targetSubDirectory); + } + } + } +}