Skip to content

Commit

Permalink
Fix some issues with the download refactoring in Spices.py (#7549)
Browse files Browse the repository at this point in the history
* Spices.py: use `with lock` instead of `lock.acquire` to ensure a lock always gets removed

* Spices.py: updaate progress: only use the total downloads to calculate the fraction when there is more than one
This prevents a divide-by-zero under certain circumstances, and gives more meaningfull progress information.

* Spices.py: add a timeout to urlopen to prevent a hang when the connection is interrupted mid-download

* Spices.py: don't open a file stream until we're ready, and use the with operator to ensure that we don't leak file descriptors
  • Loading branch information
collinss authored and clefebvre committed May 6, 2018
1 parent 23fa9ed commit 2e34f5b
Showing 1 changed file with 31 additions and 40 deletions.
71 changes: 31 additions & 40 deletions files/usr/share/cinnamon/cinnamon-settings/bin/Spices.py
Expand Up @@ -80,9 +80,8 @@ def busy(self):
return len(self.jobs) > 0 or len(self.threads) > 0

def push(self, func, callback, data):
self.lock.acquire()
self.jobs.insert(0, (func, callback, data))
self.lock.release()
with self.lock:
self.jobs.insert(0, (func, callback, data))

if self.start_id == 0:
self.start_id = GLib.idle_add(self.check_start_job)
Expand All @@ -93,11 +92,10 @@ def check_start_job(self):
if len(self.threads) == self.max_threads:
return

self.lock.acquire()
job = self.jobs.pop()
newthread = threading.Thread(target=self.thread_function_wrapper, args=job)
self.threads.append(newthread)
self.lock.release()
with self.lock:
job = self.jobs.pop()
newthread = threading.Thread(target=self.thread_function_wrapper, args=job)
self.threads.append(newthread)

newthread.start()

Expand All @@ -106,16 +104,14 @@ def check_start_job(self):
def thread_function_wrapper(self, func, callback, data):
result = func(*data)

self.lock.acquire()
try:
self.threads.remove(threading.current_thread())
except:
pass

if self.abort_status and not self.busy():
self.abort_status = False
with self.lock:
try:
self.threads.remove(threading.current_thread())
except:
pass

self.lock.release()
if self.abort_status and not self.busy():
self.abort_status = False

self.check_start_job()

Expand All @@ -124,10 +120,9 @@ def thread_function_wrapper(self, func, callback, data):

def abort(self):
if self.busy():
self.lock.acquire()
self.abort_status = True
del self.jobs[:]
self.lock.release()
with self.lock:
self.abort_status = True
del self.jobs[:]

class Spice_Harvester(GObject.Object):
__gsignals__ = {
Expand Down Expand Up @@ -300,7 +295,7 @@ def _set_progressbar_visible(self, visible):

# updates any progress bars with the download progress
def _update_progress(self, count, blockSize, totalSize):
if self.download_manager.busy():
if self.download_manager.busy() and self.download_total_files > 1:
total = self.download_total_files
current = total - self.download_manager.get_n_jobs()
fraction = float(current) / float(total)
Expand Down Expand Up @@ -363,28 +358,31 @@ def _advance_queue(self):
print(e)
self._directory_changed()

def _download(self, outfd, outfile, url, binary=True):
def _download(self, out_file, url, binary=True):
try:
self._url_retrieve(url, outfd, self._update_progress, binary)
open_args = 'wb' if binary else 'w'
with open(out_file, open_args) as outfd:
self._url_retrieve(url, outfd, self._update_progress, binary)
except Exception as e:
try:
os.remove(outfile)
os.remove(out_file)
except OSError:
pass
if not isinstance(e, KeyboardInterrupt):
if not isinstance(e, KeyboardInterrupt) and not self.download_manager.abort_status:
self.errorMessage(_("An error occurred while trying to access the server. Please try again in a little while."), e)
self.abort()
return None

return outfile
return out_file

def _url_retrieve(self, url, f, reporthook, binary):
def _url_retrieve(self, url, outfd, reporthook, binary):
#Like the one in urllib. Unlike urllib.retrieve url_retrieve
#can be interrupted. KeyboardInterrupt exception is raised when
#interrupted.
count = 0
blockSize = 1024 * 8
try:
with urlopen(url) as urlobj:
with urlopen(url, timeout=15) as urlobj:
assert urlobj.getcode() == 200

totalSize = int(urlobj.info()['content-length'])
Expand All @@ -396,15 +394,11 @@ def _url_retrieve(self, url, f, reporthook, binary):
break
if not binary:
data = data.decode("utf-8")
f.write(data)
outfd.write(data)
ui_thread_do(reporthook, count, blockSize, totalSize)
except Exception as e:
self.abort()
f.close()
raise e

f.close()

def _load_metadata(self):
self.meta_map = {}

Expand Down Expand Up @@ -521,8 +515,7 @@ def _download_cache(self, load_assets=True):
download_url = URL_MAP[self.collection_type]

filename = os.path.join(self.cache_folder, "index.json")
f = open(filename, 'w')
if self._download(f, filename, download_url, binary=False) is None:
if self._download(filename, download_url, binary=False) is None:
return

self._load_cache()
Expand All @@ -549,8 +542,7 @@ def _download_image_cache(self):

# if the image doesn't exist, is corrupt, or may have changed we want to download it
if not os.path.isfile(icon_path) or self._is_bad_image(icon_path) or self.old_cache[uuid]["last_edited"] != self.index_cache[uuid]["last_edited"]:
fstream = open(icon_path, 'w+b')
self.download_manager.push(self._download, self._check_download_image_cache_complete, (fstream, icon_path, download_url))
self.download_manager.push(self._download, self._check_download_image_cache_complete, (icon_path, download_url))
self.download_total_files += 1

ui_thread_do(self._check_download_image_cache_complete)
Expand Down Expand Up @@ -607,9 +599,8 @@ def _install(self, job):
self.current_uuid = uuid

fd, ziptempfile = tempfile.mkstemp()
f = os.fdopen(fd, 'wb')

if self._download(f, ziptempfile, download_url) is None:
if self._download(ziptempfile, download_url) is None:
return

try:
Expand Down

0 comments on commit 2e34f5b

Please sign in to comment.