# LoadPipe, pandas, and Matplotlib

Use this notebook to stream Google Drive payloads through LoadPipe's `DriveFileSystem` and explore them with pandas + Matplotlib. Replace the placeholder file identifiers with IDs from your Drive before running the data-loading cells.

## Prerequisites
- Activate your virtual environment and install the extras: ``pip install -e loadpipe[extras,gdrive] pandas matplotlib pyarrow``.
- Run ``lp auth login`` once so the OAuth token under ``.secrets/`` is ready for programmatic access.
- Update ``loadpipe/configs/config.yaml`` (or point ``CONFIG_PATH`` below to your custom file) with cache paths, Drive folder IDs, and chunk sizes.

In [1]:
from pathlib import Path
import io

from IPython.display import display
import matplotlib.pyplot as plt
import pandas as pd
from loadpipe.config import Config
from loadpipe.filesystem import DriveFileSystem, filesystem_from_config

%matplotlib inline
plt.style.use("seaborn-v0_8")

# Build a DriveFileSystem wired to your config for reuse across the notebook.
CONFIG_PATH = Path("/Users/ivantyshchenko/Projects/Python/DataPatron/loadpipe/configs/config.yaml")
cfg = Config.from_file(CONFIG_PATH)
drive_fs: DriveFileSystem = filesystem_from_config(cfg)

cfg


Config(runtime=RuntimeConfig(cache_dir='.cache/loadpipe', state_db='.state/manifest.sqlite', cache_limit_gb=30, retries=5, log_dir='.logs'), auth=AuthConfig(client_secrets_path='.secrets/client_secrets.json', token_path='.secrets/token.json', scopes=['https://www.googleapis.com/auth/drive']), source=SourceConfig(folder_id='CHANGE_ME_SOURCE_FOLDER_ID', pattern='*.zst'), download=DownloadConfig(chunk_mb=64), process=ProcessConfig(kind='identity'), upload=UploadConfig(folder_id='CHANGE_ME_DEST_FOLDER_ID', name_suffix=''))

## Helper utilities
These helpers wrap the `DriveFileSystem` so you can call them with a Drive file ID whenever you need a DataFrame.

In [2]:
from typing import Any, Optional

def read_csv_from_drive(
    file_id: str,
    *,
    fs: DriveFileSystem,
    encoding: str = "utf-8",
    **read_csv_kwargs: Any,
) -> pd.DataFrame:
    """Download a CSV from Drive and materialize it as a DataFrame."""
    if not file_id:
        raise ValueError("file_id must be provided.")
    drive_url = f"gdrive://{file_id}"
    with fs.open(drive_url) as binary_stream:
        with io.TextIOWrapper(binary_stream, encoding=encoding) as text_stream:
            return pd.read_csv(text_stream, **read_csv_kwargs)

def read_parquet_from_drive(
    file_id: str,
    *,
    fs: DriveFileSystem,
    columns: Optional[list[str]] = None,
    **read_parquet_kwargs: Any,
) -> pd.DataFrame:
    """Random-access reader for Parquet/Arrow workloads."""
    if not file_id:
        raise ValueError("file_id must be provided.")
    drive_url = f"gdrive://{file_id}"
    with fs.open(drive_url, random_access=True) as ra_stream:
        return pd.read_parquet(ra_stream, columns=columns, **read_parquet_kwargs)


## Load a CSV into pandas
Provide a Drive file ID that points to a CSV export (for example, a daily metrics dump or a partner feed). Uncomment `parse_dates` or dtype hints to match your schema.

In [3]:
CSV_FILE_ID = ""  # e.g. "1AbCdEfGhIj..."

if CSV_FILE_ID:
    orders_df = read_csv_from_drive(
        CSV_FILE_ID,
        fs=drive_fs,
        # parse_dates=["event_ts"],  # uncomment if you have timestamps
        # dtype={"partner_id": "string"},
    )
    display(orders_df.head())
    print(f"Loaded {len(orders_df):,} rows and {len(orders_df.columns)} columns.")
else:
    print("Set CSV_FILE_ID to a Drive file id before running this cell.")


Set CSV_FILE_ID to a Drive file id before running this cell.


### Quick profiling
Once the DataFrame is in memory you can lean on pandas to inspect schema, basic statistics, and null coverage.

In [10]:
if "orders_df" in globals():
    orders_df.info()
    summary = orders_df.describe(include="all").transpose()
    display(summary)
else:
    print("Load a CSV DataFrame first by setting CSV_FILE_ID.")


Load a CSV DataFrame first by setting CSV_FILE_ID.


### Plotting with Matplotlib
Pick a numeric column (and optional grouping key) to visualize. If you leave `GROUP_BY_COLUMN` blank the plot shows the raw series across the row index.

In [None]:
COLUMN_TO_PLOT = ""  # e.g. "bytes_transferred"
GROUP_BY_COLUMN = ""  # e.g. "region"

if "orders_df" not in globals():
    print("Load orders_df first.")
elif not COLUMN_TO_PLOT:
    print("Set COLUMN_TO_PLOT to a numeric column before plotting.")
elif COLUMN_TO_PLOT not in orders_df.columns:
    print(f"Column {COLUMN_TO_PLOT!r} not found. Available columns: {orders_df.columns.tolist()}")
elif GROUP_BY_COLUMN and GROUP_BY_COLUMN not in orders_df.columns:
    print(f"Grouping column {GROUP_BY_COLUMN!r} not found. Available columns: {orders_df.columns.tolist()}")
else:
    fig, ax = plt.subplots(figsize=(9, 4))
    if GROUP_BY_COLUMN:
        series = (
            orders_df.groupby(GROUP_BY_COLUMN)[COLUMN_TO_PLOT]
            .mean()
            .sort_values(ascending=False)
            .head(15)
        )
        series.plot(kind="barh", ax=ax, color="#4c72b0")
        ax.set_xlabel(f"Mean {COLUMN_TO_PLOT}")
        ax.set_ylabel(GROUP_BY_COLUMN)
        ax.set_title(f"Top {len(series)} {GROUP_BY_COLUMN} groups by {COLUMN_TO_PLOT}")
    else:
        orders_df[COLUMN_TO_PLOT].plot(ax=ax, color="#4c72b0")
        ax.set_ylabel(COLUMN_TO_PLOT)
        ax.set_title(f"{COLUMN_TO_PLOT} across the loaded rows")
    ax.grid(True, alpha=0.3)
    plt.tight_layout()


## Reading Parquet/Arrow files
Parquet readers need random access. LoadPipe exposes that via `fs.open(..., random_access=True)`, which the helper above already handles. Make sure ``pyarrow`` is installed.

In [None]:
PARQUET_FILE_ID = ""

if PARQUET_FILE_ID:
    events_df = read_parquet_from_drive(
        PARQUET_FILE_ID,
        fs=drive_fs,
        # columns=["event_ts", "bytes_transferred"],
    )
    display(events_df.head())
    print(f"Loaded {len(events_df):,} parquet rows.")
else:
    print("Set PARQUET_FILE_ID to a Drive file id before running this cell.")


## Tips
- `cfg.runtime.cache_dir` stores downloaded bytes so rerunning cells only hits Drive when the cache is cold.
- Use `drive_fs.open(..., random_access=True)` whenever a consumer seeks (Parquet/Arrow, DuckDB, Polars, etc.).
- Keep secrets in `.secrets/` and rely on `Config.from_file` or environment overrides instead of embedding credentials in notebooks.