In [None]:
from fsspec import filesystem as fsspec_filesystem

from fsspec_utils import filesystem

from fsspec_utils.storage_options import StorageOptions, AwsStorageOptions

# from fsspec.implementations.dirfs import DirFileSystem

# --- Local Filesystem Examples ---

# Local FileSystem
fs_local = filesystem("file")

# DirFileSystem: local Directory FileSystem for /tmp
fs_dir_local = filesystem("/tmp", dirfs=True)

# --- S3 Filesystem Examples ---

# S3 FileSystem using dict storage_options
# Note: In a real scenario, replace "your_key" and "your_secret" with actual AWS credentials.
fs_s3_dict = filesystem("s3", storage_options={"key": "your_key", "secret": "your_secret"})

# S3 FileSystem using StorageOptions.create() and then to_filesystem()
# This is the recommended type-safe way to create storage options.
so_s3_create = StorageOptions.create(protocol="s3", access_key_id="your_key", secret_access_key="your_secret")
fs_s3_storage_options = so_s3_create.to_filesystem()

# S3 FileSystem using AwsStorageOptions directly
# This provides specific parameters for AWS.
so_s3_aws = AwsStorageOptions(access_key_id="your_key", secret_access_key="your_secret")
fs_s3_aws_direct = so_s3_aws.to_filesystem()

# S3 FileSystem using AwsStorageOptions from AWS credentials profile
# This assumes you have an AWS profile configured (e.g., in ~/.aws/credentials).
# Replace "lodl" with your actual profile name.
so_s3_profile = AwsStorageOptions.from_aws_credentials(profile="lodl")
fs_s3_aws_profile = so_s3_profile.to_filesystem()

# S3 FileSystem using AWS credentials profile directly via filesystem() kwargs
# The filesystem() function can sometimes infer from kwargs if they match StorageOptions parameters.
fs_s3_kwargs_profile = filesystem("s3", profile="lodl", allow_invalid_certificates=True)

# --- DirFileSystem for S3 ---

# S3 Directory FileSystem for my-bucket. dirfs=True is optional, as it is often the default behavior for paths ending with a slash.
# Replace "my-bucket" with your actual S3 bucket name.
fs_dir_s3 = filesystem("s3://my-bucket", dirfs=True)
# S3 Directory FileSystem with storage_options
fs_dir_s3_so = filesystem("s3://my-bucket", storage_options={"key": "your_key", "secret": "your_secret"})
print(f"S3 DirFileSystem (default): {fs_dir_s3}")
print(f"S3 DirFileSystem (with storage_options): {fs_dir_s3_so}")

# --- DirFileSystem for local path ---

# Local Directory FileSystem.
fs_dir_local = filesystem("./my_local_dir/", dirfs=True)

print(f"Local DirFileSystem: {fs_dir_local}")



In [None]:
# --- Data I/O Examples (requires actual S3 resources to run) ---

# Read Parquet files from S3 into a pyarrow table
df_parquet = fs_s3_storage_options.read_parquet("s3://test/ewn/mms2/raw/*.parquet")

# Creates an iterator to read Parquet files from S3 into a pyarrow table with batch size
df_parquet_batch = fs_s3_storage_options.read_parquet("s3://test/ewn/mms2/raw/*.parquet", batch_size=1)

# Same is possible with other file formats (csv, json). There are also methods for writing data back to S3 in these formats.


In [None]:

# Create a pyarrow dataset from parquet files. This dataset is lazy and can be used for efficient batch processing.
ds_pyarrow = fs_s3_storage_options.pyarrow_dataset("s3://test/ewn/mms2/raw/", format="parquet")

# Create a pyarrow parquet dataset from a metadata file that holds the schema and the metadata
# (schema, column stats, row group size, offset,...) for all parquet files in the path.
# This dataset is lazy and can be used for efficient batch processing, using pyarrow scanner, polars or duckdb.
# Thanks to the column stats for all files, this is very efficient for querying and processing large datasets.
ds_pyarrow_parquet = fs_s3_storage_options.pyarrow_parquet_dataset("s3://test/ewn/mms2/raw/")

# Creates a PyDDataset from all files in the path. A PyDala dataset is a specialized dataset for efficient data processing and querying.
# It creates a metadata file that holds the schema and the metadata (column stats, row group size, offset,...) for all files in the path
# which then can be used for efficient querying and processing. It also provides nice helper functions for working with the dataset,
# like converting the dataset to a polars or pandas DataFrame, registering the dataset with a catalog, and more.
ds_pydala = fs_s3_storage_options.pydala_dataset("s3://test/ewn/mms2/raw/")

# There are also methods for writing data back to S3 in these formats.


In [None]:
import pyarrow.parquet as pq

# Read Parquet files from S3 into a pyarrow table using the filesystem directly
table_pyarrow = pq.read_table("s3://test/ewn/mms2/raw/*.parquet", filesystem=fs_s3_storage_options)

# Write a pyarrow table to S3 as a Parquet file using the filesystem directly
pq.write_table(table_pyarrow, "s3://test/ewn/mms2/raw/output.parquet", filesystem=fs_s3_storage_options)

# Same is possible with other file formats (csv, json). There are also methods for writing data back to S3 in these formats.


In [None]:
import polars as pl

# Create a StorageOptions object for S3 with a profile and allow_invalid_certificates
# This object can then be passed to libraries that accept fsspec-compatible storage options.
so_s3_polars = AwsStorageOptions.create(profile="lodl", allow_invalid_certificates=True)

# Scan Parquet files from S3 using Polars, passing storage options
df_polars = pl.scan_parquet("s3://test/ewn/mms2/raw/*.parquet", storage_options=so_s3_polars.to_object_store_kwargs())
# df_polars.head(10)


In [None]:
# --- Automatic DirFileSystem Creation ---

# When a path is provided as the protocol_or_path argument, 
# filesystem() automatically creates a DirFileSystem instance.
# This provides a convenient way to work with directory-based filesystems.

fs_auto_dir = filesystem("/tmp/test")
print(f"Filesystem type: {type(fs_auto_dir).__name__}")
print(f"Base path: {fs_auto_dir.path}")

# This is equivalent to explicitly creating a DirFileSystem:
# from fsspec.implementations.dirfs import DirFileSystem
# fs_explicit_dir = DirFileSystem(path="/tmp/test", fs=filesystem("file"))


In [None]:

from deltalake import DeltaTable

# Create a DeltaTable instance from S3, passing storage options
dt_delta = DeltaTable("s3://pu1/aumann/process_monitoring/results_delta", storage_options=so_s3_polars.to_object_store_kwargs())
dt_delta.file_uris()