Skip to content

Commit

Permalink
Add new data structure file
Browse files Browse the repository at this point in the history
  • Loading branch information
Yupeng Tang (FA Talent) committed Jun 9, 2019
1 parent cd69fae commit bb06bd8
Show file tree
Hide file tree
Showing 9 changed files with 993 additions and 0 deletions.
229 changes: 229 additions & 0 deletions libjiffy/src/jiffy/storage/client/file_client.cpp
@@ -0,0 +1,229 @@
#include "file_client.h"
#include "jiffy/utils/logger.h"
#include "jiffy/utils/string_utils.h"
#include <algorithm>
#include <thread>

namespace jiffy {
namespace storage {

using namespace jiffy::utils;

file_client::file_client(std::shared_ptr<directory::directory_interface> fs,
const std::string &path,
const directory::data_status &status,
int timeout_ms)
: data_structure_client(fs, path, status, FILE_OPS, timeout_ms) {
read_offset_ = 0;
read_partition_ = 0;
write_partition_ = 0;
for (const auto &block: status.data_blocks()) {
blocks_.push_back(std::make_shared<replica_chain_client>(fs_, path_, block, FILE_OPS, timeout_ms_));
}
}

void file_client::refresh() {
status_ = fs_->dstatus(path_);
blocks_.clear();
for (const auto &block: status_.data_blocks()) {
blocks_.push_back(std::make_shared<replica_chain_client>(fs_, path_, block, FILE_OPS, timeout_ms_));
}
}

std::string file_client::write(const std::string &msg) {
std::string _return;
std::vector<std::string> args{msg};
bool redo;
do {
try {
_return = blocks_[block_id(file_cmd_id::file_write)]->run_command(file_cmd_id::file_write, args).front();
handle_redirect(file_cmd_id::file_write, args, _return);
redo = false;
} catch (redo_error &e) {
redo = true;
}
} while (redo);
return _return;

}

std::string file_client::read(const std::size_t size) {
std::string _return;
std::vector<std::string> args;
args.push_back(std::to_string(read_offset_));
args.push_back(std::to_string(size));
bool redo;
do {
try {
_return = blocks_[block_id(file_cmd_id::file_read)]->run_command(file_cmd_id::file_read, args).front();
handle_redirect(file_cmd_id::file_read, args, _return);
redo = false;
} catch (redo_error &e) {
redo = true;
}
} while (redo);
return _return;
}

bool file_client::seek(const std::size_t offset) {
std::vector<std::string> ret;
auto seek_partition = block_id(file_cmd_id::file_seek);
ret = blocks_[seek_partition]->run_command(file_cmd_id::file_seek, {});
auto size = static_cast<std::size_t>(std::stoi(ret[0]));
auto cap = static_cast<std::size_t>(std::stoi(ret[1]));
if (offset >= seek_partition * cap + size) {
return false;
} else {
read_partition_ = offset / cap;
read_offset_ = offset % cap;
return true;
}
}

bool file_client::add_chain(const file_cmd_id &op) {
if(op == file_cmd_id::file_write) {
return write_partition_ >= blocks_.size() - 1;
} else if (op == file_cmd_id::file_read){
return read_partition_ >= blocks_.size() - 1;
} else {
throw std::logic_error("Adding chain should only happen in write and read");
}
}

std::size_t file_client::block_id(const file_cmd_id &op) {
switch (op) {
case file_cmd_id::file_write:
if (!check_valid_id(write_partition_)) {
throw std::logic_error("Blocks are insufficient, need to add more");
}
return write_partition_;
case file_cmd_id::file_read:
if (!check_valid_id(read_partition_)) {
throw std::logic_error("Blocks are insufficient, need to add more");
}
return read_partition_;
case file_cmd_id::file_seek:
if (read_partition_ > write_partition_) {
return read_partition_;
} else {
return write_partition_;
}
default:throw std::invalid_argument("Incorrect operation of message queue");
}
}

void file_client::handle_redirect(int32_t cmd_id, const std::vector<std::string> &args, std::string &response) {
bool read_flag = true;
typedef std::vector<std::string> list_t;
if (response == "!redo") {
if(cmd_id == file_cmd_id::file_write && write_partition_ < blocks_.size() - 1) {
write_partition_++;
} else if(cmd_id == file_cmd_id::file_read && read_partition_ < blocks_.size() - 1) {
read_partition_++;
read_offset_ = 0;
}
throw redo_error();
}
if (response.substr(0, 15) == "!next_partition") {
do {
if(cmd_id == file_cmd_id::file_read) read_partition_++;
else write_partition_++;
response = blocks_[block_id(static_cast<file_cmd_id >(cmd_id))]->run_command(cmd_id, args).front();
} while (response.substr(0, 15) == "!next_partition");
}
if (response.substr(0, 5) == "!full") {
do {
auto parts = string_utils::split(response, '!');
auto chain = list_t(parts.begin() + 2, parts.end());
if(add_chain(static_cast<file_cmd_id>(cmd_id))) {
blocks_.push_back(std::make_shared<replica_chain_client>(fs_,
path_,
directory::replica_chain(chain),
FILE_OPS));
}
write_partition_++;
response = blocks_[block_id(static_cast<file_cmd_id >(cmd_id))]->run_command(cmd_id, args).front();
} while (response.substr(0, 5) == "!full");
}
if (response.substr(0, 21) == "!msg_not_in_partition") {
do {
auto parts = string_utils::split(response, '!');
auto chain = list_t(parts.begin() + 2, parts.end());
if(add_chain(static_cast<file_cmd_id>(cmd_id))) {
blocks_.push_back(std::make_shared<replica_chain_client>(fs_,
path_,
directory::replica_chain(chain),
FILE_OPS));
}
read_partition_++;
read_offset_ = 0;
std::vector<std::string> modified_args;
modified_args.push_back(std::to_string(read_offset_));
modified_args.push_back(args[1]);
response = blocks_[block_id(static_cast<file_cmd_id >(cmd_id))]->run_command(cmd_id, modified_args).front();
if (response != "!msg_not_found") {
read_offset_ += response.size();
read_flag = false;
}
} while (response.substr(0, 21) == "!msg_not_in_partition");
}
if (response.substr(0, 12) == "!split_write") {
do {
auto parts = string_utils::split(response, '!');
auto remain_string_length = std::stoi(*(parts.end() - 1));
auto msg = args.front();
auto
remain_string = std::vector<std::string>{msg.substr(msg.size() - remain_string_length, remain_string_length)};
if(add_chain(static_cast<file_cmd_id>(cmd_id))) {
auto chain = list_t(parts.begin() + 2, parts.end() - 1);
blocks_.push_back(std::make_shared<replica_chain_client>(fs_,
path_,
directory::replica_chain(chain),
FILE_OPS));
}
write_partition_++;
response = blocks_[block_id(static_cast<file_cmd_id >(cmd_id))]->run_command(cmd_id, remain_string).front();
} while (response.substr(0, 12) == "!split_write");
}
if (response.substr(0, 11) == "!split_read") {
do {
auto parts = string_utils::split(response, '!');
auto first_part_string = *(parts.end() - 1);
if(add_chain(static_cast<file_cmd_id>(cmd_id))) {
auto chain = list_t(parts.begin() + 2, parts.end() - 1);
blocks_.push_back(std::make_shared<replica_chain_client>(fs_,
path_,
directory::replica_chain(chain),
FILE_OPS));
}
read_partition_++;
read_offset_ = 0;
std::vector<std::string> modified_args;
modified_args.push_back(std::to_string(read_offset_));
modified_args.push_back(std::to_string(std::stoi(args[1]) - first_part_string.size()));
auto second_part_string =
blocks_[block_id(static_cast<file_cmd_id >(cmd_id))]->run_command(cmd_id, modified_args).front();
if (second_part_string != "!msg_not_found") {
read_offset_ += second_part_string.size();
read_flag = false;
response = first_part_string + second_part_string;
} else {
response = second_part_string;
}
} while (response.substr(0, 11) == "!split_write");
}
if (response != "!msg_not_found" && cmd_id == static_cast<int32_t>(file_cmd_id::file_read) && read_flag) {
read_offset_ += response.size();
}
}

void file_client::handle_redirects(int32_t cmd_id,
std::vector<std::string> &args,
std::vector<std::string> &responses) {
(void) cmd_id;
(void) args;
(void) responses;
}

}
}
118 changes: 118 additions & 0 deletions libjiffy/src/jiffy/storage/client/file_client.h
@@ -0,0 +1,118 @@
#ifndef JIFFY_FILE_CLIENT_H
#define JIFFY_FILE_CLIENT_H

#include "jiffy/directory/client/directory_client.h"
#include "jiffy/storage/client/replica_chain_client.h"
#include "jiffy/utils/client_cache.h"
#include "jiffy/storage/file/file_ops.h"
#include "jiffy/storage/client/data_structure_client.h"

namespace jiffy {
namespace storage {

class file_client : data_structure_client {
public:
/**
* @brief Constructor
* Store all replica chain and their begin slot
* @param fs Directory service
* @param path Key value block path
* @param status Data status
* @param timeout_ms Timeout
*/

file_client(std::shared_ptr<directory::directory_interface> fs,
const std::string &path,
const directory::data_status &status,
int timeout_ms = 1000);

virtual ~file_client() = default;
/**
* @brief Refresh the slot and blocks from directory service
*/

void refresh() override;

/**
* @brief Write message to file
* @param msg New message
* @return Response of the command
*/

std::string write(const std::string &msg);

/**
* @brief Read message from file
* @param size Size to be read
* @return Response of the command
*/

std::string read(const std::size_t size);

/**
* @brief Seek to a location of the file
* @param offset File offset to seek
* @return Boolean, true if offset is within file range
*/
bool seek(const std::size_t offset);

private:

/**
* @brief Check if new chain needs to be added
* @param op Operation
* @return Boolean, true if new chain needs to be added
*/
bool add_chain(const file_cmd_id &op);
/**
* @brief Fetch block identifier for specified operation
* @param op Operation
* @return Block identifier
*/

std::size_t block_id(const file_cmd_id &op);

/**
* @brief Check if partition number is valid
* @param partition_num Partition number
* @return Boolean, true if valid
*/
bool check_valid_id(std::size_t partition_num) {
if(partition_num < blocks_.size())
return true;
else return false;
}

/**
* @brief Handle command in redirect case
* @param cmd_id Command identifier
* @param response Response to be collected
*/

void handle_redirect(int32_t cmd_id, const std::vector<std::string> &args, std::string &response) override;

/**
* @brief Handle multiple commands in redirect case
* @param cmd_id Command identifier
* @param args Command arguments
* @param responses Responses to be collected
*/

void handle_redirects(int32_t cmd_id,
std::vector<std::string> &args,
std::vector<std::string> &responses) override;

/* Read partition number */
std::size_t read_partition_;
/* Read offset number in a partition */
std::size_t read_offset_;
/* Write partition number */
std::size_t write_partition_;
/* Replica chain clients, each partition only save a replica chain client */
std::vector<std::shared_ptr<replica_chain_client>> blocks_;
};

}
}

#endif //JIFFY_FILE_CLIENT_H

0 comments on commit bb06bd8

Please sign in to comment.