Skip to content

Commit

Permalink
Use nzo.deleted instead of nzo.is_gone to prevent assembly during pp
Browse files Browse the repository at this point in the history
Relates to #2059, #2054 and #1509.
  • Loading branch information
Safihre committed Feb 7, 2022
1 parent 43e47e6 commit 171f049
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 46 deletions.
6 changes: 2 additions & 4 deletions sabnzbd/articlecache.py
Expand Up @@ -85,9 +85,7 @@ def space_left(self) -> bool:
def save_article(self, article: Article, data: bytes):
"""Save article in cache, either memory or disk"""
nzo = article.nzf.nzo
if nzo.is_gone():
# Do not discard this article because the
# file might still be processed at this moment!!
if nzo.deleted:
return

# Register article for bookkeeping in case the job is deleted
Expand Down Expand Up @@ -163,7 +161,7 @@ def purge_articles(self, articles: List[Article]):
@staticmethod
def __flush_article_to_disk(article: Article, data):
nzo = article.nzf.nzo
if nzo.is_gone():
if nzo.deleted:
# Don't store deleted jobs
return

Expand Down
2 changes: 1 addition & 1 deletion sabnzbd/assembler.py
Expand Up @@ -110,7 +110,7 @@ def run(self):
self.assemble(nzf, file_done)
except IOError as err:
# If job was deleted or in active post-processing, ignore error
if not nzo.deleted and not nzo.is_gone() and not nzo.pp_active:
if not nzo.deleted and not nzo.pp_active:
# 28 == disk full => pause downloader
if err.errno == 28:
logging.error(T("Disk full! Forcing Pause"))
Expand Down
2 changes: 1 addition & 1 deletion sabnzbd/downloader.py
Expand Up @@ -952,7 +952,7 @@ def __reset_nw(
# Make sure it is not in the readable sockets
self.remove_socket(nw)

if nw.article:
if nw.article and not nw.article.nzf.nzo.deleted:
# Only some errors should count towards the total tries for each server
if count_article_try:
nw.article.tries += 1
Expand Down
62 changes: 32 additions & 30 deletions sabnzbd/nzbqueue.py
Expand Up @@ -239,7 +239,7 @@ def save(self, save_nzo: Union[NzbObject, None, bool] = None):
nzo_ids = []
# Aggregate nzo_ids and save each nzo
for nzo in self.__nzo_list[:]:
if not nzo.is_gone():
if not nzo.deleted:
nzo_ids.append(os.path.join(nzo.work_name, nzo.nzo_id))
if save_nzo is None or nzo is save_nzo:
if not nzo.futuretype:
Expand Down Expand Up @@ -396,7 +396,7 @@ def remove(self, nzo_id: str, cleanup: bool = True, delete_all_data: bool = True

# Set statuses
nzo.deleted = True
if cleanup and not nzo.is_gone():
if cleanup:
nzo.status = Status.DELETED
self.__nzo_list.remove(nzo)

Expand Down Expand Up @@ -737,39 +737,41 @@ def register_article(self, article: Article, success: bool = True):
nzf = article.nzf
nzo = nzf.nzo

if nzf.deleted:
logging.debug("Discarding article %s, no longer in queue", article.article)
if nzo.deleted or nzf.deleted:
logging.debug("No further processing of article for file %s, no longer in queue", nzf.filename)
# If this file is needed later (par2 file added back to queue), it would be damaged because
# we discard this article. So we reset it to be picked up again if needed.
# But not reset all articles, as it could cause problems for articles still attached to a server.
article.reset_try_list()
nzf.reset_try_list()
return

articles_left, file_done, post_done = nzo.remove_article(article, success)

if nzo.is_gone():
logging.debug("Discarding article for file %s, no longer in queue", nzf.filename)
else:
# Write data if file is done or at trigger time
if file_done or (articles_left and (articles_left % DIRECT_WRITE_TRIGGER) == 0):
if not nzo.precheck:
# Only start decoding if we have a filename and type
# The type is only set if sabyenc could decode the article
if nzf.filename and nzf.type:
sabnzbd.Assembler.process(nzo, nzf, file_done)
elif nzf.filename.lower().endswith(".par2"):
# Broken par2 file, try to get another one
nzo.promote_par2(nzf)

# Save bookkeeping in case of crash
if file_done and (nzo.next_save is None or time.time() > nzo.next_save):
nzo.save_to_disk()
sabnzbd.BPSMeter.save()
if nzo.save_timeout is None:
nzo.next_save = None
else:
nzo.next_save = time.time() + nzo.save_timeout
# Write data if file is done or at trigger time
if file_done or (articles_left and (articles_left % DIRECT_WRITE_TRIGGER) == 0):
if not nzo.precheck:
# Only start decoding if we have a filename and type
# The type is only set if sabyenc could decode the article
if nzf.filename and nzf.type:
sabnzbd.Assembler.process(nzo, nzf, file_done)
elif nzf.filename.lower().endswith(".par2"):
# Broken par2 file, try to get another one
nzo.promote_par2(nzf)

# Save bookkeeping in case of crash
if file_done and (nzo.next_save is None or time.time() > nzo.next_save):
nzo.save_to_disk()
sabnzbd.BPSMeter.save()
if nzo.save_timeout is None:
nzo.next_save = None
else:
nzo.next_save = time.time() + nzo.save_timeout

# Remove post from Queue
if post_done:
nzo.set_download_report()
self.end_job(nzo)
# Remove post from Queue
if post_done:
nzo.set_download_report()
self.end_job(nzo)

@NzbQueueLocker
def end_job(self, nzo: NzbObject):
Expand Down
17 changes: 7 additions & 10 deletions sabnzbd/nzbstuff.py
Expand Up @@ -1411,7 +1411,7 @@ def set_final_name_and_scan_password(self, name, password=None):
def pause(self):
self.status = Status.PAUSED
# Prevent loss of paused state when terminated
if self.nzo_id and not self.is_gone():
if self.nzo_id and not self.deleted:
self.save_to_disk()

def resume(self):
Expand Down Expand Up @@ -1459,6 +1459,7 @@ def remove_extrapar(self, parfile: NzbFile):
def prospective_add(self, nzf: NzbFile):
"""Add par2 files to compensate for missing articles"""
# Get some blocks!
return
if not nzf.is_par2:
for parset in self.extrapars:
# Due to strong obfuscation on article-level the parset could have a different name
Expand Down Expand Up @@ -1612,7 +1613,7 @@ def get_articles(self, server: Server, servers: List[Server], fetch_limit: int)
if sabnzbd.Downloader.highest_server(server):
nzf.finish_import()
# Still not finished? Something went wrong...
if not nzf.import_finished and not self.is_gone():
if not nzf.import_finished and not self.deleted:
logging.error(T("Error importing %s"), nzf)
nzf_remove_list.append(nzf)
nzf.nzo.status = Status.PAUSED
Expand Down Expand Up @@ -1910,7 +1911,7 @@ def set_action_line(self, action: Optional[str] = None, msg: Optional[str] = Non
def save_to_disk(self):
"""Save job's admin to disk"""
self.save_attribs()
if self.nzo_id and not self.is_gone():
if self.nzo_id and not self.deleted:
sabnzbd.filesystem.save_data(self, self.nzo_id, self.admin_path)

def save_attribs(self):
Expand All @@ -1926,7 +1927,7 @@ def load_attribs(self) -> Tuple[Optional[str], Optional[int], Optional[str]]:
attribs = sabnzbd.filesystem.load_data(ATTRIB_FILE, self.admin_path, remove=False)
logging.debug("Loaded attributes %s for %s", attribs, self.final_name)

# If attributes file somehow does not exists
# If attributes file somehow does not exist
if not attribs:
return None, None, None

Expand All @@ -1940,7 +1941,7 @@ def load_attribs(self) -> Tuple[Optional[str], Optional[int], Optional[str]]:
return attribs["cat"], attribs["pp"], attribs["script"]

@synchronized(NZO_LOCK)
def build_pos_nzf_table(self, nzf_ids):
def build_pos_nzf_table(self, nzf_ids: List[str]) -> Dict[int, NzbFile]:
pos_nzf_table = {}
for nzf_id in nzf_ids:
if nzf_id in self.files_table:
Expand All @@ -1951,7 +1952,7 @@ def build_pos_nzf_table(self, nzf_ids):
return pos_nzf_table

@synchronized(NZO_LOCK)
def cleanup_nzf_ids(self, nzf_ids):
def cleanup_nzf_ids(self, nzf_ids: List[str]):
for nzf_id in nzf_ids[:]:
if nzf_id in self.files_table:
if self.files_table[nzf_id] not in self.files:
Expand Down Expand Up @@ -2007,10 +2008,6 @@ def has_duplicates(self) -> Tuple[bool, bool]:

return res, series

def is_gone(self):
"""Is this job still going somehow?"""
return self.status in (Status.COMPLETED, Status.DELETED, Status.FAILED)

def __getstate__(self):
"""Save to pickle file, selecting attributes"""
dict_ = {}
Expand Down

0 comments on commit 171f049

Please sign in to comment.