-
Notifications
You must be signed in to change notification settings - Fork 0
/
master.h~
159 lines (141 loc) · 3.19 KB
/
master.h~
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#include <stdlib.h>
#include "conf.h"
#include "user.h"
#include "message.h"
#include "tcptransport.h"
#include <vector>
#include <string>
#include <pthread.h>
#include <queue>
#include "simplesocket/ServerSocket.h"
#include "simplesocket/SocketException.h"
#include <dirent.h>
#include <sys/stat.h>
#include "MTFS.h"
using namespace std;
class Master;
struct SlaveNode
{
string ip;
int num;
long long llTotalMemUser;
long long llCurrMemUsed;
long long llCurrCPUUsed;
enum Status {INIT, RUNNING, STOPPED} m_Status; // system status
};
typedef struct TCPInfo0
{
ServerSocket *server;
string ip;
int inport;
int outport;
Master *m;
}TCPInfo;
struct addrStruct
{
string addr;
string base;
};
typedef struct jobO
{
int jobtype;
int j_Status; //0为waiting,1为sending,2为running,3为finish
}job;
typedef struct slavejob0
{
string slave;
queue<job> slaveJobList;
}slaveJob;
typedef struct SPLITFILE
{
string line;
vector<int> xi;
string dnum;
int x;
}splitfile;
typedef struct SENDFILEINFO //将划分完成的文件传送给slave时用到的结构
{
Master *m;
string ip;
string num;
string username;
string tablename;
string base;
vector<string> filename;
}sendfileinfo;
typedef struct LATERTCPTYPE //先开启线程,再在线程中开启新的任务的结构
{
vector<TCPInfo> vTCPInfo;
Master* master;
}latertcptype;
typedef struct CORRDINATE
{
string attrname;
string p1;
string p2;
}corrdinate;
typedef struct TABLECORRDINATE
{
string tablename;
vector<corrdinate> vcorrdset;
}tablecorrdinate;
typedef struct GETRECORDTCP //读取数据时发送消息的线程所需的参数
{
TCPInfo tcpinfo;
Master* m;
vector<string> vscmd;
vector<string> vscorrd;
}getRecordTcp;
class Master
{
public:
Master();
~Master();
public:
int init();
int run();
int stop();
int processSysCmd(const string& ip, const int port, const User* user, const int32_t key, int id, MantouMsg* msg);
void startSlave(const std::string& addr, const std::string& base, const std::string& option, const std::string& log = "");
static void* getActiveSlaves(void *s);
static void* System(void *s);
static void* Shell(void *s);
static void* SlaveCom(void* s);
void getSlaveStat();
static void* getSysinfo(void* s);
static void* jobDealer(void* s);
static void* start(void *s);
static void* jobChecker(void *s);
static void* getPartition(void *s);
static void* test(void *s);
static void* dataSplit(void* s);
static void* senddata(void* s);
static void* getData(void *s);
void decode(vector<string> c, vector<tablecorrdinate> &vtablecorrdinate);
static void* slaveGetRecord(void *s);
int trave_dir(char* path, vector<string> &filename);
// void dataInit();
public:
enum Status {INIT, RUNNING, STOPPED} m_Status; // system status
ConfReader cr;
MTFS mt;
vector<slavelist> slave_list;
vector<struct SlaveNode> activeSlaves;
bool listening;
vector<slaveJob> m_jobList;
queue<job> m_qmasterjobList;
vector<ServerSocket> m_vsock;
vector<TCPInfo> m_vTCPInfo;
private:
static void* get(void* s);
// void timefunc(int sig);
private:
int inCmdPort;
int outCmdPort;
string tmp1;
string tmp2;
int m_itasknum;
int m_isockcount;
};
void timefunc(int sig); /* 定时事件代码 */
void clock(int second);