/
database_statements.rb
209 lines (177 loc) · 7.88 KB
/
database_statements.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# frozen_string_literal: true
module ActiveRecord
module ConnectionAdapters
module PostgreSQL
module DatabaseStatements
def explain(arel, binds = [], options = [])
sql = build_explain_clause(options) + " " + to_sql(arel, binds)
result = internal_exec_query(sql, "EXPLAIN", binds)
PostgreSQL::ExplainPrettyPrinter.new.pp(result)
end
# Queries the database and returns the results in an Array-like object
def query(sql, name = nil) # :nodoc:
mark_transaction_written_if_write(sql)
log(sql, name) do |notification_payload|
with_raw_connection do |conn|
result = conn.async_exec(sql).map_types!(@type_map_for_results).values
verified!
notification_payload[:row_count] = result.count
result
end
end
end
READ_QUERY = ActiveRecord::ConnectionAdapters::AbstractAdapter.build_read_query_regexp(
:close, :declare, :fetch, :move, :set, :show
) # :nodoc:
private_constant :READ_QUERY
def write_query?(sql) # :nodoc:
!READ_QUERY.match?(sql)
rescue ArgumentError # Invalid encoding
!READ_QUERY.match?(sql.b)
end
# Executes an SQL statement, returning a PG::Result object on success
# or raising a PG::Error exception otherwise.
#
# Setting +allow_retry+ to true causes the db to reconnect and retry
# executing the SQL statement in case of a connection-related exception.
# This option should only be enabled for known idempotent queries.
#
# Note: the PG::Result object is manually memory managed; if you don't
# need it specifically, you may want consider the <tt>exec_query</tt> wrapper.
def execute(...) # :nodoc:
super
ensure
@notice_receiver_sql_warnings = []
end
def raw_execute(sql, name, async: false, allow_retry: false, materialize_transactions: true)
log(sql, name, async: async) do |notification_payload|
with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn|
result = conn.async_exec(sql)
verified!
handle_warnings(result)
notification_payload[:row_count] = result.count
result
end
end
end
def internal_exec_query(sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false, materialize_transactions: true) # :nodoc:
execute_and_clear(sql, name, binds, prepare: prepare, async: async, allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |result|
types = {}
fields = result.fields
fields.each_with_index do |fname, i|
ftype = result.ftype i
fmod = result.fmod i
types[fname] = types[i] = get_oid_type(ftype, fmod, fname)
end
build_result(columns: fields, rows: result.values, column_types: types)
end
end
def exec_delete(sql, name = nil, binds = []) # :nodoc:
execute_and_clear(sql, name, binds) { |result| result.cmd_tuples }
end
alias :exec_update :exec_delete
def exec_insert(sql, name = nil, binds = [], pk = nil, sequence_name = nil, returning: nil) # :nodoc:
if use_insert_returning? || pk == false
super
else
result = internal_exec_query(sql, name, binds)
unless sequence_name
table_ref = extract_table_ref_from_insert_sql(sql)
if table_ref
pk = primary_key(table_ref) if pk.nil?
pk = suppress_composite_primary_key(pk)
sequence_name = default_sequence_name(table_ref, pk)
end
return result unless sequence_name
end
last_insert_id_result(sequence_name)
end
end
# Begins a transaction.
def begin_db_transaction # :nodoc:
internal_execute("BEGIN", "TRANSACTION", allow_retry: true, materialize_transactions: false)
end
def begin_isolated_db_transaction(isolation) # :nodoc:
internal_execute("BEGIN ISOLATION LEVEL #{transaction_isolation_levels.fetch(isolation)}", "TRANSACTION", allow_retry: true, materialize_transactions: false)
end
# Commits a transaction.
def commit_db_transaction # :nodoc:
internal_execute("COMMIT", "TRANSACTION", allow_retry: false, materialize_transactions: true)
end
# Aborts a transaction.
def exec_rollback_db_transaction # :nodoc:
cancel_any_running_query
internal_execute("ROLLBACK", "TRANSACTION", allow_retry: false, materialize_transactions: true)
end
def exec_restart_db_transaction # :nodoc:
cancel_any_running_query
internal_execute("ROLLBACK AND CHAIN", "TRANSACTION", allow_retry: false, materialize_transactions: true)
end
# From https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-CURRENT
HIGH_PRECISION_CURRENT_TIMESTAMP = Arel.sql("CURRENT_TIMESTAMP", retryable: true).freeze # :nodoc:
private_constant :HIGH_PRECISION_CURRENT_TIMESTAMP
def high_precision_current_timestamp
HIGH_PRECISION_CURRENT_TIMESTAMP
end
def build_explain_clause(options = [])
return "EXPLAIN" if options.empty?
"EXPLAIN (#{options.join(", ").upcase})"
end
# Set when constraints will be checked for the current transaction.
#
# Not passing any specific constraint names will set the value for all deferrable constraints.
#
# [<tt>deferred</tt>]
# Valid values are +:deferred+ or +:immediate+.
#
# See https://www.postgresql.org/docs/current/sql-set-constraints.html
def set_constraints(deferred, *constraints)
unless %i[deferred immediate].include?(deferred)
raise ArgumentError, "deferred must be :deferred or :immediate"
end
constraints = if constraints.empty?
"ALL"
else
constraints.map { |c| quote_table_name(c) }.join(", ")
end
execute("SET CONSTRAINTS #{constraints} #{deferred.to_s.upcase}")
end
private
IDLE_TRANSACTION_STATUSES = [PG::PQTRANS_IDLE, PG::PQTRANS_INTRANS, PG::PQTRANS_INERROR]
private_constant :IDLE_TRANSACTION_STATUSES
def cancel_any_running_query
return if @raw_connection.nil? || IDLE_TRANSACTION_STATUSES.include?(@raw_connection.transaction_status)
@raw_connection.cancel
@raw_connection.block
rescue PG::Error
end
def execute_batch(statements, name = nil)
execute(combine_multi_statements(statements))
end
def build_truncate_statements(table_names)
["TRUNCATE TABLE #{table_names.map(&method(:quote_table_name)).join(", ")}"]
end
# Returns the current ID of a table's sequence.
def last_insert_id_result(sequence_name)
internal_exec_query("SELECT currval(#{quote(sequence_name)})", "SQL")
end
def returning_column_values(result)
result.rows.first
end
def suppress_composite_primary_key(pk)
pk unless pk.is_a?(Array)
end
def handle_warnings(sql)
@notice_receiver_sql_warnings.each do |warning|
next if warning_ignored?(warning)
warning.sql = sql
ActiveRecord.db_warnings_action.call(warning)
end
end
def warning_ignored?(warning)
["WARNING", "ERROR", "FATAL", "PANIC"].exclude?(warning.level) || super
end
end
end
end
end