Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a semaphore to avoid fetching complete library to memory #410

Merged
merged 5 commits into from Nov 2, 2020
Merged

Use a semaphore to avoid fetching complete library to memory #410

merged 5 commits into from Nov 2, 2020

Conversation

mammo0
Copy link
Contributor

@mammo0 mammo0 commented Oct 14, 2020

Hello,

after my #350 (comment) I had some time to investigate this bug. I tracked it down to the lines

p = concurrent.futures.ThreadPoolExecutor(DTHREADS)
results = p.map(lambda params: _get(url, params, server_id=server_id), query_params)
for params, result in zip(query_params, results):

In the first line a thread pool is opened. For each bunch of items that should be fetched from the Jellyfin server a thread is created (second line). When iterating over the thread pool starts (the for loop), all threads are executed.

This leads to the following problem:
If the processing of single items (e.g. adding them to the Kodi library) takes longer than the fetching process of the threads does, the fetched items get stored in memory until they get processed.
For a big library that means a lot of memory is needed.

My approach to solve this problem is the use of a simple semaphore that acts like a buffer. There is now a fixed number of items that should be pre-fetched from the Jellyfin server. Only after items are processed by the Kodi plugin more items can be fetched from the server.
The current fixed number gets calculated: 2 * [max. item fetch limit] * [number of download threads]

Please let me know your thoughts on this.

-> this happens if the processing of items is slower as the fetching of
new
-> if a big library is synced, the old behavior could lead to extensive
use of memory
-> the semaphore acts like a buffer that only allows fetching of new
items from the library if old ones are processed
-> the current size of the 'buffer' is hard coded to 2 * [max. item
fetch limit] * [number of download threads]
-> prior all threads that fetched items from the server and their
results stayed in memory until the sync was finished
@oddstr13
Copy link
Member

I can't say I'm familiar with semaphores (heard of them, never had to use them).

Is there any particular reason not to use a fixed size Queue for this task?

@mammo0
Copy link
Contributor Author

mammo0 commented Oct 15, 2020

Basically the semaphore does exactly that:

Like before all thread jobs are getting created and started by the pool. But during execution of one thread it checks if enough items were processed (the fixed number). If so, the execution continues. If not, the thread waits until the semaphore gets released. This happens after an item is processed.

To implement this behaviour with a semaphore was the first thing that came to my mind. But there are for sure other ways to achieve that.

Just had a quick look at the source of the Emby Kodi plugin:
https://github.com/MediaBrowser/plugin.video.emby/blob/3dae2464f01678c391c00c8b13d6b45c7ba8f901/resources/lib/downloader.py#L256-L276

It seems that they don't use threading at this point anymore... So items get fetched when they are needed. So is a thread pool really needed here?

@oddstr13
Copy link
Member

Just had a quick look at the source of the Emby Kodi plugin:
https://github.com/MediaBrowser/plugin.video.emby/blob/3dae2464f01678c391c00c8b13d6b45c7ba8f901/resources/lib/downloader.py#L256-L276

It seems that they don't use threading at this point anymore... So items get fetched when they are needed. So is a thread pool really needed here?

This was implemented in #202 in order to speed up syncs I believe.
I have no idea of how much this actually improves the sync speed, as other things have changed around improving sync speed since (#349, #387 are a couple I found from a quick look at the PR list).

If I where to implement this kind of parallel processing myself, I'd most likely use threading.Thread with queue.Queue, as this is what I've used for similar things in the past.

I am not opposed to removing threading here if the performance impact of that is minimal (trading some performance for actually working on more systems is generally fine, 10x slower maybe not).

@mammo0
Copy link
Contributor Author

mammo0 commented Oct 15, 2020

This was implemented in #202 in order to speed up syncs I believe.
I have no idea of how much this actually improves the sync speed, as other things have changed around improving sync speed since (#349, #387 are a couple I found from a quick look at the PR list).

Ok, but then processing of the items should also be parallelized. Because after fetching the items this is the bottle neck.

If I where to implement this kind of parallel processing myself, I'd most likely use threading.Thread with queue.Queue, as this is what I've used for similar things in the past.

Would be no problem for me.

I am not opposed to removing threading here if the performance impact of that is minimal (trading some performance for actually working on more systems is generally fine, 10x slower maybe not).

On lower powered devices the multi-threading could be disabled. The simplest method is to set the queue size to 1 if we assume your implementation suggestion. This setting could be linked to the 'download threads' setting. Also the default value (currently 3) can be set programmatically according to the available CPU cores.


Anyway, #350 (comment) shows, that my "fix" does not work for all.

@macearl
Copy link
Contributor

macearl commented Oct 15, 2020

After some additional testing i have some observations to share and a few questions that i hope are not too noobish ;)

2 * [max. item fetch limit] * [number of download threads]
Where does the 2 come from in the maximum number of items to prefetch?
The way i understand it right now, and please correct me if im wrong:

Now two related questions:

  • Should line

    thread_buffer = threading.Semaphore(LIMIT * DTHREADS)
    just contain DTHREADS so that there is more transparency to the end user? As this would mean only LIMIT * DTHREAD entries would be kept in memory at any given time?

  • Can anyone see a reason why a library scan always crashes for me if the semaphores get limited to LIMIT * DTHREADS even when i set the settings in the addon which control LIMIT and DTHREADS to 1 and does not crash any more if the semaphores are just limited by DTHREADS in which case i can set LIMIT and DTHREADS to numerous reasonable combinations?

I did a local database reset followed by a full scan after removing LIMIT from the Semaphore line with the following combinations (i also wrote down the time each scan took to complete, which may be a starting point to decide if parallel _get requests are even necessary:

DTHREADS LIMIT TIME
1 1 12m13s
1 100 12m28s
3 20 11m56s
3 50 12m25s

@oddstr13
Copy link
Member

After some additional testing i have some observations to share and a few questions that i hope are not too noobish ;)

There is no such thing as too noobish questions when debuging issues such as these – rubber duck debugging is a thing after all.

  • Can anyone see a reason why a library scan always crashes for me if the semaphores get limited to LIMIT * DTHREADS even when i set the settings in the addon which control LIMIT and DTHREADS to 1 and does not crash any more if the semaphores are just limited by DTHREADS in which case i can set LIMIT and DTHREADS to numerous reasonable combinations?

That sounds really strange to me, I don't see any reason for that happening, except possibly the settings in the UI not matching what's read in this file.

I did a local database reset followed by a full scan after removing LIMIT from the Semaphore line with the following combinations (i also wrote down the time each scan took to complete, which may be a starting point to decide if parallel _get requests are even necessary:

I'd consider this just random variation (I'm assuming you tested once, and didn't average 100 runs of each 😉), so it looks like these parameters don't do anything for sync speed in your particular case.
Depending on the library size, I don't think that 13 minutes for a full sync is absolutely terrible either.

Could you share some details on your library composition?
Number of movies, shows, episodes, albums & tracks synced?


Just a thought that appeared to me earlier today; could this be an issue related to rapidly writing to the database, and slow (SD-card) storage?

@mammo0
Copy link
Contributor Author

mammo0 commented Oct 15, 2020

@macearl let me try to answer your questions:

2 * [max. item fetch limit] * [number of download threads]
Where does the 2 come from in the maximum number of items to prefetch?

The "2" came from the first commit of my PR:

thread_buffer = threading.Semaphore(2 * LIMIT * DTHREADS)

My problem was, that I didn't know how big should I size the semaphore. At first it was 2 * LIMIT * DTHREAD. But as you got it right this would mean 2 * LIMIT * DTHREADS * LIMIT items, that are stored in memory.

My intention of this PR was to save memory, so I removed the 2 in my second commit. Sorry for the confusion.

Basically the size of the semaphore should not be too big, because then more memory is needed. But it also should not be too low, because on a slow network/internet connection there should be some items in memory for a faster procession of the items.

First point

threads are started/semaphores aquired in batches of DTHREADS due to line

with concurrent.futures.ThreadPoolExecutor(DTHREADS) as p:
and
job = p.submit(get_wrapper, param)

Line

with concurrent.futures.ThreadPoolExecutor(DTHREADS) as p:
creates a thread pool with the size DTHREADS. That means that there is a number of DTHREADS threads that can execute jobs. If all threads have a job, the next job will not be started until another job finishes.
For further information, have a look at the documentation: https://docs.python.org/3/library/concurrent.futures.html

Second point was answered above

Third point

once one batch of LIMIT * DTHREADS * LIMIT entries is processed and therefore a semaphore being released another batch gets downloaded

Thats not right. The semaphore gets released after the results of one job are processed. That means after LIMIT items. See the line

thread_buffer.release()

This release call is within the for-loop that processes the results of a single job.

Answers to your questions

Should line

thread_buffer = threading.Semaphore(LIMIT * DTHREADS)
just contain DTHREADS so that there is more transparency to the end user? As this would mean only LIMIT * DTHREAD entries would be kept in memory at any given time?

I answered this above. Do you have an idea of how to size the semaphore right?

Can anyone see a reason why a library scan always crashes for me if the semaphores get limited to LIMIT * DTHREADS even when i set the settings in the addon which control LIMIT and DTHREADS to 1 and does not crash any more if the semaphores are just limited by DTHREADS in which case i can set LIMIT and DTHREADS to numerous reasonable combinations?

Sorry I've got no idea why this fails... The behavior you describe is very strange.

Comment on your measurements

  • I assume your Jellyfin server and your RPi are on the same network? If so, I think that on a local network there's not that much difference in performance when threading is used only for fetching.
    A better performance might be achieved if the item processing is also parallelized as I said before:

Ok, but then processing of the items should also be parallelized. Because after fetching the items this is the bottle neck.

  • If the devices are not on the same network, there might be a bigger performance gain when using threading to fetch. Especially on slow network connections.

@mammo0
Copy link
Contributor Author

mammo0 commented Oct 15, 2020

@oddstr13 How should we proceed with this PR?

@mcarlton00
Copy link
Member

I wonder if we're overthinking this and it's more a order of operations issue than a data query/threading issue. For example, this function:

def movies(self, library, dialog):
''' Process movies from a single library.
'''
processed_ids = []
for items in server.get_items(library['Id'], "Movie", False, self.sync['RestorePoint'].get('params')):
with self.video_database_locks() as (videodb, jellyfindb):
obj = Movies(self.server, jellyfindb, videodb, self.direct_path, library)
self.sync['RestorePoint'] = items['RestorePoint']
start_index = items['RestorePoint']['params']['StartIndex']
for index, movie in enumerate(items['Items']):
dialog.update(int((float(start_index + index) / float(items['TotalRecordCount'])) * 100),
heading="%s: %s" % (translate('addon_name'), library['Name']),
message=movie['Name'])
obj.movie(movie)
processed_ids.append(movie['Id'])
with self.video_database_locks() as (videodb, jellyfindb):
obj = Movies(self.server, jellyfindb, videodb, self.direct_path, library)
obj.item_ids = processed_ids
if self.update_library:
self.movies_compare(library, obj, jellyfindb)

It's processing each item, but then it's not clearing it's memory each iteration, so the memory usage would constantly be climbing (if I'm reading it right). Also, it's opening the database lock multiple times, which I'm not entirely sure is the most efficient use.

It does highlight why parallelizing adding data to the database is an issue though. Sqlite is only meant to be accessed from one place, so I think if we try to multithread that part of it we'll run into conflicts sooner or later. I'd love to be wrong on that part of it though.

@oddstr13
Copy link
Member

I was thinking we could try a single fetching thread with a queue size somewhere in the range of 10, LIMIT*2, as that would probably eliminate the http request latency (keeping data ready for processing).

I don't think there's a need for multiple threads to process the data, and I don't think there is much of a benefit to have multiple http requests at the same time either if the server is slow, it is slow. We should be using HTTP keep-alive anyways (are we?), so TCP handshakes shouldn't cause additional latency.

What would be interesting to compare is the full sync time on single threaded (fetch then process, repeat) vs. one, two & three http fetching threads.


My current thinking is either remove threadhing here, or use just a single thread, depending on how the performance is on something low-end (I think Pi3 is a reasonable benchmark device here?).
How fast it syncs on a higher end system such as my desktop is less of a concern, as this will be faster in any case.

@mammo0
Copy link
Contributor Author

mammo0 commented Oct 16, 2020

@mcarlton00

It's processing each item, but then it's not clearing it's memory each iteration, so the memory usage would constantly be climbing (if I'm reading it right). Also, it's opening the database lock multiple times, which I'm not entirely sure is the most efficient use.

I can't confirm that. On my search for the memory leak I used Pydev debugger. While investigating the function you mentioned, I noticed that in the background my memory filled up. So this function can't be the culprit, because it's execution was interrupted by the debugger.
As I wrote above the memory started to climb after the thread pool with the parallel fetching of the items started.

It does highlight why parallelizing adding data to the database is an issue though. Sqlite is only meant to be accessed from one place, so I think if we try to multithread that part of it we'll run into conflicts sooner or later. I'd love to be wrong on that part of it though.

What about https://sqlite.org/threadsafe.html? Can this be used to add items parallel to the database or update them?

@oddstr13

I don't think there's a need for multiple threads to process the data, ...

I think on multi-core systems that would make sense. Normally this should result in a better performance on those systems.

... and I don't think there is much of a benefit to have multiple http requests at the same time either if the server is slow, it is slow.

That's quite a good point... Except the Jellyfin server uses also multi threading for processing GET requests. Does it? If so, multiple parallel GETs could be faster than one GET after another. But this is just a guess...

@macearl
Copy link
Contributor

macearl commented Oct 19, 2020

  • Can anyone see a reason why a library scan always crashes for me if the semaphores get limited to LIMIT * DTHREADS even when i set the settings in the addon which control LIMIT and DTHREADS to 1 and does not crash any more if the semaphores are just limited by DTHREADS in which case i can set LIMIT and DTHREADS to numerous reasonable combinations?

That sounds really strange to me, I don't see any reason for that happening, except possibly the settings in the UI not matching what's read in this file.

Sorry I've got no idea why this fails... The behavior you describe is very strange.

Had a poke around over the weekend and i think i found the problem: because the variables DTHREADS and LIMIT get assigned only once at startup all changes in the settings dialog don't take effect until a restart/reboot. A possible fix for this is to just move them into the function itself: #413

I did a local database reset followed by a full scan after removing LIMIT from the Semaphore line with the following combinations (i also wrote down the time each scan took to complete, which may be a starting point to decide if parallel _get requests are even necessary:

I'd consider this just random variation (I'm assuming you tested once, and didn't average 100 runs of each wink), so it looks like these parameters don't do anything for sync speed in your particular case.
Depending on the library size, I don't think that 13 minutes for a full sync is absolutely terrible either.

Could you share some details on your library composition?
Number of movies, shows, episodes, albums & tracks synced?

Yeah i would also put it down to normal variation. If the connection speed to the server is fast enough it probably does not make a difference.

The Library is about 1500 Movies and 300 episodes no music. The scan was done on a RPI 4 1GB, on my desktop machine the scan finishes in about 2 minutes.

My intention of this PR was to save memory, so I removed the 2 in my second commit. Sorry for the confusion.

Ah, didn't check the previous commits, makes sense

Thats not right. The semaphore gets released after the results of one job are processed. That means after LIMIT items. See the line

thread_buffer.release()

This release call is within the for-loop that processes the results of a single job.

yeah i probably got confused at this point or missed the indentation.

I answered this above. Do you have an idea of how to size the semaphore right?

Not really, i would probably go the "just set it to DTHREADS" route as that is what i personally would expect the option to mean, but then again it might just be me and someone else expects something completely different.

If a fixed size queue would be better than a Semaphore i have no idea, but i also don't really see anything wrong with the current implementation.

I was able to fully sync on both the RPi4 and a RPI3+ with the number of semaphores set to just DTHREADS and was able to go reasonably high with the number of DTHREADS and the LIMIT.

With the original limit of DTHREADS * LIMIT it was probably more of a settings not taking effect yet problem as i basically was always running with a limit of 45 Semaphores each pulling 15 items, so theoretically it was still pulling over a third of my library into memory

@mammo0
Copy link
Contributor Author

mammo0 commented Oct 20, 2020

I was able to fully sync on both the RPi4 and a RPI3+ with the number of semaphores set to just DTHREADS and was able to go reasonably high with the number of DTHREADS and the LIMIT.

I'm glad that you found a working combination.
So maybe the best is to set the semaphore size to LIMIT Edit: Sorry, of course I meant DTHREADS.

@oddstr13 @mcarlton00:
Maybe I've got some time on coming Sunday. Then I can try to implement parallel item processing and fetching with threading.Thread with queue.Queue. The amount of threads can then be set individually.
But will only do this if it's supported by the plugin maintainers.

@oddstr13
Copy link
Member

Whatever solution fixing the problem while keeping or improving maintainability is fine with me. 😃

@mammo0
Copy link
Contributor Author

mammo0 commented Oct 25, 2020

After spending my afternoon reading SQLite and python documentation about multi threading, I must consider that implementing my approach is not as easy as I thought. So @mcarlton00 is right here and I was overthinking this.

So the easiest will be as @oddstr13 said:

My current thinking is either remove threadhing here, or use just a single thread, depending on how the performance is on something low-end (I think Pi3 is a reasonable benchmark device here?).
How fast it syncs on a higher end system such as my desktop is less of a concern, as this will be faster in any case.

Just remove the multithreading for fetching items from the database. Or do you have another idea for a better syncing performance?

If not, I think we can close this PR. Thanks for your assistance :)

@sonarcloud
Copy link

sonarcloud bot commented Nov 2, 2020

Kudos, SonarCloud Quality Gate passed!

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities (and Security Hotspot 0 Security Hotspots to review)
Code Smell A 1 Code Smell

No Coverage information No Coverage information
0.0% 0.0% Duplication

@mammo0
Copy link
Contributor Author

mammo0 commented Nov 2, 2020

I think with this change it does appear to fix the problem. I finally managed to reproduce the memory exhaustion on one of my Pis, but after implementing this fix and changing it to thread_buffer = threading.Semaphore(DTHREADS) it allowed the scan to finish and only hovered around 25% memory usage.

@mcarlton00 I've added this change and rebased everything to the current master. Feel free to merge if it's ok for you.

@phyzical
Copy link

phyzical commented Nov 2, 2020

hey @mammo0 is there a build of all this i can try or should this build contain the relevant changes?

Edit: nvm found my way to the azure pipeline. Am currently running a full sync on a 1 gb, pi 2b. and so far so good. it has made it passed the previous milestones without any swap

@mcarlton00
Copy link
Member

1.5 confirmations that it works, good enough for me. If there's other issues I'm sure we'll find them over time.

@mcarlton00 mcarlton00 merged commit 7d792ab into jellyfin:master Nov 2, 2020
@mammo0 mammo0 deleted the fix_for_#350 branch November 2, 2020 15:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants