<a href="https://colab.research.google.com/github/Data-Science-and-Data-Analytics-Courses/UniMelb---Database-Systems-Information-Modelling-INFO90002_2019_SM1/blob/master/Packages/Files.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import os
from contextlib import contextmanager
from functools import partial
import requests
from urllib.parse import urlsplit
from pathlib import Path
import re
from fnmatch import fnmatch, filter as fnfilter
import io
import tarfile, gzip, zipfile
import json
import shutil
import numpy as np
from IPython.display import display

# Reading modes supported by tarfile.open (https://docs.python.org/3/library/tarfile.html#tarfile.open)
TARREADS = {".tar.gz": "r:gz", ".tgz": "r:gz", ".tar.Z": "r:gz",
           ".tar.bz2": "r:bz2", ".tbz2": "r:bz2",
           ".tar.lzma": "r:xz", ".tlz": "r:xz", ".tar.xz": "r:xz", ".txz": "r:xz"}

# General

In [0]:
def download(url, dest=".", name="", force=False, options="-#"):
  """
  Download url into dest
  name: if provided, rename downloaded
  force: if True, overwrite existing resource
  options: supported by curl (https://curl.haxx.se/docs/manpage.html)
  """
  
  rurl = urlsplit(url)
  dest = Path(dest).resolve()
  downloaded = dest / (name or Path(rurl.path).name)
  
  # Download
  if not downloaded.exists() or force:
    !curl $options --create-dirs "{rurl.geturl()}" -o "{downloaded}"

  return downloaded

if __name__ == "__main__":
  url="http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz"
  download(url)

######################################################################## 100.0%


In [0]:
def download2(url, dest=".", name="", force=False, authfile=""):
  """
  Download url into dest
  name: if provided, rename downloaded
  force: if True, overwrite existing resource
  authfile: path to .json authentication file, containing text: {"username": "<username>", "password": "<password>"}
  """
  
  rurl = urlsplit(url)
  dest = Path(dest).resolve()
  downloaded = dest / (name or Path(rurl.path).name)
  
  if not force and downloaded.exists():
    return downloaded
  
  # Authentication information
  auth_info = None
  if authfile:
    with open(authfile) as f:
      auth_info = json.load(f)
      
  # Retrieve url
  auth = None
  if auth_info:
    auth = (auth_info["username"], auth_info["password"])
  r = requests.get(rurl.geturl(), auth=auth, stream=True)

  # Write file
  downloaded.parent.mkdir(parents=True, exist_ok=True)
  with open(downloaded, 'wb') as f:
    r.raw.decode_content = True
    shutil.copyfileobj(r.raw, f)
  
  return downloaded

if __name__ == "__main__":
  url="https://prod-edxapp.edx-cdn.org/assets/courseware/v1/c9921c9218806f05d9480052212b77dc/asset-v1:MITx+18.6501x+3T2018+type@asset+block/lectureslides_chap1_annot.pdf"
  download2(url)

# Tar

In [0]:
@contextmanager
def readtar(fpath, pattern="*.*", peek=False):
  """
  Read pattern in fpath
  peek: if True, show matches before reading
  """

  fpath = Path(fpath)
  tpat = re.compile(r"(?P<stem>.*)(?P<suffix>\.t.*)$") # e.g., .tar.gz, .tgz, .tar.Z, .tar.bz2, .tbz2, .tar.lzma, .tlz, .tar.xz, .txz
  name = tpat.match(fpath.name)

  t = tarfile.open(fpath, mode=TARREADS[name["suffix"]])
  try:
    matches = fnfilter(t.getnames(), pattern)
    if peek:
      display("Read", matches)
      input("Press Enter to continue... ")
    yield map(t.extractfile, matches)
  finally:
    t.close()

In [0]:
def runtar(url, dest=".", patterns=None, top=True, force=False, peek=False):
  """
  Retrieve url and untar patterns into dest
  top: if True, create top directory if not exist
  force: if True, overwrite existing members
  peek: if True, show members before extraction
  """
  
  rurl = urlsplit(url)
  dest = Path(dest).resolve()
  tpat = re.compile(r"(?P<stem>.*?)(?P<suffix>\.t.*?)?$") # e.g., .tar.gz, .tgz, .tar.Z, .tar.bz2, .tbz2, .tar.lzma, .tlz, .tar.xz, .txz
  name = tpat.match(Path(rurl.path).name)

  # Retrieve
  r = requests.get(url)
  t = tarfile.open(fileobj=io.BytesIO(r.content), mode=TARREADS[name["suffix"]])

  # Members
  members = np.array(t.getnames())
  items = np.vectorize(Path)(members) # Path objects
  extract = members

  # Top directory
  tops = map(lambda m: m.parts[0], items) # tops of directory tree
  if top and len(set(tops))>=2:
    dest = dest / name["stem"]

  # Matches
  if patterns:
    matched = list(map(lambda m: any(map(m.match, patterns)), items))
    extract = extract[matched]
  
  # Existing members
  extracted = list(map(dest.joinpath, extract))
  if not force:
    exist = list(map(Path.exists, extracted))
    extract = extract[np.logical_not(exist)]

  # Show members
  if peek:
    display("Extract", extract, f"into {dest}")
    input("Press Enter to continue... ")
  
  # Extract
  list(map(partial(t.extract, path=dest), extract))
  
  return extracted # extracted/existing

if __name__ == "__main__":
  url = "https://cvml.ist.ac.at/AwA/AwA-base.tar.bz2"
  runtar(url, peek=True)

'Extract'

array([], dtype='<U55')

'into /content'

Press Enter to continue... 


# Zip

In [0]:
def runzip(url, dest=".", patterns=None, top=True, force=False, peek=False):
  """
  Retrieve url and unzip patterns into dest
  top: if True, create top directory if not exist
  force: if True, overwrite existing members
  peek: if True, show members before extraction
  """
  
  rurl = urlsplit(url)
  dest = Path(dest).resolve()
  zpat = re.compile(r"(?P<stem>.*?)(?P<suffix>\.zip.*?)?$") # e.g., .zip
  name = zpat.match(Path(rurl.path).name)

  # Retrieve
  r = requests.get(url)
  z = zipfile.ZipFile(io.BytesIO(r.content))

  # Members
  members = np.array(z.namelist())
  items = np.vectorize(Path)(members) # Path objects
  extract = members

  # Top directory
  tops = map(lambda m: m.parts[0], items) # tops of directory tree
  if top and len(set(tops))>=2:
    dest = dest / name["stem"]

  # Matches
  if patterns:
    matched = list(map(lambda m: any(map(m.match, patterns)), items))
    extract = extract[matched]
  
  # Existing members
  extracted = list(map(dest.joinpath, extract))
  if not force:
    exist = list(map(Path.exists, extracted))
    extract = extract[np.logical_not(exist)]

  # Show members
  if peek:
    display("Extract", extract, f"into {dest}")
    input("Press Enter to continue... ")
  
  # Extract
  list(map(partial(z.extract, path=dest), extract))
  
  return extracted # extracted/existing

if __name__ == "__main__":
  url = "https://prod-edxapp.edx-cdn.org/assets/courseware/v1/e7b080a506546a859b50d03f9d0705f5/asset-v1:UCSanDiegoX+DSE220x+1T2019+type@asset+block/DSE220x_PA7.zip"
  runzip(url, peek=True)

'Extract'

array([], dtype='<U84')

'into /content/DSE220x_PA7'

Press Enter to continue... 
