Skip to content

Commit

Permalink
Adding FILE_SHARE_DELETE when deleteOnClose is used (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
peterfreiling authored and badrishc committed Jun 13, 2019
1 parent 2bf4d78 commit 7f4eeed
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 0 deletions.
12 changes: 12 additions & 0 deletions cs/src/core/Device/LocalStorageDevice.cs
Expand Up @@ -182,14 +182,26 @@ private SafeFileHandle CreateHandle(int segmentId)

fileFlags = fileFlags | Native32.FILE_FLAG_NO_BUFFERING;
if (deleteOnClose)
{
fileFlags = fileFlags | Native32.FILE_FLAG_DELETE_ON_CLOSE;

// FILE_SHARE_DELETE allows multiple FASTER instances to share a single log directory and each can specify deleteOnClose.
// This will allow the files to persist until all handles across all instances have been closed.
fileShare = fileShare | Native32.FILE_SHARE_DELETE;
}

var logHandle = Native32.CreateFileW(
GetSegmentName(segmentId),
fileAccess, fileShare,
IntPtr.Zero, fileCreation,
fileFlags, IntPtr.Zero);

if (logHandle.IsInvalid)
{
var error = Marshal.GetLastWin32Error();
throw new IOException($"Error creating log file for {GetSegmentName(segmentId)}, error: {error}", Native32.MakeHRFromErrorCode(error));
}

if (preallocateFile)
SetFileSize(FileName, logHandle, segmentSize);

Expand Down
7 changes: 7 additions & 0 deletions cs/src/core/Utilities/Native32.cs
Expand Up @@ -53,6 +53,8 @@ private struct MARK_HANDLE_INFO
internal const uint FILE_FLAG_DELETE_ON_CLOSE = 0x04000000;
internal const uint FILE_FLAG_NO_BUFFERING = 0x20000000;
internal const uint FILE_FLAG_OVERLAPPED = 0x40000000;

internal const uint FILE_SHARE_DELETE = 0x00000004;
#endregion

#region io functions
Expand Down Expand Up @@ -320,6 +322,11 @@ public static bool SetFileSize(SafeFileHandle file_handle, long file_size)

return true;
}

internal static int MakeHRFromErrorCode(int errorCode)
{
return unchecked(((int)0x80070000) | errorCode);
}
#endregion
}
}
232 changes: 232 additions & 0 deletions cs/test/SharedDirectoryTests.cs
@@ -0,0 +1,232 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using FASTER.core;
using NUnit.Framework;
using NUnit.Framework.Internal;
using System;
using System.IO;
using System.Linq;

namespace FASTER.test.recovery.sumstore
{
[TestFixture]
internal class SharedDirectoryTests
{
const long numUniqueKeys = (1 << 14);
const long keySpace = (1L << 14);
const long numOps = (1L << 19);
const long refreshInterval = (1L << 8);
const long completePendingInterval = (1L << 10);
private string rootPath;
private string sharedLogDirectory;
FasterTestInstance original;
FasterTestInstance clone;

[SetUp]
public void Setup()
{
this.rootPath = $"{TestContext.CurrentContext.TestDirectory}\\{Path.GetRandomFileName()}";
Directory.CreateDirectory(this.rootPath);
this.sharedLogDirectory = $"{this.rootPath}\\SharedLogs";
Directory.CreateDirectory(this.sharedLogDirectory);

this.original = new FasterTestInstance();
this.clone = new FasterTestInstance();
}

[TearDown]
public void TearDown()
{
this.original.TearDown();
this.clone.TearDown();
try
{
Directory.Delete(this.rootPath, recursive: true);
}
catch
{
}
}

[Test]
public void SharedLogDirectory()
{
this.original.Initialize($"{this.rootPath}\\OriginalCheckpoint", this.sharedLogDirectory);
this.original.Faster.StartSession();
Assert.IsTrue(IsDirectoryEmpty(this.sharedLogDirectory)); // sanity check
Populate(this.original.Faster);

// Take checkpoint from original to start the clone from
Assert.IsTrue(this.original.Faster.TakeFullCheckpoint(out var checkpointGuid));
Assert.IsTrue(this.original.Faster.CompleteCheckpoint(wait: true));

// Sanity check against original
Assert.IsFalse(IsDirectoryEmpty(this.sharedLogDirectory));
Test(this.original, checkpointGuid);

// Copy checkpoint directory
var cloneCheckpointDirectory = $"{this.rootPath}\\CloneCheckpoint";
CopyDirectory(new DirectoryInfo(this.original.CheckpointDirectory), new DirectoryInfo(cloneCheckpointDirectory));

// Recover from original checkpoint
this.clone.Initialize(cloneCheckpointDirectory, this.sharedLogDirectory);
this.clone.Faster.Recover(checkpointGuid);
this.clone.Faster.StartSession();

// Both sessions should work concurrently
Test(this.original, checkpointGuid);
Test(this.clone, checkpointGuid);

// Dispose original, files should not be deleted
this.original.TearDown();

// Clone should still work
Assert.IsFalse(IsDirectoryEmpty(this.sharedLogDirectory));
Test(this.clone, checkpointGuid);

this.clone.TearDown();

// Files should be deleted after both instances are closed
Assert.IsTrue(IsDirectoryEmpty(this.sharedLogDirectory));
}

private struct FasterTestInstance
{
public string CheckpointDirectory { get; private set; }
public string LogDirectory { get; private set; }
public FasterKV<AdId, NumClicks, Input, Output, Empty, Functions> Faster { get; private set; }
public IDevice LogDevice { get; private set; }

public void Initialize(string checkpointDirectory, string logDirectory)
{
this.CheckpointDirectory = checkpointDirectory;
this.LogDirectory = logDirectory;

this.LogDevice = Devices.CreateLogDevice($"{this.LogDirectory}\\log", deleteOnClose: true);
this.Faster = new FasterKV<AdId, NumClicks, Input, Output, Empty, Functions>(
keySpace,
new Functions(),
new LogSettings { LogDevice = this.LogDevice },
new CheckpointSettings { CheckpointDir = this.CheckpointDirectory, CheckPointType = CheckpointType.FoldOver });
}

public void TearDown()
{
this.Faster?.StopSession();
this.Faster?.Dispose();
this.Faster = null;
this.LogDevice?.Close();
this.LogDevice = null;
}
}

private void Populate(FasterKV<AdId, NumClicks, Input, Output, Empty, Functions> fasterInstance)
{
// Prepare the dataset
var inputArray = new Input[numOps];
for (int i = 0; i < numOps; i++)
{
inputArray[i].adId.adId = i % numUniqueKeys;
inputArray[i].numClicks.numClicks = 1;
}

// Process the batch of input data
for (int i = 0; i < numOps; i++)
{
fasterInstance.RMW(ref inputArray[i].adId, ref inputArray[i], Empty.Default, i);

if (i % completePendingInterval == 0)
{
fasterInstance.CompletePending(false);
}
else if (i % refreshInterval == 0)
{
fasterInstance.Refresh();
}
}

// Make sure operations are completed
fasterInstance.CompletePending(true);
}

private void Test(FasterTestInstance fasterInstance, Guid checkpointToken)
{
var checkpointInfo = default(HybridLogRecoveryInfo);
Assert.IsTrue(checkpointInfo.Recover(checkpointToken, new DirectoryConfiguration(fasterInstance.CheckpointDirectory)));

// Create array for reading
var inputArray = new Input[numUniqueKeys];
for (int i = 0; i < numUniqueKeys; i++)
{
inputArray[i].adId.adId = i;
inputArray[i].numClicks.numClicks = 0;
}

var input = default(Input);
var output = default(Output);

// Issue read requests
for (var i = 0; i < numUniqueKeys; i++)
{
var status = fasterInstance.Faster.Read(ref inputArray[i].adId, ref input, ref output, Empty.Default, i);
Assert.IsTrue(status == Status.OK);
inputArray[i].numClicks = output.value;
}

// Complete all pending requests
fasterInstance.Faster.CompletePending(true);


// Compute expected array
long[] expected = new long[numUniqueKeys];
foreach (var guid in checkpointInfo.continueTokens.Keys)
{
var sno = checkpointInfo.continueTokens[guid];
for (long i = 0; i <= sno; i++)
{
var id = i % numUniqueKeys;
expected[id]++;
}
}

int threadCount = 1; // single threaded test
int numCompleted = threadCount - checkpointInfo.continueTokens.Count;
for (int t = 0; t < numCompleted; t++)
{
var sno = numOps;
for (long i = 0; i < sno; i++)
{
var id = i % numUniqueKeys;
expected[id]++;
}
}

// Assert that expected is same as found
for (long i = 0; i < numUniqueKeys; i++)
{
Assert.IsTrue(
expected[i] == inputArray[i].numClicks.numClicks,
"Debug error for AdId {0}: Expected ({1}), Found({2})", inputArray[i].adId.adId, expected[i], inputArray[i].numClicks.numClicks);
}
}

private bool IsDirectoryEmpty(string path) => !Directory.Exists(path) || !Directory.EnumerateFileSystemEntries(path).Any();

private static void CopyDirectory(DirectoryInfo source, DirectoryInfo target)
{
// Copy each file
foreach (var file in source.GetFiles())
{
file.CopyTo(Path.Combine(target.FullName, file.Name), true);
}

// Copy each subdirectory
foreach (var sourceSubDirectory in source.GetDirectories())
{
var targetSubDirectory = target.CreateSubdirectory(sourceSubDirectory.Name);
CopyDirectory(sourceSubDirectory, targetSubDirectory);
}
}
}
}

0 comments on commit 7f4eeed

Please sign in to comment.