Skip to content

Commit

Permalink
Prepopulate log file handles (#274)
Browse files Browse the repository at this point in the history
* Adding option to prepopulate log file handles for derived implementations of LocalStorageDevice

* White space

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
peterfreiling and badrishc committed Jun 5, 2020
1 parent 3fc5429 commit ffc2c30
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 47 deletions.
119 changes: 76 additions & 43 deletions cs/src/core/Device/LocalStorageDevice.cs
Expand Up @@ -36,14 +36,36 @@ public class LocalStorageDevice : StorageDeviceBase
bool disableFileBuffering = true,
long capacity = Devices.CAPACITY_UNSPECIFIED,
bool recoverDevice = false)
: this(filename, preallocateFile, deleteOnClose, disableFileBuffering, capacity, recoverDevice, initialLogFileHandles: null)
{
}

/// <summary>
/// Constructor with more options for derived classes
/// </summary>
/// <param name="filename">File name (or prefix) with path</param>
/// <param name="preallocateFile"></param>
/// <param name="deleteOnClose"></param>
/// <param name="disableFileBuffering"></param>
/// <param name="capacity">The maximum number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit </param>
/// <param name="recoverDevice">Whether to recover device metadata from existing files</param>
/// <param name="initialLogFileHandles">Optional set of preloaded safe file handles, which can speed up hydration of preexisting log file handles</param>
protected internal LocalStorageDevice(string filename,
bool preallocateFile = false,
bool deleteOnClose = false,
bool disableFileBuffering = true,
long capacity = Devices.CAPACITY_UNSPECIFIED,
bool recoverDevice = false,
IEnumerable<KeyValuePair<int, SafeFileHandle>> initialLogFileHandles = null)
: base(filename, GetSectorSize(filename), capacity)

{
Native32.EnableProcessPrivileges();
this.preallocateFile = preallocateFile;
this.deleteOnClose = deleteOnClose;
this.disableFileBuffering = disableFileBuffering;
logHandles = new SafeConcurrentDictionary<int, SafeFileHandle>();
logHandles = initialLogFileHandles != null
? new SafeConcurrentDictionary<int, SafeFileHandle>(initialLogFileHandles)
: new SafeConcurrentDictionary<int, SafeFileHandle>();
if (recoverDevice)
RecoverFiles();
}
Expand Down Expand Up @@ -217,48 +239,16 @@ public override void Close()
}

/// <summary>
///
/// Creates a SafeFileHandle for the specified segment. This can be used by derived classes to prepopulate logHandles in the constructor.
/// </summary>
/// <param name="segmentId"></param>
/// <returns></returns>
protected string GetSegmentName(int segmentId)
{
return FileName + "." + segmentId;
}

/// <summary>
///
/// </summary>
/// <param name="_segmentId"></param>
/// <returns></returns>
// Can be used to pre-load handles, e.g., after a checkpoint
protected SafeFileHandle GetOrAddHandle(int _segmentId)
{
return logHandles.GetOrAdd(_segmentId, segmentId => CreateHandle(segmentId));
}

private static uint GetSectorSize(string filename)
{
if (!Native32.GetDiskFreeSpace(filename.Substring(0, 3),
out uint lpSectorsPerCluster,
out uint _sectorSize,
out uint lpNumberOfFreeClusters,
out uint lpTotalNumberOfClusters))
{
Debug.WriteLine("Unable to retrieve information for disk " + filename.Substring(0, 3) + " - check if the disk is available and you have specified the full path with drive name. Assuming sector size of 512 bytes.");
_sectorSize = 512;
}
return _sectorSize;
}

private SafeFileHandle CreateHandle(int segmentId)
protected internal static SafeFileHandle CreateHandle(int segmentId, bool disableFileBuffering, bool deleteOnClose, bool preallocateFile, long segmentSize, string fileName)
{
uint fileAccess = Native32.GENERIC_READ | Native32.GENERIC_WRITE;
uint fileShare = unchecked(((uint)FileShare.ReadWrite & ~(uint)FileShare.Inheritable));
uint fileCreation = unchecked((uint)FileMode.OpenOrCreate);
uint fileFlags = Native32.FILE_FLAG_OVERLAPPED;

if (this.disableFileBuffering)
if (disableFileBuffering)
{
fileFlags = fileFlags | Native32.FILE_FLAG_NO_BUFFERING;
}
Expand All @@ -273,36 +263,79 @@ private SafeFileHandle CreateHandle(int segmentId)
}

var logHandle = Native32.CreateFileW(
GetSegmentName(segmentId),
GetSegmentName(fileName, segmentId),
fileAccess, fileShare,
IntPtr.Zero, fileCreation,
fileFlags, IntPtr.Zero);

if (logHandle.IsInvalid)
{
var error = Marshal.GetLastWin32Error();
throw new IOException($"Error creating log file for {GetSegmentName(segmentId)}, error: {error}", Native32.MakeHRFromErrorCode(error));
throw new IOException($"Error creating log file for {GetSegmentName(fileName, segmentId)}, error: {error}", Native32.MakeHRFromErrorCode(error));
}

if (preallocateFile && segmentSize != -1)
SetFileSize(FileName, logHandle, segmentSize);
SetFileSize(fileName, logHandle, segmentSize);

try
{
ThreadPool.BindHandle(logHandle);
}
catch (Exception e)
{
throw new FasterException("Error binding log handle for " + GetSegmentName(segmentId) + ": " + e.ToString());
throw new FasterException("Error binding log handle for " + GetSegmentName(fileName, segmentId) + ": " + e.ToString());
}
return logHandle;
}

/// <summary>
/// Static method to construct segment name
/// </summary>
protected static string GetSegmentName(string fileName, int segmentId)
{
return fileName + "." + segmentId;
}

/// <summary>
///
/// </summary>
/// <param name="segmentId"></param>
/// <returns></returns>
protected string GetSegmentName(int segmentId) => GetSegmentName(FileName, segmentId);

/// <summary>
///
/// </summary>
/// <param name="_segmentId"></param>
/// <returns></returns>
// Can be used to pre-load handles, e.g., after a checkpoint
protected SafeFileHandle GetOrAddHandle(int _segmentId)
{
return logHandles.GetOrAdd(_segmentId, segmentId => CreateHandle(segmentId));
}

private SafeFileHandle CreateHandle(int segmentId)
=> CreateHandle(segmentId, this.disableFileBuffering, this.deleteOnClose, this.preallocateFile, this.segmentSize, this.FileName);

private static uint GetSectorSize(string filename)
{
if (!Native32.GetDiskFreeSpace(filename.Substring(0, 3),
out uint lpSectorsPerCluster,
out uint _sectorSize,
out uint lpNumberOfFreeClusters,
out uint lpTotalNumberOfClusters))
{
Debug.WriteLine("Unable to retrieve information for disk " + filename.Substring(0, 3) + " - check if the disk is available and you have specified the full path with drive name. Assuming sector size of 512 bytes.");
_sectorSize = 512;
}
return _sectorSize;
}

/// Sets file size to the specified value.
/// Does not reset file seek pointer to original location.
private bool SetFileSize(string filename, SafeFileHandle logHandle, long size)
private static bool SetFileSize(string filename, SafeFileHandle logHandle, long size)
{
if (segmentSize <= 0)
if (size <= 0)
return false;

if (Native32.EnableVolumePrivileges(filename, logHandle))
Expand Down
12 changes: 11 additions & 1 deletion cs/src/core/Utilities/SafeConcurrentDictionary.cs
Expand Up @@ -18,10 +18,20 @@ namespace FASTER.core
/// <typeparam name="TValue">Type of values in the dictionary</typeparam>
internal sealed class SafeConcurrentDictionary<TKey, TValue> : IEnumerable<KeyValuePair<TKey, TValue>>
{
private readonly ConcurrentDictionary<TKey, TValue> dictionary = new ConcurrentDictionary<TKey, TValue>();
private readonly ConcurrentDictionary<TKey, TValue> dictionary;

private readonly ConcurrentDictionary<TKey, object> keyLocks = new ConcurrentDictionary<TKey, object>();

public SafeConcurrentDictionary()
{
this.dictionary = new ConcurrentDictionary<TKey, TValue>();
}

public SafeConcurrentDictionary(IEnumerable<KeyValuePair<TKey, TValue>> initialCollection)
{
this.dictionary = new ConcurrentDictionary<TKey, TValue>(initialCollection);
}

/// <summary>
/// Returns the count of the dictionary.
/// </summary>
Expand Down
28 changes: 25 additions & 3 deletions cs/test/SharedDirectoryTests.cs
Expand Up @@ -2,9 +2,11 @@
// Licensed under the MIT license.

using FASTER.core;
using Microsoft.Win32.SafeHandles;
using NUnit.Framework;
using NUnit.Framework.Internal;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;

Expand Down Expand Up @@ -68,7 +70,7 @@ public void SharedLogDirectory()
CopyDirectory(new DirectoryInfo(this.original.CheckpointDirectory), new DirectoryInfo(cloneCheckpointDirectory));

// Recover from original checkpoint
this.clone.Initialize(cloneCheckpointDirectory, this.sharedLogDirectory);
this.clone.Initialize(cloneCheckpointDirectory, this.sharedLogDirectory, populateLogHandles: true);
this.clone.Faster.Recover(checkpointGuid);

// Both sessions should work concurrently
Expand All @@ -95,12 +97,32 @@ private struct FasterTestInstance
public FasterKV<AdId, NumClicks, AdInput, Output, Empty, Functions> Faster { get; private set; }
public IDevice LogDevice { get; private set; }

public void Initialize(string checkpointDirectory, string logDirectory)
public void Initialize(string checkpointDirectory, string logDirectory, bool populateLogHandles = false)
{
this.CheckpointDirectory = checkpointDirectory;
this.LogDirectory = logDirectory;

this.LogDevice = Devices.CreateLogDevice($"{this.LogDirectory}\\log", deleteOnClose: true);
string logFileName = "log";
string deviceFileName = $"{this.LogDirectory}\\{logFileName}";
KeyValuePair<int, SafeFileHandle>[] initialHandles = null;
if (populateLogHandles)
{
var segmentIds = new List<int>();
foreach (FileInfo item in new DirectoryInfo(logDirectory).GetFiles(logFileName + "*"))
{
segmentIds.Add(int.Parse(item.Name.Replace(logFileName, "").Replace(".", "")));
}
segmentIds.Sort();
initialHandles = new KeyValuePair<int, SafeFileHandle>[segmentIds.Count];
for (int i = 0; i < segmentIds.Count; i++)
{
var segmentId = segmentIds[i];
var handle = LocalStorageDevice.CreateHandle(segmentId, disableFileBuffering: false, deleteOnClose: true, preallocateFile: false, segmentSize: -1, fileName: deviceFileName);
initialHandles[i] = new KeyValuePair<int, SafeFileHandle>(segmentId, handle);
}
}

this.LogDevice = new LocalStorageDevice(deviceFileName, deleteOnClose: true, disableFileBuffering: false, initialLogFileHandles: initialHandles);
this.Faster = new FasterKV<AdId, NumClicks, AdInput, Output, Empty, Functions>(
keySpace,
new Functions(),
Expand Down

0 comments on commit ffc2c30

Please sign in to comment.