Skip to content

Commit

Permalink
Merge pull request #410 from mammo0/fix_for_#350
Browse files Browse the repository at this point in the history
Use a semaphore to avoid fetching complete library to memory
  • Loading branch information
mcarlton00 committed Nov 2, 2020
2 parents 75469b1 + dd7bca7 commit 7d792ab
Showing 1 changed file with 46 additions and 24 deletions.
70 changes: 46 additions & 24 deletions jellyfin_kodi/downloader.py
Expand Up @@ -7,7 +7,7 @@
import concurrent.futures
from datetime import date

from six.moves import range, queue as Queue, zip
from six.moves import range, queue as Queue

from kodi_six import xbmc
import requests
Expand Down Expand Up @@ -280,29 +280,51 @@ def get_query_params(params, start, count):
# complete all tasks before allowing any results to be processed. ThreadPoolExecutor
# allows for completed tasks to be processed while other tasks are completed on other
# threads. Dont be a dummy.Pool, be a ThreadPoolExecutor
p = concurrent.futures.ThreadPoolExecutor(dthreads)

results = p.map(lambda params: _get(url, params, server_id=server_id), query_params)

for params, result in zip(query_params, results):
query['params'] = params

result = result or {'Items': []}

# Mitigates #216 till the server validates the date provided is valid
if result['Items'][0].get('ProductionYear'):
try:
date(result['Items'][0]['ProductionYear'], 1, 1)
except ValueError:
LOG.info('#216 mitigation triggered. Setting ProductionYear to None')
result['Items'][0]['ProductionYear'] = None

items['Items'].extend(result['Items'])
# Using items to return data and communicate a restore point back to the callee is
# a violation of the SRP. TODO: Seperate responsibilities.
items['RestorePoint'] = query
yield items
del items['Items'][:]
with concurrent.futures.ThreadPoolExecutor(dthreads) as p:
# dictionary for storing the jobs and their results
jobs = {}

# semaphore to avoid fetching complete library to memory
thread_buffer = threading.Semaphore(dthreads)

# wrapper function for _get that uses a semaphore
def get_wrapper(params):
thread_buffer.acquire()
return _get(url, params, server_id=server_id)

# create jobs
for param in query_params:
job = p.submit(get_wrapper, param)
# the query params are later needed again
jobs[job] = param

# process complete jobs
for job in concurrent.futures.as_completed(jobs):
# get the result
result = job.result() or {'Items': []}
query['params'] = jobs[job]

# free job memory
del jobs[job]
del job

# Mitigates #216 till the server validates the date provided is valid
if result['Items'][0].get('ProductionYear'):
try:
date(result['Items'][0]['ProductionYear'], 1, 1)
except ValueError:
LOG.info('#216 mitigation triggered. Setting ProductionYear to None')
result['Items'][0]['ProductionYear'] = None

items['Items'].extend(result['Items'])
# Using items to return data and communicate a restore point back to the callee is
# a violation of the SRP. TODO: Seperate responsibilities.
items['RestorePoint'] = query
yield items
del items['Items'][:]

# release the semaphore again
thread_buffer.release()


class GetItemWorker(threading.Thread):
Expand Down

0 comments on commit 7d792ab

Please sign in to comment.