Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Newer
Older
100644 358 lines (301 sloc) 8.688 kB
a12e719 Initial commit
postgres authored
1 /*
707c501 Add repmgr client command.
postgres authored
2 * repmgrd.c
a12e719 Initial commit
postgres authored
3 * Copyright (c) 2ndQuadrant, 2010
4 *
707c501 Add repmgr client command.
postgres authored
5 * Replication manager daemon
a12e719 Initial commit
postgres authored
6 * This module connects to the nodes of a replication cluster and monitors
7 * how far are they from master
8 */
9
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <unistd.h>
13
14 #include "repmgr.h"
15
16 char myClusterName[MAXLEN];
17
18 /* Local info */
19 int myLocalMode = STANDBY_MODE;
20 int myLocalId = -1;
21 PGconn *myLocalConn;
22
23 /* Primary info */
24 int primaryId;
25 char primaryConninfo[MAXLEN];
26 PGconn *primaryConn;
27
28
29 void setMyLocalMode(void);
30 void checkClusterConfiguration(void);
31 void checkNodeConfiguration(char *conninfo);
32 void getPrimaryConnection(void);
33
34 void MonitorCheck(void);
35 void MonitorExecute(void);
36
7ec3485 Lag is now monitored in bytes, we show replication lag
postgres authored
37 unsigned long long int walLocationToBytes(char *wal_location);
38
a12e719 Initial commit
postgres authored
39
40 int
41 main(int argc, char **argv)
42 {
43 char conninfo[MAXLEN];
44
45 /*
46 * Read the configuration file: repmgr.conf
47 */
48 parse_config(myClusterName, &myLocalId, conninfo);
49 if (myLocalId == -1)
50 {
51 fprintf(stderr, "Node information is missing. "
52 "Check the configuration file.");
53 exit(1);
54 }
55
56 myLocalConn = establishDBConnection(conninfo, true);
57
58 /*
59 * Set my server mode, establish a connection to primary
60 * and start monitor
61 */
62 setMyLocalMode();
63 checkClusterConfiguration();
64 checkNodeConfiguration(conninfo);
65 if (myLocalMode == STANDBY_MODE)
66 {
67 /* I need the id of the primary as well as a connection to it */
68 getPrimaryConnection();
69 MonitorCheck();
70 PQfinish(primaryConn);
71 }
72
73 /* close the connection to the database and cleanup */
74 PQfinish(myLocalConn);
75
76 return 0;
77 }
78
79
80 /*
81 * This function ask if we are in recovery, if false we are the primary else
82 * we are a standby
83 */
84 void
85 setMyLocalMode(void)
86 {
87 PGresult *res;
88
89 res = PQexec(myLocalConn, "SELECT pg_is_in_recovery()");
90 if (PQresultStatus(res) != PGRES_TUPLES_OK)
91 {
92 fprintf(stderr, "Can't query server mode: %s", PQerrorMessage(myLocalConn));
93 PQclear(res);
94 PQfinish(myLocalConn);
95 exit(1);
96 }
97
98 if (strcmp(PQgetvalue(res, 0, 0), "f") == 0)
99 myLocalMode = PRIMARY_MODE;
100 else
101 myLocalMode = STANDBY_MODE;
102
103 PQclear(res);
104 }
105
106
107 void
108 getPrimaryConnection(void)
109 {
110 PGresult *res1;
111 PGresult *res2;
112 int i;
113
114 res1 = PQexec(myLocalConn, "SELECT * FROM repl_nodes");
115 if (PQresultStatus(res1) != PGRES_TUPLES_OK)
116 {
117 fprintf(stderr, "Can't get nodes info: %s", PQerrorMessage(myLocalConn));
118 PQclear(res1);
119 PQfinish(myLocalConn);
120 exit(1);
121 }
122
123 for (i = 0; i < PQntuples(res1); i++)
124 {
125 primaryId = atoi(PQgetvalue(res1, i, 0));
d728ef0 Fix some typos and replace the variable Inrecovery with
postgres authored
126 strcpy(primaryConninfo, PQgetvalue(res1, i, 2));
a12e719 Initial commit
postgres authored
127 primaryConn = establishDBConnection(primaryConninfo, false);
128
129 res2 = PQexec(primaryConn, "SELECT pg_is_in_recovery()");
130 if (PQresultStatus(res2) != PGRES_TUPLES_OK)
131 {
132 fprintf(stderr, "Can't get nodes info: %s", PQerrorMessage(primaryConn));
133 PQclear(res1);
134 PQclear(res2);
135 PQfinish(primaryConn);
136 PQfinish(myLocalConn);
137 exit(1);
138 }
139
140 if (strcmp(PQgetvalue(res2, 0, 0), "f") == 0)
141 {
142 PQclear(res2);
143 PQclear(res1);
144 /* On the primary the monitor check is asynchronous */
145 res1 = PQexec(primaryConn, "SET synchronous_commit TO off");
146 PQclear(res1);
147 return;
148 }
149 else
150 {
151 PQclear(res2);
152 PQfinish(primaryConn);
153 primaryId = -1;
154 }
155 }
156
157 /* If we finish this loop without finding a primary then
158 * we doesn't have the info or the primary has failed (or we
159 * reached max_connections or superuser_reserved_connections,
160 * anything else i'm missing?),
161 * Probably we will need to check the error to know if we need
162 * to start failover procedure o just fix some situation on the
163 * standby.
164 */
165 fprintf(stderr, "There isn't a primary node");
166 PQclear(res1);
167 PQfinish(myLocalConn);
168 exit(1);
169 }
170
171
172 void
173 MonitorCheck(void) {
7ad17f7 Dropping phase2 from repmgr, it now is an independent tool
postgres authored
174 /*
175 * Every 3 seconds, insert monitor info
a12e719 Initial commit
postgres authored
176 */
177 for (;;)
178 {
7ad17f7 Dropping phase2 from repmgr, it now is an independent tool
postgres authored
179 MonitorExecute();
180 sleep(3);
a12e719 Initial commit
postgres authored
181 }
182 }
183
184
185 /*
186 * Check if its time for next monitor call and if so, do it.
187 */
188
189 void
190 MonitorExecute(void)
191 {
7ec3485 Lag is now monitored in bytes, we show replication lag
postgres authored
192 PGresult *res;
a12e719 Initial commit
postgres authored
193 char sqlquery[8192];
7ec3485 Lag is now monitored in bytes, we show replication lag
postgres authored
194 char monitor_standby_timestamp[MAXLEN];
195 char last_wal_primary_location[MAXLEN];
196 char last_wal_standby_received[MAXLEN];
197 char last_wal_standby_applied[MAXLEN];
198
199 unsigned long long int lsn_primary;
200 unsigned long long int lsn_standby_received;
201 unsigned long long int lsn_standby_applied;
202
203 /* Get local xlog info */
204 sprintf(sqlquery,
205 "SELECT CURRENT_TIMESTAMP, pg_last_xlog_receive_location(), "
362f931 Apply log should be pg_xlog_receive_location() - pg_xlog_replay_locat…
postgres authored
206 "pg_last_xlog_replay_location()");
7ec3485 Lag is now monitored in bytes, we show replication lag
postgres authored
207
208 res = PQexec(myLocalConn, sqlquery);
209 if (PQresultStatus(res) != PGRES_TUPLES_OK)
210 {
211 fprintf(stderr, "PQexec failed: %s", PQerrorMessage(myLocalConn));
212 PQclear(res);
213 return;
214 }
a12e719 Initial commit
postgres authored
215
7ec3485 Lag is now monitored in bytes, we show replication lag
postgres authored
216 strcpy(monitor_standby_timestamp, PQgetvalue(res, 0, 0));
217 strcpy(last_wal_standby_received , PQgetvalue(res, 0, 1));
218 strcpy(last_wal_standby_applied , PQgetvalue(res, 0, 2));
219 PQclear(res);
220
221 /* Get primary xlog info */
222 sprintf(sqlquery, "SELECT pg_current_xlog_location() ");
223
224 res = PQexec(primaryConn, sqlquery);
225 if (PQresultStatus(res) != PGRES_TUPLES_OK)
226 {
227 fprintf(stderr, "PQexec failed: %s", PQerrorMessage(primaryConn));
228 PQclear(res);
229 return;
230 }
231
232 strcpy(last_wal_primary_location, PQgetvalue(res, 0, 0));
233 PQclear(res);
234
235 /* Calculate the lag */
236 lsn_primary = walLocationToBytes(last_wal_primary_location);
237 lsn_standby_received = walLocationToBytes(last_wal_standby_received);
238 lsn_standby_applied = walLocationToBytes(last_wal_standby_applied);
a12e719 Initial commit
postgres authored
239
240 /*
241 * Build the SQL to execute on primary
242 */
243 sprintf(sqlquery,
244 "INSERT INTO repl_status "
245 "VALUES(%d, %d, '%s'::timestamp with time zone, "
7ec3485 Lag is now monitored in bytes, we show replication lag
postgres authored
246 " '%s', '%s', "
247 " %lld, %lld)",
248 primaryId, myLocalId, monitor_standby_timestamp,
249 last_wal_primary_location,
362f931 Apply log should be pg_xlog_receive_location() - pg_xlog_replay_locat…
postgres authored
250 last_wal_standby_received,
7ec3485 Lag is now monitored in bytes, we show replication lag
postgres authored
251 (lsn_primary - lsn_standby_received),
362f931 Apply log should be pg_xlog_receive_location() - pg_xlog_replay_locat…
postgres authored
252 (lsn_standby_received - lsn_standby_applied));
a12e719 Initial commit
postgres authored
253
254 /*
255 * Execute the query asynchronously, but don't check for a result. We
256 * will check the result next time we pause for a monitor step.
257 */
258 if (!PQexec(primaryConn, sqlquery))
259 fprintf(stderr, "replication monitor insert failed: %s",
260 PQerrorMessage(primaryConn));
261 }
262
263
264 void
265 checkClusterConfiguration(void)
266 {
267 PGresult *res;
268
269 res = PQexec(myLocalConn, "SELECT oid FROM pg_class "
270 " WHERE relname = 'repl_nodes'");
271 if (PQresultStatus(res) != PGRES_TUPLES_OK)
272 {
273 fprintf(stderr, "PQexec failed: %s", PQerrorMessage(myLocalConn));
274 PQclear(res);
275 PQfinish(myLocalConn);
276 PQfinish(primaryConn);
277 exit(1);
278 }
279
280 /*
281 * If there isn't any results then we have not configured a primary node yet
282 * in repmgr or the connection string is pointing to the wrong database.
283 * XXX if we are the primary, should we try to create the tables needed?
284 */
285 if (PQntuples(res) == 0)
286 {
287 fprintf(stderr, "The replication cluster is not configured");
288 PQclear(res);
289 PQfinish(myLocalConn);
290 PQfinish(primaryConn);
291 exit(1);
292 }
293 PQclear(res);
294 }
295
296
297 void
298 checkNodeConfiguration(char *conninfo)
299 {
300 PGresult *res;
301 char sqlquery[8192];
302
303 /*
304 * Check if we have my node information in repl_nodes
305 */
306 sprintf(sqlquery, "SELECT * FROM repl_nodes "
307 " WHERE id = %d AND cluster = '%s' ",
308 myLocalId, myClusterName);
309
310 res = PQexec(myLocalConn, sqlquery);
311 if (PQresultStatus(res) != PGRES_TUPLES_OK)
312 {
313 fprintf(stderr, "PQexec failed: %s", PQerrorMessage(myLocalConn));
314 PQclear(res);
315 PQfinish(myLocalConn);
316 PQfinish(primaryConn);
317 exit(1);
318 }
319
320 /*
321 * If there isn't any results then we have not configured this node yet
322 * in repmgr, if that is the case we will insert the node to the cluster
323 */
324 if (PQntuples(res) == 0)
325 {
326 PQclear(res);
327 /* Adding the node */
328 sprintf(sqlquery, "INSERT INTO repl_nodes "
329 "VALUES (%d, '%s', '%s')",
330 myLocalId, myClusterName, conninfo);
331
332 if (!PQexec(primaryConn, sqlquery))
333 {
334 fprintf(stderr, "Cannot insert node details, %s",
335 PQerrorMessage(primaryConn));
336 PQfinish(myLocalConn);
337 PQfinish(primaryConn);
338 exit(1);
339 }
340 }
341 PQclear(res);
342 }
7ec3485 Lag is now monitored in bytes, we show replication lag
postgres authored
343
344
345 unsigned long long int
346 walLocationToBytes(char *wal_location)
347 {
348 unsigned int xlogid;
349 unsigned int xrecoff;
350
351 if (sscanf(wal_location, "%X/%X", &xlogid, &xrecoff) != 2)
352 {
353 fprintf(stderr, "wrong log location format: %s", wal_location);
354 return 0;
355 }
356 return ((xlogid * 16 * 1024 * 1024 * 255) + xrecoff);
357 }
Something went wrong with that request. Please try again.