In [1]:
# https://docs.dask.org/en/stable/custom-graphs.html

In [2]:
# reload modules before executing user code
%load_ext autoreload
%autoreload 2

In [3]:
import os
import shutil
import requests
import time
from datetime import datetime
import hashlib
from ftplib import FTP
import pandas as pd
#import extract_uniprot_data as ex
import importlib.util
import sys

In [4]:
CACHE = os.getenv('KGBIO_CACHE', default='../cache')
os.makedirs(os.path.join(CACHE), exist_ok=True)

In [5]:
#resources = pf.read_csv('resources.csv')
resources = pd.read_csv('resources_dev1.csv', keep_default_na=False)

In [6]:
resources.head()

Unnamed: 0,resource,url,filename,processor
0,NcbiTaxonomy,https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/new_...,,ProcessNcbiTaxonomy
1,NcbiGenes,https://ftp.ncbi.nlm.nih.gov/gene/DATA/gene_in...,,ProcessNcbiGenes
2,TremblViruses,https://ftp.uniprot.org/pub/databases/uniprot/...,,ProcessUniProt
3,UniprotViruses,https://ftp.uniprot.org/pub/databases/uniprot/...,,ProcessUniProt
4,UniprotHuman,https://ftp.uniprot.org/pub/databases/uniprot/...,,ProcessUniProt


In [7]:
# class Download():
#     def __init__(self,
#                  url,
#                  file_names,
#                  resource,
#                  cache_path):
#         self.url = url
#         self.file_names = file_names
#         self.resource = resource
#         self.cache_path = cache_path
#         self.timestamp = get_timestamp()
        
#     def get_timestamp():
#         now = datetime.now()
#         return now.strftime("%Y-%m-%d-%H%M%S")

In [8]:
def get_timestamp():
    now = datetime.now()
    return now.strftime("%Y-%m-%d-%H%M%S")

In [9]:
# filename = f'{datetime.utcnow().timestamp()}.parquet'
# print(filename)

In [10]:
def get_sha256_hash(path):
    sha256_hash = hashlib.sha256()

    # Read and update hash string value
    with open(path,"rb") as f:
        for block in iter(lambda: f.read(65536), b''):
            sha256_hash.update(block)
            
    return sha256_hash.hexdigest()

In [11]:
def download(resource, url, filename, cache_dir):
    start = time.time()
    path = os.path.join(cache_dir, resource, get_timestamp())
    os.makedirs(os.path.join(path), exist_ok=True)
                
    # if no file name is specified, get the file name from the url
    if filename == '':
        filename = url.split('/')[-1]
                
    filepath = os.path.join(path, filename)
    print('Downloading:', url)
    if url.startswith('https://') or url.startswith('http://'):
        download_http(url, filepath)
    elif url.startswith('ftp://'):
        download_ftp(url, filepath, filename)
    elif url.startswith('file:///'):
        download_file(url, filepath)
        
    end = time.time()
    duration = end - start
    sha256_hash = get_sha256_hash(filepath)
        
    return {'filepath': filepath, 'download_time': duration, 'sha256_hash': sha256_hash, 'resource_path': path}

In [12]:
def download_file(url, dst):
    src = url.split('file:///')[1]
    shutil.copy2(src, dst)

In [13]:
def download_http(url, path):
    print("downloading http:", url, " to", path)
    with requests.get(url, stream=True) as r:
        with open(path, 'wb') as f:
            shutil.copyfileobj(r.raw, f)

In [14]:
def download_ftp(url, path, filename):
    ftp_string = url.split('//')[1]
    ftp_server = ftp_string.split('/')[0]
    ftp_path = ftp_string.rsplit('/', 1)[0]
    ftp_path = ftp_path.split('/', 1)[1]
    print(ftp_server, ftp_path, path, filename, flush=True)
    
    ftp = FTP(ftp_server)
    ftp.login()
    ftp.cwd(ftp_path)

    with open(path, 'wb') as f:
        ftp.retrbinary('RETR %s' % filename, f.write)
            
    ftp.quit()

In [15]:
def get_checksum(path):
    os.makedirs(os.path.join(path), exist_ok=True)
    lastpath = ''
    for it in os.scandir(path):
        if it.is_dir():
            if it.path > lastpath:
                lastpath = it.path
    
    if lastpath != '':
        try:
            df = pd.read_csv(os.path.join(lastpath, 'log.csv'))
            record = df['sha256_hash'].iloc[0]
            return record
        except Exception:
            return ''
    
    return ''

In [16]:
def process(record, cache_dir):
    resource = record['resource']
    url = record['url']
    filename = record['filename']
    processor = record['processor']
    path = os.path.join(cache_dir, resource)
    print("resource path:", path)
    chsum = get_checksum(path)
    
    try:
        result = download(resource, url, filename, cache_dir)
    except Exception:
        result = {'status', 'download failed'}
        return result
    
    print("old checksum:", chsum)
    print("new checksum:", result['sha256_hash'])
    
    if chsum != result['sha256_hash']:
         
        print("Processing:", resource, processor)
        start = time.time()
        if processor != '':
            name = processor
            spec = importlib.util.find_spec(name)

            if name in sys.modules:
                print(f"{name!r} already in sys.modules")
                module = sys.modules.get(name)
            elif spec is not None:
            # If you chose to perform the actual import ...
                module = importlib.util.module_from_spec(spec)
                sys.modules[name] = module
                spec.loader.exec_module(module)
                print(f"{name} has been imported")
            else:
                print(f"can't find the {name!r} module")
                
            func = getattr(module, processor)
            extractor = func(result['filepath'], fileformat='parquet', compression='snappy')
            extractor.extract_data()

        end = time.time()
        result['processing_time'] = end-start
        print("Processing time:", end-start)
        df = pd.DataFrame([result])
        path = result['filepath'].rsplit("/", maxsplit=1)[0]
        df.to_csv(os.path.join(path, 'log.csv'))
    else:
        shutil.rmtree(result['resource_path'])
        result['processing_time'] = 0
                       
    return result

https://man7.org/linux/man-pages/man5/crontab.5.html
       field          allowed values
              -----          --------------
              minute         0-59
              hour           0-23
              day of month   1-31
              month          1-12 (or names, see below)
              day of week    0-7 (0 or 7 is Sunday, or use names)
              
              CRON_TZ=Japan
       # run five minutes after midnight, every day
       5 0 * * *       $HOME/bin/daily.job >> $HOME/tmp/out 2>&1
       # run at 2:15pm on the first of every month -- output mailed to paul
       15 14 1 * *     $HOME/bin/monthly
       # run at 10 pm on weekdays, annoy Joe
       0 22 * * 1-5    mail -s "It's 10pm" joe%Joe,%%Where are your kids?%
       23 0-23/2 * * * echo "run 23 minutes after midn, 2am, 4am ..., everyday"
       5 4 * * sun     echo "run at 5 after 4 every sunday"

https://stackoverflow.com/questions/3061/calling-a-function-of-a-module-by-using-its-name-a-string

import os

os.system('python3 home/pathtoscript/script.py Hello')

https://blog.finxter.com/python-exec/

https://stackoverflow.com/questions/5362771/how-to-load-a-module-from-code-in-a-string

https://docs.python.org/3/library/importlib.html#importing-programmatically

https://stackoverflow.com/questions/13562037/python-tar-file-how-to-extract-file-into-stream
import tarfile

with tarfile.open(path) as tf:
    for entry in tf:  # list each entry one by one
        fileobj = tf.extractfile(entry)
        # fileobj is now an open file object. Use `.read()` to get the data.
        # alternatively, loop over `fileobj` to read it line by line.

In [17]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1, memory_limit="4 GiB")
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 4,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:55586,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:55611,Total threads: 1
Dashboard: http://127.0.0.1:55612/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:55591,
Local directory: /Users/Peter/GitRepositories/kg-bio/notebooks/dask-worker-space/worker-du6sjqmx,Local directory: /Users/Peter/GitRepositories/kg-bio/notebooks/dask-worker-space/worker-du6sjqmx

0,1
Comm: tcp://127.0.0.1:55603,Total threads: 1
Dashboard: http://127.0.0.1:55606/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:55589,
Local directory: /Users/Peter/GitRepositories/kg-bio/notebooks/dask-worker-space/worker-6d8pq154,Local directory: /Users/Peter/GitRepositories/kg-bio/notebooks/dask-worker-space/worker-6d8pq154

0,1
Comm: tcp://127.0.0.1:55602,Total threads: 1
Dashboard: http://127.0.0.1:55604/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:55590,
Local directory: /Users/Peter/GitRepositories/kg-bio/notebooks/dask-worker-space/worker-uvv1kd8b,Local directory: /Users/Peter/GitRepositories/kg-bio/notebooks/dask-worker-space/worker-uvv1kd8b

0,1
Comm: tcp://127.0.0.1:55608,Total threads: 1
Dashboard: http://127.0.0.1:55609/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:55592,
Local directory: /Users/Peter/GitRepositories/kg-bio/notebooks/dask-worker-space/worker-yfocjfv7,Local directory: /Users/Peter/GitRepositories/kg-bio/notebooks/dask-worker-space/worker-yfocjfv7


In [18]:
import dask.bag as db

b = db.from_sequence(resources.to_dict('records'))
b.take(1)

({'resource': 'NcbiTaxonomy',
  'url': 'https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/new_taxdump/new_taxdump.tar.gz',
  'filename': '',
  'processor': 'ProcessNcbiTaxonomy'},)

In [19]:
start = time.time()
result = b.map(lambda record: process(record, CACHE))
result.compute()
end = time.time()
print('Total time:', end-start)

Total time: 770.4393548965454


In [20]:
client.close()

resource path: ../cache/NcbiGenes
Downloading: https://ftp.ncbi.nlm.nih.gov/gene/DATA/gene_info.gz
downloading http: https://ftp.ncbi.nlm.nih.gov/gene/DATA/gene_info.gz  to ../cache/NcbiGenes/2022-05-15-171900/gene_info.gz
old checksum: 4e03d27ada24334299d17f6c49c745c63ffb4f95e113ec18a7257cba6e6600be
new checksum: 4e03d27ada24334299d17f6c49c745c63ffb4f95e113ec18a7257cba6e6600be
resource path: ../cache/TremblViruses
Downloading: https://ftp.uniprot.org/pub/databases/uniprot/current_release/knowledgebase/taxonomic_divisions/uniprot_trembl_viruses.xml.gz
downloading http: https://ftp.uniprot.org/pub/databases/uniprot/current_release/knowledgebase/taxonomic_divisions/uniprot_trembl_viruses.xml.gz  to ../cache/TremblViruses/2022-05-15-171900/uniprot_trembl_viruses.xml.gz
old checksum: 857f0d4af66b7e4b1a9b52e37d0fa4cd7d53e4380a03c014dd369978b74a268c
new checksum: 857f0d4af66b7e4b1a9b52e37d0fa4cd7d53e4380a03c014dd369978b74a268c
resource path: ../cache/UniprotViruses
Downloading: https://ftp.u