In [160]:
# default_exp core

# blobster

> API details.

In [161]:
#hide
from nbdev.showdoc import *

In [162]:
#export
import pandas as pd
import fastcore
from fastcore.foundation import *
import azure
from azure.storage.blob import BlockBlobService

class AzureBlobStorage:
    def __init__(self, credential_file):
        
        account, key = self.load_credentials(credential_file)
        
        if account:
            self.account = account
        else:
            self.account = account
        if key:
            self.key = key
        else:
            self.key = key
        self.is_connected = False
        self.blob_service = None

In [163]:
#export
@patch
def load_credentials(self:AzureBlobStorage, credential_file):
    credentials = pd.read_json(credential_file)
    return list(credentials['account'].values)[0], list(credentials['key'].values)[0]

### Loading credentials
Edit the blob_storage_credentials.json and enter your blob storage account and key information.

In [164]:
azure_blob_storage = AzureBlobStorage(credential_file='blob_storage_credentials.json')

In [165]:
#export
@patch
def connect(self:AzureBlobStorage):
    """Connect to Azure Blob Storage"""
    self.blob_service = BlockBlobService(
        account_name=self.account, account_key=self.key
    )
    self.is_connected = True

### Connnect to Azure Blob Storage
Once the credentials have been loaded with ```load_credentials``` 
a connection can be established by calling the ```connect```method.

In [166]:
azure_blob_storage.connect()

In [167]:
#export
@patch
def list_all_containers(self:AzureBlobStorage):
    """Return all container names from blob storage
    
    Returns
    -------
    container_names: list
        all container names in blob storage
    """
    container_names = []
    containers = self.blob_service.list_containers()
    for container in containers:
        container_names.append(container.name)
    return container_names


In [168]:
azure_blob_storage.list_all_containers()

['vfu-analytics-myhr-employees',
 'vfu-analytics-opco',
 'vfu-analytics-raw-data',
 'vfu-analytics-raw-data-courses',
 'vfu-analytics-raw-data-ga-course-launch',
 'vfu-analytics-raw-data-ga-course-rating',
 'vfu-analytics-raw-data-ga-dashboard-launch',
 'vfu-analytics-raw-data-ga-footerlinks',
 'vfu-analytics-raw-data-ga-global-navigation',
 'vfu-analytics-raw-data-ga-home-academy-sections',
 'vfu-analytics-raw-data-ga-notification',
 'vfu-analytics-raw-data-ga-ofcourse-launch',
 'vfu-analytics-raw-data-ga-ofcourse-pagehits',
 'vfu-analytics-raw-data-ga-pagehits',
 'vfu-analytics-raw-data-ga-preference-skills-added',
 'vfu-analytics-raw-data-ga-preference-skills-removed',
 'vfu-analytics-raw-data-ga-saveforlater',
 'vfu-analytics-raw-data-ga-searchbox',
 'vfu-analytics-raw-data-ga-searchpage',
 'vfu-analytics-raw-data-ga-traffic-sources',
 'vfu-analytics-raw-data-ga-typeahead',
 'vfu-analytics-raw-data-ga-users',
 'vfu-analytics-raw-data-nps',
 'vfu-analytics-raw-data-ratings',
 'vfu-a

In [169]:
#export
@patch
def delete_container(self:AzureBlobStorage, container_name):
    """Delete specific container.
    
    Parameters
    ----------
    container_name: str
        The name of the container that shall be deleted
    
    Returns
    -------
    None: NoneType
    """
    self.blob_service.delete_container(
        container_name=container_name, fail_not_exist=False
    )

In [170]:
#export
@patch
def make_container(self:AzureBlobStorage, container_name):
    """Make specific container. First check if container already exists
    
    Parameters
    ----------
    container_name: str
        The name of the container that shall be created
    
    Returns
    -------
    None: NoneType
    """
    try:
        self.blob_service.list_blobs(container_name)
    except:
        # assumption container does not exist and must be created
        self.blob_service.create_container(container_name)

In [171]:
#export
@patch
def list_all_blobs(self:AzureBlobStorage, container_name):
    blob_names = []
    blobs = self.blob_service.list_blobs(container_name)
    for blob in blobs:
        blob_names.append(blob.name)
    return blob_names

In [172]:
#export
@patch
def delete_blobs(self:AzureBlobStorage, container_name):
    """Delete all blobs in specified container. 
    
    Parameters
    ----------
    container_name: str
        The name of the container in which all blobs shall be deleted
    
    Returns
    -------
    None: NoneType
    """

    try:
        blobs = self.blob_service.list_blobs(container_name)
    except azure.common.AzureMissingResourceHttpError:
        pass
        # logger.warning('container not found: {}'.format(container))
    else:
        for blob in blobs:
            self.blob_service.delete_blob(container_name, blob.name)

In [173]:
#export
@patch
def delete_blob(self:AzureBlobStorage, container_name, blob_name):
    """Delete all blobs in specified container. 
    
    Parameters
    ----------
    container_name: str
        The name of the container of the blob
    blob_name: str
        The name of the blob that shall be deleted
    Returns
    -------
    None: NoneType
    """
    self.blob_service.delete_blob(container_name, blob_name)

In [174]:
#export
@patch
def file_to_blob(self:AzureBlobStorage, container_name, blob_name, file_name):
    self.blob_service.create_blob_from_path(container_name, blob_name, file_name)

In [175]:
#export
@patch
def folder_to_container(self:AzureBlobStorage, folder_path, container_name=None):
    files_in_folder = [f for f in listdir(folder_path) if isfile(join(folder_path, f))]
    [self.file_to_blob(container_name=container_name, blob_name=f, file_name=join(folder_path, f)) for f in files_in_folder]


In [176]:
#export
@patch
def df_to_blob(self:AzureBlobStorage, container_name, blob_name, df):
    """Write dataframe to blob
    
    Parameters
    ----------
    container_name: str
        The name of the container
    
    blob_name: str
        The name of the blob that shall be created

    df: pandas.core.frame.DataFrame
        The dataframe that shall be saved as blob

    """

    extension = blob_name.split(".")[-1]
    output = io.StringIO()
    if extension == "json":
        output = df.to_json()
    elif extension == "csv":
        output = df.to_csv(index=False, index_label=False)
    elif extension == "parquet":
        output = io.BytesIO()
        output = df.to_parquet()
    self.blob_service.create_blob_from_text(container_name, blob_name, output)

In [177]:
#export
@patch
def blob_to_df(self:AzureBlobStorage, container_name, blob_name):
    """Load blob and return dataframe
    
    Parameters
    ----------
    container_name: str
        The name of the container

    blob_name: str
        The name of the blob

    Returns
    -------
    df: pandas.core.frame.DataFrame
        The Dataframe containing data of the blob
    """

    extension = blob_name.split(".")[-1]

    with io.BytesIO() as input_stream:
        self.blob_service.get_blob_to_stream(
            container_name=container_name, blob_name=blob_name, stream=input_stream
        )

        input_stream.seek(0)
        if extension == "csv":
            df = pd.read_csv(input_stream, lineterminator="\n")
        elif extension == "json":
            df = pd.read_json(input_stream)
        elif extension == "parquet":
            df = pd.read_parquet(input_stream)
        elif extension == "xlsx":
            df = pd.read_excel(input_stream)
    return df


In [178]:
#export
@patch
def blobs_to_df(self:AzureBlobStorage, container_name):
    """Load blobs and write to dataframe
    
    Parameters
    ----------
    container_name: str
        The name of the container

    Returns
    -------
    df: pandas.core.frame.DataFrame
        The Dataframe containing data of the blobs
    """

    dfs = []
    generator = self.blob_service.list_blobs(container_name)
    for blob in generator:
        df = self.blob_to_df(container_name, blob.name)
        dfs.append(df)
    return pd.concat(dfs, ignore_index=True)

In [179]:
#export
@patch
def copy_blobs_to_other_container(self:AzureBlobStorage, source_container_name, destination_container_name, delete_after_copy=False):
    """Copy all blobs in one container to another container
        
    Parameters
    ----------
    source_container_name: str
        The name of the source container

    destination_container_name: str
        The name of the target container

    delete_after_copy: bool
        If True, delete all blobs in source container after copy

    """
    generator = self.blob_service.list_blobs(source_container_name)
    for blob in generator:
        blob_url = self.blob_service.make_blob_url(source_container_name, blob.name)
        self.blob_service.copy_blob(destination_container_name, blob.name, blob_url)
    if delete_after_copy:
        for blob in generator:
            self.blob_service.delete_blob(source_container_name, blob.name)


In [180]:
#export
@patch
def download_blobs_from_container(self:AzureBlobStorage, container_name, destination_path):
    """Download all blobs from container
    
    Parameters
    ----------
    container_name: str
        The name of the container
    destination_path: str
        The destination path
    
    """

    generator = self.blob_service.list_blobs(container_name)

    path = Path(download_path)

    zf = zipfile.ZipFile(
        path / f"{container_name}.zip", mode="w", compression=zipfile.ZIP_DEFLATED
    )

    for blob in generator:
        b = self.blob_service.get_blob_to_bytes(container_name, blob.name)
        zf.writestr(blob.name, b.content)

    zf.close()

In [181]:
from nbdev.export import *
notebook2script()

Converted 00_core.ipynb.
Converted index.ipynb.
