Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Linux open file flag with kuzu open file flag #2931

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions extension/httpfs/src/cached_file_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ std::unique_ptr<FileInfo> CachedFileManager::getCachedFileInfo(HTTPFileInfo* htt
auto fileName = FileSystem::getFileName(httpFileInfo->path);
auto cacheFilePath = getCachedFilePath(fileName, transactionID);
if (!vfs->fileOrPathExists(cacheFilePath)) {
auto cacheFileInfo = vfs->openFile(cacheFilePath, O_CREAT | O_RDWR);
auto cacheFileInfo = vfs->openFile(cacheFilePath,
FileFlags::CREATE_IF_NOT_EXISTS | FileFlags::READ_ONLY | FileFlags::WRITE);
downloadFile(httpFileInfo, cacheFileInfo.get());
}
return vfs->openFile(cacheFilePath, O_RDONLY);
return vfs->openFile(cacheFilePath, FileFlags::READ_ONLY);
}

void CachedFileManager::cleanUP(main::ClientContext* context) {
Expand Down
2 changes: 1 addition & 1 deletion extension/httpfs/src/httpfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ bool HTTPFileSystem::canHandleFile(const std::string& path) const {

bool HTTPFileSystem::fileOrPathExists(const std::string& path, main::ClientContext* context) {
try {
auto fileInfo = openFile(path, O_RDONLY, context, FileLockType::READ_LOCK);
auto fileInfo = openFile(path, FileFlags::READ_ONLY, context, FileLockType::READ_LOCK);
auto httpFileInfo = fileInfo->constPtrCast<HTTPFileInfo>();
if (httpFileInfo->length == 0) {
return false;
Expand Down
Empty file removed mm-256KB
Empty file.
5 changes: 2 additions & 3 deletions src/binder/bind/bind_import_database.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#include <fcntl.h>

#include "binder/binder.h"
#include "binder/bound_import_database.h"
#include "common/cast.h"
Expand All @@ -23,7 +21,8 @@ static std::string getQueryFromFile(common::VirtualFileSystem* vfs, const std::s
if (!vfs->fileOrPathExists(filePath, context)) {
throw BinderException(stringFormat("File {} does not exist.", filePath));
}
auto fileInfo = vfs->openFile(filePath, O_RDONLY
auto fileInfo = vfs->openFile(filePath, FileFlags::READ_ONLY
// TODO(Ziyi): We need to handle O_BINARY here.
#ifdef _WIN32
| _O_BINARY
#endif
Expand Down
10 changes: 4 additions & 6 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#include "catalog/catalog.h"

#include <fcntl.h>

#include "binder/ddl/bound_alter_info.h"
#include "binder/ddl/bound_create_sequence_info.h"
#include "binder/ddl/bound_create_table_info.h"
Expand Down Expand Up @@ -413,8 +411,8 @@ static void writeMagicBytes(Serializer& serializer) {
void Catalog::saveToFile(const std::string& directory, common::VirtualFileSystem* fs,
common::FileVersionType versionType) {
auto catalogPath = StorageUtils::getCatalogFilePath(fs, directory, versionType);
Serializer serializer(
std::make_unique<BufferedFileWriter>(fs->openFile(catalogPath, O_WRONLY | O_CREAT)));
Serializer serializer(std::make_unique<BufferedFileWriter>(fs->openFile(catalogPath,
FileFlags::CREATE_IF_NOT_EXISTS | FileFlags::READ_ONLY | FileFlags::WRITE)));
writeMagicBytes(serializer);
serializer.serializeValue(StorageVersionInfo::getStorageVersion());
tables->serialize(serializer);
Expand All @@ -426,8 +424,8 @@ void Catalog::saveToFile(const std::string& directory, common::VirtualFileSystem
void Catalog::readFromFile(const std::string& directory, common::VirtualFileSystem* fs,
common::FileVersionType versionType, main::ClientContext* context) {
auto catalogPath = StorageUtils::getCatalogFilePath(fs, directory, versionType);
Deserializer deserializer(
std::make_unique<BufferedFileReader>(fs->openFile(catalogPath, O_RDONLY, context)));
Deserializer deserializer(std::make_unique<BufferedFileReader>(
fs->openFile(catalogPath, FileFlags::READ_ONLY, context)));
validateMagicBytes(deserializer);
storage_version_t savedStorageVersion;
deserializer.deserializeValue(savedStorageVersion);
Expand Down
49 changes: 44 additions & 5 deletions src/common/file_system/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,57 @@ LocalFileInfo::~LocalFileInfo() {
#endif
}

static void validateFileFlags(uint8_t flags) {
bool isRead = flags & FileFlags::READ_ONLY;
bool isWrite = flags & FileFlags::WRITE;
// Require either READ or WRITE (or both).
KU_ASSERT(isRead || isWrite);
// CREATE flags require writing.
KU_ASSERT(isWrite || !(flags & FileFlags::CREATE_IF_NOT_EXISTS));
KU_ASSERT(isWrite || !(flags & FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS));
// CREATE_IF_NOT_EXISTS and CREATE_AND_TRUNCATE_IF_EXISTS flags cannot be combined.
KU_ASSERT(!(flags & FileFlags::CREATE_IF_NOT_EXISTS &&
flags & FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS));
}

std::unique_ptr<FileInfo> LocalFileSystem::openFile(const std::string& path, int flags,
main::ClientContext* context, FileLockType lock_type) {
auto fullPath = expandPath(context, path);
validateFileFlags(flags);

int openFlags = 0;
bool readMode = flags & FileFlags::READ_ONLY;
bool writeMode = flags & FileFlags::WRITE;
if (readMode && writeMode) {
openFlags = O_RDWR;
} else if (readMode) {
openFlags = O_RDONLY;
} else if (writeMode) {
openFlags = O_WRONLY;
} else {
// LCOV_EXCL_START
throw InternalException("READ, WRITE or both should be specified when opening a file.");
// LCOV_EXCL_STOP
}
if (writeMode) {
KU_ASSERT(flags & FileFlags::WRITE);
openFlags |= O_CLOEXEC;
if (flags & FileFlags::CREATE_IF_NOT_EXISTS) {
openFlags |= O_CREAT;
} else if (flags & FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS) {
openFlags |= O_CREAT | O_TRUNC;
}
}

#if defined(_WIN32)
auto dwDesiredAccess = 0ul;
auto dwCreationDisposition = (flags & O_CREAT) ? OPEN_ALWAYS : OPEN_EXISTING;
auto dwCreationDisposition = (openFlags & O_CREAT) ? OPEN_ALWAYS : OPEN_EXISTING;
auto dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE;
if (flags & (O_CREAT | O_WRONLY | O_RDWR)) {
if (openFlags & (O_CREAT | O_WRONLY | O_RDWR)) {
dwDesiredAccess |= GENERIC_WRITE;
}
// O_RDONLY is 0 in practice, so flags & (O_RDONLY | O_RDWR) doesn't work.
if (!(flags & O_WRONLY)) {
// O_RDONLY is 0 in practice, so openFlags & (O_RDONLY | O_RDWR) doesn't work.
if (!(openFlags & O_WRONLY)) {
dwDesiredAccess |= GENERIC_READ;
}

Expand All @@ -71,7 +110,7 @@ std::unique_ptr<FileInfo> LocalFileSystem::openFile(const std::string& path, int
}
return std::make_unique<LocalFileInfo>(fullPath, handle, this);
#else
int fd = open(fullPath.c_str(), flags, 0644);
int fd = open(fullPath.c_str(), openFlags, 0644);
if (fd == -1) {
throw IOException(stringFormat("Cannot open file {}: {}", fullPath, posixErrMessage()));
}
Expand Down
6 changes: 2 additions & 4 deletions src/function/export/export_csv_function.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#include <fcntl.h>

#include "common/file_system/virtual_file_system.h"
#include "common/serializer/buffered_serializer.h"
#include "function/cast/vector_cast_functions.h"
Expand Down Expand Up @@ -107,8 +105,8 @@ struct ExportCSVSharedState : public ExportFuncSharedState {
offset_t offset = 0;

ExportCSVSharedState(main::ClientContext& context, const ExportFuncBindData& bindData) {
fileInfo = context.getVFSUnsafe()->openFile(bindData.fileName, O_WRONLY | O_CREAT | O_TRUNC,
&context);
fileInfo = context.getVFSUnsafe()->openFile(bindData.fileName,
FileFlags::WRITE | FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS, &context);
writeHeader(bindData);
}

Expand Down
12 changes: 12 additions & 0 deletions src/include/common/file_system/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ namespace common {

enum class FileLockType : uint8_t { NO_LOCK = 0, READ_LOCK = 1, WRITE_LOCK = 2 };

struct FileFlags {
static constexpr uint8_t READ_ONLY = 1 << 0;
static constexpr uint8_t WRITE = 1 << 1;
// Create file if not exists, can only be used together with WRITE
static constexpr uint8_t CREATE_IF_NOT_EXISTS = 1 << 3;
// Always create a new file. If a file exists, the file is truncated. Cannot be used together
// with CREATE_IF_NOT_EXISTS.
static constexpr uint8_t CREATE_AND_TRUNCATE_IF_EXISTS = 1 << 4;
// Temporary file that is not persisted to disk.
static constexpr uint8_t TEMPORARY = 1 << 5;
};

class KUZU_API FileSystem {
friend struct FileInfo;

Expand Down
5 changes: 2 additions & 3 deletions src/include/storage/file_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class FileHandle {
fileInfo->writeFile(buffer, getPageSize(), pageIdx * getPageSize());
}

// TODO: These should be refactored to use functions of `FileFlags`.
inline bool isLargePaged() const { return flags & isLargePagedMask; }
inline bool isNewTmpFile() const { return flags & isNewInMemoryTmpFileMask; }
inline bool isReadOnlyFile() const { return flags & isReadOnlyMask; }
Expand All @@ -65,9 +66,7 @@ class FileHandle {

protected:
virtual common::page_idx_t addNewPageWithoutLock();
void constructExistingFileHandle(const std::string& path, common::VirtualFileSystem* vfs,
main::ClientContext* context);
void constructNewFileHandle(const std::string& path, common::VirtualFileSystem* vfs,
void constructFileHandle(const std::string& path, common::VirtualFileSystem* vfs,
main::ClientContext* context);

protected:
Expand Down
3 changes: 0 additions & 3 deletions src/include/storage/storage_structure/overflow_file.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#pragma once

#include <fcntl.h>

#include <cstdint>
#include <memory>
#include <string_view>
Expand All @@ -11,7 +9,6 @@
#include "common/types/types.h"
#include "storage/buffer_manager/bm_file_handle.h"
#include "storage/buffer_manager/buffer_manager.h"
#include "storage/file_handle.h"
#include "storage/index/hash_index_utils.h"
#include "storage/storage_structure/in_mem_page.h"
#include "storage/storage_utils.h"
Expand Down
5 changes: 3 additions & 2 deletions src/main/attached_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ void AttachedKuzuDatabase::initCatalog(const std::string& path, ClientContext* c
}

static void validateEmptyWAL(const std::string& path, ClientContext* context) {
auto walFile = context->getVFSUnsafe()->openFile(
path + "/" + common::StorageConstants::WAL_FILE_SUFFIX, O_RDONLY, context);
auto walFile =
context->getVFSUnsafe()->openFile(path + "/" + common::StorageConstants::WAL_FILE_SUFFIX,
common::FileFlags::READ_ONLY, context);
if (walFile->getFileSize() > 0) {
throw common::RuntimeException(
common::stringFormat("Cannot attach a remote kuzu database with non-empty wal file."));
Expand Down
4 changes: 2 additions & 2 deletions src/main/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ SystemConfig::SystemConfig(uint64_t bufferPoolSize_, uint64_t maxNumThreads, boo
}

static void getLockFileFlagsAndType(bool readOnly, bool createNew, int& flags, FileLockType& lock) {
flags = readOnly ? O_RDONLY : O_RDWR;
flags = readOnly ? FileFlags::READ_ONLY : FileFlags::WRITE;
if (createNew && !readOnly) {
flags |= O_CREAT;
flags |= FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS;
}
lock = readOnly ? FileLockType::READ_LOCK : FileLockType::WRITE_LOCK;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#include "processor/operator/persistent/reader/csv/base_csv_reader.h"

#include <fcntl.h>

#include <vector>

#include "common/exception/copy.h"
Expand All @@ -20,7 +18,7 @@ BaseCSVReader::BaseCSVReader(const std::string& filePath, common::CSVOption opti
: option{std::move(option)}, numColumns{numColumns}, buffer{nullptr}, bufferSize{0},
position{0}, osFileOffset{0}, rowEmpty{false}, context{context} {
fileInfo = context->getVFSUnsafe()->openFile(filePath,
O_RDONLY
FileFlags::READ_ONLY
#ifdef _WIN32
| _O_BINARY
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ void ParquetReader::initializeScan(ParquetReaderScanState& state,
state.groupIdxList = std::move(groups_to_read);
if (!state.fileInfo || state.fileInfo->path != filePath) {
state.prefetchMode = false;
state.fileInfo = vfs->openFile(filePath, O_RDONLY, context);
state.fileInfo = vfs->openFile(filePath, FileFlags::READ_ONLY, context);
}

state.thriftFileProto = createThriftProtocol(state.fileInfo.get(), state.prefetchMode);
Expand Down Expand Up @@ -170,7 +170,7 @@ void ParquetReader::scan(processor::ParquetReaderScanState& state, DataChunk& re
}

void ParquetReader::initMetadata() {
auto fileInfo = context->getVFSUnsafe()->openFile(filePath, O_RDONLY, context);
auto fileInfo = context->getVFSUnsafe()->openFile(filePath, FileFlags::READ_ONLY, context);
auto proto = createThriftProtocol(fileInfo.get(), false);
auto& transport =
ku_dynamic_cast<kuzu_apache::thrift::transport::TTransport&, ThriftFileTransport&>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ ParquetWriter::ParquetWriter(std::string fileName, std::vector<common::LogicalTy
main::ClientContext* context)
: fileName{std::move(fileName)}, types{std::move(types)}, columnNames{std::move(columnNames)},
codec{codec}, fileOffset{0}, mm{context->getMemoryManager()} {
fileInfo =
context->getVFSUnsafe()->openFile(this->fileName, O_WRONLY | O_CREAT | O_TRUNC, context);
fileInfo = context->getVFSUnsafe()->openFile(this->fileName,
FileFlags::WRITE | FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS, context);
// Parquet files start with the string "PAR1".
fileInfo->writeFile(reinterpret_cast<const uint8_t*>(ParquetConstants::PARQUET_MAGIC_WORDS),
strlen(ParquetConstants::PARQUET_MAGIC_WORDS), fileOffset);
Expand Down
4 changes: 1 addition & 3 deletions src/processor/operator/simple/export_db.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#include "processor/operator/simple/export_db.h"

#include <fcntl.h>

#include <sstream>

#include "catalog/catalog.h"
Expand All @@ -26,7 +24,7 @@ using std::stringstream;

static void writeStringStreamToFile(VirtualFileSystem* vfs, std::string ssString,
const std::string& path) {
auto fileInfo = vfs->openFile(path, O_WRONLY | O_CREAT);
auto fileInfo = vfs->openFile(path, FileFlags::WRITE | FileFlags::CREATE_IF_NOT_EXISTS);
fileInfo->writeFile(reinterpret_cast<const uint8_t*>(ssString.c_str()), ssString.size(),
0 /* offset */);
}
Expand Down
3 changes: 2 additions & 1 deletion src/processor/operator/simple/install_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ void InstallExtension::saveExtensionToLocalFile(const std::string& extensionData
if (!vfs->fileOrPathExists(extensionDir, context)) {
vfs->createDir(extensionDir);
}
auto fileInfo = vfs->openFile(extensionPath, O_WRONLY | O_CREAT);
auto fileInfo =
vfs->openFile(extensionPath, FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS | FileFlags::WRITE);
fileInfo->writeFile(reinterpret_cast<const uint8_t*>(extensionData.c_str()),
extensionData.size(), 0 /* offset */);
fileInfo->syncFile();
Expand Down
32 changes: 14 additions & 18 deletions src/storage/file_handle.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#include "storage/file_handle.h"

#include <fcntl.h>

#include <cmath>
#include <mutex>

Expand All @@ -15,37 +13,35 @@ namespace storage {
FileHandle::FileHandle(const std::string& path, uint8_t flags, VirtualFileSystem* vfs,
main::ClientContext* context)
: flags{flags} {
if (!isNewTmpFile()) {
constructExistingFileHandle(path, vfs, context);
} else {
constructNewFileHandle(path, vfs, context);
}
constructFileHandle(path, vfs, context);
}

void FileHandle::constructExistingFileHandle(const std::string& path, VirtualFileSystem* vfs,
void FileHandle::constructFileHandle(const std::string& path, VirtualFileSystem* vfs,
main::ClientContext* context) {
if (isNewTmpFile()) {
numPages = 0;
pageCapacity = 0;
fileInfo = std::make_unique<FileInfo>(path, nullptr /* fileSystem */);
return;
}
int openFlags;
if (isReadOnlyFile()) {
openFlags = O_RDONLY;
openFlags = FileFlags::READ_ONLY;
} else {
openFlags = O_RDWR | ((createFileIfNotExists()) ? O_CREAT : 0x00000000);
openFlags = FileFlags::WRITE | FileFlags::READ_ONLY;
if (createFileIfNotExists()) {
openFlags |= FileFlags::CREATE_IF_NOT_EXISTS;
}
}
fileInfo = vfs->openFile(path, openFlags, context);
auto fileLength = fileInfo->getFileSize();
numPages = ceil((double)fileLength / (double)getPageSize());
numPages = ceil(static_cast<double>(fileLength) / static_cast<double>(getPageSize()));
pageCapacity = 0;
while (pageCapacity < numPages) {
pageCapacity += StorageConstants::PAGE_GROUP_SIZE;
}
}

void FileHandle::constructNewFileHandle(const std::string& path, VirtualFileSystem* vfs,
main::ClientContext* context) {
fileInfo = vfs->openFile(path, O_CREAT | O_RDWR, context);
numPages = 0;
pageCapacity = 0;
}

page_idx_t FileHandle::addNewPage() {
return addNewPages(1 /* numNewPages */);
}
Expand Down
2 changes: 0 additions & 2 deletions src/storage/index/in_mem_hash_index.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#include "storage/index/in_mem_hash_index.h"

#include <fcntl.h>

#include <cstring>

#include "common/constants.h"
Expand Down
Loading
Loading