Skip to content

Commit

Permalink
refactor(server/mysql): trigger saves in separate threads
Browse files Browse the repository at this point in the history
Though these queries shouldn't error - and nobody has shared
any if they have - the promises will never resolve on an error.
There's no way to really handle callback errors, so we'll use the
await wrapper with a protected call.
  • Loading branch information
thelindat committed Mar 7, 2024
1 parent 9b76804 commit 3bd6fa8
Showing 1 changed file with 76 additions and 48 deletions.
124 changes: 76 additions & 48 deletions modules/mysql/server.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ if not lib then return end
local Query = {
SELECT_STASH = 'SELECT data FROM ox_inventory WHERE owner = ? AND name = ?',
UPDATE_STASH = 'UPDATE ox_inventory SET data = ? WHERE owner = ? AND name = ?',
UPSERT_STASH = 'INSERT INTO ox_inventory (data, owner, name) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE data = VALUES(data)',
UPSERT_STASH =
'INSERT INTO ox_inventory (data, owner, name) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE data = VALUES(data)',
INSERT_STASH = 'INSERT INTO ox_inventory (owner, name) VALUES (?, ?)',
SELECT_GLOVEBOX = 'SELECT plate, glovebox FROM `{vehicle_table}` WHERE `{vehicle_column}` = ?',
SELECT_TRUNK = 'SELECT plate, trunk FROM `{vehicle_table}` WHERE `{vehicle_column}` = ?',
Expand Down Expand Up @@ -32,10 +33,10 @@ Citizen.CreateThreadNow(function()
vehicleTable = 'player_vehicles'
vehicleColumn = 'plate'
elseif shared.framework == 'nd' then
playerTable = 'nd_characters'
playerColumn = 'charid'
vehicleTable = 'nd_vehicles'
vehicleColumn = 'id'
playerTable = 'nd_characters'
playerColumn = 'charid'
vehicleTable = 'nd_vehicles'
vehicleColumn = 'id'
end

for k, v in pairs(Query) do
Expand Down Expand Up @@ -112,9 +113,9 @@ Citizen.CreateThreadNow(function()

local clearStashes = GetConvar('inventory:clearstashes', '6 MONTH')

if clearStashes ~= '' then
pcall(MySQL.query.await, ('DELETE FROM ox_inventory WHERE lastupdated < (NOW() - INTERVAL %s)'):format(clearStashes))
end
if clearStashes ~= '' then
pcall(MySQL.query.await, ('DELETE FROM ox_inventory WHERE lastupdated < (NOW() - INTERVAL %s)'):format(clearStashes))
end
end)

db = {}
Expand Down Expand Up @@ -165,86 +166,113 @@ local function countRows(rows)
return n
end

local function safeQuery(...)
local ok, resp = pcall(...)

if not ok then
return warn(resp)
end

return resp
end

---@param players InventorySaveData[]
---@param trunks InventorySaveData[]
---@param gloveboxes InventorySaveData[]
---@param stashes (InventorySaveData | string | number)[]
---@param total number[]
function db.saveInventories(players, trunks, gloveboxes, stashes, total)
local promises = {}
local start = os.nanotime()
local saveStr = 'Saved %d/%d %s (%.4f ms)'
local pending = 0

shared.info(('Saving %s inventories to the database'):format(total[5]))

if total[1] > 0 then
local p = promise.new()
promises[#promises + 1] = p
pending += 1

MySQL.prepare(Query.UPDATE_PLAYER, players, function(resp)
shared.info(('Saved %d/%d players (%.4f ms)'):format(countRows(resp), total[1], (os.nanotime() - start) / 1e6))
p:resolve()
Citizen.CreateThreadNow(function()
local resp = safeQuery(MySQL.prepare.await, Query.UPDATE_PLAYER, players)
pending -= 1

if resp then
shared.info(saveStr:format('players', countRows(resp), total[1], (os.nanotime() - start) / 1e6))
end
end)
end

if total[2] > 0 then
local p = promise.new()
promises[#promises + 1] = p
pending += 1

MySQL.prepare(Query.UPDATE_TRUNK, trunks, function(resp)
shared.info(('Saved %d/%d trunks (%.4f ms)'):format(countRows(resp), total[2], (os.nanotime() - start) / 1e6))
p:resolve()
Citizen.CreateThreadNow(function()
local resp = safeQuery(MySQL.prepare.await, Query.UPDATE_TRUNK, trunks)
pending -= 1

if resp then
shared.info(saveStr:format('trunks', countRows(resp), total[2], (os.nanotime() - start) / 1e6))
end
end)
end

if total[3] > 0 then
local p = promise.new()
promises[#promises + 1] = p
pending += 1

Citizen.CreateThreadNow(function()
local resp = safeQuery(MySQL.prepare.await, Query.UPDATE_GLOVEBOX, gloveboxes)
pending -= 1

MySQL.prepare(Query.UPDATE_GLOVEBOX, gloveboxes, function(resp)
shared.info(('Saved %d/%d gloveboxes (%.4f ms)'):format(countRows(resp), total[3], (os.nanotime() - start) / 1e6))
p:resolve()
if resp then
shared.info(saveStr:format('gloveboxes', countRows(resp), total[3], (os.nanotime() - start) / 1e6))
end
end)
end

if total[4] > 0 then
local p = promise.new()
promises[#promises + 1] = p
pending += 1

if server.bulkstashsave then
total[4] /= 3

MySQL.query(Query.UPSERT_STASH:gsub('%(%?, %?, %?%)', string.rep('(?, ?, ?)', total[4], ', ')), stashes, function(resp)
local affectedRows = resp.affectedRows
Citizen.CreateThreadNow(function()
local query = Query.UPSERT_STASH:gsub('%(%?, %?, %?%)', string.rep('(?, ?, ?)', total[4], ', '))
local resp = safeQuery(MySQL.query.await, query, stashes)
pending -= 1

if total[4] == 1 then
if affectedRows == 2 then affectedRows = 1 end
else
affectedRows -= tonumber(resp.info:match('Duplicates: (%d+)'), 10) or 0
end
if resp then
local affectedRows = resp.affectedRows

shared.info(('Saved %d/%d stashes (%.4f ms)'):format(affectedRows, total[4], (os.nanotime() - start) / 1e6))
p:resolve()
if total[4] == 1 then
if affectedRows == 2 then affectedRows = 1 end
else
affectedRows -= tonumber(resp.info:match('Duplicates: (%d+)'), 10) or 0
end

shared.info(saveStr:format('stashes', affectedRows, total[4], (os.nanotime() - start) / 1e6))
end
end)
else
MySQL.rawExecute(Query.UPSERT_STASH, stashes, function(resp)
local affectedRows = 0

if table.type(resp) == 'hash' then
if resp.affectedRows > 0 then affectedRows = 1 end
else
for i = 1, #resp do
if resp[i].affectedRows > 0 then affectedRows += 1 end
Citizen.CreateThreadNow(function()
local resp = safeQuery(MySQL.rawExecute.await, Query.UPSERT_STASH, stashes)
pending -= 1

if resp then
local affectedRows = 0

if table.type(resp) == 'hash' then
if resp.affectedRows > 0 then affectedRows = 1 end
else
for i = 1, #resp do
if resp[i].affectedRows > 0 then affectedRows += 1 end
end
end
end

shared.info(('Saved %s/%s stashes (%.4f ms)'):format(affectedRows, total[4], (os.nanotime() - start) / 1e6))
p:resolve()
shared.info(saveStr:format('stashes', affectedRows, total[4], (os.nanotime() - start) / 1e6))
end
end)
end
end

-- All queries must run asynchronously on resource stop, so we'll await multiple promises instead.
Citizen.Await(promise.all(promises))
repeat Wait(0) until pending == 0
end

return db

0 comments on commit 3bd6fa8

Please sign in to comment.