Permalink
Browse files

Replace PQsetRowProcessor streaming with PQsetSingleRowMode streaming…

… introduced in PostgreSQL 9.2beta3

PostgreSQL decided that PQsetRowProcessor was a bad API, and replaced
it with PQsetSingleRowMode in 9.2beta3.  I agree with the choice, as
the new API is simpler and fairly easy to use.

Currently, this requires a patch to ruby-pg to recognize
PGRES_SINGLE_TUPLE as an OK result status.  That should hopefully
be committed soon.  This also requires the master branch of Sequel,
as it depends on some recent refactoring in the Sequel postgres
adapter.
  • Loading branch information...
1 parent 15edb13 commit b3feea10592df9d7ba48bae0c853406872e59f9b @jeremyevans committed Aug 9, 2012
Showing with 230 additions and 246 deletions.
  1. +4 −0 CHANGELOG
  2. +1 −1 ext/sequel_pg/extconf.rb
  3. +94 −162 ext/sequel_pg/sequel_pg.c
  4. +130 −0 lib/sequel/extensions/pg_streaming.rb
  5. +0 −82 lib/sequel_pg/streaming.rb
  6. +1 −1 sequel_pg.gemspec
View
@@ -1,3 +1,7 @@
+=== HEAD
+
+* Replace PQsetRowProcessor streaming with PQsetSingleRowMode streaming introduced in PostgreSQL 9.2beta3 (jeremyevans)
+
=== 1.5.1 (2012-08-02)
* Sprinkle some RB_GC_GUARD to work around segfaults in the PostgreSQL array parser (jeremyevans)
View
@@ -14,7 +14,7 @@
end
if (have_library('pq') || have_library('libpq') || have_library('ms/libpq')) && have_header('libpq-fe.h')
- have_func 'PQsetRowProcessor'
+ have_func 'PQsetSingleRowMode'
create_makefile("sequel_pg")
else
puts 'Could not find PostgreSQL build environment (libraries & headers): Makefile not created'
View
@@ -103,6 +103,12 @@ static ID spg_id_columns;
static ID spg_id_encoding;
static ID spg_id_values;
+#if HAVE_PQSETSINGLEROWMODE
+static ID spg_id_get_result;
+static ID spg_id_clear;
+static ID spg_id_check;
+#endif
+
#if SPG_ENCODING
static int enc_get_index(VALUE val)
{
@@ -403,7 +409,7 @@ static VALUE spg_timestamp(const char *s, VALUE self) {
}
static VALUE spg_fetch_rows_set_cols(VALUE self, VALUE ignore) {
- return self;
+ return Qnil;
}
static VALUE spg__col_value(VALUE self, PGresult *res, long i, long j, VALUE* colconvert
@@ -813,194 +819,115 @@ static VALUE spg_yield_hash_rows(VALUE self, VALUE rres, VALUE ignore) {
static VALUE spg_supports_streaming_p(VALUE self) {
return
-#if HAVE_PQSETROWPROCESSOR
+#if HAVE_PQSETSINGLEROWMODE
Qtrue;
#else
Qfalse;
#endif
}
-#if HAVE_PQSETROWPROCESSOR
-static VALUE spg__rp_value(VALUE self, PGresult* res, const PGdataValue* dvs, int j, VALUE* colconvert
-#ifdef SPG_ENCODING
-, int enc_index
-#endif
-) {
- const char *v;
- PGdataValue dv = dvs[j];
- VALUE rv;
- size_t l;
- int len = dv.len;
-
- if(len < 0) {
- rv = Qnil;
- } else {
- v = dv.value;
-
- switch(PQftype(res, j)) {
- case 16: /* boolean */
- rv = *v == 't' ? Qtrue : Qfalse;
- break;
- case 17: /* bytea */
- v = PQunescapeBytea((unsigned char*)v, &l);
- rv = rb_funcall(spg_Blob, spg_id_new, 1, rb_str_new(v, l));
- PQfreemem((char *)v);
- break;
- case 20: /* integer */
- case 21:
- case 22:
- case 23:
- case 26:
- rv = rb_str2inum(rb_str_new(v, len), 10);
- break;
- case 700: /* float */
- case 701:
- if (strncmp("NaN", v, 3) == 0) {
- rv = spg_nan;
- } else if (strncmp("Infinity", v, 8) == 0) {
- rv = spg_pos_inf;
- } else if (strncmp("-Infinity", v, 9) == 0) {
- rv = spg_neg_inf;
- } else {
- rv = rb_float_new(rb_str_to_dbl(rb_str_new(v, len), Qfalse));
- }
- break;
- case 790: /* numeric */
- case 1700:
- rv = rb_funcall(spg_BigDecimal, spg_id_new, 1, rb_str_new(v, len));
- break;
- case 1082: /* date */
- rv = rb_str_new(v, len);
- rv = spg_date(StringValuePtr(rv));
- break;
- case 1083: /* time */
- case 1266:
- rv = rb_str_new(v, len);
- rv = spg_time(StringValuePtr(rv));
- break;
- case 1114: /* timestamp */
- case 1184:
- rv = rb_str_new(v, len);
- rv = spg_timestamp(StringValuePtr(rv), self);
- break;
- case 18: /* char */
- case 25: /* text */
- case 1043: /* varchar*/
- rv = rb_tainted_str_new(v, len);
-#ifdef SPG_ENCODING
- rb_enc_associate_index(rv, enc_index);
-#endif
- break;
- default:
- rv = rb_tainted_str_new(v, len);
-#ifdef SPG_ENCODING
- rb_enc_associate_index(rv, enc_index);
-#endif
- if (colconvert[j] != Qnil) {
- rv = rb_funcall(colconvert[j], spg_id_call, 1, rv);
- }
- }
+#if HAVE_PQSETSINGLEROWMODE
+static VALUE spg_set_single_row_mode(VALUE self) {
+ PGconn *conn;
+ Data_Get_Struct(self, PGconn, conn);
+ if (PQsetSingleRowMode(conn) != 1) {
+ rb_raise(spg_PGError, "cannot set single row mode");
}
- return rv;
+ return Qnil;
}
-static int spg_row_processor(PGresult *res, const PGdataValue *columns, const char **errmsgp, void *param) {
+static VALUE spg__yield_each_row(VALUE self) {
+ PGconn *conn;
+ PGresult *res;
+ VALUE rres;
+ VALUE rconn;
+ VALUE colsyms[SPG_MAX_FIELDS];
+ VALUE colconvert[SPG_MAX_FIELDS];
long nfields;
- struct spg_row_proc_info *info;
- info = (struct spg_row_proc_info *)param;
- VALUE *colsyms = info->colsyms;
- VALUE *colconvert = info->colconvert;
- VALUE self = info->dataset;
+ long j;
+ VALUE h;
+ VALUE opts;
+ VALUE pg_type;
+ VALUE pg_value = Qnil;
+ char type = SPG_YIELD_NORMAL;
- switch (PQresultStatus(res))
- {
- case PGRES_TUPLES_OK:
- case PGRES_COPY_OUT:
- case PGRES_COPY_IN:
-#ifdef HAVE_CONST_PGRES_COPY_BOTH
- case PGRES_COPY_BOTH:
+ rconn = rb_ary_entry(self, 1);
+ self = rb_ary_entry(self, 0);
+ Data_Get_Struct(rconn, PGconn, conn);
+
+ rres = rb_funcall(rconn, spg_id_get_result, 0);
+ rb_funcall(rres, spg_id_check, 0);
+ Data_Get_Struct(rres, PGresult, res);
+
+#ifdef SPG_ENCODING
+ int enc_index;
+ enc_index = enc_get_index(rres);
#endif
- case PGRES_EMPTY_QUERY:
- case PGRES_COMMAND_OK:
- break;
- case PGRES_BAD_RESPONSE:
- case PGRES_FATAL_ERROR:
- case PGRES_NONFATAL_ERROR:
- rb_raise(spg_PGError, "error while streaming results");
- default:
- rb_raise(spg_PGError, "unexpected result status while streaming results");
+
+ /* Only handle regular and model types. All other types require compiling all
+ * of the results at once, which is not a use case for streaming. The streaming
+ * code does not call this function for the other types. */
+ opts = rb_funcall(self, spg_id_opts, 0);
+ if (rb_type(opts) == T_HASH) {
+ pg_type = rb_hash_aref(opts, spg_sym__sequel_pg_type);
+ pg_value = rb_hash_aref(opts, spg_sym__sequel_pg_value);
+ if (SYMBOL_P(pg_type) && pg_type == spg_sym_model && rb_type(pg_value) == T_CLASS) {
+ type = SPG_YIELD_MODEL;
+ }
}
nfields = PQnfields(res);
- if(columns == NULL) {
- spg_set_column_info(self, res, colsyms, colconvert);
- rb_ivar_set(self, spg_id_columns, rb_ary_new4(nfields, colsyms));
- } else {
- long j;
- VALUE h, m;
- h = rb_hash_new();
+ if (nfields > SPG_MAX_FIELDS) {
+ rb_funcall(rres, spg_id_clear, 0);
+ rb_raise(rb_eRangeError, "more than %d columns in query", SPG_MAX_FIELDS);
+ }
+
+ spg_set_column_info(self, res, colsyms, colconvert);
+
+ rb_ivar_set(self, spg_id_columns, rb_ary_new4(nfields, colsyms));
+ while (PQntuples(res) != 0) {
+ h = rb_hash_new();
for(j=0; j<nfields; j++) {
- rb_hash_aset(h, colsyms[j], spg__rp_value(self, res, columns, j, colconvert
-#ifdef SPG_ENCODING
- , info->enc_index
-#endif
- ));
+ rb_hash_aset(h, colsyms[j], spg__col_value(self, res, 0, j, colconvert ENC_INDEX));
}
- /* optimize_model_load used, return model instance */
- if ((m = info->model)) {
- m = rb_obj_alloc(m);
- rb_ivar_set(m, spg_id_values, h);
- h = m;
+ rb_funcall(rres, spg_id_clear, 0);
+
+ if(type == SPG_YIELD_MODEL) {
+ /* Abuse local variable */
+ pg_type = rb_obj_alloc(pg_value);
+ rb_ivar_set(pg_type, spg_id_values, h);
+ rb_yield(pg_type);
+ } else {
+ rb_yield(h);
}
- rb_funcall(info->block, spg_id_call, 1, h);
+ rres = rb_funcall(rconn, spg_id_get_result, 0);
+ rb_funcall(rres, spg_id_check, 0);
+ Data_Get_Struct(rres, PGresult, res);
}
- return 1;
-}
+ rb_funcall(rres, spg_id_clear, 0);
-static VALUE spg_unset_row_processor(VALUE rconn) {
- PGconn *conn;
- Data_Get_Struct(rconn, PGconn, conn);
- if ((PQskipResult(conn)) != NULL) {
- /* Results remaining when row processor finished,
- * either because an exception was raised or the iterator
- * exited early, so skip all remaining rows. */
- while(PQgetResult(conn) != NULL) {
- /* Use a separate while loop as PQgetResult is faster than
- * PQskipResult. */
- }
- }
- PQsetRowProcessor(conn, NULL, NULL);
- return Qnil;
+ return self;
}
-static VALUE spg_with_row_processor(VALUE self, VALUE rconn, VALUE dataset, VALUE block) {
- struct spg_row_proc_info info;
+static VALUE spg__flush_results(VALUE rconn) {
PGconn *conn;
+ PGresult *res;
Data_Get_Struct(rconn, PGconn, conn);
- bzero(&info, sizeof(info));
-
- info.dataset = dataset;
- info.block = block;
- info.model = 0;
-#if SPG_ENCODING
- info.enc_index = enc_get_index(rconn);
-#endif
- /* Abuse local variable, detect if optimize_model_load used */
- block = rb_funcall(dataset, spg_id_opts, 0);
- if (rb_type(block) == T_HASH && rb_hash_aref(block, spg_sym__sequel_pg_type) == spg_sym_model) {
- block = rb_hash_aref(block, spg_sym__sequel_pg_value);
- if (rb_type(block) == T_CLASS) {
- info.model = block;
- }
+ while ((res = PQgetResult(conn)) != NULL) {
+ PQclear(res);
}
- PQsetRowProcessor(conn, spg_row_processor, (void*)&info);
- rb_ensure(rb_yield, Qnil, spg_unset_row_processor, rconn);
- return Qnil;
+ return rconn;
+}
+
+static VALUE spg_yield_each_row(VALUE self, VALUE rconn) {
+ VALUE v;
+ v = rb_ary_new3(2, self, rconn);
+ return rb_ensure(spg__yield_each_row, v, spg__flush_results, rconn);
}
#endif
@@ -1081,9 +1008,14 @@ void Init_sequel_pg(void) {
rb_define_singleton_method(spg_Postgres, "supports_streaming?", spg_supports_streaming_p, 0);
-#if HAVE_PQSETROWPROCESSOR
- c = rb_funcall(spg_Postgres, cg, 1, rb_str_new2("Database"));
- rb_define_private_method(c, "with_row_processor", spg_with_row_processor, 3);
+#if HAVE_PQSETSINGLEROWMODE
+ spg_id_get_result = rb_intern("get_result");
+ spg_id_clear = rb_intern("clear");
+ spg_id_check = rb_intern("check");
+
+ rb_define_private_method(c, "yield_each_row", spg_yield_each_row, 1);
+ c = rb_funcall(spg_Postgres, cg, 1, rb_str_new2("Adapter"));
+ rb_define_private_method(c, "set_single_row_mode", spg_set_single_row_mode, 0);
#endif
rb_define_singleton_method(spg_Postgres, "parse_pg_array", parse_pg_array, 2);
Oops, something went wrong.

0 comments on commit b3feea1

Please sign in to comment.