In [None]:
# default_exp core

In [None]:
#hide
from nbdev.showdoc import show_doc

# Core

> Core tools for working with storage.

In [None]:
#export
from abc import ABC,abstractmethod
from configparser import ConfigParser
from pathlib import Path
import azure.storage.blob,azure.core.exceptions
import boto3
import shutil,re
from typing import List,Tuple,Optional,Union

In [None]:
#hide
from fastcore.test import *
from configparser import SectionProxy

In [None]:
#export
def read_config(section_name:str=None,config_name:str='secrets/settings.ini'):
    config_path=Path(config_name)
    config=ConfigParser()
    config.read(config_path)
    if section_name is None:
        return config
    if section_name not in config:
        raise Exception(f'Error: [{section_name}] section not found in {config_path}')
    return dict(config.items(section_name))

In [None]:
assert isinstance(read_config(),ConfigParser)
assert isinstance(read_config()['DEFAULT'],SectionProxy)
assert isinstance(read_config('DEFAULT'),dict)
assert read_config('local_cwd',config_name='test/settings.ini')['storage_type']=='local'

In [None]:
#export
def parse_dataset_archive_name(name:str) -> Optional[Tuple[str,...]]:
    "Returns (name,version) if `name` is a dataset archive name, `None` otherwise"
    match = re.match('^([\./\s\w-]+)\.(\d+\.\d+\.\d+)\.zip$',name)
    return None if match is None else match.group(1,2)

In [None]:
test_eq(('dsetname', '0.0.1'), parse_dataset_archive_name('dsetname.0.0.1.zip'))
test_eq(('dsetname.txt', '0.2.1'), parse_dataset_archive_name('dsetname.txt.0.2.1.zip'))
test_eq(('path/to/dsetname', '0.0.1'), parse_dataset_archive_name('path/to/dsetname.0.0.1.zip'))
test_eq(('//path/to/dsetname', '0.0.1'), parse_dataset_archive_name('//path/to/dsetname.0.0.1.zip'))
test_eq(None, parse_dataset_archive_name('dsetname.0.0.1.csv'))
test_eq(None, parse_dataset_archive_name('dsetname.0.1.zip'))
test_eq(None, parse_dataset_archive_name('dsetname.0.a.1.csv'))
test_eq(None, parse_dataset_archive_name('.0.0.1.zip'))
test_eq(None, parse_dataset_archive_name('0.0.1.csv'))
test_eq(None, parse_dataset_archive_name('dsetname.0.0.1'))

In [None]:
#export 
def parse_dataset_archive_version(version:str) -> List[int]:
    "Returns (major,minor,patch) if `version` is a valid dataset archive version"
    match = re.match('^(\d+)\.(\d+)\.(\d+)$',version)
    if match is None: raise ValueError(f'Invalid version: {version}')
    return [int(s) for s in match.group(1,2,3)]

In [None]:
test_eq([0,1,2],parse_dataset_archive_version('0.1.2'))
test_eq([5,4,3],parse_dataset_archive_version('5.4.3'))
test_fail(lambda: parse_dataset_archive_version('0.1.2.'))
test_fail(lambda: parse_dataset_archive_version('0.1'))

In [None]:
#export
def next_version(versions:List[str]=None,increment:str='patch'):
    "Return the version that should follow the last version in `versions`"
    v=[0,0,0] if versions is None else parse_dataset_archive_version(versions[-1])
    if increment=='patch': v[2]+=1
    elif increment=='minor': v[1]+=1;v[2]=0
    elif increment=='major': v[0]+=1;v[1]=0;v[2]=0
    else: raise ValueError(f'Unknown increment: {increment}')
    return f'{v[0]}.{v[1]}.{v[2]}'

In [None]:
test_eq('0.0.1',next_version(None))
test_eq('33.55.67',next_version(['2.4.60','33.55.66']))
test_eq('0.1.0',next_version(None,'minor'))
test_eq('1.0.0',next_version(None,'major'))
test_eq('3.0.0',next_version(['2.4.60'],'major'))
test_fail(lambda: next_version(None,'beta'))
test_fail(lambda: next_version(['2.4.60','33.55.66a']))

In [None]:
#export
def append_or_replace_verion(name,version):
    parse_dataset_archive_version(version)
    return '.'.join([name,version])

In [None]:
#export
def make_dataset_archive_folder(
        path:str, name:str, versions:List[str]=None, version:str='patch') -> str:
    "Create a new dataset archive folder in `path`"
    src=Path(path)/name
    if not src.exists():
        raise FileNotFoundError(f'{src} not found')
    if version in ['major','minor','patch']:
        version=next_version(versions,version)
    else:
        parse_dataset_archive_version(version)
        
    archive_folder=Path(path)/'.'.join([name,version])
    if archive_folder.exists(): 
        raise FileExistsError(f'Archive folder {archive_folder} exists')
    if src.is_file(): 
        archive_folder.mkdir(parents=True)
        shutil.copy(src,archive_folder)
    else: 
        shutil.copytree(src,archive_folder)
    # TODO: create/update manifest
    # mf describes archive contents, data owner etc (TODO: manifest details TBC)
    return f'{path}/{name}.{version}'

In [None]:
#hide
def _rmtree(p):
    try: shutil.rmtree(p)
    except FileNotFoundError: pass

In [None]:
def _make_local_test_data():
    test_files=['a/b/test_data.2.0.0.txt','test_data.txt']
    for i in reversed(range(3)): test_files.insert(1,f'sub/test_data.0.0.{i}.txt')
    for i,f in enumerate(test_files):
        f='test/local_path/'+f
        Path(f).parent.mkdir(parents=True,exist_ok=True)
        with open(f, 'w') as _file: _file.write(f'a little bit of data {i}')
    return test_files

In [None]:
for p in ['test/local_path','test/storage_area']: _rmtree(p)
_make_local_test_data()

test_eq('test/local_path/test_data.txt.0.0.1',
        make_dataset_archive_folder('test/local_path','test_data.txt'))
test_eq('test/local_path/test_data.txt.2.5.0',
        make_dataset_archive_folder('test/local_path','test_data.txt',['2.4.6'],'minor'))
test_eq('test/local_path/sub.0.0.1',
        make_dataset_archive_folder('test/local_path','sub'))
# TODO: check archive folder contents

In [None]:
#export
class StorageClientABC(ABC):
    """Defines functionality common to all storage clients"""
    
    def __init__(self, storage_name:str, config_name:str='secrets/settings.ini'):
        "Create a new storage client using the `storage_name` section of `config_name`"
        self.config=read_config(storage_name,config_name=config_name)

    def _ls(self, p:Path, result:List[str], len_path_prefix:int=None):
        if len_path_prefix is None: len_path_prefix=len(str(p).replace('\\','/'))
        for _p in p.iterdir():
            if _p.is_dir(): self._ls(_p,result,len_path_prefix)
            else: result.append(str(_p).replace('\\','/')[len_path_prefix+1:])
        
    def ls(self, what:str='storage_area',name_starts_with:str=None) -> List[str]:
        "Return a list containing the names of files in either `storage_area` or `local_path`"
        result: List[str]=[]
        p=Path(self.config[what])
        p.mkdir(parents=True,exist_ok=True)
        self._ls(p,result)
        if name_starts_with is not None: 
            result=[r for r in result if r.startswith(name_starts_with)]
        return sorted(result)
        
    @abstractmethod
    def download(self, filename:str) -> Path: 
        "Copy `filename` from `storage_area` to `local_path`"
    
    @abstractmethod
    def upload(self, filename:str, overwrite=False) -> Union[Path,str]: 
        "Copy `filename` from `local_path` to `storage_area`"
        
    def _sort_by_dataset_archive_version(self,version):
        try: return tuple(parse_dataset_archive_version(version))
        except: return (-1,-1,-1)
        
    def ls_versions(self, name:str, what:str='storage_area') -> Union[List[str],None]:
        "Return a list containing all versions of the specified archive `name`"
        files=[parse_dataset_archive_name(f) for f in self.ls(what)]
        result=[f[1] for f in files if f is not None and f[0]==name]
        if not result: return None
        return sorted(result, key=self._sort_by_dataset_archive_version)
        
    def upload_dataset(self, name:str, version:str='patch') -> Union[Path,str]:
        "Create a new dataset archive and upload it to `storage_area`"
        archive_folder=make_dataset_archive_folder(
                self.config['local_path'],name,self.ls_versions(name),version)
        archive=shutil.make_archive(archive_folder,'zip',archive_folder)
        return self.upload(f"{archive_folder[len(self.config['local_path'])+1:]}.zip")
        
    def download_dataset(self, name:str, version:str='latest', overwrite:bool=False) -> Path:
        "Download a dataset archive from `storage_area` and extract it to `local_path`"
        if version=='latest': 
            versions=self.ls_versions(name)
            if versions is None:
                raise ValueError('latest version requested but no versions exist in storage area')
            version=versions[-1]
        dst=Path(self.config['local_path'])/f'{name}.{version}'
        if dst.exists() and not overwrite: return dst
        archive=self.download(f'{name}.{version}.zip')
        shutil.unpack_archive(str(archive),dst)
        return dst

In [None]:
show_doc(StorageClientABC.upload_dataset)

<h4 id="StorageClientABC.upload_dataset" class="doc_header"><code>StorageClientABC.upload_dataset</code><a href="__main__.py#L44" class="source_link" style="float:right">[source]</a></h4>

> <code>StorageClientABC.upload_dataset</code>(**`name`**:`str`, **`version`**:`str`=*`'patch'`*)

Create a new dataset archive and upload it to `storage_area`

- `name`
    - file or folder name
    - which must exist in "local_path"
    - without "local_path" prefix
    - e.g. if
        - "local_path" is "~/storage_tools/test/local_path"
        - and you want to upload "~/storage_tools/test/local_path/test_data.txt" as a dataset
        - you would pass the name "test_data.txt"
- `version`
    - "major", "minor" or "patch" to automatically create a new version or
    - version literal that matches `\d+\.\d+\.\d+` (e.g. "1.0.45")

`upload_dataset` will;
- create a folder `[local_path]/[name].[version]`
    - if this folder already exists, as error will be raised
- copy the file or folder contents (and all sub-folders) to this folder
- create a manifest in this folder
- create a zip archive, called `[name].[version].zip`, of this folder
- upload the zip archive to remote storage

Why no `overwrite` option?
- It is not expected that archives will need to be overwritten
    - as we want to be able to re-run old experiments using the data as it was
- bad archives could be deleted via storage API (e.g. `storage_client.client.delete_blob('test.0.0.1.zip')`) or via storage bowsers
    - we might want to add a soft delete, archive status etc to handle this kind of thing?

In [None]:
#export
class LocalStorageClient(StorageClientABC):
    """Storage client that uses the local filesystem for both `storage_area` and `local_path`"""
    
    def _cp(self,from_key,to_key,filename,overwrite=False):
        src=Path(self.config[from_key])/filename
        dst=Path(self.config[to_key])/filename
        if dst.exists() and not overwrite: 
            raise FileExistsError(f'{dst} exists and overwrite=False')
        dst.parent.mkdir(parents=True,exist_ok=True)
        shutil.copy(src,dst)
        return dst
        
    def download(self,filename,overwrite=False):
        try: self._cp('storage_area','local_path',filename,overwrite)
        except FileExistsError: pass
        return Path(self.config['local_path'])/filename
        
    def upload(self,filename,overwrite=False): 
        return self._cp('local_path','storage_area',filename,overwrite)

`LocalStorageClient` will most often be used for local testing.

In [None]:
storage_client=LocalStorageClient('local_test','test/settings.ini')
assert storage_client.config['storage_type']=='local'

In [None]:
#export
class AzureStorageClient(StorageClientABC):
    """Storage client that uses Azure for `storage_area` and the local filesystem `local_path`"""
    
    @property
    def client(self):
        if not hasattr(self,'_client'):
            service_client=azure.storage.blob.BlobServiceClient.from_connection_string(
                self.config['conn_str'],self.config['credential'])
            self._client=service_client.get_container_client(self.config['container'])
        return self._client
    
    def ls(self,what='storage_area',name_starts_with=None):
        if what=='local_path': return super().ls(what,name_starts_with)
        result=[b.name for b in self.client.list_blobs(name_starts_with)]
        return sorted(result)
    
    def download(self,filename,overwrite=False):
        p=Path(self.config['local_path'])/filename
        if p.exists() and not overwrite: return p
        p.parent.mkdir(parents=True,exist_ok=True)
        with open(p, 'wb') as f:
            f.write(self.client.download_blob(filename).readall())
        return p
            
    def upload(self,filename,overwrite=False): 
        p=Path(self.config['local_path'])/filename
        try:
            with open(p, 'rb') as f:
                self.client.upload_blob(filename,f,overwrite=overwrite)
            return f"{self.config['storage_type']}:{self.config['container']}:{filename}"
        except azure.core.exceptions.ResourceExistsError as e:
            raise FileExistsError(f'{e}\noverwrite=False')

think about using ~/.aws/credentials

In [None]:
#export
class AwsStorageClient(StorageClientABC):
    """Storage client that uses AWS for `storage_area` and the local filesystem `local_path`"""

    @property
    def client(self):
        if not hasattr(self,'_client'):
            self._client=boto3.client(service_name=self.config['service_name'],
                                      aws_access_key_id=self.config['aws_access_key_id'],
                                      aws_secret_access_key=self.config['aws_secret_access_key'])
        return self._client
    
    def ls(self,what='storage_area',name_starts_with=None): 
        if what=='local_path': return super().ls(what,name_starts_with)
        args=dict(Bucket=self.config['bucket'])
        if name_starts_with is not None: args['Prefix']=name_starts_with
        objects=self.client.list_objects_v2(**args)
        if objects['KeyCount']==0: return []
        result=[o['Key'] for o in objects['Contents'] if o['Size']>0]
        return sorted(result)
    
    def download(self,filename): 
        p=Path(self.config['local_path'])/filename
        if p.exists() and not overwrite: return p
        p.parent.mkdir(parents=True,exist_ok=True)
        self.client.download_file(
                Filename='/'.join([self.config['local_path'],filename]),
                Bucket=self.config['bucket'],
                Key=filename)
        return p
        
    def upload(self,filename,overwrite=False): 
        result=f"{self.config['storage_type']}:{self.config['bucket']}:{filename}"
        if overwrite==False and filename in [self.ls(name_starts_with=filename)]:
            raise FileExistsError(f'{result} exists and overwrite=False')
        self.client.upload_file(
                Filename='/'.join([self.config['local_path'],filename]),
                Bucket=self.config['bucket'],
                Key=filename)
        return result

Ideally, we would use the same property keys in settings.ini as the boto3 API but `ConfigParser` converts keys to lower case by default.

So if boto3 has a parameter called `Bucket`, we need `bucket=a-bucket-name` settings.ini.

#hide
If we pass `None` to `Prefix` when listing objects (e.g. `list_objects_v2(Prefix=None)`) AWS raises an error.
This is why we have to create and unpack the `args` dictionary when we call `list_objects_v2`.

In [None]:
#export
def new_storage_client(storage_name,config_name='secrets/settings.ini'):
    "Returns a storage client based on the configured `storage_type`"
    config=read_config(storage_name,config_name=config_name)
    storage_type=config['storage_type']
    if storage_type=='local': return LocalStorageClient(storage_name, config_name)
    elif storage_type=='azure': return AzureStorageClient(storage_name, config_name)
    elif storage_type=='aws': return AwsStorageClient(storage_name, config_name)
    else: raise ValueError(f'Unknown storage_type: {storage_type}')

In [None]:
test_fail(lambda: new_storage_client('gcp_dummy','test/settings.ini'))

In [None]:
for p in ['test/local_path','test/storage_area']: _rmtree(p)
    
storage_client=new_storage_client('local_test','test/settings.ini')
assert isinstance(storage_client,LocalStorageClient)
assert storage_client.config['storage_type']=='local'
test_eq([],storage_client.ls())
test_eq([],storage_client.ls('local_path'))
    
test_files=_make_local_test_data()
test_eq([],storage_client.ls())
test_eq(test_files,storage_client.ls('local_path'))
test_eq(['a/b/test_data.2.0.0.txt'],storage_client.ls('local_path','a/b'))
test_eq([],storage_client.ls('local_path','does/not/exist'))
        
for f in test_files: storage_client.upload(f)
test_eq(test_files,storage_client.ls())
test_eq(['a/b/test_data.2.0.0.txt'],storage_client.ls(name_starts_with='a/b'))
test_eq([],storage_client.ls(name_starts_with='does_no_exist'))
test_eq(test_files,storage_client.ls('local_path'))
_rmtree('test/local_path')
test_eq([],storage_client.ls('local_path'))

for f in test_files: storage_client.download(f)
test_eq(test_files,storage_client.ls('local_path'))
test_eq('a little bit of data 4',open('test/local_path/test_data.txt').read())

with open('test/local_path/test_data.txt', 'w') as _file: _file.write('upd')
test_eq('upd',open('test/local_path/test_data.txt').read())
storage_client.download('test_data.txt')
test_eq('upd',open('test/local_path/test_data.txt').read())
storage_client.download('test_data.txt',True)
test_eq('a little bit of data 4',open('test/local_path/test_data.txt').read())

test_fail(lambda: storage_client.upload('test_data.txt'))
storage_client.upload('test_data.txt',True)

test_eq(None,storage_client.ls_versions('this/does/not/exitst'))

In [None]:
def _t(expected,upload_name,version='patch'):
    test_eq(Path(f'test/storage_area/{expected}'),storage_client.upload_dataset(upload_name,version))
_t('test_data.txt.0.0.1.zip','test_data.txt')
_t('sub.0.0.1.zip','sub')
_t('sub/test_data.0.0.2.txt.0.0.1.zip','sub/test_data.0.0.2.txt')
_t('a.3.0.0.zip','a','3.0.0')
_t('a.3.0.1.zip','a')
# TODO: check zip contents
_rmtree('test/local_path/a.3.0.0')
_rmtree('test/local_path/a.3.0.1')
test_eq(Path('test/local_path/a.3.0.1'),storage_client.download_dataset('a'))
test_eq(Path('test/local_path/a.3.0.0'),storage_client.download_dataset('a','3.0.0'))
# TODO: check folder contents

In [None]:
storage_client=new_storage_client('azure_dummy','test/settings.ini')
assert isinstance(storage_client,AzureStorageClient)
storage_client=new_storage_client('aws_dummy','test/settings.ini')
assert isinstance(storage_client,AwsStorageClient)

In [None]:
# clean-up test data
for p in ['test/local_path','test/storage_area']: _rmtree(p)