In [8]:
from azure.identity.aio import DefaultAzureCredential
from azure.storage.blob.aio import ContainerClient, BlobPrefix
import asyncio
from azure.storage.blob import BlobProperties
import os

account_url = f"https://wboylesbackups.blob.core.windows.net"
credential = DefaultAzureCredential()

In [9]:
# If out_path is None, file is downloaded to local with same name as in Azure
async def download_blob(container_name: str, blob_name: str, out_path: str | None = None):
    if out_path is None:
        out_path = container_name + blob_name

    container_client = ContainerClient(account_url, container_name, credential=credential)  # type: ignore

    async with container_client:
        with open(out_path, "wb") as f:
            blob_stream = await container_client.download_blob(blob_name)
            await blob_stream.readinto(f)


In [10]:
from typing import AsyncGenerator

async def list_blobs_async(
    container_name: str, prefix: str = ""
) -> AsyncGenerator[BlobProperties, None]:
    container_client = ContainerClient(account_url, container_name, credential)  # type: ignore

    async with container_client:
        async for blob in container_client.walk_blobs(name_starts_with=prefix):
            if isinstance(blob, BlobPrefix):
                name: str = blob.name  # type: ignore

                async for subblob in list_blobs_async(container_name, prefix=name):
                    yield subblob

            yield blob


In [11]:
async def create_folder_async(container_name: str, folder_name: str):
    container_client = ContainerClient(account_url, container_name, credential)  # type: ignore

    async with container_client:
        # This is a hack, but it works better than using the data lake file system
        uploaded_blob_client = await container_client.upload_blob(f"{folder_name}/_", "")
        async with uploaded_blob_client:
            await uploaded_blob_client.delete_blob()


In [12]:
# If azure_path is None, then file is uploaded to container base with same name as local
async def upload_file_async(
    container_name: str, local_path: str, azure_path: str | None = None
):
    if azure_path is None:
        azure_path = os.path.basename(local_path)

    container_client = ContainerClient(account_url, container_name, credential)  # type: ignore

    async with container_client:
        with open(local_path, "rb") as f:
            await container_client.upload_blob(azure_path, f)


In [13]:
# Deletes blobs recursively
# If deleting a directory, do not include a trailing / in the blob_name, otherwise the directory itself  will not be deleted
async def delete_blob(container_name: str, blob_name: str):
    container_client = ContainerClient(account_url, container_name, credential)  # type: ignore

    async with container_client:
        files: list[BlobProperties] = []
        folders: list[str] = []

        async for blob_props in list_blobs_async(container_name, blob_name):
            if str(blob_props.name).endswith("/"):
                # Trying to delete a folder with the / in the name causes a resource not found error
                altered_name = str(blob_props.name).removesuffix("/")
                folders.append(altered_name)
            else:
                files.append(blob_props)

        await container_client.delete_blobs(*files)

        # We can't use delete_blobs here because there's no ordering guarantee.
        # So, we'll get an error if we try to delete a parent folder before all descendents.
        # Same problem with for asyncio.gather.
        for folder in folders:
            await container_client.delete_blob(folder)


In [14]:
# uploads all files in a local folder
# folder structure is flattened, and all files are in upload/ directory in storage account
async def upload_folder_async(
    container_name: str, local_folder_path: str, azure_folder_name: str
):
    if not os.path.exists(local_folder_path):
        raise FileNotFoundError(f"{local_folder_path} not found")
    if not os.path.isdir(local_folder_path):
        raise NotADirectoryError(f"{local_folder_path} is not a directory")

    tasks = []
    path = os.path.abspath(local_folder_path)
    for dir in os.walk(path):
        dirpath, _, filenames = dir

        tasks += [
            asyncio.create_task(
                upload_file_async(
                    container_name,
                    f"{dirpath}/{filename}",
                    f"{azure_folder_name}/{filename}",
                )
            )
            for filename in filenames
        ]

    await asyncio.gather(*tasks)


# TODO: We can do this create nested directories. Could we use this to improve upload_folder?
# await upload_folder_async("videos", "C:\\Users\\willi\\git\\lightsout_c", "upload")
# await upload_folder_async("videos", "C:\\Users\\willi\\git\\lightsout_c", "upload/upload2")


In [29]:
# Move, copy, or rename blobs
async def move_rename_blob_async(
    old_container_name: str,
    old_name: str,
    new_container_name: str | None = None,
    new_name: str | None = None,
    delete_source: bool = True
):
    # Check for no-ops
    new_names_none = new_container_name is None and new_name is None
    dest_is_source = new_container_name == old_container_name and new_name == old_name
    if new_names_none or dest_is_source:
        return

    if new_container_name is None:
        new_container_name = old_container_name
    if new_name is None:
        new_name = old_name

    old_container_client = ContainerClient(account_url, old_container_name, credential)  # type: ignore
    new_container_client = ContainerClient(account_url, new_container_name, credential)  # type: ignore

    async with old_container_client, new_container_client:
        old_blob_client = old_container_client.get_blob_client(old_name)
        new_blob_client = new_container_client.get_blob_client(new_name)

        if not await old_blob_client.exists():
            raise FileNotFoundError(f"No blob named {old_name}")
        if await new_blob_client.exists():
            raise FileExistsError(f"There already exists a blob named {new_name}")

        await new_blob_client.start_copy_from_url(old_blob_client.url, requires_sync=True)

        if delete_source:
            await old_blob_client.delete_blob()
