-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
postgresql.rb
242 lines (216 loc) · 8.11 KB
/
postgresql.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# frozen-string-literal: true
Sequel::JDBC.load_driver('org.postgresql.Driver', :Postgres)
require_relative '../shared/postgres'
module Sequel
module JDBC
Sequel.synchronize do
DATABASE_SETUP[:postgresql] = proc do |db|
db.dataset_class = Sequel::JDBC::Postgres::Dataset
db.extend(Sequel::JDBC::Postgres::DatabaseMethods)
org.postgresql.Driver
end
end
module Postgres
# Return PostgreSQL array types as ruby Arrays instead of
# JDBC PostgreSQL driver-specific array type. Only used if the
# database does not have a conversion proc for the type.
def self.RubyPGArray(r, i)
if v = r.getArray(i)
v.array.to_ary
end
end
# Return PostgreSQL hstore types as ruby Hashes instead of
# Java HashMaps. Only used if the database does not have a
# conversion proc for the type.
def self.RubyPGHstore(r, i)
if v = r.getObject(i)
v.to_hash
end
end
module DatabaseMethods
include Sequel::Postgres::DatabaseMethods
# Add the primary_keys and primary_key_sequences instance variables,
# so we can get the correct return values for inserted rows.
def self.extended(db)
super
db.send(:initialize_postgres_adapter)
end
# Remove any current entry for the oid in the oid_convertor_map.
def add_conversion_proc(oid, *)
super
Sequel.synchronize{@oid_convertor_map.delete(oid)}
end
# See Sequel::Postgres::Adapter#copy_into
def copy_into(table, opts=OPTS)
data = opts[:data]
data = Array(data) if data.is_a?(String)
if block_given? && data
raise Error, "Cannot provide both a :data option and a block to copy_into"
elsif !block_given? && !data
raise Error, "Must provide either a :data option or a block to copy_into"
end
synchronize(opts[:server]) do |conn|
begin
copy_manager = org.postgresql.copy.CopyManager.new(conn)
copier = copy_manager.copy_in(copy_into_sql(table, opts))
if block_given?
while buf = yield
java_bytes = buf.to_java_bytes
copier.writeToCopy(java_bytes, 0, java_bytes.length)
end
else
data.each do |d|
java_bytes = d.to_java_bytes
copier.writeToCopy(java_bytes, 0, java_bytes.length)
end
end
rescue Exception => e
copier.cancelCopy if copier
raise
ensure
unless e
begin
copier.endCopy
rescue NativeException => e2
raise_error(e2)
end
end
end
end
end
# See Sequel::Postgres::Adapter#copy_table
def copy_table(table, opts=OPTS)
synchronize(opts[:server]) do |conn|
copy_manager = org.postgresql.copy.CopyManager.new(conn)
copier = copy_manager.copy_out(copy_table_sql(table, opts))
begin
if block_given?
while buf = copier.readFromCopy
yield(String.from_java_bytes(buf))
end
nil
else
b = String.new
while buf = copier.readFromCopy
b << String.from_java_bytes(buf)
end
b
end
rescue => e
raise_error(e, :disconnect=>true)
ensure
if buf && !e
raise DatabaseDisconnectError, "disconnecting as a partial COPY may leave the connection in an unusable state"
end
end
end
end
def oid_convertor_proc(oid)
if (conv = Sequel.synchronize{@oid_convertor_map[oid]}).nil?
conv = if pr = conversion_procs[oid]
lambda do |r, i|
if v = r.getString(i)
pr.call(v)
end
end
else
false
end
Sequel.synchronize{@oid_convertor_map[oid] = conv}
end
conv
end
private
def disconnect_error?(exception, opts)
super || exception.message =~ /\A(This connection has been closed\.|FATAL: terminating connection due to administrator command|An I\/O error occurred while sending to the backend\.)\z/
end
# For PostgreSQL-specific types, return the string that should be used
# as the PGObject value. Returns nil by default, loading pg_* extensions
# will override this to add support for specific types.
def bound_variable_arg(arg, conn)
nil
end
# Work around issue when using Sequel's bound variable support where the
# same SQL is used in different bound variable calls, but the schema has
# changed between the calls. This is necessary as jdbc-postgres versions
# after 9.4.1200 violate the JDBC API. These versions cache separate
# PreparedStatement instances, which are eventually prepared server side after the
# prepareThreshold is met. The JDBC API violation is that PreparedStatement#close
# does not release the server side prepared statement.
def prepare_jdbc_statement(conn, sql, opts)
ps = super
unless opts[:name]
ps.prepare_threshold = 0
end
ps
end
# If the given argument is a recognized PostgreSQL-specific type, create
# a PGObject instance with unknown type and the bound argument string value,
# and set that as the prepared statement argument.
def set_ps_arg(cps, arg, i)
if v = bound_variable_arg(arg, nil)
obj = org.postgresql.util.PGobject.new
obj.setType("unknown")
obj.setValue(v)
cps.setObject(i, obj)
else
super
end
end
# Use setNull for nil arguments as the default behavior of setString
# with nil doesn't appear to work correctly on PostgreSQL.
def set_ps_arg_nil(cps, i)
cps.setNull(i, JavaSQL::Types::NULL)
end
# Execute the connection configuration SQL queries on the connection.
def setup_connection_with_opts(conn, opts)
conn = super
statement(conn) do |stmt|
connection_configuration_sqls(opts).each{|sql| log_connection_yield(sql, conn){stmt.execute(sql)}}
end
conn
end
def setup_type_convertor_map
super
@oid_convertor_map = {}
end
end
class Dataset < JDBC::Dataset
include Sequel::Postgres::DatasetMethods
private
# Literalize strings similar to the native postgres adapter
def literal_string_append(sql, v)
sql << "'" << db.synchronize(@opts[:server]){|c| c.escape_string(v)} << "'"
end
# SQL fragment for Sequel::SQLTime, containing just the time part
def literal_sqltime(v)
v.strftime("'%H:%M:%S#{sprintf(".%03d", (v.usec/1000.0).round)}'")
end
STRING_TYPE = Java::JavaSQL::Types::VARCHAR
ARRAY_TYPE = Java::JavaSQL::Types::ARRAY
ARRAY_METHOD = Postgres.method(:RubyPGArray)
PG_SPECIFIC_TYPES = [ARRAY_TYPE, Java::JavaSQL::Types::OTHER, Java::JavaSQL::Types::STRUCT].freeze
HSTORE_METHOD = Postgres.method(:RubyPGHstore)
def type_convertor(map, meta, type, i)
case type
when *PG_SPECIFIC_TYPES
oid = meta.getField(i).getOID
if pr = db.oid_convertor_proc(oid)
pr
elsif type == ARRAY_TYPE
ARRAY_METHOD
elsif oid == 2950 # UUID
map[STRING_TYPE]
elsif meta.getPGType(i) == 'hstore'
HSTORE_METHOD
else
super
end
else
super
end
end
end
end
end
end