Skip to content

Commit

Permalink
reorganize sql files and move mutex under scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
oldmoe committed Jun 8, 2024
1 parent 9cdf123 commit 71a7f73
Show file tree
Hide file tree
Showing 14 changed files with 225 additions and 229 deletions.
2 changes: 1 addition & 1 deletion lib/litestack/litecable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def create_listener
end

def create_connection
super("#{__dir__}/litecable.sql.yml") do |conn|
super("#{__dir__}/sql/litecable.sql.yml") do |conn|
conn.wal_autocheckpoint = 10000
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/litestack/litecache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def spawn_worker
end

def create_connection
super("#{__dir__}/litecache.sql.yml") do |conn|
super("#{__dir__}/sql/litecache.sql.yml") do |conn|
conn.cache_size = 2000
conn.journal_size_limit = [(@options[:size] / 2).to_i, @options[:min_size]].min
conn.max_page_count = (@options[:size] / conn.page_size).to_i
Expand Down
176 changes: 176 additions & 0 deletions lib/litestack/liteconnection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
module Litesupport
module Liteconnection
include Litescheduler::Forkable

# close, setup, run_stmt and run_sql assume a single connection was created

def options
@options
end

def close
@running = false
@conn.acquire do |q|
q.stmts.each_pair { |k, v| q.stmts[k].close }
q.close
end
end

def size
run_sql("SELECT size.page_size * count.page_count FROM pragma_page_size() AS size, pragma_page_count() AS count")[0][0].to_f / (1024 * 1024)
end

def journal_mode
run_method(:journal_mode)
end

def synchronous
run_method(:synchronous)
end

def path
run_method(:filename)
end

private # all methods are private

def init(options = {})
# c configure the object, loading options from the appropriate location
configure(options)
# setup connections and background threads
setup
# handle process exiting
at_exit do
exit_callback
end
# handle forking (restart connections and background threads)
Litescheduler::ForkListener.listen do
setup
end
end

def configure(options = {})
# detect environment (production, development, etc.)
defaults = begin
self.class::DEFAULT_OPTIONS
rescue
{}
end
@options = defaults.merge(options)
config = begin
YAML.load(ERB.new(File.read(@options[:config_path])).result)
rescue
{}
end # an empty hash won't hurt
config = config[Litesupport.environment] if config[Litesupport.environment] # if there is a config for the current environment defined then use it, otherwise use the top level declaration
config.keys.each do |k| # symbolize keys
config[k.to_sym] = config[k]
config.delete k
end
@options.merge!(config)
@options.merge!(options) # make sure options passed to initialize trump everything else
end

def setup
@conn = create_pooled_connection(@options[:connection_count])
@logger = create_logger
@running = true
end

def create_logger
@options[:logger] = nil unless @options[:logger]
return @options[:logger] if @options[:logger].respond_to? :info
return Logger.new($stdout) if @options[:logger] == "STDOUT"
return Logger.new($stderr) if @options[:logger] == "STDERR"
return Logger.new(@options[:logger]) if @options[:logger].is_a? String
Logger.new(IO::NULL)
end

def exit_callback
close
end

def run_stmt(stmt, *args)
acquire_connection { |conn| conn.stmts[stmt].execute!(*args) }
end

def run_sql(sql, *args)
acquire_connection { |conn| conn.execute(sql, args) }
end

def run_method(method, *args)
acquire_connection { |conn| conn.send(method, *args) }
end

def run_stmt_method(stmt, method, *args)
acquire_connection { |conn| conn.stmts[stmt].send(method, *args) }
end

def acquire_connection
if @checked_out_conn
yield @checked_out_conn
else
@conn.acquire{ |conn| yield conn }
end
end

# this will force the other run_* methods to use the
# checked out connection if one exists
def with_connection
@conn.acquire do |conn|
begin
@checked_out_conn = conn
yield self
ensure
@checked_out_conn = nil
end
end
end

def create_pooled_connection(count = 1)
count = 1 unless count&.is_a?(Integer)
Litesupport::Pool.new(count) { create_connection }
end

# common db object options
def create_connection(path_to_sql_file = nil)
conn = SQLite3::Database.new(@options[:path])
conn.busy_handler { Litescheduler.switch || sleep(rand * 0.002) }
conn.journal_mode = "WAL"
conn.synchronous = @options[:sync] || 1
conn.mmap_size = @options[:mmap_size] || 0
conn.instance_variable_set(:@stmts, {})
class << conn
attr_reader :stmts
end
yield conn if block_given?
# use the <client>.sql.yml file to define the schema and compile prepared statements
unless path_to_sql_file.nil?
sql = YAML.load_file(path_to_sql_file)
version = conn.get_first_value("PRAGMA user_version")
sql["schema"].each_pair do |v, obj|
if v > version
conn.transaction do
obj.each do |k, s|
conn.execute(s)
rescue Exception => e # standard:disable Lint/RescueException
warn "Error parsing #{k}"
warn s
raise e
end
conn.user_version = v
end
end
end
sql["stmts"].each do |k, v|
conn.stmts[k.to_sym] = conn.prepare(v)
rescue Exception => e # standard:disable Lint/RescueException
warn "Error parsing #{k}"
warn v
raise e
end
end
conn
end
end
end
9 changes: 5 additions & 4 deletions lib/litestack/litejobqueue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Litejobqueue < Litequeue
}

@@queue = nil
@@mutex = Litescheduler::Mutex.new

attr_reader :running

Expand All @@ -53,7 +54,7 @@ class Litejobqueue < Litequeue
# a method that returns a single instance of the job queue
# for use by Litejob
def self.jobqueue(options = {})
@@queue ||= Litescheduler.synchronize { new(options) }
@@queue ||= @@mutex.synchronize { new(options) }
end

def self.new(options = {})
Expand Down Expand Up @@ -165,15 +166,15 @@ def setup
@jobs_in_flight = 0
@workers = @options[:workers].times.collect { create_worker }
@gc = create_garbage_collector
@mutex = Litesupport::Mutex.new
@mutex = Litescheduler::Mutex.new # reinitialize a mutex in setup as the environment could change after forking
end

def job_started
Litescheduler.synchronize(@mutex) { @jobs_in_flight += 1 }
@mutex.synchronize { @jobs_in_flight += 1 }
end

def job_finished
Litescheduler.synchronize(@mutex) { @jobs_in_flight -= 1 }
@mutex.synchronize { @jobs_in_flight -= 1 }
end

# optionally run a job in its own context
Expand Down
4 changes: 2 additions & 2 deletions lib/litestack/litemetric.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def setup
end

def create_connection
super("#{__dir__}/litemetric.sql.yml") do |conn|
super("#{__dir__}/sql/litemetric.sql.yml") do |conn|
conn.wal_autocheckpoint = 10000 # checkpoint after 10000 pages are written
end
end
Expand Down Expand Up @@ -297,7 +297,7 @@ def flush
end

def create_connection
super("#{__dir__}/litemetric_collector.sql.yml") do |conn|
super("#{__dir__}/sql/litemetric_collector.sql.yml") do |conn|
conn.execute("ATTACH ? as m", @options[:dbpath].to_s)
conn.wal_autocheckpoint = 10000
end
Expand Down
2 changes: 1 addition & 1 deletion lib/litestack/litequeue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def prepare_search_options(opts)
end

def create_connection
super("#{__dir__}/litequeue.sql.yml") do |conn|
super("#{__dir__}/sql/litequeue.sql.yml") do |conn|
conn.wal_autocheckpoint = 10000
# check if there is an old database and convert entries to the new format
if conn.get_first_value("select count(*) from sqlite_master where name = '_ul_queue_'") == 1
Expand Down
56 changes: 36 additions & 20 deletions lib/litestack/litescheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,31 +59,47 @@ def self.switch
end
end

# bold assumption, we will only synchronize threaded code!
# If some code explicitly wants to synchronize a fiber
# they must send (true) as a parameter to this method
# else it is a no-op for fibers
def self.synchronize(fiber_sync = false, &block)
if fiber_backed?
yield # do nothing, just run the block as is
else
mutex.synchronize(&block)
end
def self.fiber_backed?
backend == :fiber || backend == :polyphony
end

def self.max_contexts
return 50 if fiber_backed?
5
private_class_method :fiber_backed?

class Mutex
def initialize
@mutex = Thread::Mutex.new
end

def synchronize(&block)
if Litescheduler.backend == :threaded || Litescheduler.backend == :iodine
@mutex.synchronize { block.call }
else
block.call
end
end
end

module ForkListener
def self.listeners
@listeners ||= []
end

# mutex initialization
def self.mutex
# a single mutex per process (is that ok?)
@@mutex ||= Mutex.new
def self.listen(&block)
listeners << block
end
end

def self.fiber_backed?
backend == :fiber || backend == :polyphony
module Forkable
def _fork(*args)
ppid = Process.pid
result = super
if Process.pid != ppid && [:threaded, :iodine].include?(Litescheduler.backend)
ForkListener.listeners.each { |l| l.call }
end
result
end
end
private_class_method :fiber_backed?

end

Process.singleton_class.prepend(Litescheduler::Forkable)
Loading

0 comments on commit 71a7f73

Please sign in to comment.