forked from traviscrawford/scribe
-
Notifications
You must be signed in to change notification settings - Fork 0
/
HdfsFile.h
109 lines (96 loc) · 3.21 KB
/
HdfsFile.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Copyright (c) 2009- Facebook
// Distributed under the Scribe Software License
//
// See accompanying file LICENSE or visit the Scribe site at:
// http://developers.facebook.com/scribe/
//
#ifndef HDFS_FILE_H
#define HDFS_FILE_H
#ifdef USE_SCRIBE_HDFS
#include "hdfs.h"
#ifdef HAVE_LZO
#define LZO_STREAMING 1
#endif
#ifdef LZO_STREAMING
#include "lzo/lzoconf.h"
#include "lzo/lzo1x.h"
#endif
class HdfsFile : public FileInterface {
public:
HdfsFile(const std::string& name);
virtual ~HdfsFile();
static void init(); // initialize hdfs subsystem
bool openRead(); // open for reading file
bool openWrite(); // open for appending to file
bool openTruncate(); // truncate and open for write
bool isOpen(); // is file open?
void close();
bool write(const std::string& data);
void flush();
unsigned long fileSize();
long readNext(std::string& _return);
void deleteFile();
void listImpl(const std::string& path, std::vector<std::string>& _return);
std::string getFrame(unsigned data_size);
bool createDirectory(std::string path);
bool createSymlink(std::string newpath, std::string oldpath);
private:
char* inputBuffer_;
unsigned bufferSize_;
hdfsFS fileSys;
hdfsFile hfile;
hdfsFS connectToPath(const char* uri);
#ifdef LZO_STREAMING
/* Streaming LZO write support */
void setShouldLZOCompress(int compressionLevel);
bool LZOStringAppendChar(std::string&str, int x);
bool LZOStringAppendInt16(std::string&str, unsigned x);
bool LZOStringAppendInt32(std::string&str, lzo_uint x);
const std::string LZOCompress(const std::string& data, bool force, bool *success);
lzo_uint32 lzo_checksum;
std::string LZObacklogBuffer;
#endif
// disallow copy, assignment, and empty construction
HdfsFile();
HdfsFile(HdfsFile& rhs);
HdfsFile& operator=(HdfsFile& rhs);
};
/**
* A static lock
*/
class HdfsLock {
private:
static bool lockInitialized;
public:
static pthread_mutex_t lock;
static bool initLock() {
pthread_mutex_init(&lock, NULL);
return true;
}
};
#else
class HdfsFile : public FileInterface {
public:
HdfsFile(const std::string& name) : FileInterface(name, false) {
LOG_OPER("[hdfs] ERROR: HDFS is not supported. file: %s", name.c_str());
LOG_OPER("[hdfs] If you want HDFS Support, please recompile scribe with HDFS support");
}
static void init() {};
bool openRead() { return false; }; // open for reading file
bool openWrite(){ return false; }; // open for appending to file
bool openTruncate() { return false; } // open for write and truncate
bool isOpen() { return false; }; // is file open?
void close() {};
bool write(const std::string& data) { return false; };
void flush() {};
void sync() {};
unsigned long fileSize() { return 0; };
long readNext(std::string& _return) { return false; };
void deleteFile() {};
void listImpl(const std::string& path, std::vector<std::string>& _return) {};
std::string getFrame(unsigned data_size) { return 0; };
bool createDirectory(std::string path) { return false; };
bool createSymlink(std::string newpath, std::string oldpath) { return false; };
};
#endif // USE_SCRIBE_HDFS
#endif // HDFS_FILE_H