# About
This notebook is unorganized but included in case it proves useful.  
It was used to manage todo lists of SRA runs to be processed.

# Prereqs

In [202]:
import collections
import configparser
import csv
import gzip
import math
import pathlib
import random
import subprocess
import time
import xml.etree.ElementTree

In [227]:
from html_table import Table, Row, Cell

In [142]:
MAIN_DIR = pathlib.Path('/nfs/brubeck.bx.psu.edu/scratch4/nick/overlaps/auto/ecoli')
CSV_PATH = MAIN_DIR/'sra.runinfo3.csv.gz'
XML_PATH = MAIN_DIR/'sra.docsum2.xml.gz'

## Reading metadata

In [3]:
def read_runinfo(csv_path):
  start = time.perf_counter()
  int_fields = {'spots', 'spots_with_mates', 'avgLength', 'size_MB', 'InsertSize', 'InsertDev', 'ProjectID', 'TaxID'}
  runs = {}
  header = {}
  empty_lines = 0
  header_lines = 0
  try:
    with gzip.open(csv_path, 'rt') as csv_file:
      for row in csv.reader(csv_file):
        if not header:
          header_lines += 1
          for i, value in enumerate(row):
            header[i] = value
          RunInfo = collections.namedtuple('RunInfo', header.values())
          continue
        if len(row) == 0:
          empty_lines += 1
          continue
        assert len(row) == len(header), (len(row), row)
        header_matches = 0
        value_dict = {}
        for i, raw_value in enumerate(row):
          if raw_value == header[i]:
            header_matches += 1
          else:
            if header[i] in int_fields:
              try:
                value = int(raw_value)
              except ValueError:
                if header[i] == 'InsertDev':
                  value = float(raw_value)
                else:
                  print(f'{header[i]}: {raw_value!r}', file=sys.stderr)
            else:
              value = raw_value
          value_dict[header[i]] = value
        if header_matches > 5:
          header_lines += 1
          continue
        runs[value_dict['Run']] = RunInfo(**value_dict)
  except EOFError:
    print('Incomplete gzip.', file=sys.stderr)
  elapsed = time.perf_counter() - start
  print(f'{len(runs)} runs in {elapsed:0.1f} seconds (headers: {header_lines}, empty lines: {empty_lines})')
  return runs

In [4]:
def read_xml(xml_path):
  start = time.perf_counter()
  summaries = {}
  with gzip.open(xml_path) as xml_file:
    # Note: This takes about 1-2GB of memory.
    tree = xml.etree.ElementTree.parse(xml_file)
  docsum = tree.getroot()
  for exp in docsum:
    for run in exp.find('./Runs'):
      acc = run.attrib['acc']
      summaries[acc] = exp
  elapsed = time.perf_counter() - start
  print(f'{len(summaries)} experiments in {round(elapsed)} seconds')
  return summaries

In [5]:
def rm_prefix(string, prefix):
  if string.startswith(prefix):
    return string[len(prefix):]
  else:
    return string

In [93]:
def truncate(string, max_len):
  if len(string) > max_len:
    return string[:max_len-1]+'…'
  else:
    return string

## Read in the metadata

In [6]:
RUNS = read_runinfo(CSV_PATH)

186022 runs in 11.7 seconds (headers: 1, empty lines: 0)


In [7]:
SUMMARIES = read_xml(XML_PATH)

186022 experiments in 43 seconds


## Reading todo/launched/done lists

In [8]:
def read_file_as_list(list_path):
  line_list = []
  with list_path.open() as list_file:
    for line_raw in list_file:
      line_list.append(line_raw.strip())
  return line_list

In [9]:
def subtract_lists(list1: list, *lists) -> list:
  """Return a copy of `list1` but remove any items present in the other lists."""
  new_list = []
  remove_these: Set[list] = set()
  for list2 in lists:
    remove_these |= set(list2)
  for item in list1:
    if item not in remove_these:
      new_list.append(item)
  return new_list

## Overlap calculations

In [10]:
def get_overlap(run, null=None, neg=True):
  readlen = run.avgLength//2
  if run.InsertSize == 0 or readlen == 0:
    return null
  overlap = min(readlen, run.InsertSize, readlen*2 - run.InsertSize)
  if neg:
    return overlap
  else:
    return max(0, overlap)

In [11]:
def sort_key(acc):
  """Get a sort key based on the overlap metadata."""
  run = RUNS[acc]
  if run.avgLength == 0:
    has_len = 0
  else:
    has_len = 1
  if run.InsertSize == 0:
    has_size = 0
  else:
    has_size = 1
  if run.InsertDev == 0:
    has_dev = 0
  else:
    has_dev = 1
  return has_len, has_size, has_dev, get_overlap(run), -run.InsertDev

## Reading group metadata

In [97]:
ABBREVIATIONS = {
  'European':'Euro.', 'Nucleotide':'Nuc.', 'Institute':'Inst.', 'Technology':'Tech.', 'The Pennsylvania':'Penn',
  'University':'Univ.', 'National':'Nat.', 'Department':'Dept.', 'Technological':'Tech.', 'Laboratory':'Lab.',
  'Biotechnology':'Biotech.',
}
class Study(collections.namedtuple('Study', ('title', 'study', 'center', 'lab', 'contact'))):
  __slots__ = ()
  @property
  def submitter(self):
    return (self.center, self.lab, self.contact)
  @classmethod
  def from_accession(cls, accession, summaries=SUMMARIES):
    try:
      experiment = summaries[accession]
    except KeyError:
      sys.stderr.write(f'Warning: Could not find XML summary for {accession}\n')
      raise
    return cls.from_experiment(experiment)
  @classmethod
  def from_experiment(cls, experiment):
    data = {'title':None, 'study':None, 'center':None, 'lab':None, 'contact':None}
    if not experiment:
      return cls(**data)
    data['title'] = experiment.find('./ExpXml/Summary/Title').text
    study_elem = experiment.find('./ExpXml/Study')
    data['study'] = study_elem.attrib.get('name')
    subm_elem = experiment.find('./ExpXml/Submitter')
    for field in 'center', 'lab', 'contact':
      data[field] = subm_elem.attrib.get(field+'_name')
    return cls(**data)
  def format_fields(self, max_len=None, null='?'):
    """Return a copy with the fields formatted for human reading."""
    strs = {}
    for field in self._fields:
      value = raw_value = getattr(self, field)
      if raw_value is None:
        value = null
      strs[field] = value
    # This is a common prefix that's lengthy and not too informative.
    strs['title'] = rm_prefix(strs['title'], 'Illumina MiSeq paired end sequencing; ')
    for field in 'center', 'lab', 'contact':
      value = raw_value = strs[field]
      # If it's all uppercase, make it titlecased to be easier to read.
      if len(raw_value) > 16 and raw_value == raw_value.upper():
        value = raw_value.title()
      for long, short in ABBREVIATIONS.items():
        value = value.replace(long, short)
      strs[field] = value
    if max_len is not None:
      for field, value in strs.items():
        strs[field] = truncate(value, max_len)
    return type(self)(**strs)

## Checking run completion

### Dealing with `progress.ini`

In [149]:
PROGRESS_TYPES = {
  'step':int, 'when':int, 'timestamp':int,
  'start_step':int, 'start_time':int, 'end_step':int, 'end_time':int, 'commit_time':int,
}
def read_progress(progress_path):
  raw_progress = read_config(progress_path, PROGRESS_TYPES)
  return convert_progress(raw_progress)

In [148]:
def read_config(config_path, types):
  data = {}
  config = configparser.ConfigParser(interpolation=None)
  try:
    config.read(config_path)
    for section in config.sections():
      for key, raw_value in config.items(section):
        if types and key in types:
          value = types[key](raw_value)
        else:
          value = raw_value
        try:
          data[section][key] = value
        except KeyError:
          data[section] = {key:value}
  except configparser.Error:
    logging.critical(f'Error: Invalid config file format in {config_path!r}.')
    raise
  return data

In [154]:
def convert_progress(progress):
  """Convert old progress structure to the new one, if necessary."""
  if any([section.startswith('run') for section in progress.keys()]):
    # It's the new format.
    return progress
  mapping = {
    ('start', 'step'): 'start_step',
    ('start', 'when'): 'start_time',
    ('end', 'step'): 'end_step',
    ('end', 'when'): 'end_time',
    ('version', 'timestamp'): 'commit_time',
    ('version', 'commit'): 'commit',
  }
  run0 = {}
  for section_name, section in progress.items():
    for key, value in section.items():
      new_key = mapping.get((section_name, key), f'{section_name}_{key}')
      run0[new_key] = value
  if run0:
    return {'run0':run0}
  else:
    return {}

In [182]:
def get_last_step(progress):
  section = get_last_section(progress)
  if section is not None:
    return section.get('end_step')

In [159]:
def get_last_section(progress):
  last_run = None
  last_section = None
  for name, section in progress.items():
    if name.startswith('run'):
      run = int(name[3:])
      if last_run is None or run > last_run:
        last_run = run
        last_section = section
  return last_section

In [179]:
def get_version(progress):
  section = get_last_section(progress)
  commit_time = section.get('commit_time')
  start_time = section.get('start_time')
  if (commit_time and commit_time < 1577854800) or (start_time and start_time < 1577854800):
    return 'old'
  else:
    return 'new'

### Checking other signals of completion

In [164]:
OUTFILES = (                 # Step
  'reads_1.fastq',           # 1
  'align.auto.bam',          # 2
  'errors.tsv',              # 3
  'seq-context.tsv',         # 4
  'seq-context-summary.tsv', # 5
  'analysis.tsv',            # 6
)

In [172]:
def get_last_step_by_file(run_dir, version='new'):
  """Look at the output files to see how far the pipeline actually got."""
  confirmed_step = None
  for step, filename in enumerate(OUTFILES, 1):
    path = run_dir/filename
    if path.is_file() and os.path.getsize(path) > 0:
      if version == 'old' and step == 6:
        confirmed_step = 5
      else:
        confirmed_step = step
  return confirmed_step

In [203]:
def found_no_errors(run_dir):
  summary_path = run_dir/'errors.summary.tsv'
  if summary_path.is_file():
    with summary_path.open() as summary_file:
      for line_num, line_raw in enumerate(summary_file):
        if line_num == 2:
          fields = line_raw.split('\t')
          if fields[0] == '0':
            return True
  else:
    cmd = ('grep', '-c', '^error', run_dir/'errors.tsv')
    result = subprocess.run(cmd, stdout=subprocess.PIPE, encoding='utf8')
    if result.stdout == '0\n':
      return True
  return False

In [211]:
def is_done(run_dir):
  progress = read_progress(run_dir/'progress.ini')
  version = get_version(progress)
  last_step_recorded = get_last_step(progress)
  last_step_seen = get_last_step_by_file(run_dir, version=version)
  if last_step_recorded is None or last_step_seen is None:
    return False
  last_step = min(last_step_recorded, last_step_seen)
  if last_step == 3:
    if found_no_errors(run_dir):
      return True
  if version == 'old':
    return last_step == 5
  else:
    return last_step == 6
  return False

## Display

In [128]:
def display_runs(accessions, max_len=None, runs=RUNS, summaries=SUMMARIES):
  rows = []
  for acc in accessions[:20]:
    run = runs[acc]
    model = rm_prefix(run.Model, 'Illumina ')
    study = Study.from_accession(acc, summaries=summaries).format_fields(max_len=max_len)
    rlen = run.avgLength//2
    row = (acc, model, run.size_MB, get_overlap(run), rlen, run.InsertSize, study.center, study.lab, study.contact)
    rows.append(row)
  header = (
    'Accession', 'Model', 'Size<br>(MB)', 'Overlap', 'Read<br>Length', 'Insert<br>Size',
    'Center', 'Lab', 'Contact'
  )
  return Table(rows, header=header)

# Generate new todo list

## Read in lists

In [13]:
everything = list(RUNS.keys())

In [14]:
launched = read_file_as_list(MAIN_DIR/'meta/launched.txt')
done     = read_file_as_list(MAIN_DIR/'meta/done.txt')

In [16]:
todo_unsorted = subtract_lists(everything, launched, done)

In [17]:
len(todo_unsorted)

184264

In [121]:
display_runs(todo_unsorted[:12]).render()

Accession,Model,Size (MB),Overlap,Read Length,Insert Size,Center,Lab,Contact
SRR12549494,NovaSeq 6000,123,,149,0,Univ. of Illinois at Urbana-Champaign,Civil and Environmental Engineering,Yue Xing
SRR12549500,NovaSeq 6000,140,,149,0,Univ. of Illinois at Urbana-Champaign,Civil and Environmental Engineering,Yue Xing
SRR12549499,NovaSeq 6000,132,,149,0,Univ. of Illinois at Urbana-Champaign,Civil and Environmental Engineering,Yue Xing
SRR12549498,NovaSeq 6000,137,,149,0,Univ. of Illinois at Urbana-Champaign,Civil and Environmental Engineering,Yue Xing
SRR12549493,NovaSeq 6000,133,,149,0,Univ. of Illinois at Urbana-Champaign,Civil and Environmental Engineering,Yue Xing
SRR12549492,NovaSeq 6000,121,,149,0,Univ. of Illinois at Urbana-Champaign,Civil and Environmental Engineering,Yue Xing
SRR12549497,NovaSeq 6000,139,,149,0,Univ. of Illinois at Urbana-Champaign,Civil and Environmental Engineering,Yue Xing
SRR12549496,NovaSeq 6000,139,,149,0,Univ. of Illinois at Urbana-Champaign,Civil and Environmental Engineering,Yue Xing
SRR12549495,NovaSeq 6000,120,,149,0,Univ. of Illinois at Urbana-Champaign,Civil and Environmental Engineering,Yue Xing
SRR12541342,HiSeq 2500,278,,148,0,PHE,,


## Sort by overlap length

In [19]:
todo_overlap_sorted = sorted(todo_unsorted, key=sort_key, reverse=True)

In [122]:
display_runs(todo_overlap_sorted[:15]).render()

Accession,Model,Size (MB),Overlap,Read Length,Insert Size,Center,Lab,Contact
SRR1217262,HiSeq 2500,196,200,239,200,Penn State Univ.,Mwangi Lab,Juan Antonio Raygoza Garay
SRR1217264,HiSeq 2500,180,200,239,200,Penn State Univ.,Mwangi Lab,Juan Antonio Raygoza Garay
SRR1217273,HiSeq 2500,182,200,239,200,Penn State Univ.,Mwangi Lab,Juan Antonio Raygoza Garay
SRR1217276,HiSeq 2500,175,200,239,200,Penn State Univ.,Mwangi Lab,Juan Antonio Raygoza Garay
SRR1217277,HiSeq 2500,193,200,239,200,Penn State Univ.,Mwangi Lab,Juan Antonio Raygoza Garay
SRR1217278,HiSeq 2500,164,200,239,200,Penn State Univ.,Mwangi Lab,Juan Antonio Raygoza Garay
SRR1217279,HiSeq 2500,173,200,239,200,Penn State Univ.,Mwangi Lab,Juan Antonio Raygoza Garay
SRR1217280,HiSeq 2500,153,200,239,200,Penn State Univ.,Mwangi Lab,Juan Antonio Raygoza Garay
SRR1217281,HiSeq 2500,158,200,239,200,Penn State Univ.,Mwangi Lab,Juan Antonio Raygoza Garay
SRR1217284,HiSeq 2500,155,200,239,200,Penn State Univ.,Mwangi Lab,Juan Antonio Raygoza Garay


## Reorder to diversify by lab group

In [125]:
def get_diverse_runs(accessions, runs=RUNS, summaries=SUMMARIES):
  """Reorder the input list to maximize the diversity in submitting group."""
  # Note: this relies on the new property of dicts to be ordered so that the first group used is the first that
  # appears in the input list.
  start = time.perf_counter()
  acc_by_group = divide_accessions_by_group(accessions, runs=runs, summaries=summaries)
  remaining_groups = {}
  for group, group_list in acc_by_group.items():
    remaining_groups[group] = group_list.copy()
#   print('  loop  new   used  total')
  # Keep looping through the list of groups until all the accessions are used.
  loop = 0
  used = used_last = 0
  while used < len(accessions):
    loop += 1
    newly_finished_groups = []
    for group, group_list in remaining_groups.items():
      yield group_list.pop(0)
      used += 1
      if len(group_list) == 0:
        newly_finished_groups.append(group)
    for group in newly_finished_groups:
      del remaining_groups[group]
#     if is_tenth(loop):
#       new = used - used_last
#       print(f'{loop:6d} {new:4d} {used:6d} {len(accessions):6d}')
    used_last = used
  elapsed = time.perf_counter() - start
  print(f'Done in {round(elapsed)} seconds')
def divide_accessions_by_group(accessions, runs=RUNS, summaries=SUMMARIES):
  """Divide the list of accessions into sublists by submitting group."""
  acc_by_group = collections.defaultdict(list)
  for accession in accessions:
    study = Study.from_accession(accession)
    acc_by_group[study.submitter].append(accession)
  return acc_by_group
def is_tenth(value):
  exp = int(math.log10(value))
  tenth = 10**exp
  return value % tenth == 0

In [126]:
todo_diversified = list(get_diverse_runs(todo_overlap_sorted))

Done in 6 seconds


In [131]:
display_runs(todo_diversified[:15], max_len=34).render()

Accession,Model,Size (MB),Overlap,Read Length,Insert Size,Center,Lab,Contact
SRR1217262,HiSeq 2500,196,200,239,200,Penn State Univ.,Mwangi Lab,Juan Antonio Raygoza Garay
DRR205447,MiSeq,2,168,182,196,NAGOYA,Dept. of Pathophysiological Lab. …,Dept. of Pathophysiological Lab. …
SRR3385574,MiSeq,89,150,300,150,Agricultural Biotech. Center,,Tibor Nagy
ERR1594001,HiSeq 2500,1126,120,149,120,Systems Biology - Harvard Medical…,Euro. Nuc. Archive,Euro. Nuc. Archive
ERR3537783,MiSeq,511,104,250,104,Wadsworth Center,Euro. Nuc. Archive,Euro. Nuc. Archive
SRR1344511,HiSeq 2000,671,101,101,101,Sabanci Univ.,Toprak Lab,Yusuf Tamer
SRR2227327,HiSeq 2500,2100,100,100,100,UCLA,Ren Sun's Lab,Tian-Hao Zhang
ERR779279,HiSeq 2500,1788,100,100,100,,Euro. Nuc. Archive,Euro. Nuc. Archive
SRR6674866,MiSeq,1304,100,300,500,NISC,,Morgan Park
SRR1231704,MiSeq,56,98,233,368,Univ. of Tech. Sydney,Darling Lab,Aaron Darling


## Create some model-specific todo lists

In [138]:
def filter_by_model(accessions, model, runs=RUNS):
  for accession in accessions:
    run = runs[accession]
    this_model = rm_prefix(run.Model, 'Illumina ')
    if this_model == model:
      yield accession

### <font style="background-color:pink">Write to the files</font>

In [143]:
with (MAIN_DIR/'meta/novaseq.todo.txt').open('w') as todo_file:
  for i, accession in enumerate(filter_by_model(todo_diversified, 'NovaSeq 6000')):
    print(accession, file=todo_file)
print(i+1)

1268


In [144]:
with (MAIN_DIR/'meta/miniseq.todo.txt').open('w') as todo_file:
  for i, accession in enumerate(filter_by_model(todo_diversified, 'MiniSeq')):
    print(accession, file=todo_file)
print(i+1)

161


# Misc management

## Clear out launched.txt

In [221]:
# Look at each run in launched.txt and check if it finished.
new_done = []
incomplete = []
start = time.perf_counter()
for i, accession in enumerate(launched, 1):
  if is_done(MAIN_DIR/'runs'/accession):
    new_done.append(accession)
  else:
    incomplete.append(accession)
elapsed = time.perf_counter() - start
print(f'{i:4d} in {round(elapsed):3d} sec: {len(new_done):4d} done, {len(incomplete):3d} incomplete.')

1438 in  98 sec: 1357 done,  81 incomplete.


### <font style="background-color:pink">Write to the files</font>

In [219]:
with (MAIN_DIR/'meta/done.txt').open('a') as done_file:
  for accession in new_done:
    print(accession, file=done_file)

In [220]:
with (MAIN_DIR/'meta/launched.txt').open('w') as launched_file:
  for accession in incomplete:
    print(accession, file=launched_file)

## Reset `launched.txt` to restart new run.
- After realizing I hadn't updated the meta reference file.

In [222]:
launched = read_file_as_list(MAIN_DIR/'meta/launched.txt')
novaseq_todo = read_file_as_list(MAIN_DIR/'meta/todo/novaseq.txt')
miniseq_todo = read_file_as_list(MAIN_DIR/'meta/todo/miniseq.txt')

In [226]:
new_launched = subtract_lists(launched, novaseq_todo, miniseq_todo)
len(new_launched)

0