Skip to content

Commit

Permalink
sound/discrete.cpp: Use appropriate memory barriers for task synchron…
Browse files Browse the repository at this point in the history
…isation. (mamedev#12034)
  • Loading branch information
cuavas authored and stonedDiscord committed Apr 8, 2024
1 parent 5e5a905 commit 511f4bb
Showing 1 changed file with 69 additions and 65 deletions.
134 changes: 69 additions & 65 deletions src/devices/sound/discrete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
***********************************************************************
*
* Each sound primative DSS_xxxx or DST_xxxx has its own implementation
* Each sound primitive DSS_xxxx or DST_xxxx has its own implementation
* file. All discrete sound primatives MUST implement the following
* API:
*
Expand Down Expand Up @@ -65,7 +65,7 @@ DEFINE_DEVICE_TYPE(DISCRETE, discrete_sound_device, "discrete", "Discrete Sound"
* Values > 500 have a slightly worse performace (too much cache misses?).
*/

#define MAX_SAMPLES_PER_TASK_SLICE (960/4)
static constexpr int MAX_SAMPLES_PER_TASK_SLICE = 960 / 4;

/*************************************
*
Expand All @@ -91,61 +91,70 @@ DEFINE_DEVICE_TYPE(DISCRETE, discrete_sound_device, "discrete", "Discrete Sound"

struct output_buffer
{
output_buffer() : source(nullptr), ptr(nullptr)
{
}

// required for use in vector (std::atomic<T> has deleted copy constructor)
output_buffer(output_buffer &&that) : node_buf(std::move(that.node_buf)), source(that.source), ptr(that.ptr.load(std::memory_order_relaxed)), node_num(that.node_num)
{
}

std::unique_ptr<double []> node_buf;
const double *source;
volatile double *ptr;
const double * source;
std::atomic<double *> ptr;
int node_num;
};

struct input_buffer
{
volatile const double *ptr; /* pointer into linked_outbuf.nodebuf */
output_buffer * linked_outbuf; /* what output are we connected to ? */
double buffer; /* input[] will point here */
const double * ptr; // pointer into linked_outbuf.nodebuf
output_buffer * linked_outbuf; // what output are we connected to?
double buffer; // input[] will point here
};

class discrete_task
{
friend class discrete_device;
public:
virtual ~discrete_task() { }

inline void step_nodes();
inline bool lock_threadid(int32_t threadid)
discrete_task(discrete_device &pdev) : m_device(pdev), m_threadid(-1), m_samples(0)
{
int expected = -1;
return m_threadid.compare_exchange_weak(expected, threadid, std::memory_order_release,std::memory_order_relaxed);
// FIXME: the code expects to be able to take pointers to members of elements of this vector before it's filled
source_list.reserve(16);
}
inline void unlock() { m_threadid = -1; }

void check(discrete_task &dest_task);
void prepare_for_queue(int samples);

static void *task_callback(void *param, int threadid);

//const linked_list_entry *list;
discrete_device::node_step_list_t step_list;

/* list of source nodes */
std::vector<input_buffer> source_list; /* discrete_source_node */

int task_group = 0;

private:
void step_nodes();
bool process();

discrete_task(discrete_device &pdev) : m_device(pdev), m_threadid(-1)
bool lock_threadid(int32_t threadid)
{
// FIXME: the code expects to be able to take pointers to members of elements of this vector before it's filled
source_list.reserve(16);
int expected = -1;
return m_threadid.compare_exchange_strong(expected, threadid, std::memory_order_acquire, std::memory_order_relaxed);
}

protected:
static void *task_callback(void *param, int threadid);
inline bool process();
void unlock()
{
m_threadid.store(-1, std::memory_order_release);
}

void check(discrete_task &dest_task);
void prepare_for_queue(int samples);
/* list of source nodes */
std::vector<input_buffer> source_list; /* discrete_source_node */

std::vector<output_buffer> m_buffers;
discrete_device & m_device;
std::vector<output_buffer> m_buffers;
discrete_device & m_device;

private:
std::atomic<int32_t> m_threadid;
volatile int m_samples = 0;
std::atomic<int32_t> m_threadid;
int m_samples;
};


Expand Down Expand Up @@ -179,17 +188,13 @@ class discrete_task
inline void discrete_task::step_nodes()
{
for (input_buffer &sn : source_list)
{
sn.buffer = *sn.ptr++;
}

if (EXPECTED(!m_device.profiling()))
{
// Now step the nodes
for (discrete_step_interface *entry : step_list)
{
/* Now step the node */
entry->step();
}
}
else
{
Expand All @@ -204,74 +209,78 @@ inline void discrete_task::step_nodes()
}
}

/* buffer the outputs */
// buffer the outputs
for (output_buffer &outbuf : m_buffers)
*outbuf.ptr.load(std::memory_order_relaxed) = *outbuf.source;
std::atomic_thread_fence(std::memory_order_release);
for (output_buffer &outbuf : m_buffers)
*outbuf.ptr++ = *outbuf.source;
outbuf.ptr.store(outbuf.ptr.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
}

void *discrete_task::task_callback(void *param, int threadid)
{
const auto &list = *reinterpret_cast<const discrete_sound_device::task_list_t *>(param);
do
while (true)
{
for (const auto &task : list)
{
/* try to lock */
// try to lock
if (task->lock_threadid(threadid))
{
if (!task->process())
return nullptr;
task->unlock();
}
}
} while (1);
}

return nullptr;
}

bool discrete_task::process()
inline bool discrete_task::process()
{
int samples = std::min(int(m_samples), MAX_SAMPLES_PER_TASK_SLICE);
int samples = std::min(m_samples, MAX_SAMPLES_PER_TASK_SLICE);

/* check dependencies */
// check dependencies
for (input_buffer &sn : source_list)
{
int avail = sn.linked_outbuf->ptr - sn.ptr;
const int avail = sn.linked_outbuf->ptr.load(std::memory_order_relaxed) - sn.ptr;
if (avail < 0)
throw emu_fatalerror("discrete_task::process: available samples are negative");
if (avail < samples)
samples = avail;
}
std::atomic_thread_fence(std::memory_order_acquire);

m_samples -= samples;
if (m_samples < 0)
throw emu_fatalerror("discrete_task::process: m_samples got negative");
while (samples > 0)
{
/* step */
// step
step_nodes();
samples--;
}
if (m_samples == 0)
{
/* return and keep the task locked so it is not picked up by other worker threads */
// return and keep the task locked so it is not picked up by other worker threads
return false;
}
return true;
}

void discrete_task::prepare_for_queue(int samples)
{
m_threadid.store(-1, std::memory_order_relaxed); // unlock the thread
m_samples = samples;
/* set up task buffers */

// set up task buffers
for (output_buffer &ob : m_buffers)
ob.ptr = ob.node_buf.get();
ob.ptr.store(ob.node_buf.get(), std::memory_order_relaxed);

/* initialize sources */
// initialize sources
for (input_buffer &sn : source_list)
{
sn.ptr = sn.linked_outbuf->node_buf.get();
}
}

void discrete_task::check(discrete_task &dest_task)
Expand All @@ -297,7 +306,7 @@ void discrete_task::check(discrete_task &dest_task)
int inputnode_num = dest_node->input_node(inputnum);
if IS_VALUE_A_NODE(inputnode_num)
{
/* Fixme: sub nodes ! */
/* FIXME: sub nodes ! */
if (NODE_DEFAULT_NODE(task_node->block_node()) == NODE_DEFAULT_NODE(inputnode_num))
{
int found = -1;
Expand All @@ -312,16 +321,16 @@ void discrete_task::check(discrete_task &dest_task)
break;
}

if (found<0)
if (found < 0)
{
output_buffer buf;

buf.node_buf = std::make_unique<double []>((task_node->sample_rate() + sound_manager::STREAMS_UPDATE_FREQUENCY) / sound_manager::STREAMS_UPDATE_FREQUENCY);
buf.ptr = buf.node_buf.get();
buf.ptr.store(buf.node_buf.get(), std::memory_order_relaxed);
buf.source = dest_node->m_input[inputnum];
buf.node_num = inputnode_num;
//buf.node = device->discrete_find_node(inputnode);
m_buffers.push_back(std::move(buf));
m_buffers.emplace_back(std::move(buf));
pbuf = &m_buffers.back();
}
m_device.discrete_log("dso_task_start - buffering %d(%d) in task %p group %d referenced by %d group %d", NODE_INDEX(inputnode_num), NODE_CHILD_NODE_NUM(inputnode_num), this, task_group, dest_node->index(), dest_task.task_group);
Expand All @@ -330,7 +339,6 @@ void discrete_task::check(discrete_task &dest_task)
dest_task.source_list.push_back(input_buffer{ nullptr, pbuf, 0.0 });
// FIXME: taking address of element of vector before it's filled
dest_node->m_input[inputnum] = &dest_task.source_list.back().buffer;

}
}
}
Expand Down Expand Up @@ -1017,22 +1025,18 @@ void discrete_device::process(int samples)
if (samples == 0)
return;

/* Setup tasks */
// Set up tasks
for (const auto &task : task_list)
{
/* unlock the thread */
task->unlock();

task->prepare_for_queue(samples);
}
std::atomic_thread_fence(std::memory_order_release);

for (const auto &task : task_list)
{
/* Fire a work item for each task */
// Fire a work item for each task
(void)task;
osd_work_item_queue(m_queue, discrete_task::task_callback, (void *)&task_list, WORK_ITEM_FLAG_AUTO_RELEASE);
}
osd_work_queue_wait(m_queue, osd_ticks_per_second()*10);
osd_work_queue_wait(m_queue, osd_ticks_per_second() * 10);

if (m_profiling)
{
Expand Down

0 comments on commit 511f4bb

Please sign in to comment.