Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
158 lines (132 sloc) 4.68 KB
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <fcntl.h>
#include <unistd.h>
#include <curl/curl.h>
#include <string>
#include <sys/time.h>
#include "update_listener.h"
#include "yadfs_exception.h"
#include "std_headers.h"
#include "utils.h"
static std::string transaction_id_file(const std::string& server) {
std::string state_dir(check_create_state_directory(server));
return state_dir + "/last_transaction";
}
static int64_t read_latest_known_transaction_id(const std::string& server) {
std::string transaction_file(transaction_id_file(server));
int fd = open(transaction_file.c_str(), O_RDONLY|O_EXLOCK);
int64_t value(-1LL);
if (fd>=0) {
int64_t tmp;
if (sizeof(int64_t) == read(fd, &tmp, sizeof(int64_t)))
value=tmp;
close(fd);
}
return value;
}
static void write_latest_known_transaction_id(const std::string& server, int64_t transaction_id) {
std::string transaction_file(transaction_id_file(server));
int fd = open(transaction_file.c_str(), O_WRONLY|O_TRUNC|O_CREAT|O_EXLOCK);
if (fd>=0) {
write(fd, &transaction_id, sizeof(int64_t));
close(fd);
}
}
UpdateListener::~UpdateListener() {
}
void UpdateListener::onDataReceived(const uint8_t* data, size_t length) {
mCurrentData.write((const char*) data, length);
}
static size_t data_received_callback(void* ptr, size_t size, size_t nmemb, void *stream) {
((UpdateListener*)stream)->onDataReceived((const uint8_t*)ptr, size*nmemb);
return size*nmemb;
}
void UpdateListener::onNewHeaderReceived(const std::string& line) {
mCurrentHeaders.push_back(line);
}
static size_t header_line_received_callback(void *ptr, size_t size, size_t nmemb, void *stream) {
std::string line;
line.append((const char*) ptr, size*nmemb);
((UpdateListener*)stream)->onNewHeaderReceived(line);
return size*nmemb;
}
void UpdateListener::operator()() {
// Open transaction state file and read last
mLatestKnownTransaction=read_latest_known_transaction_id(mServer);
std::string updateUrl(mServer+"update");
// Create CURL handle
CURL* curl(curl_easy_init());
while(true) {
// Setup curl request
timeval tv;
gettimeofday(&tv, 0);
std::stringstream volatile_url;
volatile_url << updateUrl << "?not_in_cache=" << tv.tv_sec << "_" << tv.tv_usec;
curl_easy_setopt(curl, CURLOPT_URL, volatile_url.str().c_str());
curl_slist* headers(0);
headers = curl_slist_append(headers, "Accept: */*");
headers = curl_slist_append(headers, "Accept-Charset: UTF-8");
headers = curl_slist_append(headers, "Keep-Alive: 36000");
headers = curl_slist_append(headers, "Connection: keep-alive");
if (mLatestKnownTransaction!=-1LL) {
std::string transaction_header(YADFS_HEADER_LAST_SERIAL_ID);
transaction_header+=": ";
transaction_header += mLatestKnownTransaction;
headers = curl_slist_append(headers, transaction_header.c_str());
}
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, data_received_callback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_line_received_callback);
curl_easy_setopt(curl, CURLOPT_WRITEHEADER, this);
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
// Clear current content
mCurrentHeaders.clear();
mCurrentData.clear();
// Issue request and scrap result
if (0==curl_easy_perform(curl)) {
std::string line;
while (std::getline(mCurrentData, line))
parseUpdateLine(line);
write_latest_known_transaction_id(mServer, mLatestKnownTransaction);
}
curl_slist_free_all(headers);
if (mDelegate.quitRequested()) break;
// sleep for 1s
usleep(1000000);
}
curl_easy_cleanup(curl);
}
void UpdateListener::parseUpdateLine(const std::string& line) {
// <transaction id><TAB>MOVED<TAB>/old/path<TAB>/new/path
// <transaction id><TAB>STORED<TAB>/path/of/modified/object
std::string separators("\t\n");
size_t a(0), b(line.find_first_of(separators));
if (b==std::string::npos) return;
std::stringstream transaction_id_ss(line.substr(a, b-a));
int64_t transaction_id;
transaction_id_ss >> transaction_id;
a=b+1;
b=line.find_first_of(separators, a);
if (b==std::string::npos) return;
std::string verb(line.substr(a, b-a));
std::list<std::string> args;
do {
a=b+1;
b=line.find_first_of(separators, a);
if (b==std::string::npos) b=line.size();;
args.push_back(line.substr(a, b-a));
} while(b!=line.size());
if (verb=="MOVED" && args.size()==2) {
std::list<std::string>::iterator it(args.begin());
const std::string& first(*it);
const std::string& second(*(++it));
mDelegate.onMoved(transaction_id, first, second);
}
else if (verb=="STORED")
mDelegate.onStored(transaction_id, args.front());
else return;
mLatestKnownTransaction = transaction_id;
}