forked from plproxy/plproxy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.c
292 lines (248 loc) · 7.42 KB
/
main.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
/*
* PL/Proxy - easy access to partitioned database.
*
* Copyright (c) 2006-2020 PL/Proxy Authors
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/*
* External interface for PostgreSQL core.
*
* List of memory contexts that are touched by this code:
*
* - Query context that is active when plproxy_call_handler is called.
* Function results should be allocated from here.
*
* - SPI Proc context that activates in SPI_connect() and is freed
* in SPI_finish(). This is used for compile-time short-term storage.
*
* - HTAB has its own memory context.
*
* - ProxyFunction->ctx for long-term allocations for functions.
*
* - cluster_mem where info about clusters is stored.
*
* - SPI_saveplan() stores plan info in separate context,
* so it must be freed explicitly.
*
* - libpq uses malloc() so it must be freed explicitly
*
* Because SPI functions do not honour CurrentMemoryContext
* and code should not have assumptions whether core
* functions do allocations or not, the per-function and
* cluster MemoryContext is switched on only when doing actual
* allocations. Otherwise the default context is kept.
*/
#include "plproxy.h"
#include <sys/time.h>
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(plproxy_call_handler);
PG_FUNCTION_INFO_V1(plproxy_validator);
/*
* Centralised error reporting.
*
* Also frees any pending results.
*/
void
plproxy_error_with_state(ProxyFunction *func, int sqlstate, const char *fmt, ...)
{
char msg[1024];
va_list ap;
va_start(ap, fmt);
vsnprintf(msg, sizeof(msg), fmt, ap);
va_end(ap);
plproxy_clean_results(func->cur_cluster);
ereport(ERROR, (
errcode(sqlstate),
errmsg("PL/Proxy function %s(%d): %s",
func->name, func->arg_count, msg)));
}
/*
* Pass remote error/notice/warning through.
*/
void
plproxy_remote_error(ProxyFunction *func, ProxyConnection *conn, const PGresult *res, bool iserr)
{
const char *ss = PQresultErrorField(res, PG_DIAG_SQLSTATE);
const char *sev = PQresultErrorField(res, PG_DIAG_SEVERITY);
const char *msg = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
const char *det = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
const char *hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
const char *spos = PQresultErrorField(res, PG_DIAG_STATEMENT_POSITION);
const char *ipos = PQresultErrorField(res, PG_DIAG_INTERNAL_POSITION);
const char *iquery = PQresultErrorField(res, PG_DIAG_INTERNAL_QUERY);
const char *ctx = PQresultErrorField(res, PG_DIAG_CONTEXT);
int elevel;
/* libpq errors may not have sqlstate */
if (!ss)
ss = "XX000";
if (iserr)
/* must ignore remote level, as it may be FATAL/PANIC */
elevel = ERROR;
else
/* cannot look at sev here, as it may be localized */
elevel = !strncmp(ss, "00", 2) ? NOTICE : WARNING;
ereport(elevel, (
errcode(MAKE_SQLSTATE(ss[0], ss[1], ss[2], ss[3], ss[4])),
errmsg("%s(%d): [%s] REMOTE %s: %s", func->name, func->arg_count, PQdb(conn->cur->db), sev, msg),
det ? errdetail("Remote detail: %s", det) : 0,
hint ? errhint("Remote hint: %s", hint) : 0,
spos ? errposition(atoi(spos)) : 0,
ipos ? internalerrposition(atoi(ipos)) : 0,
iquery ? internalerrquery(iquery) : 0,
ctx ? errcontext("Remote context: %s", ctx) : 0));
}
/*
* Library load-time initialization.
* Do the initialization when SPI is active to simplify the code.
*/
static bool initialized = false;
static void
plproxy_startup_init(void)
{
if (initialized)
return;
plproxy_function_cache_init();
plproxy_cluster_cache_init();
plproxy_syscache_callback_init();
initialized = true;
}
/*
* Regular maintenance over all clusters.
*/
static void
run_maint(void)
{
static struct timeval last = {0, 0};
struct timeval now;
if (!initialized)
return;
gettimeofday(&now, NULL);
if (now.tv_sec - last.tv_sec < 2 * 60)
return;
last = now;
plproxy_cluster_maint(&now);
}
/*
* Do compilation and execution under SPI.
*
* Result conversion will be done without SPI.
*/
static ProxyFunction *
compile_and_execute(FunctionCallInfo fcinfo)
{
int err;
ProxyFunction *func;
ProxyCluster *cluster;
/* prepare SPI */
err = SPI_connect();
if (err != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect: %s", SPI_result_code_string(err));
/* do the initialization also under SPI */
plproxy_startup_init();
/* compile code */
func = plproxy_compile_and_cache(fcinfo);
/* get actual cluster to run on */
cluster = plproxy_find_cluster(func, fcinfo);
/* Don't allow nested calls on the same cluster */
if (cluster->busy)
plproxy_error(func, "Nested PL/Proxy calls to the same cluster are not supported.");
/* fetch PGresults */
func->cur_cluster = cluster;
plproxy_exec(func, fcinfo);
/* done with SPI */
err = SPI_finish();
if (err != SPI_OK_FINISH)
elog(ERROR, "SPI_finish: %s", SPI_result_code_string(err));
return func;
}
/*
* Logic for set-returning functions.
*
* Currently it uses the simplest, return
* one value/tuple per call mechanism.
*/
static Datum
handle_ret_set(FunctionCallInfo fcinfo)
{
ProxyFunction *func;
FuncCallContext *ret_ctx;
if (SRF_IS_FIRSTCALL())
{
func = compile_and_execute(fcinfo);
ret_ctx = SRF_FIRSTCALL_INIT();
ret_ctx->user_fctx = func;
}
ret_ctx = SRF_PERCALL_SETUP();
func = ret_ctx->user_fctx;
if (func->cur_cluster->ret_total > 0)
{
SRF_RETURN_NEXT(ret_ctx, plproxy_result(func, fcinfo));
}
else
{
plproxy_clean_results(func->cur_cluster);
SRF_RETURN_DONE(ret_ctx);
}
}
/*
* The PostgreSQL function & trigger manager calls this function
* for execution of PL/Proxy procedures.
*
* Main entry point for rest of the code.
*/
Datum
plproxy_call_handler(PG_FUNCTION_ARGS)
{
ProxyFunction *func;
Datum ret;
if (CALLED_AS_TRIGGER(fcinfo))
elog(ERROR, "PL/Proxy procedures can't be used as triggers");
/* clean old results */
if (!fcinfo->flinfo->fn_retset || SRF_IS_FIRSTCALL())
run_maint();
if (fcinfo->flinfo->fn_retset)
{
ret = handle_ret_set(fcinfo);
}
else
{
func = compile_and_execute(fcinfo);
if (func->cur_cluster->ret_total != 1)
plproxy_error_with_state(func,
(func->cur_cluster->ret_total < 1) ? ERRCODE_NO_DATA_FOUND : ERRCODE_TOO_MANY_ROWS,
"Non-SETOF function requires 1 row from remote query, got %d",
func->cur_cluster->ret_total);
ret = plproxy_result(func, fcinfo);
plproxy_clean_results(func->cur_cluster);
}
return ret;
}
/*
* This function is called when a PL/Proxy function is created to
* check the syntax.
*/
Datum
plproxy_validator(PG_FUNCTION_ARGS)
{
Oid oid = PG_GETARG_OID(0);
HeapTuple proc_tuple;
if (!CheckFunctionValidatorAccess(fcinfo->flinfo->fn_oid, oid))
PG_RETURN_VOID();
proc_tuple = SearchSysCache(PROCOID, ObjectIdGetDatum(oid), 0, 0, 0);
if (!HeapTupleIsValid(proc_tuple))
elog(ERROR, "cache lookup failed for function %u", oid);
plproxy_compile(NULL, proc_tuple, true);
ReleaseSysCache(proc_tuple);
PG_RETURN_VOID();
}