Skip to content

Commit

Permalink
Merge pull request #13 from computerquip-streamlabs/threaded_memcpy
Browse files Browse the repository at this point in the history
Add threaded memcpy implementation
  • Loading branch information
Xaymar-Streamlabs committed Mar 22, 2017
2 parents 20644f7 + 9269341 commit 1c6baab
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 11 deletions.
6 changes: 4 additions & 2 deletions libobs/CMakeLists.txt
Expand Up @@ -281,7 +281,8 @@ set(libobs_util_SOURCES
util/crc32.c
util/text-lookup.c
util/cf-parser.c
util/profiler.c)
util/profiler.c
util/threaded-memcpy.c)
set(libobs_util_HEADERS
util/array-serializer.h
util/file-serializer.h
Expand All @@ -307,7 +308,8 @@ set(libobs_util_HEADERS
util/lexer.h
util/platform.h
util/profiler.h
util/profiler.hpp)
util/profiler.hpp
util/threaded-memcpy.h)

set(libobs_libobs_SOURCES
${libobs_PLATFORM_SOURCES}
Expand Down
21 changes: 12 additions & 9 deletions libobs/media-io/video-frame.c
Expand Up @@ -16,6 +16,9 @@
******************************************************************************/

#include "video-frame.h"
#include "util/threaded-memcpy.h"

struct memcpy_environment *memcpy_env; /* Forward declaration from obs.c */

#define ALIGN_SIZE(size, align) \
size = (((size)+(align-1)) & (~(align-1)))
Expand Down Expand Up @@ -112,14 +115,14 @@ void video_frame_copy(struct video_frame *dst, const struct video_frame *src,
return;

case VIDEO_FORMAT_I420:
memcpy(dst->data[0], src->data[0], src->linesize[0] * cy);
memcpy(dst->data[1], src->data[1], src->linesize[1] * cy / 2);
memcpy(dst->data[2], src->data[2], src->linesize[2] * cy / 2);
threaded_memcpy(dst->data[0], src->data[0], src->linesize[0] * cy, memcpy_env);
threaded_memcpy(dst->data[1], src->data[1], src->linesize[1] * cy / 2, memcpy_env);
threaded_memcpy(dst->data[2], src->data[2], src->linesize[2] * cy / 2, memcpy_env);
break;

case VIDEO_FORMAT_NV12:
memcpy(dst->data[0], src->data[0], src->linesize[0] * cy);
memcpy(dst->data[1], src->data[1], src->linesize[1] * cy / 2);
threaded_memcpy(dst->data[0], src->data[0], src->linesize[0] * cy, memcpy_env);
threaded_memcpy(dst->data[1], src->data[1], src->linesize[1] * cy / 2, memcpy_env);
break;

case VIDEO_FORMAT_Y800:
Expand All @@ -129,13 +132,13 @@ void video_frame_copy(struct video_frame *dst, const struct video_frame *src,
case VIDEO_FORMAT_RGBA:
case VIDEO_FORMAT_BGRA:
case VIDEO_FORMAT_BGRX:
memcpy(dst->data[0], src->data[0], src->linesize[0] * cy);
threaded_memcpy(dst->data[0], src->data[0], src->linesize[0] * cy, memcpy_env);
break;

case VIDEO_FORMAT_I444:
memcpy(dst->data[0], src->data[0], src->linesize[0] * cy);
memcpy(dst->data[1], src->data[1], src->linesize[1] * cy);
memcpy(dst->data[2], src->data[2], src->linesize[2] * cy);
threaded_memcpy(dst->data[0], src->data[0], src->linesize[0] * cy, memcpy_env);
threaded_memcpy(dst->data[1], src->data[1], src->linesize[1] * cy, memcpy_env);
threaded_memcpy(dst->data[2], src->data[2], src->linesize[2] * cy, memcpy_env);
break;
}
}
1 change: 1 addition & 0 deletions libobs/obs-internal.h
Expand Up @@ -24,6 +24,7 @@
#include "util/threading.h"
#include "util/platform.h"
#include "util/profiler.h"
#include "util/threaded-memcpy.h"
#include "callback/signal.h"
#include "callback/proc.h"

Expand Down
6 changes: 6 additions & 0 deletions libobs/obs.c
Expand Up @@ -25,6 +25,8 @@

struct obs_core *obs = NULL;

struct memcpy_environment *memcpy_env = NULL;

extern void add_default_module_paths(void);
extern char *find_libobs_data_file(const char *file);

Expand Down Expand Up @@ -742,6 +744,8 @@ static bool obs_init(const char *locale, const char *module_config_path,
{
obs = bzalloc(sizeof(struct obs_core));

memcpy_env = init_threaded_memcpy_pool(0);

pthread_mutex_init_value(&obs->audio.monitoring_mutex);

obs->name_store_owned = !store;
Expand Down Expand Up @@ -862,6 +866,8 @@ void obs_shutdown(void)
bfree(obs);
obs = NULL;

destroy_threaded_memcpy_pool(memcpy_env);

#ifdef _WIN32
uninitialize_com();
#endif
Expand Down
264 changes: 264 additions & 0 deletions libobs/util/threaded-memcpy.c
@@ -0,0 +1,264 @@
/*
* Copyright (c) 2017 Zachary Lund <zachary.lund@streamlabs.com>
*
* Based on Michael Dirks (aka Xaymar) threaded memcpy tests
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/

#include "threading.h"
#include "bmem.h"

#if defined(_WIN32)
#include <Windows.h>
#elif defined(__unix__) || (defined(__APPLE__) && defined(__MACH__))
#include <unistd.h>
#endif

/* Prevents WinAPI redefinition down the road. */
#define NOMINMAX

#define max(x, y) (((x) > (y)) ? (x) : (y))
#define min(x, y) (((x) < (y)) ? (x) : (y))

/* TODO: Will need adjustment */
#define MAX_THREAD_COUNT 32

struct memcpy_thread_work {
void* from;
void* to;
size_t size;
os_sem_t *semaphore;
size_t block_size;
struct memcpy_thread_work *next;
struct memcpy_thread_work *prev;
};

/* Static global data */
const size_t block_size = 64 * 1024;

struct memcpy_environment {
/* We don't know the size of pthread_t
* but it's usually represented by a
* pointer or an integer.
* MAX_THREAD_COUNT should be a multiple of 8
* so threads don't thrash the cache alignment
* of the rest of the structure. */
pthread_t threads[MAX_THREAD_COUNT];
int thread_count;
struct memcpy_thread_work *work_queue;
struct memcpy_thread_work *work_queue_last;
pthread_mutex_t work_queue_mutex;
pthread_cond_t work_queue_cond;
int running;
};

static unsigned optimal_thread_count()
{
unsigned long result;
#if defined(_SC_NPROCESSORS_ONLN)
result = sysconf(_SC_NPROCESSORS_ONLN);

if (result < 0)
return 1;
#elif defined(_WIN32)
SYSTEM_INFO info;
GetSystemInfo(&info);
result = info.dwNumberOfProcessors;
#else
result = MAX_THREAD_COUNT / 8;
#endif
result /= 2;

if (result == 0) return 1;

if (result > MAX_THREAD_COUNT)
result = MAX_THREAD_COUNT;

return result;
}

static void *start_memcpy_thread(void* context)
{
struct memcpy_environment *env = context;
struct memcpy_thread_work *work;

os_sem_t* semaphore;
void *from, *to;
size_t size;

for (;;) {
pthread_mutex_lock(&env->work_queue_mutex);

while (!env->work_queue && env->running) {
pthread_cond_wait(&env->work_queue_cond, &env->work_queue_mutex);
}

if (!env->running) {
pthread_mutex_unlock(&env->work_queue_mutex);
break;
}

work = env->work_queue;
from = work->from;
to = work->to;
size = work->block_size;
semaphore = work->semaphore;

if (work->size > size) {
work->from = ((uint8_t*)work->from) + size;
work->to = ((uint8_t*)work->to) + size;
work->size -= size;
}
else {
if (env->work_queue->next != NULL) {
env->work_queue = env->work_queue->next;
env->work_queue->prev = NULL;
} else {
env->work_queue = NULL;
env->work_queue_last = NULL;
}
}

pthread_mutex_unlock(&env->work_queue_mutex);

memcpy((uint8_t*)to, (uint8_t*)from, size);

/* Notify the calling thread that this thread is done working */
os_sem_post(work->semaphore);
}

return 0;
}

/* Not thread safe but only needs to be called once for all threads */
struct memcpy_environment *init_threaded_memcpy_pool(int threads)
{
struct memcpy_environment *env =
bmalloc(sizeof(struct memcpy_environment));

if (!threads)
env->thread_count = optimal_thread_count();
else
env->thread_count = threads;

env->work_queue = NULL;
env->running = true;
pthread_cond_init(&env->work_queue_cond, NULL);
pthread_mutex_init(&env->work_queue_mutex, NULL);

if (env->thread_count == 1)
return env;

for (int i = 0; i < env->thread_count; ++i) {
int error = pthread_create(
&env->threads[i],
NULL,
start_memcpy_thread,
(void*)env);

if (error) {
/* This can only really happen in two cases:
* The first one being permissions for setting policy.
* The second being lack of resources.
* We can't really help either case so just fallback to
* a single thread */
for (int k = 0; k < i; ++k) {
pthread_cancel(env->threads[k]);
}

env->thread_count = 1;
return env;
}
}

return env;
}

void destroy_threaded_memcpy_pool(struct memcpy_environment *env)
{
pthread_mutex_lock(&env->work_queue_mutex);
env->running = false;
pthread_mutex_unlock(&env->work_queue_mutex);

for (int i = 0; i < env->thread_count; ++i) {
/* Since we don't know which thread wakes up, wake them all up. */
pthread_cond_broadcast(&env->work_queue_cond);
pthread_join(env->threads[i], NULL);
}

pthread_cond_destroy(&env->work_queue_cond);
pthread_mutex_destroy(&env->work_queue_mutex);
bfree(env);
}

void threaded_memcpy(void *destination, void *source, size_t size, struct memcpy_environment *env)
{
if (env->thread_count == 1 ||
size <= block_size * 2)
{
memcpy(destination, source, size);
return;
}

struct memcpy_thread_work work;
os_sem_t *finish_signal;

size_t blocks =
min(max(size, block_size) / block_size, env->thread_count);

os_sem_init(&finish_signal, 0);

work.block_size = size / blocks;
size_t block_size_rem = size - (work.block_size * blocks);
work.size = size - block_size_rem;
work.to = (uint8_t*)destination + block_size_rem;
work.from = (uint8_t*)source + block_size_rem;
work.semaphore = finish_signal;
work.next = NULL;
work.prev = NULL;

pthread_mutex_lock(&env->work_queue_mutex);

if (env->work_queue == NULL) {
env->work_queue = &work;
env->work_queue_last = &work;
} else {
work.prev = env->work_queue_last;
work.prev->next = &work;
env->work_queue_last = &work;
}

for (int i = 0; i < blocks; ++i)
pthread_cond_signal(&env->work_queue_cond);

pthread_mutex_unlock(&env->work_queue_mutex);

/* Copy the remainder while we wait */
memcpy(destination, source, block_size_rem);

/* Wait for a signal from each job */
for (int i = 0; i < blocks; ++i) {
os_sem_wait(finish_signal);
}

pthread_mutex_lock(&env->work_queue_mutex);

if (work.prev)
work.prev->next = work.next;

pthread_mutex_unlock(&env->work_queue_mutex);

os_sem_destroy(finish_signal);
}
14 changes: 14 additions & 0 deletions libobs/util/threaded-memcpy.h
@@ -0,0 +1,14 @@
#pragma once

#include <stddef.h>

struct memcpy_environment;

struct memcpy_environment *init_threaded_memcpy_pool(unsigned int threads);
void destroy_threaded_memcpy_pool(struct memcpy_environment *env);

void threaded_memcpy(
void *destination,
void *source,
size_t size,
struct memcpy_environment *env);

0 comments on commit 1c6baab

Please sign in to comment.