In [None]:
# %pip -q install azure-storage-file-datalake

from azure.storage.filedatalake import DataLakeServiceClient
from azure.core.credentials import TokenCredential, AccessToken
from collections import defaultdict
from pathlib import PurePosixPath
import time
import pandas as pd
import requests


# -----------------------------
# Fabric token -> TokenCredential (for OneLake DFS)
# -----------------------------
class FabricStorageTokenCredential(TokenCredential):
    def get_token(self, *scopes, **kwargs):
        import notebookutils
        tok = notebookutils.credentials.getToken("storage")
        token_value = tok["accessToken"] if isinstance(tok, dict) else tok
        return AccessToken(token_value, int(time.time()) + 3600)


# -----------------------------
# Fabric REST helpers
# -----------------------------
def _get_fabric_rest_token():
    import notebookutils
    return notebookutils.credentials.getToken("pbi")


def _fabric_get(url: str, token: str):
    r = requests.get(url, headers={"Authorization": f"Bearer {token}"})
    if r.status_code >= 400:
        raise RuntimeError(f"Fabric REST call failed {r.status_code}: {r.text}")
    return r.json()



def list_lakehouses_and_warehouses(workspace_id: str):
    """
    Returns list of dicts: {id, displayName, type}
    """
    token = _get_fabric_rest_token()
    # Items endpoint includes Lakehouse/Warehouse among others
    data = _fabric_get(f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items", token)
    items = data.get("value", [])

    wanted = []
    for it in items:
        t = it.get("type")
        if t in ("Lakehouse", "Warehouse"):
            wanted.append({
                "id": it.get("id"),
                "displayName": it.get("displayName"),
                "type": t
            })
    return wanted


# -----------------------------
# Aggregation helpers
# -----------------------------
def parent_dirs(path: PurePosixPath, max_depth=None):
    """
    Yield parent directories for a file path, from deepest to root.
    Root is represented as "" internally.
    max_depth: limit rollup depth ("a" depth=1, "a/b" depth=2, etc.)
    """
    parts = path.parts
    dir_parts = parts[:-1]
    if not dir_parts:
        yield ""  # root
        return

    for d in range(len(dir_parts), 0, -1):
        if max_depth is not None and d > max_depth:
            continue
        yield "/".join(dir_parts[:d])

    yield ""  # root


# -----------------------------
# Main: enumerate items, then list each itemId in OneLake
# -----------------------------
def measure_onelake_workspace_by_items(
    workspace_name: str | None = None,
    item_types: tuple[str, ...] = ("Lakehouse", "Warehouse"),
    onelake_dfs_endpoint: str = "https://onelake.dfs.fabric.microsoft.com",
    max_depth: int | None = None,
    progress_every_files: int = 200_000,
    prefix_with_item: bool = True,   # prefix directory keys with "<Type>/<Name>/..."
):
    import sempy 
    """
    Enumerate Lakehouses/Warehouses in a workspace and compute:
      - total bytes
      - per-item total bytes
      - per-directory cumulative bytes

    Uses OneLake GUID addressing: list each item by fs.get_paths(path=itemId, recursive=True).
    """

    if workspace_name == None:
        workspace_name =  notebookutils.runtime.context.get("currentWorkspaceName")

    workspace_id = sempy.fabric.resolve_workspace_id(workspace_name)
    items = list_lakehouses_and_warehouses(workspace_id)
    items = [it for it in items if it["type"] in item_types]

    if not items:
        raise RuntimeError(f"No items of type {item_types} found in workspace ({workspace_id}).")

    cred = FabricStorageTokenCredential()
    service = DataLakeServiceClient(account_url=onelake_dfs_endpoint, credential=cred)
    fs = service.get_file_system_client(workspace_id)

    dir_bytes = defaultdict(int)
    item_totals = []
    grand_total = 0
    grand_files = 0
    started = time.time()

    for idx, it in enumerate(items, start=1):
        item_id = it["id"]
        item_name = it.get("displayName") or item_id
        item_type = it["type"]

        item_bytes = 0
        item_files = 0

        item_prefix = f"{item_type}/{item_name}".replace("\\", "/").strip("/")
        list_path = item_id  # GUID root for this item

        print(f"[{idx}/{len(items)}] Listing {item_type}: {item_name} ({item_id})")

        for p in fs.get_paths(path=list_path, recursive=True):
            if getattr(p, "is_directory", False):
                continue

            size = int(getattr(p, "content_length", 0) or 0)
            item_bytes += size
            item_files += 1
            grand_total += size
            grand_files += 1

            # p.name begins with "<itemId>/..."
            name = p.name or ""
            rel = name[len(item_id):].lstrip("/") if name.startswith(item_id) else name
            key_path = f"{item_prefix}/{rel}" if prefix_with_item else rel

            pp = PurePosixPath(key_path)
            for d in parent_dirs(pp, max_depth=max_depth):
                dir_bytes[d] += size

            if progress_every_files and grand_files % progress_every_files == 0:
                elapsed = time.time() - started
                rate = grand_files / elapsed if elapsed > 0 else 0
                print(f"  Processed {grand_files:,} files total | {grand_total/1e12:.3f} TB | {rate:,.0f} files/sec")

        item_totals.append({
            "type": item_type,
            "item_name": item_name,
            "item_id": item_id,
            "files": item_files,
            "bytes": item_bytes,
        })

    elapsed = time.time() - started

    # Directory dataframe
    rows = [(d if d != "" else "/", b) for d, b in dir_bytes.items()]
    df_dirs = pd.DataFrame(rows, columns=["directory", "bytes"])
    df_dirs["gb"] = df_dirs["bytes"] / (1024**3)
    df_dirs["tb"] = df_dirs["bytes"] / (1024**4)
    df_dirs = df_dirs.sort_values("bytes", ascending=False).reset_index(drop=True)

    # Item totals dataframe
    df_items = pd.DataFrame(item_totals)
    df_items["gb"] = df_items["bytes"] / (1024**3)
    df_items["tb"] = df_items["bytes"] / (1024**4)
    df_items = df_items.sort_values("bytes", ascending=False).reset_index(drop=True)

    summary = {
        "workspace_id": workspace_id,
        "workspace_name": workspace_name,
        "items_count": len(items),
        "files": grand_files,
        "total_bytes": grand_total,
        "total_gb": grand_total / (1024**3),
        "total_tb": grand_total / (1024**4),
        "seconds": elapsed,
        "files_per_sec": (grand_files / elapsed) if elapsed > 0 else None,
        "dirs_tracked": len(dir_bytes),
        "max_depth": max_depth,
        "prefix_with_item": prefix_with_item,
        "onelake_dfs_endpoint": onelake_dfs_endpoint,
    }

    return summary, df_dirs, df_items


# -----------------------------
# Run (defaults to current workspace)
# -----------------------------
summary, df_dirs, df_items = measure_onelake_workspace_by_items(
    workspace_name=None,     # or "My Workspace Name"
    max_depth=4,             # set None for full rollup (can be huge)
    prefix_with_item=True,
    progress_every_files=100_000
)

summary


In [None]:
def format_bytes(n: int) -> str:
    units = ["B","KB","MB","GB","TB","PB","EB"]
    x = float(n)
    i = 0
    while x >= 1024 and i < len(units)-1:
        x /= 1024
        i += 1
    return f"{x:,.2f} {units[i]}" if i >= 3 else f"{x:,.0f} {units[i]}"

def print_top_folders(df_dirs, top_n=10):
    top = df_dirs.sort_values("bytes", ascending=False).head(top_n)

    print(f"\nTop {len(top)} folders by size")
    print("-" * 80)
    for _, row in top.iterrows():
        print(f"{row['directory']:<60} {format_bytes(int(row['bytes'])):>15}")

# Run it
print_top_folders(df_dirs, top_n=50)


