Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial win32 support #69

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ crossbeam = "0.7"
thiserror = "1.0"
hex = "0.4"

[target.'cfg(windows)'.dependencies]
windows = "0.3.1"

[target.'cfg(windows)'.build-dependencies]
windows = "0.3.1"

[dev-dependencies]
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] }
tempfile = "3.1"
Expand Down
8 changes: 8 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
fn main() {
windows::build!(
windows::win32::system_services::{HANDLE},
windows::win32::shell::{PathIsDirectoryW, PathFileExistsW},
windows::win32::file_system::{CreateFileW, FlushFileBuffers, SetFilePointerEx, SetEndOfFile, ReadFile, WriteFile, GetFileSizeEx},
windows::win32::windows_programming::CloseHandle,
);
}
199 changes: 191 additions & 8 deletions src/pipe_log.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,39 @@
use std::collections::VecDeque;
use std::fs::{self, File};
use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read};
#[cfg(not(target_os = "windows"))]
use std::os::unix::io::RawFd;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::{cmp, u64};

use log::{info, warn};
use nix::errno::Errno;
use nix::fcntl::{self, OFlag};
use nix::sys::stat::Mode;
use nix::sys::uio::{pread, pwrite};
use nix::unistd::{close, fsync, ftruncate, lseek, Whence};
use nix::NixPath;

#[cfg(not(target_os = "windows"))]
use {
nix::errno::Errno,
nix::fcntl::{self, OFlag},
nix::sys::stat::Mode,
nix::sys::uio::{pread, pwrite},
nix::unistd::{close, fsync, ftruncate, lseek, Whence},
nix::NixPath
};

#[cfg(target_os = "windows")]
mod bindings {
::windows::include_bindings!();
}
#[cfg(target_os = "windows")]
use bindings::{
windows::HString,
windows::BOOL,
windows::win32::shell::{PathIsDirectoryW, PathFileExistsW},
windows::win32::system_services::{HANDLE},
windows::win32::file_system::{CreateFileW, FlushFileBuffers, SetFilePointerEx, SetEndOfFile, ReadFile, WriteFile, GetFileSizeEx},
windows::win32::file_system::{FILE_ACCESS_FLAGS, FILE_SHARE_FLAGS, FILE_CREATE_FLAGS, FILE_FLAGS_AND_ATTRIBUTES},
windows::win32::windows_programming::CloseHandle,
};

use protobuf::Message;

use crate::cache_evict::CacheSubmitor;
Expand Down Expand Up @@ -120,8 +141,12 @@ pub trait PipeLogHook: Sync + Send {
fn post_purge(&self, queue: LogQueue, file_num: u64);
}

#[cfg(target_os = "windows")]
pub type RawFd = bindings::windows::win32::system_services::HANDLE;

pub struct LogFd(RawFd);

#[cfg(not(target_os = "windows"))]
impl LogFd {
fn close(&self) -> Result<()> {
close(self.0).map_err(|e| parse_nix_error(e, "close"))
Expand All @@ -131,6 +156,7 @@ impl LogFd {
}
}

#[cfg(not(target_os = "windows"))]
impl Drop for LogFd {
fn drop(&mut self) {
if let Err(e) = self.close() {
Expand All @@ -139,6 +165,27 @@ impl Drop for LogFd {
}
}

#[cfg(target_os = "windows")]
impl LogFd {
fn close(&self) -> Result<()> {
unsafe {
match CloseHandle(self.0) {
BOOL(0) => Err(Error::Io(std::io::Error::last_os_error())),
_ => Ok(())
}
}
}

pub fn sync(&self) -> Result<()> {
unsafe {
match FlushFileBuffers(self.0) {
BOOL(0) => Err(Error::Io(std::io::Error::last_os_error())),
_ => Ok(())
}
}
}
}

struct LogManager {
dir: String,
rotate_size: usize,
Expand Down Expand Up @@ -227,7 +274,9 @@ impl LogManager {
self.active_log_capacity = bytes;
self.last_sync_size = 0;
self.all_files.push_back(fd);
self.sync_dir()?;
if cfg!(not(windows)) {
self.sync_dir()?;
}

for hook in &self.hooks {
let queue = queue_from_suffix(&self.name_suffix);
Expand All @@ -250,7 +299,7 @@ impl LogManager {
return Err(Error::Io(io_error));
}
let active_fd = self.get_active_fd().unwrap();
ftruncate(active_fd.0, offset as _).map_err(|e| parse_nix_error(e, "ftruncate"))?;
truncate_file(active_fd.0, offset as u64)?;
if offset < FILE_MAGIC_HEADER.len() + VERSION.len() {
// After truncate to 0, write header is necessary.
offset = write_file_header(active_fd.0)?;
Expand Down Expand Up @@ -672,6 +721,7 @@ fn queue_from_suffix(suffix: &str) -> LogQueue {
}
}

#[cfg(not(target_os = "windows"))]
fn parse_nix_error(e: nix::Error, custom: &'static str) -> Error {
match e {
nix::Error::Sys(no) => {
Expand All @@ -682,24 +732,94 @@ fn parse_nix_error(e: nix::Error, custom: &'static str) -> Error {
}
}

#[cfg(not(target_os = "windows"))]
fn open_active_file<P: ?Sized + NixPath>(path: &P) -> Result<RawFd> {
let flags = OFlag::O_RDWR | OFlag::O_CREAT;
let mode = Mode::S_IRUSR | Mode::S_IWUSR;
fcntl::open(path, flags, mode).map_err(|e| parse_nix_error(e, "open_active_file"))
}

#[cfg(target_os = "windows")]
fn open_active_file(path: &PathBuf) -> Result<RawFd> {
let path_str = path.as_path().as_os_str().to_str().unwrap();
let ptr = HString::from(path_str).as_wide().as_ptr();
unsafe {
let mut create_disposition = FILE_CREATE_FLAGS::CREATE_NEW;
let mut flag = FILE_FLAGS_AND_ATTRIBUTES::FILE_ATTRIBUTE_NORMAL;
if (PathFileExistsW(ptr) == BOOL(1)) {
create_disposition = FILE_CREATE_FLAGS::OPEN_EXISTING;
if (PathIsDirectoryW(ptr) == BOOL(1)) {
flag = FILE_FLAGS_AND_ATTRIBUTES(0x80 | 0x02000000); // FILE_ATTRIBUTE_NORMAL | FILE_FLAG_BACKUP_SEMANTICS
}
}

match CreateFileW(
ptr,
FILE_ACCESS_FLAGS::FILE_ALL_ACCESS,
FILE_SHARE_FLAGS(7), // FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
std::ptr::null_mut(),
create_disposition,
flag,
HANDLE::default()
) {
HANDLE(-1) => Err(Error::Io(std::io::Error::last_os_error())),
handle => Ok(handle),
}
}
}

#[cfg(not(target_os = "windows"))]
fn open_frozen_file<P: ?Sized + NixPath>(path: &P) -> Result<RawFd> {
let flags = OFlag::O_RDONLY;
let mode = Mode::S_IRWXU;
fcntl::open(path, flags, mode).map_err(|e| parse_nix_error(e, "open_frozen_file"))
}

#[cfg(target_os = "windows")]
fn open_frozen_file(path: &PathBuf) -> Result<RawFd> {
let ptr = HString::from(path.as_path().as_os_str().to_str().unwrap()).as_wide().as_ptr();
unsafe {
let mut flag = FILE_FLAGS_AND_ATTRIBUTES::FILE_ATTRIBUTE_READONLY;
if (PathFileExistsW(ptr) == BOOL(1)) {
if (PathIsDirectoryW(ptr) == BOOL(1)) {
flag = FILE_FLAGS_AND_ATTRIBUTES(0x80 | 0x02000000); // FILE_ATTRIBUTE_NORMAL | FILE_FLAG_BACKUP_SEMANTICS
}
}
match CreateFileW(
ptr,
FILE_ACCESS_FLAGS::FILE_GENERIC_READ,
FILE_SHARE_FLAGS(7), // FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
std::ptr::null_mut(),
FILE_CREATE_FLAGS::OPEN_EXISTING,
flag,
HANDLE::default()
) {
HANDLE(-1) => Err(Error::Io(std::io::Error::last_os_error())),
handle => Ok(handle),
}
}
}

#[cfg(not(target_os = "windows"))]
fn file_len(fd: RawFd) -> Result<usize> {
lseek(fd, 0, Whence::SeekEnd)
.map(|n| n as usize)
.map_err(|e| parse_nix_error(e, "lseek"))
}

#[cfg(target_os = "windows")]
fn file_len(fd: RawFd) -> Result<usize> {
let mut result: i64 = 0;
unsafe {
if GetFileSizeEx(fd, &mut result) == BOOL(0) {
return Err(Error::Io(std::io::Error::last_os_error()));
}
}

Ok(result as usize)
}

#[cfg(not(target_os = "windows"))]
fn pread_exact(fd: RawFd, mut offset: u64, len: usize) -> Result<Vec<u8>> {
let mut result = vec![0; len as usize];
let mut readed = 0;
Expand All @@ -715,6 +835,30 @@ fn pread_exact(fd: RawFd, mut offset: u64, len: usize) -> Result<Vec<u8>> {
Ok(result)
}

#[cfg(target_os = "windows")]
fn pread_exact(fd: RawFd, mut offset: u64, len: usize) -> Result<Vec<u8>> {
unsafe {
let mut result = vec![0; len as usize];
if SetFilePointerEx(fd, offset as i64, std::ptr::null_mut(), 0/* FILE_BEGIN */) == BOOL(0) {
return Err(Error::Io(std::io::Error::last_os_error()));
}

let ptr = result.as_mut_ptr() as *mut core::ffi::c_void;
let len = len as u32;
let mut readed: u32 = 0;

if ReadFile(fd, ptr, len as u32, &mut readed, std::ptr::null_mut()) == BOOL(0) {
return Err(Error::Io(std::io::Error::last_os_error()));
}

SetFilePointerEx(fd, 0, std::ptr::null_mut(), 0/* FILE_BEGIN */);

offset += len as u64;
Ok(result)
}
}

#[cfg(not(target_os = "windows"))]
fn pwrite_exact(fd: RawFd, mut offset: u64, content: &[u8]) -> Result<()> {
let mut written = 0;
while written < content.len() {
Expand All @@ -729,6 +873,29 @@ fn pwrite_exact(fd: RawFd, mut offset: u64, content: &[u8]) -> Result<()> {
Ok(())
}

#[cfg(target_os = "windows")]
fn pwrite_exact(fd: RawFd, mut offset: u64, content: &[u8]) -> Result<()> {
unsafe {
let mut copied = content.to_vec();

if SetFilePointerEx(fd, offset as i64, std::ptr::null_mut(), 0/* FILE_BEGIN */) == BOOL(0) {
return Err(Error::Io(std::io::Error::last_os_error()));
}

let ptr = copied.as_mut_ptr() as *mut core::ffi::c_void;
let len = copied.len() as u32;
let mut written: u32 = 0;
if WriteFile(fd, ptr, len, &mut written, std::ptr::null_mut()) == BOOL(0) {
return Err(Error::Io(std::io::Error::last_os_error()));
}

SetFilePointerEx(fd, 0, std::ptr::null_mut(), 0/* FILE_BEGIN */);

offset += content.len() as u64;
Ok(())
}
}

fn write_file_header(fd: RawFd) -> Result<usize> {
let len = FILE_MAGIC_HEADER.len() + VERSION.len();
let mut header = Vec::with_capacity(len);
Expand All @@ -738,6 +905,22 @@ fn write_file_header(fd: RawFd) -> Result<usize> {
Ok(len)
}

#[cfg(not(target_os = "windows"))]
fn truncate_file(fd: RawFd, offset: u64) -> Result<()> {
ftruncate(fd.0, offset as _).map_err(|e| parse_nix_error(e, "ftruncate"))?;
}

#[cfg(target_os = "windows")]
fn truncate_file(fd: RawFd, offset: u64) -> Result<()> {
unsafe {
let ret = SetFilePointerEx(fd, offset as i64, std::ptr::null_mut(), 0/* FILE_BEGIN */);
match SetEndOfFile(fd) {
BOOL(0) => Err(Error::Io(std::io::Error::last_os_error())),
_ => Ok(())
}
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down