Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a semaphore to avoid fetching complete library to memory #410

Merged
merged 5 commits into from Nov 2, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 46 additions & 24 deletions jellyfin_kodi/downloader.py
Expand Up @@ -7,7 +7,7 @@
import concurrent.futures
from datetime import date

from six.moves import range, queue as Queue, zip
from six.moves import range, queue as Queue

from kodi_six import xbmc
import requests
Expand Down Expand Up @@ -280,29 +280,51 @@ def get_query_params(params, start, count):
# complete all tasks before allowing any results to be processed. ThreadPoolExecutor
# allows for completed tasks to be processed while other tasks are completed on other
# threads. Dont be a dummy.Pool, be a ThreadPoolExecutor
p = concurrent.futures.ThreadPoolExecutor(dthreads)

results = p.map(lambda params: _get(url, params, server_id=server_id), query_params)

for params, result in zip(query_params, results):
query['params'] = params

result = result or {'Items': []}

# Mitigates #216 till the server validates the date provided is valid
if result['Items'][0].get('ProductionYear'):
try:
date(result['Items'][0]['ProductionYear'], 1, 1)
except ValueError:
LOG.info('#216 mitigation triggered. Setting ProductionYear to None')
result['Items'][0]['ProductionYear'] = None

items['Items'].extend(result['Items'])
# Using items to return data and communicate a restore point back to the callee is
# a violation of the SRP. TODO: Seperate responsibilities.
items['RestorePoint'] = query
yield items
del items['Items'][:]
with concurrent.futures.ThreadPoolExecutor(dthreads) as p:
# dictionary for storing the jobs and their results
jobs = {}

# semaphore to avoid fetching complete library to memory
thread_buffer = threading.Semaphore(dthreads)

# wrapper function for _get that uses a semaphore
def get_wrapper(params):
thread_buffer.acquire()
return _get(url, params, server_id=server_id)

# create jobs
for param in query_params:
job = p.submit(get_wrapper, param)
# the query params are later needed again
jobs[job] = param

# process complete jobs
for job in concurrent.futures.as_completed(jobs):
# get the result
result = job.result() or {'Items': []}
query['params'] = jobs[job]

# free job memory
del jobs[job]
del job

# Mitigates #216 till the server validates the date provided is valid
if result['Items'][0].get('ProductionYear'):
try:
date(result['Items'][0]['ProductionYear'], 1, 1)
except ValueError:
LOG.info('#216 mitigation triggered. Setting ProductionYear to None')
result['Items'][0]['ProductionYear'] = None

items['Items'].extend(result['Items'])
# Using items to return data and communicate a restore point back to the callee is
# a violation of the SRP. TODO: Seperate responsibilities.
items['RestorePoint'] = query
yield items
del items['Items'][:]

# release the semaphore again
thread_buffer.release()


class GetItemWorker(threading.Thread):
Expand Down