Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Active Record: clear query cache automatically when calling #execute #48061

Merged
merged 3 commits into from Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -124,7 +124,7 @@ def write_query?(sql)
# method may be manually memory managed. Consider using the exec_query
# wrapper instead.
def execute(sql, name = nil, allow_retry: false)
raise NotImplementedError
internal_execute(sql, name, allow_retry: allow_retry)
end

# Executes +sql+ statement in the context of this connection using
Expand Down Expand Up @@ -491,16 +491,25 @@ def high_precision_current_timestamp
end

private
def internal_execute(sql, name = "SCHEMA")
execute(sql, name)
def internal_execute(sql, name = "SCHEMA", allow_retry: false, uses_transaction: true)
sql = transform_query(sql)
check_if_write_query(sql)

mark_transaction_written_if_write(sql)

raw_execute(sql, name, allow_retry: allow_retry, uses_transaction: uses_transaction)
end

def execute_batch(statements, name = nil)
statements.each do |statement|
execute(statement, name)
internal_execute(statement, name)
end
end

def raw_execute(sql, name, async: false, allow_retry: false, uses_transaction: true)
raise NotImplementedError
end

DEFAULT_INSERT_VALUE = Arel.sql("DEFAULT").freeze
private_constant :DEFAULT_INSERT_VALUE

Expand Down
Expand Up @@ -7,7 +7,7 @@ module ConnectionAdapters # :nodoc:
module QueryCache
class << self
def included(base) # :nodoc:
dirties_query_cache base, :create, :insert, :update, :delete, :truncate, :truncate_tables,
dirties_query_cache base, :execute, :create, :insert, :update, :delete, :truncate, :truncate_tables,
:rollback_to_savepoint, :rollback_db_transaction, :restart_db_transaction, :exec_insert_all

base.set_callback :checkout, :after, :configure_query_cache!
Expand All @@ -17,7 +17,7 @@ def included(base) # :nodoc:
def dirties_query_cache(base, *method_names)
method_names.each do |method_name|
base.class_eval <<-end_code, __FILE__, __LINE__ + 1
def #{method_name}(*)
def #{method_name}(...)
ActiveRecord::Base.clear_query_caches_for_current_thread
super
end
Expand Down
Expand Up @@ -220,27 +220,6 @@ def disable_referential_integrity # :nodoc:
# DATABASE STATEMENTS ======================================
#++

# Executes the SQL statement in the context of this connection.
#
# Setting +allow_retry+ to true causes the db to reconnect and retry
# executing the SQL statement in case of a connection-related exception.
# This option should only be enabled for known idempotent queries.
def execute(sql, name = nil, allow_retry: false)
sql = transform_query(sql)
check_if_write_query(sql)

mark_transaction_written_if_write(sql)

log(sql, name) do
with_raw_connection(allow_retry: allow_retry) do |conn|
sync_timezone_changes(conn)
result = conn.query(sql)
handle_warnings(sql)
result
end
end
end

# Mysql2Adapter doesn't have to free a result after using it, but we use this method
# to write stuff in an abstract way without concerning ourselves about whether it
# needs to be explicitly freed or not.
Expand All @@ -253,11 +232,11 @@ def execute_and_free(sql, name = nil, async: false) # :nodoc:
end

def begin_db_transaction # :nodoc:
internal_execute("BEGIN", "TRANSACTION")
internal_execute("BEGIN", "TRANSACTION", allow_retry: true, uses_transaction: false)
end

def begin_isolated_db_transaction(isolation) # :nodoc:
internal_execute "SET TRANSACTION ISOLATION LEVEL #{transaction_isolation_levels.fetch(isolation)}", "TRANSACTION"
internal_execute("SET TRANSACTION ISOLATION LEVEL #{transaction_isolation_levels.fetch(isolation)}", "TRANSACTION", allow_retry: true, uses_transaction: false)
begin_db_transaction
end

Expand Down Expand Up @@ -746,19 +725,6 @@ def extended_type_map_key
end
end

def raw_execute(sql, name, async: false, allow_retry: false, uses_transaction: true)
mark_transaction_written_if_write(sql)

log(sql, name, async: async) do
with_raw_connection(allow_retry: allow_retry, uses_transaction: uses_transaction) do |conn|
sync_timezone_changes(conn)
result = conn.query(sql)
handle_warnings(sql)
result
end
end
end

def handle_warnings(sql)
return if ActiveRecord.db_warnings_action.nil? || @raw_connection.warning_count == 0

Expand All @@ -781,12 +747,6 @@ def warning_ignored?(warning)
def sync_timezone_changes(raw_connection)
end

def internal_execute(sql, name = "SCHEMA", allow_retry: true, uses_transaction: false)
sql = transform_query(sql)
check_if_write_query(sql)
raw_execute(sql, name, allow_retry: allow_retry, uses_transaction: uses_transaction)
end

# See https://dev.mysql.com/doc/mysql-errors/en/server-error-reference.html
ER_DB_CREATE_EXISTS = 1007
ER_FILSORT_ABORT = 1028
Expand Down
Expand Up @@ -18,10 +18,6 @@ def select_all(*, **) # :nodoc:
result
end

def query(sql, name = nil) # :nodoc:
execute(sql, name).to_a
end

READ_QUERY = ActiveRecord::ConnectionAdapters::AbstractAdapter.build_read_query_regexp(
:desc, :describe, :set, :show, :use
) # :nodoc:
Expand Down Expand Up @@ -169,6 +165,17 @@ def max_allowed_packet
@max_allowed_packet ||= show_variable("max_allowed_packet")
end

def raw_execute(sql, name, async: false, allow_retry: false, uses_transaction: true)
log(sql, name, async: async) do
with_raw_connection(allow_retry: allow_retry, uses_transaction: uses_transaction) do |conn|
sync_timezone_changes(conn)
result = conn.query(sql)
handle_warnings(sql)
result
end
end
end

def exec_stmt_and_free(sql, name, binds, cache_stmt: false, async: false)
sql = transform_query(sql)
check_if_write_query(sql)
Expand Down
Expand Up @@ -41,30 +41,18 @@ def write_query?(sql) # :nodoc:
#
# Note: the PG::Result object is manually memory managed; if you don't
# need it specifically, you may want consider the <tt>exec_query</tt> wrapper.
def execute(sql, name = nil, allow_retry: false)
sql = transform_query(sql)
check_if_write_query(sql)

mark_transaction_written_if_write(sql)

with_raw_connection(allow_retry: allow_retry) do |conn|
log(sql, name) do
result = conn.async_exec(sql)
handle_warnings(sql)
result
end
end
def execute(...) # :nodoc:
super
ensure
@notice_receiver_sql_warnings = []
end

def internal_execute(sql, name = "SCHEMA", allow_retry: true, uses_transaction: false)
sql = transform_query(sql)
check_if_write_query(sql)

with_raw_connection(allow_retry: allow_retry, uses_transaction: uses_transaction) do |conn|
log(sql, name) do
conn.async_exec(sql)
def raw_execute(sql, name, async: false, allow_retry: false, uses_transaction: true)
log(sql, name, async: async) do
with_raw_connection(allow_retry: allow_retry, uses_transaction: uses_transaction) do |conn|
result = conn.async_exec(sql)
handle_warnings(result)
result
end
end
end
Expand Down Expand Up @@ -122,11 +110,11 @@ def exec_insert(sql, name = nil, binds = [], pk = nil, sequence_name = nil) # :n

# Begins a transaction.
def begin_db_transaction # :nodoc:
internal_execute("BEGIN", "TRANSACTION")
internal_execute("BEGIN", "TRANSACTION", allow_retry: true, uses_transaction: false)
end

def begin_isolated_db_transaction(isolation) # :nodoc:
internal_execute("BEGIN ISOLATION LEVEL #{transaction_isolation_levels.fetch(isolation)}", "TRANSACTION")
internal_execute("BEGIN ISOLATION LEVEL #{transaction_isolation_levels.fetch(isolation)}", "TRANSACTION", allow_retry: true, uses_transaction: false)
end

# Commits a transaction.
Expand Down
Expand Up @@ -864,8 +864,8 @@ def exec_cache(sql, name, binds, async:, allow_retry:, uses_transaction:)
stmt_key = prepare_statement(sql, binds)
type_casted_binds = type_casted_binds(binds)

log(sql, name, binds, type_casted_binds, stmt_key, async: async) do
with_raw_connection do |conn|
with_raw_connection do |conn|
log(sql, name, binds, type_casted_binds, stmt_key, async: async) do
conn.exec_prepared(stmt_key, type_casted_binds)
end
end
Expand Down
Expand Up @@ -21,19 +21,6 @@ def explain(arel, binds = [], _options = [])
SQLite3::ExplainPrettyPrinter.new.pp(result)
end

def execute(sql, name = nil, allow_retry: false) # :nodoc:
sql = transform_query(sql)
check_if_write_query(sql)

mark_transaction_written_if_write(sql)

log(sql, name) do
with_raw_connection(allow_retry: allow_retry) do |conn|
conn.execute(sql)
end
end
end

def exec_query(sql, name = nil, binds = [], prepare: false, async: false) # :nodoc:
sql = transform_query(sql)
check_if_write_query(sql)
Expand Down Expand Up @@ -122,6 +109,14 @@ def high_precision_current_timestamp
end

private
def raw_execute(sql, name, async: false, allow_retry: false, uses_transaction: false)
log(sql, name, async: async) do
with_raw_connection(allow_retry: allow_retry, uses_transaction: uses_transaction) do |conn|
conn.execute(sql)
end
end
end

def reset_read_uncommitted
read_uncommitted = ActiveSupport::IsolatedExecutionState[:active_record_read_uncommitted]
return unless read_uncommitted
Expand Down
Expand Up @@ -691,7 +691,7 @@ def reconnect
def configure_connection
@raw_connection.busy_timeout(self.class.type_cast_config_to_integer(@config[:timeout])) if @config[:timeout]

execute("PRAGMA foreign_keys = ON", "SCHEMA")
raw_execute("PRAGMA foreign_keys = ON", "SCHEMA")
end
end
ActiveSupport.run_load_hooks(:active_record_sqlite3adapter, SQLite3Adapter)
Expand Down
Expand Up @@ -39,6 +39,7 @@ def explain(arel, binds = [], options = [])
def exec_query(sql, name = "SQL", binds = [], prepare: false, async: false) # :nodoc:
sql = transform_query(sql)
check_if_write_query(sql)
mark_transaction_written_if_write(sql)

result = raw_execute(sql, name, async: async)
ActiveRecord::Result.new(result.fields, result.to_a)
Expand All @@ -47,13 +48,15 @@ def exec_query(sql, name = "SQL", binds = [], prepare: false, async: false) # :n
def exec_insert(sql, name, binds, pk = nil, sequence_name = nil) # :nodoc:
sql = transform_query(sql)
check_if_write_query(sql)
mark_transaction_written_if_write(sql)

raw_execute(to_sql(sql, binds), name)
end

def exec_delete(sql, name = nil, binds = []) # :nodoc:
sql = transform_query(sql)
check_if_write_query(sql)
mark_transaction_written_if_write(sql)

result = raw_execute(to_sql(sql, binds), name)
result.affected_rows
Expand All @@ -78,6 +81,17 @@ def build_explain_clause(options = [])
end

private
def raw_execute(sql, name, async: false, allow_retry: false, uses_transaction: true)
log(sql, name, async: async) do
with_raw_connection(allow_retry: allow_retry, uses_transaction: uses_transaction) do |conn|
sync_timezone_changes(conn)
result = conn.query(sql)
handle_warnings(sql)
result
end
end
end

def last_inserted_id(result)
result.last_insert_id
end
Expand Down
Expand Up @@ -142,13 +142,6 @@ def discard!
end
end

def execute(sql, name = nil, allow_retry: false)
sql = transform_query(sql)
check_if_write_query(sql)

raw_execute(sql, name, allow_retry: allow_retry)
end

private
def each_hash(result)
return to_enum(:each_hash, result) unless block_given?
Expand Down