## What is NeoFS?

NeoFS is a distributed object storage service within the Neo blockchain ecosystem. It's a decentralized storage network that allows users to store and share data without relying on centralized service providers. NeoFS has the following features:

1. **Decentralized Storage**: Data is distributed across network nodes with no single point of failure.
2. **Data Ownership**: Users maintain complete control over their data.
3. **Blockchain Integration**: Tightly integrated with the Neo blockchain, providing security and auditability.
4. **Flexible Access Control**: Offers fine-grained access control policies.
5. **Data Integrity**: Uses cryptographic techniques to ensure data integrity and authenticity.

In this tutorial, we'll learn how to interact with NeoFS using Python, including creating containers, retrieving container lists, and performing signature operations.

## Helper Functions

First, let's define some helper functions for encryption, encoding, and utility operations.

In [None]:
# Import necessary libraries
import os
import json
import binascii
import httpx
from base64 import b64decode, b64encode
from hashlib import sha256

# These libraries need to be installed: pip install pycryptodome ecdsa base58
import base58
from Crypto.PublicKey import ECC
from Crypto.Hash import RIPEMD160
import ecdsa

In [None]:
# Functions about cryptographic operations
def wif_to_private_key(wif: str) -> bytes:
    """Convert from WIF format to raw private key"""
    # Step 1: Decode WIF key using Base58
    decoded = base58.b58decode(wif)
    
    # Step 2: Validate the length of decoded data
    if len(decoded) != 38:
        raise ValueError("Invalid WIF length")
    
    # First byte is version, last 4 bytes are checksum
    # version_byte = decoded[0]  # Typically 0x80 for Bitcoin mainnet
    private_key_bytes = decoded[1:-5]  # Extract private key portion
    checksum = decoded[-4:]  # Extract checksum portion
    
    # Step 3: Calculate expected checksum
    hash1 = sha256(decoded[:-4]).digest()
    hash2 = sha256(hash1).digest()
    expected_checksum = hash2[:4]
    
    # Check if provided checksum matches calculated checksum
    if checksum != expected_checksum:
        raise ValueError("Invalid WIF checksum")
    
    return private_key_bytes

def hex_to_private_key(hex_str: str) -> bytes:
    """Convert from hex string to private key bytes"""
    if hex_str.startswith('0x'):
        hex_str = hex_str[2:]
    return binascii.unhexlify(hex_str)

def private_key_to_neo3_public_key_and_address(private_key: bytes) -> (str, str):
    """Generate NEO3 public key and address from private key"""
    # Construct public key from private key using secp256r1 curve
    public_key = ECC.construct(curve='secp256r1', d=int.from_bytes(private_key, 'big')).pointQ
    x = public_key.x.to_bytes(32, 'big')
    # Choose prefix based on y-coordinate parity (compressed format)
    prefix = b'\x02' if public_key.y % 2 == 0 else b'\x03'
    compressed_public_key = prefix + x
    
    # Generate verification script
    verification_script = b'\x0c\x21' + compressed_public_key + b'\x41\x56\xe7\xb3\x27'
    
    # Calculate script hash
    ripemd160 = RIPEMD160.new()
    ripemd160.update(sha256(verification_script).digest())
    script_hash = ripemd160.digest()
    
    # Generate address (Neo address prefix is 0x35)
    address = base58.b58encode_check(b'\x35' + script_hash).decode('utf-8')
    
    return compressed_public_key.hex(), address

def sign_message(private_key: bytes, message: str | bytes) -> bytes:
    """Sign a message using private key"""
    assert len(private_key) == 32
    if type(message) is str:
        message: bytes = bytes.fromhex(message)
    pk = ecdsa.SigningKey.from_string(private_key, curve=ecdsa.NIST256p, hashfunc=sha256)
    signature: bytes = pk.sign_deterministic(message)
    return signature

def verify_message_signature(public_key: str | bytes, message: str | bytes, signature: bytes) -> bool:
    """Verify message signature"""
    if type(public_key) is str:
        public_key: bytes = bytes.fromhex(public_key)
    assert len(public_key) == 33
    assert public_key.startswith(b'\x02') or public_key.startswith(b'\x03')
    if type(message) is str:
        message: bytes = bytes.fromhex(message)
    public_key: ecdsa.VerifyingKey = ecdsa.VerifyingKey.from_string(public_key, curve=ecdsa.NIST256p, hashfunc=sha256)
    result = public_key.verify(signature, message)
    return result

In [None]:
# Functions about data format conversions
def num2VarInt(num: int) -> str:
    """Convert a number to variable-length integer hex string representation"""
    if num < 0xfd:
        return num2hexstring(num)
    elif num <= 0xffff:
        # uint16
        return "fd" + num2hexstring(num, 2, True)
    elif num <= 0xffffffff:
        # uint32
        return "fe" + num2hexstring(num, 4, True)
    else:
        # uint64
        return "ff" + num2hexstring(num, 8, True)

def num2hexstring(num: int, size: int = 1, little_endian=False) -> str:
    """Convert a number to a hex string"""
    if not isinstance(num, int):
        raise TypeError(f"num2hexstring expected a number but got {type(num)}.")
    if num < 0:
        raise ValueError(f"num2hexstring expected a positive integer but got {num}.")
    if size % 1 != 0:
        raise ValueError(f"num2hexstring expected a positive integer but got {num}.")
    if num > 2 ** 53 - 1:
        raise ValueError(f"num2hexstring expected a safe integer but got {num}.")
    
    size *= 2
    hexstring = hex(num)[2:]
    hexstring = hexstring.zfill(size)
    
    if little_endian:
        hexstring = reverse_hex(hexstring)
    return hexstring

def reverse_hex(hexstring: str) -> str:
    """Reverse a hex string (reverse by byte pairs)"""
    return ''.join(reversed([hexstring[i:i + 2] for i in range(0, len(hexstring), 2)]))

## Key Configuration

In NeoFS, all operations require signing with a Neo wallet's private key to verify identity. Here, we'll start with a WIF (Wallet Import Format) private key and obtain the corresponding public key and address.

**Note**: Always protect your private key and never expose it in public or insecure environments.

In [None]:
# Set up private key - Replace with your own private key in actual use
PRIVATE_KEY_WIF = ""  # Enter your WIF format private key here

# Convert from WIF format to raw private key
private_key = wif_to_private_key(PRIVATE_KEY_WIF)

# Get corresponding public key and NEO3 address
public_key, address = private_key_to_neo3_public_key_and_address(private_key)

# Set up the base URL for the NeoFS gateway
BASE_URL = 'https://rest.t5.fs.neo.org/v1/'  # This is a testnet URL, use the appropriate URL for production

# Create an HTTP client
httpx_client = httpx.Client(base_url=BASE_URL, headers={"Content-Type": "application/json"})

## Example 1: Creating and Retrieving Containers

In NeoFS, all data is stored in containers. Containers are similar to "buckets" or "folders" in traditional cloud storage, defining storage policies and access control rules for data.

Below we'll demonstrate how to create new containers and then query containers owned by the current user.

### 1.1 Creating an Authentication Token

Before creating a container, we need to obtain an authentication token. This is how permission control is implemented in NeoFS.

In [None]:
# Set up authentication request headers
auth_header = {
    'X-Bearer-Owner-Id': address,       # Token owner's address
    'X-Bearer-Lifetime': "10000",       # Token validity period (seconds)
    'X-Bearer-For-All-Users': "false",  # Whether valid for all users
}

# Set up authentication content - Define token permissions
auth_content = [
    {"name":"my-bearer-token","object":[{"action":"ALLOW","filters":[],"operation":"GET","targets":[{"keys":[],"role":"OTHERS"}]}]},
    {"container":{"verb":"PUT"},"name":"my token"}
]

# Send authentication request
try:
    auth_resp = httpx_client.post('auth', 
                                  headers=auth_header, 
                                  content=json.dumps(auth_content)).json()
    
    # Decode and sign tokens
    for t in auth_resp:
        t["decoded_token"] = b64decode(t["token"])
    
    for t in auth_resp:
        t["signed_token"] = sign_message(private_key, t["decoded_token"]).hex()
    
    # Verify signatures
    for t in auth_resp:
        assert verify_message_signature(public_key, t["decoded_token"], bytes.fromhex(t["signed_token"]))
    
    print("Authentication token created successfully:")
    print(auth_resp)
except Exception as e:
    print(f"Failed to create authentication token: {e}")

### 1.2 Creating a New Container

Now that we have an authentication token, we can use it to create a new container. Container creation requires a signature, so we need to prepare the signature data first.

In [None]:
# Use the second token (for container creation)
msg = auth_resp[1]['token']

# Generate random salt (for increased security)
random_salt = os.urandom(16).hex()  # Generate random salt

# Prepare parameters and serialize
parameter_hex_string = (random_salt + msg).encode().hex()
assert len(parameter_hex_string) % 2 == 0  # Ensure even length
length_hex = num2VarInt(len(parameter_hex_string) // 2)
concatenated_string = length_hex + parameter_hex_string
serialized_transaction = '010001f0' + concatenated_string + '0000'  # Specific format for transaction serialization

# Sign transaction data
signature = sign_message(private_key, serialized_transaction)
assert verify_message_signature(public_key, serialized_transaction, signature)
signature_hex_str = signature.hex()

# Set request headers
httpx_client.headers.update({"Authorization": f"Bearer {auth_resp[1]['token']}"})
bearer_header = {
    'X-Bearer-Owner-Id': address,                     # Owner address
    'X-Bearer-Signature': signature_hex_str + random_salt,  # Signature + random salt
    'X-Bearer-Signature-Key': public_key,              # Public key
}

# Create container
try:
    create_container_resp = httpx_client.put(
        'containers?walletConnect=true&name-scope-global=true',  # API endpoint and parameters
        headers=bearer_header,
        content=json.dumps(
            {
                "basicAcl": "public-read-write",     # Access control level
                "containerName": "MyFirstContainer",  # Container name
                "placementPolicy": "REP 3"           # Replication policy: maintain 3 copies
            }
        )
    )
    print(f"Container creation response: {create_container_resp.json()}")
    
    # Retrieve container list again to see newly created container
    list_container_resp = httpx_client.get(f'containers?ownerId={address}')
    print(f"\nUpdated container list: {list_container_resp.json()}")
    
except Exception as e:
    print(f"Failed to create container: {e}")

### 1.3 Retrieving the Current User's Container List

Now that we've created a container, let's see how to retrieve a list of all containers owned by the current user:

In [None]:
# Get all containers owned by current user
try:
    my_containers = httpx_client.get(f'containers?ownerId={address}').json()
    print(f'My container list: {my_containers}')
except Exception as e:
    print(f"Failed to retrieve container list: {e}")

## Example 2: Signature Operations

In NeoFS, many operations require signatures to verify identity and authorization. The example below demonstrates how to sign a simple message ("hello"). This can be used as a test to verify that your key setup is correct, and to understand the signature mechanism in NeoFS.

In [None]:
# Display public key and address
print(f"Public key: {public_key}")
print(f"Address: {address}")
print()

# Message to sign
msg = 'hello'

# Use a fixed random salt for verification and testing
# In production, use os.urandom(16).hex() to generate a random salt
random_salt = '2b62f24c77ec30bac716b116651b9d23'  

# Prepare parameters and serialize - similar to container creation process
parameter_hex_string = (random_salt + msg).encode().hex()
assert len(parameter_hex_string) % 2 == 0
length_hex = num2VarInt(len(parameter_hex_string) // 2)
concatenated_string = length_hex + parameter_hex_string
serialized_transaction = '010001f0' + concatenated_string + '0000'
print(f"Serialized transaction: {serialized_transaction}")
print()

# Sign transaction data
signature = sign_message(private_key, serialized_transaction)

# Output results
print(f"Random salt: {random_salt}")
print(f"Signature: {signature.hex()}")

## Example 3: File Operations in NeoFS

After setting up a container in NeoFS, we can now perform key file operations such as uploading, downloading, and searching for files. These operations are essential for utilizing NeoFS as a decentralized storage system.

### 3.1 Uploading Files to NeoFS

To upload files to NeoFS, we first need to create an authentication token specifically for upload operations, then sign the upload request and send the file data.

In [None]:
def create_upload_token(client, private_key, public_key, address, container_id):
    """
    Create a token for uploading files
    
    Args:
        client: Configured HTTPX client
        private_key: Private key bytes
        public_key: Public key hex string
        address: Neo address
        container_id: Container ID to upload to
        
    Returns:
        dict: Auth token information
    """
    auth_header = {
        'X-Bearer-Owner-Id': address,
        'X-Bearer-Lifetime': "10000",
        'X-Bearer-For-All-Users': "false",
    }
    
    auth_content = [
        {
            "name": "upload-token",
            "object": [
                {
                    "action": "ALLOW",
                    "filters": [],
                    "operation": "PUT",
                    "targets": [{"keys": [], "role": "OTHERS"}]
                }
            ]
        }
    ]
    
    auth_resp = client.post('auth', headers=auth_header, content=json.dumps(auth_content), timeout=180).json()
    
    # Process and sign token
    for t in auth_resp:
        t["decoded_token"] = b64decode(t["token"])
    for t in auth_resp:
        t["signed_token"] = sign_message(private_key, t["decoded_token"]).hex()
    for t in auth_resp:
        assert verify_message_signature(public_key, t["decoded_token"], bytes.fromhex(t["signed_token"]))
    
    return auth_resp[0]

In [None]:
def upload_file(client, private_key, public_key, address, container_id, file_path, attributes=None, expiration=None):
    """
    Upload a file to NeoFS
    
    Args:
        client: Configured HTTPX client
        private_key: Private key bytes
        public_key: Public key hex string
        address: Neo address
        container_id: Container ID to upload to
        file_path: Path to file to upload
        attributes: Optional attributes for the file
        expiration: Optional expiration time (RFC3339 format, timestamp, or duration)
        
    Returns:
        dict: Upload response containing object ID
    """
    # Create token for upload operation
    upload_token = create_upload_token(client, private_key, public_key, address, container_id)
    
    # Prepare for signature
    msg = upload_token['token']
    random_salt = os.urandom(16).hex()
    parameter_hex_string = (random_salt + msg).encode().hex()
    assert len(parameter_hex_string) % 2 == 0
    length_hex = num2VarInt(len(parameter_hex_string) // 2)
    concatenated_string = length_hex + parameter_hex_string
    serialized_transaction = '010001f0' + concatenated_string + '0000'
    signature = sign_message(private_key, serialized_transaction)
    assert verify_message_signature(public_key, serialized_transaction, signature)
    
    # The API requires hex-encoded signature
    signature_hex_str = signature.hex()
    
    # Update client headers with auth token
    client.headers.update({"Authorization": f"Bearer {upload_token['token']}"})
    
    # Set up headers for the upload request
    upload_headers = {
        'X-Bearer-Signature': signature_hex_str + random_salt, # Include random salt with signature
        'X-Bearer-Signature-Key': public_key,
        'Content-Type': 'application/octet-stream',
    }
    
    # Handle attributes
    if attributes is None:
        attributes = {}
    
    # Add filename as attribute if not provided
    if 'FileName' not in attributes:
        attributes['FileName'] = os.path.basename(file_path)
    print(attributes['FileName'])
    # Convert attributes to JSON and add to headers
    upload_headers['X-Attributes'] = json.dumps(attributes)
    
    # Handle expiration if provided
    if expiration:
        if expiration.endswith('Z') or '+' in expiration or '-' in expiration:
            # Assume RFC3339 format
            upload_headers['X-Neofs-Expiration-RFC3339'] = expiration
        elif expiration.endswith('s') or 'h' in expiration or 'm' in expiration:
            # Assume duration format
            upload_headers['X-Neofs-Expiration-Duration'] = expiration
        else:
            # Assume timestamp
            upload_headers['X-Neofs-Expiration-Timestamp'] = expiration
    
    # Read file content
    with open(file_path, 'rb') as f:
        file_content = f.read()
    
    # Upload file
    upload_resp = client.post(
        f'objects/{container_id}?walletConnect=true',
        headers=upload_headers,
        content=file_content,
        timeout=180
    )
    
    return upload_resp.json()

Let's use these functions to upload a file to our container:

In [None]:
# Example: Upload a file to NeoFS
# First, let's create a simple text file
with open("example.txt", "w") as f:
    f.write("Hello NeoFS! This is a test file for our tutorial.")

# Now upload the file to our container
try:
    # Get the first container ID from our list
    container_id = my_containers['containers'][0]['containerId']
    
    # Upload with custom attributes and 24-hour expiration
    upload_response = upload_file(
        httpx_client, 
        private_key, 
        public_key, 
        address, 
        container_id, 
        "example.txt", 
        attributes={"Description": "Test file for NeoFS tutorial"},
        expiration="24h"
    )
    
    print(f"File uploaded successfully: {upload_response}")
    
    # Store the object ID for later use
    object_id = upload_response["object_id"]
except Exception as e:
    print(f"Upload failed: {e}")

### 3.2 Downloading Files from NeoFS

Once files are uploaded to NeoFS, you can download them using their container ID and object ID. Similar to uploads, we need to create an authentication token for download operations.

In [None]:
def create_download_token(client, private_key, public_key, address, container_id, object_id=None):
    """
    Create a token for downloading files
    
    Args:
        client: Configured HTTPX client
        private_key: Private key bytes
        public_key: Public key hex string
        address: Neo address
        container_id: Container ID to download from
        object_id: Optional specific object ID to download
        
    Returns:
        dict: Auth token information
    """
    auth_header = {
        'X-Bearer-Owner-Id': address,
        'X-Bearer-Lifetime': "10000",
        'X-Bearer-For-All-Users': "false",
    }
    
    # Create a more generic token similar to upload token
    auth_content = [
        {
            "name": "download-token",
            "object": [
                {
                    "action": "ALLOW",
                    "filters": [],
                    "operation": "GET",
                    "targets": [{"keys": [], "role": "OTHERS"}]
                }
            ]
        }
    ]
    
    auth_resp = client.post('auth', headers=auth_header, content=json.dumps(auth_content), timeout=180).json()
    
    # Process and sign token
    for t in auth_resp:
        t["decoded_token"] = b64decode(t["token"])
    for t in auth_resp:
        t["signed_token"] = sign_message(private_key, t["decoded_token"]).hex()
    for t in auth_resp:
        assert verify_message_signature(public_key, t["decoded_token"], bytes.fromhex(t["signed_token"]))
    
    return auth_resp[0]

In [None]:
def download_file(client, private_key, public_key, address, container_id, object_id, output_path=None, range_header=None, force_download=False):
    """
    Download a file from NeoFS
    
    Args:
        client: Configured HTTPX client
        private_key: Private key bytes
        public_key: Public key hex string
        address: Neo address
        container_id: Container ID to download from
        object_id: Object ID to download
        output_path: Path to save downloaded file
        range_header: Optional Range header value to request specific bytes
        force_download: Whether to use the download query param to force browser download
        
    Returns:
        bytes: File content if output_path is None, otherwise saves to file
    """
    # Create token for download operation
    download_token = create_download_token(client, private_key, public_key, address, container_id)
    
    # Prepare for signature
    msg = download_token['token']
    random_salt = os.urandom(16).hex()
    parameter_hex_string = (random_salt + msg).encode().hex()
    assert len(parameter_hex_string) % 2 == 0
    length_hex = num2VarInt(len(parameter_hex_string) // 2)
    concatenated_string = length_hex + parameter_hex_string
    serialized_transaction = '010001f0' + concatenated_string + '0000'
    signature = sign_message(private_key, serialized_transaction)
    assert verify_message_signature(public_key, serialized_transaction, signature)
    
    # The API requires hex-encoded signature
    signature_hex_str = signature.hex()
    
    # Update client headers with auth token
    client.headers.update({"Authorization": f"Bearer {download_token['token']}"})
    
    download_headers = {
        'X-Bearer-Signature': signature_hex_str + random_salt, # Include random salt with signature
        'X-Bearer-Signature-Key': public_key,
    }
    
    # Add Range header if specified
    if range_header:
        download_headers['Range'] = range_header
    
    # Construct URL with query parameters if needed
    download_url = f'objects/{container_id}/by_id/{object_id}'
    query_params = []
    
    query_params.append('walletConnect=true')
    
    if force_download:
        query_params.append('download=true')
    
    if query_params:
        download_url += '?' + '&'.join(query_params)
    
    # Download file
    download_resp = client.get(
        download_url,
        headers=download_headers
    )
    
    # Handle response
    if download_resp.status_code != 200:
        raise Exception(f"Download failed with status code {download_resp.status_code}: {download_resp.text}")
    
    content = download_resp.content
    
    # Extract file attributes if present
    attributes = {}
    if 'X-Attributes' in download_resp.headers:
        try:
            attributes = json.loads(download_resp.headers['X-Attributes'])
        except:
            pass
    
    if output_path:
        with open(output_path, 'wb') as f:
            f.write(content)
        return {
            "message": f"File saved to {output_path}",
            "attributes": attributes
        }
    else:
        return {
            "content": content,
            "attributes": attributes
        }

Let's download the file we just uploaded:

In [None]:
try:
    # Download the file
    download_response = download_file(
        httpx_client, 
        private_key, 
        public_key, 
        address, 
        container_id, 
        object_id, 
        output_path="downloaded_document.txt",  # Assuming it's a text file
        force_download=True
    )
    
    print(f"File downloaded successfully: {download_response}")
    
    # Alternative implementation using scikit-learn
    def process_and_vectorize_document_sklearn(file_path, chunk_size=1000, chunk_overlap=200):
        """
        Process a downloaded document using scikit-learn for vectorization
        
        Args:
            file_path: Path to the downloaded file
            chunk_size: Size of text chunks for splitting
            chunk_overlap: Overlap between chunks
        """
        import os
        import numpy as np
        from sklearn.feature_extraction.text import TfidfVectorizer
        import pickle
        
        # Simple text chunking function
        def chunk_text(text, chunk_size, overlap):
            chunks = []
            start = 0
            while start < len(text):
                end = min(start + chunk_size, len(text))
                chunks.append(text[start:end])
                start += chunk_size - overlap
            return chunks
        
        # Read the content of the file
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()
        
        # Split text into chunks
        chunks = chunk_text(text, chunk_size, chunk_overlap)
        print(f"Split document into {len(chunks)} chunks")
        
        # Use TF-IDF to vectorize the text
        vectorizer = TfidfVectorizer()
        vectors = vectorizer.fit_transform(chunks)
        
        # Save the chunks and vectors for later use
        # Create directory if it doesn't exist
        os.makedirs("./vector_db", exist_ok=True)
        
        # Save chunks to a file
        with open("./vector_db/chunks.txt", "w", encoding="utf-8") as f:
            for chunk in chunks:
                f.write(chunk + "\n===CHUNK_SEPARATOR===\n")
        
        # Save vectors using numpy
        np.save("./vector_db/vectors.npy", vectors.toarray())
        
        # Save vocabulary
        with open("./vector_db/vectorizer.pkl", "wb") as f:
            pickle.dump(vectorizer, f)
        
        print("Document vectorized and stored in ./vector_db")
        
        # Create a search function
        def similarity_search(query, k=3):
            # Vectorize the query
            query_vector = vectorizer.transform([query])
            
            # Calculate similarity scores (cosine similarity)
            from sklearn.metrics.pairwise import cosine_similarity
            similarities = cosine_similarity(query_vector, vectors).flatten()
            
            # Get indices of top k results
            top_indices = similarities.argsort()[-k:][::-1]
            
            # Create document-like objects with a page_content attribute
            class Document:
                def __init__(self, content):
                    self.page_content = content
            
            # Return results
            results = []
            for idx in top_indices:
                results.append(Document(chunks[idx]))
            
            return results
        
        # Return a database-like object with a similarity_search method
        class VectorDB:
            def __init__(self, search_func):
                self.similarity_search = search_func
        
        return VectorDB(similarity_search)
    
    # Now process and vectorize the document
    vectordb = process_and_vectorize_document_sklearn("downloaded_document.txt")
    
    # Example of how to search the vector database
    query = "Your search query here"
    results = vectordb.similarity_search(query, k=3)
    
    print(f"Search results for '{query}':")
    for i, doc in enumerate(results):
        print(f"Result {i+1}:\n{doc.page_content}\n")
        
except Exception as e:
    import traceback
    print(f"Process failed: {e}")
    print(traceback.format_exc())

### 3.3 Searching for Files in NeoFS

NeoFS provides search functionality to find files based on their attributes. This is particularly useful when you have many files and need to find specific ones.

In [None]:
def create_search_token(client, private_key, public_key, address, container_id):
    """
    Create a token for searching files
    
    Args:
        client: Configured HTTPX client
        private_key: Private key bytes
        public_key: Public key hex string
        address: Neo address
        container_id: Container ID to search in
        
    Returns:
        dict: Auth token information
    """
    auth_header = {
        'X-Bearer-Owner-Id': address,
        'X-Bearer-Lifetime': "10000",
        'X-Bearer-For-All-Users': "false",
    }
    
    auth_content = [
        {
            "name": "search-token",
            "object": [
                {
                    "action": "ALLOW",
                    "filters": [],
                    "operation": "SEARCH",
                    "targets": [{"keys": [], "role": "OTHERS"}]
                }
            ]
        }
    ]
    
    auth_resp = client.post('auth', headers=auth_header, content=json.dumps(auth_content), timeout=180).json()
    
    # Process and sign token
    for t in auth_resp:
        t["decoded_token"] = b64decode(t["token"])
    for t in auth_resp:
        t["signed_token"] = sign_message(private_key, t["decoded_token"]).hex()
    for t in auth_resp:
        assert verify_message_signature(public_key, t["decoded_token"], bytes.fromhex(t["signed_token"]))
    
    return auth_resp[0]

In [None]:
def search_files(client, private_key, public_key, address, container_id, filters, offset=0, limit=100):
    """
    Search files in NeoFS
    
    Args:
        client: Configured HTTPX client
        private_key: Private key bytes
        public_key: Public key hex string
        address: Neo address
        container_id: Container ID to search in
        filters: Dictionary of attribute key-value pairs to search for
        offset: Number of objects to skip
        limit: Maximum number of objects to return
        
    Returns:
        list: List of matching object IDs
    """
    # Create token for search operation
    auth_header = {
        'X-Bearer-Owner-Id': address,
        'X-Bearer-Lifetime': "10000",
        'X-Bearer-For-All-Users': "false",
    }
    
    auth_content = [
        {
            "name": "search-token",
            "object": [
                {
                    "action": "ALLOW",
                    "filters": [],
                    "operation": "SEARCH",
                    "targets": [{"keys": [], "role": "OTHERS"}]
                }
            ]
        }
    ]
    
    auth_resp = client.post('auth', headers=auth_header, content=json.dumps(auth_content), timeout=180).json()
    
    # Process and sign token
    for t in auth_resp:
        t["decoded_token"] = b64decode(t["token"])
    for t in auth_resp:
        t["signed_token"] = sign_message(private_key, t["decoded_token"]).hex()
    for t in auth_resp:
        assert verify_message_signature(public_key, t["decoded_token"], bytes.fromhex(t["signed_token"]))
    
    # Get the search token
    search_token = auth_resp[0]
    
    # Prepare for signature
    msg = search_token['token']
    random_salt = os.urandom(16).hex()
    parameter_hex_string = (random_salt + msg).encode().hex()
    assert len(parameter_hex_string) % 2 == 0
    length_hex = num2VarInt(len(parameter_hex_string) // 2)
    concatenated_string = length_hex + parameter_hex_string
    serialized_transaction = '010001f0' + concatenated_string + '0000'
    signature = sign_message(private_key, serialized_transaction)
    assert verify_message_signature(public_key, serialized_transaction, signature)
    
    # The API requires hex-encoded signature
    signature_hex_str = signature.hex()
    
    # Update client headers with auth token
    client.headers.update({"Authorization": f"Bearer {search_token['token']}"})
    
    # Prepare search headers
    search_headers = {
        'X-Bearer-Signature': signature_hex_str + random_salt, # Include random salt with signature
        'X-Bearer-Signature-Key': public_key,
        'Content-Type': 'application/json'
    }
    
    # Prepare search filters using the correct format
    search_filters = []
    for key, value in filters.items():
        search_filters.append({
            "key": key,
            "value": value,
            "match": "MatchStringEqual"  # Using the correct match name from API docs
        })
    
    # Construct URL with query parameters
    search_url = f'objects/{container_id}/search?walletConnect=true&offset={offset}&limit={limit}'
    
    # Perform search
    search_resp = client.post(
        search_url,
        headers=search_headers,
        content=json.dumps({
            "filters": search_filters
        })
    )
    
    return search_resp.json()

Let's search for files by their attributes:

In [None]:
# Search for files with specific attributes
try:
    # Search for our example file by filename
    search_response = search_files(
        httpx_client, 
        private_key, 
        public_key, 
        address, 
        container_id, 
        {"FileName": "example.txt"},
        offset=0,
        limit=50
    )
    
    print(f"Search results: {search_response}")
    
    # You can also search by any custom attribute
    search_response_by_desc = search_files(
        httpx_client, 
        private_key, 
        public_key, 
        address, 
        container_id, 
        {"Description": "Test file for NeoFS tutorial"},
        offset=0,
        limit=50
    )
    
    print(f"Search results by description: {search_response_by_desc}")
except Exception as e:
    print(f"Search failed: {e}")

## Summary

NeoFS provides many other functionalities, including but not limited to:

1. **Set ACL**: Configure access control lists
2. **Create Extended ACL**: More complex access control policies

These operations form the foundation of using NeoFS as a decentralized storage solution. By combining container management with these file operations, you can build robust applications that leverage the security, reliability, and decentralization benefits of NeoFS.
Remember that all operations in NeoFS require proper authentication and signature generation, which follows the pattern we've established throughout this tutorial: create a token, sign it with your private key, and include the signature in your request.