In [1]:
from pathlib import Path
import json
from urllib.request import urlopen, urlretrieve
from urllib.error import URLError
import tarfile
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

In [2]:
from tqdm import tqdm

In [3]:
from entrez_tools.db.assembly import seq_url_from_ftppath

## Setup

In [4]:
DATESTR = '211115'
NBNAME = DATESTR + '-download-genomes'

In [5]:
tmpdir = Path('tmp')

genomes_dir = tmpdir / 'genomes'
genomes_dir.mkdir(exist_ok=True)

In [6]:
infiles = dict(
    esummaries_archive=Path('data-intermediate/211111-find-genomes/assembly-summaries.tar.gz'),
)

In [20]:
archive_dir = Path('archive')
archive_dir.mkdir(exist_ok=True)

In [22]:
outfiles = dict(
    genomes=archive_dir / '211109-ncbi-representative-genomes.tar.gz',
)

## Load summary data

In [7]:
summaries = dict()

with tarfile.open(infiles['esummaries_archive']) as archive:
    for member in tqdm(archive.getmembers()):
        with archive.extractfile(member) as f:
            summary = json.load(f)
            
        uid = summary['uid']
        assert member.name == uid + '.json'
        assert uid not in summaries
        
        summaries[uid] = summary

100%|██████████| 14388/14388 [00:00<00:00, 15231.38it/s]


In [8]:
len(summaries)

14388

## Find URLs

In [9]:
assembly_info = dict()
skipped = []

for uid, summary in summaries.items():
    ftppath = summary['ftppath_refseq']
    
    if not ftppath:
        skipped.append(uid)
        continue
    
    info = dict(
        acc=summary['assemblyaccession'],
        ftppath=ftppath,
        url=seq_url_from_ftppath(ftppath),
    )
    assembly_info[uid] = info

In [10]:
[(uid, summaries[uid]['assemblyaccession']) for uid in skipped]

[('10898551', 'GCA_003114835.2'),
 ('11011431', 'GCF_017493175.2'),
 ('11411721', 'GCA_003382565.3'),
 ('11411751', 'GCA_016765655.2'),
 ('11411881', 'GCA_016806835.2')]

## Download

In [11]:
files = {uid: genomes_dir / (info['acc'] + '.fasta.gz') for uid, info in assembly_info.items()}

In [12]:
to_download = [uid for uid, f in files.items() if not f.is_file()]

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = []
    
    for uid in to_download:
        futures.append(executor.submit(urlretrieve, assembly_info[uid]['url'], files[uid]))
        
    for f in tqdm(as_completed(futures), total=len(futures)):
        pass

0it [00:00, ?it/s]


## Validate checksums

### Get checksums

In [13]:
def checksum_from_response(data: str, seq_fname: str):
    for line in data.splitlines():
        checksum, file = line.split(maxsplit=1)
        if file == './' + seq_fname:
            return checksum
        
    raise RuntimeError('not found')

def get_checksum(ftppath: str):
    with urlopen(ftppath + '/md5checksums.txt') as f:
        data = f.read().decode()
        
    seq_url = seq_url_from_ftppath(ftppath)
    seq_fname = seq_url.rsplit('/', 1)[1]
        
    return checksum_from_response(data, seq_fname)

In [14]:
checksums_file = tmpdir / 'checksums.json'

if checksums_file.is_file():
    with open(checksums_file) as f:
        checksums = json.load(f)
        
else:
    checksums = dict()

In [15]:
def worker(ftppath):
    errs = []
    max_tries = 5
    
    for i in range(max_tries):
        try:
            return get_checksum(ftppath)
        except URLError as e:
            errs.append(e)
            if i == max_tries - 1:
                print(errs)
                raise
            time.sleep(.125 * 2**i)
            
    assert 0
    

with ThreadPoolExecutor(max_workers=20) as executor:
    future_to_uid = dict()
    
    for uid, info in assembly_info.items():
        if uid not in checksums:
            future = executor.submit(worker, info['ftppath'])
            future_to_uid[future] = uid
        
    for future in tqdm(as_completed(list(future_to_uid)), total=len(future_to_uid)):
        uid = future_to_uid[future]
        checksum = future.result()
        checksums[uid] = checksum

0it [00:00, ?it/s]


In [16]:
with open(checksums_file, 'w') as f:
    json.dump(checksums, f)

### Check

In [17]:
failed = []

for uid, file in tqdm(files.items()):
    s = !md5sum -b {file}
    s = s[0].split()[0]
    if s != checksums[uid]:
        failed.append(uid)

100%|██████████| 14383/14383 [02:50<00:00, 84.36it/s]


In [18]:
for uid in failed:
    files[uid].unlink()

In [19]:
assert not failed
# Or return to download section...