In [None]:
#default_exp download

In [None]:
#export
from kesscore.base import *
from functools import wraps
import torchvision.datasets.utils as tvdu

import tarfile,zipfile,warnings,subprocess

In [None]:
#export
def try_extract(fname, dest):
    "Extract `fname` to folder `dest` using `tarfile` or `zipfile`."
    try:    tarfile.open(fname, 'r:gz').extractall(dest); return
    except: pass
    try:    zipfile.ZipFile(fname     ).extractall(dest); return
    except: pass
    raise Exception(f'Unrecognized archive: {fname}')

In [None]:
# export
def _untar_data(url, name, dest, download_func, extract_func=try_extract):
    'Download to `dest/name.tar.gz` and extract to folder `dest/name` and deletes it'
    fname   = Path(dest)/f'{name}.tar.gz'
    destext = Path(dest)/name
    if destext.exists(): warnings.warn(f'Folder already exist, so will not download again: {destext}')
    else:
        if fname.exists(): warnings.warn(f'File already exist, will use it as is. ({fname})\n' +
                                         'If you wish, you might delete the file and run the program again instead.')
        else: download_func(url, fname)
        extract_func(fname, dest=destext)
        fname.unlink()
    return destext

In [None]:
#export
def _split2dirfile(path):
    spath,ppath = str(path),Path(path)
    if spath[-1:] in '\\/' or ppath.is_dir(): return ppath,None
    return ppath.parent,ppath.name

In [None]:
#export
def download_drive(fid, fname):
    root,filename = _split2dirfile(fname)
    return tvdu.download_file_from_google_drive(file_id=fid, root=root, filename=filename)

def download_general(url, fname):
    'download using torchvision.datasets.utils.download_url.'
    root,filename = _split2dirfile(fname)
    return tvdu.download_url(url=url, root=root, filename=filename)

def download_wget(url, fname):
    'download file using wget as a subprocess'
    command = ['wget',str(url),'-O',str(fname.absolute())]
    print(
f'''Running wget, this will not show any output and might take some time. 
If you wish, you can run it youself instead using the following command:
>>> {" ".join(command)}''')
    return subprocess.run(command, check=True)

In [None]:
#export
@delegates(_untar_data)
def untar_drive(fid, name, dest, **kwargs):
    '''download https://drive.google.com/uc?id={fid} to `dest/name.tar.gz` and 
    extract it to `dest/name`. Will not download if folder exist.'''
    return _untar_data(fid, name, dest, download_drive, **kwargs)

@delegates(_untar_data)
def untar_general(url, name, dest, **kwargs):
    '''download `url` to `dest/name.tar.gz` using torchvision.datasets.utils.download_url
    and extract it to `dest/name`. Will not download if folder exist.'''
    return _untar_data(url, name, dest, download_general, **kwargs)

@delegates(_untar_data)
def untar_wget(url, name, dest, **kwargs):
    '''download `url` to `dest/name.tar.gz` using wget and extract it to `dest/name`. Will not download if folder exist.'''
    return _untar_data(url, name, dest, download_wget, **kwargs)

@wraps(untar_wget)
def untar_dropbox(url, name, dest, **kwargs):
    assert url.endswith('dl=0') or url.endswith('dl=1'), 'dropbox links need to end with dl=0 (single file) or dl=1(folder)'
    return untar_wget(url, name, dest, **kwargs)

In [None]:
from nbdev.sync import notebook2script; notebook2script()

Converted 00_functional.ipynb.
Converted 01_tests.ipynb.
Converted 02_tensor.ipynb.
Converted 03_images.ipynb.
Converted 04_random.ipynb.
Converted 05_domainadaptation.ipynb.
Converted 06_mlp.ipynb.
Converted 07_download.ipynb.
Converted index.ipynb.
