# Sift Client Ingestion

This notebook demonstrates features of SiftClient ingestion, from basic usage to advanced patterns.

## Prerequisites

**Important**: The ingestion streaming client requires the `sift-stream` optional dependency. Install it with:

```bash
pip install sift-stack-py[sift-stream]
```

## Topics Covered

1. **Basic Example**: Simple single flow sending using FlowConfig
2. **Advanced FlowBuilderPy**: High-performance flow building with direct run ID management
3. **High-Performance Batch Sending**: Efficiently sending multiple flows using FlowBuilderPy
4. **Queue-Based Lazy Flow Creation**: Dynamic flow registration with multi-task architecture


## 1. Basic Example: Sending Individual Flows with FlowConfig

This example shows the simplest way to send telemetry data to Sift:
- Create an ingestion config with flow definitions
- Save the flow config from the ingestion config (no API call needed)
- Create a run to associate data with
- Send individual flows one at a time using `as_flow()`

This is the simplest approach and is recommended for basic use cases where performance is not critical.

In [None]:
import asyncio
import random
import time
from datetime import datetime, timezone

from sift_client import SiftClient, SiftConnectionConfig
from sift_client.sift_types import (
    ChannelConfig,
    ChannelDataType,
    FlowConfig,
    IngestionConfigCreate,
    RunCreate,
)


async def basic_example():
    # Configure connection to Sift
    connection_config = SiftConnectionConfig(
        api_key="my_api_key",
        grpc_url="sift_grpc_url",
        rest_url="sift_rest_url",
    )

    client = SiftClient(connection_config=connection_config)

    # Define your telemetry schema using an flow config and ingestion config
    flow_config = FlowConfig(
                name="onboard_sensors",
                channels=[
                    ChannelConfig(name="motor_temp", unit="C", data_type=ChannelDataType.DOUBLE),
                    ChannelConfig(
                        name="tank_pressure", unit="kPa", data_type=ChannelDataType.DOUBLE
                    ),
                ],
            )

    ingestion_config = IngestionConfigCreate(
        asset_name="sift_rover_1",
        flows=[flow_config],
    )

    # Create a run to associate this data collection session
    run = RunCreate(name="sift_rover-" + str(int(time.time())))

    # Create the streaming client
    async with await client.async_.ingestion.create_ingestion_config_streaming_client(
        ingestion_config=ingestion_config,
        run=run,
    ) as ingest_client:
        # Send data in a loop
        for i in range(10):
            # Create a flow with timestamp and values using the saved flow_config
            # The timestamp can also be left out to default to datetime.now(timezone.utc)
            flow = flow_config.as_flow(
                timestamp=datetime.now(timezone.utc),
                values={
                    "motor_temp": 50.0 + random.random() * 5.0,
                    "tank_pressure": 2000.0 + random.random() * 100.0,
                },
            )

            # Send the flow to Sift
            await ingest_client.send(flow=flow)

            await asyncio.sleep(0.1)


# Uncomment to run:
# asyncio.run(basic_example())

## 2. Advanced FlowBuilderPy Usage

This example demonstrates the advanced `FlowBuilderPy` paradigm, which provides better performance and more control:
- Get `FlowDescriptorPy` using `get_flow_descriptor()`
- Retrieve the run ID from SiftStream using `get_run_id()`
- Create `FlowBuilderPy` from the descriptor
- Set the run ID directly on the flow builder using `attach_run_id()`
- Use channel indices from the descriptor mapping to avoid hash operations
- Use `set()` with channel indices instead of `set_with_key()` for maximum performance
- Build the request and send using `send_requests()` or `send_requests_nonblocking()`

**Note**: This approach requires managing the run ID directly, making it more advanced but also more performant. Using channel indices instead of channel names avoids hash lookups, providing the best performance for high-frequency data sending. It's useful when you need fine-grained control or are sending data for multiple runs/assets with a single SiftStream instance.


In [None]:
async def advanced_flowbuilder_example():
    """Example showing advanced FlowBuilderPy usage with channel indices for maximum performance."""
    from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy, ChannelIndexPy

    connection_config = SiftConnectionConfig(
        api_key="my_api_key",
        grpc_url="sift_grpc_url",
        rest_url="sift_rest_url",
    )

    client = SiftClient(connection_config=connection_config)

    ingestion_config = IngestionConfigCreate(
        asset_name="sift_rover_1",
        flows=[
            FlowConfig(
                name="onboard_sensors",
                channels=[
                    ChannelConfig(name="motor_temp", unit="C", data_type=ChannelDataType.DOUBLE),
                    ChannelConfig(
                        name="tank_pressure", unit="kPa", data_type=ChannelDataType.DOUBLE
                    ),
                ],
            )
        ],
    )

    run = RunCreate(name="sift_rover-" + str(int(time.time())))

    async with await client.async_.ingestion.create_ingestion_config_streaming_client(
        ingestion_config=ingestion_config,
        run=run,
    ) as ingest_client:
        # Get the flow descriptor and run ID from SiftStream
        descriptor = ingest_client.get_flow_descriptor(flow_name="onboard_sensors")
        run_id = ingest_client.get_run_id()

        if run_id is None:
            raise ValueError("Run ID is required for FlowBuilderPy usage")

        # Get the mapping from channel names to ChannelIndexPy
        # This allows us to avoid hash lookups by using indices directly
        channel_index_map = descriptor.mapping()
        
        # Pre-compute channel indices and value conversion methods
        # This creates a list of (ChannelIndexPy, conversion_method) tuples
        # that can be reused for each flow, avoiding hash operations
        #
        # If this technique is used, caching the indices and conversion method
        # is strongly recommended.
        channel_indices_and_methods = [
            (channel_index_map["motor_temp"], ValuePy.Double),
            (channel_index_map["tank_pressure"], ValuePy.Double),
        ]

        # Send data in a loop using FlowBuilderPy with channel indices
        for i in range(10):
            # Create a FlowBuilderPy from the descriptor
            flow_builder = FlowBuilderPy(descriptor)
            
            # Attach the run ID directly to the flow builder
            flow_builder.attach_run_id(run_id)
            
            # Set channel values using set() with pre-computed indices
            # This avoids hash lookups and provides better performance
            motor_temp_value = 50.0 + random.random() * 5.0
            tank_pressure_value = 2000.0 + random.random() * 100.0
            
            # If the raw data class used provides in-order iteration over the raw data, you can also iterate
            # over the values and encoding information directly. Since the value indices are used, the
            # additional per-channel hash lookup is not needed, further improving performance.
            #
            # Though for convenience, the values can also be set using set_with_key() which takes a channel name
            # and value.
            #
            # Example:
            #
            # flow_builder.set_with_key("motor_temp", motor_temp_value)
            # flow_builder.set_with_key("tank_pressure", tank_pressure_value)
            values = [motor_temp_value, tank_pressure_value]
            for (channel_index, conversion_method), value in zip(channel_indices_and_methods, values):
                flow_builder.set(channel_index, conversion_method(value))
            
            # Build the request with current timestamp
            request = flow_builder.request(TimeValuePy.now())
            
            # Send the request (non-blocking version)
            ingest_client.send_requests_nonblocking([request])

            await asyncio.sleep(0.1)


# Uncomment to run:
# asyncio.run(advanced_flowbuilder_example())

## 3. High-Performance Batch Sending

This example demonstrates high-performance batch sending using `FlowBuilderPy` with channel indices and `send_requests_nonblocking()`:
- Pre-compute channel indices from the descriptor mapping to avoid hash operations
- Use `FlowBuilderPy` with `set()` and channel indices for maximum performance
- Use `send_requests_nonblocking()` for non-blocking batch sending
- This approach provides the best performance for high-throughput scenarios

The combination of channel indices (avoiding hash lookups) and non-blocking batch sending allows the underlying Rust client to handle batching and sending asynchronously, minimizing Python overhead and maximizing throughput.


In [None]:
async def high_performance_batch_example():
    """Example showing high-performance batch sending with FlowBuilderPy using channel indices."""
    from datetime import timedelta
    from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy

    connection_config = SiftConnectionConfig(
        api_key="my_api_key",
        grpc_url="sift_grpc_url",
        rest_url="sift_rest_url",
    )

    client = SiftClient(connection_config=connection_config)

    ingestion_config = IngestionConfigCreate(
        asset_name="sift_rover_1",
        flows=[
            FlowConfig(
                name="onboard_sensors",
                channels=[
                    ChannelConfig(name="motor_temp", unit="C", data_type=ChannelDataType.DOUBLE),
                    ChannelConfig(
                        name="tank_pressure", unit="kPa", data_type=ChannelDataType.DOUBLE
                    ),
                ],
            )
        ],
    )

    run = RunCreate(name="sift_rover-" + str(int(time.time())))

    async with await client.async_.ingestion.create_ingestion_config_streaming_client(
        ingestion_config=ingestion_config,
        run=run,
    ) as ingest_client:
        # Get the flow descriptor and run ID
        descriptor = ingest_client.get_flow_descriptor(flow_name="onboard_sensors")
        run_id = ingest_client.get_run_id()

        if run_id is None:
            raise ValueError("Run ID is required for FlowBuilderPy usage")

        # Pre-compute channel indices and conversion methods for maximum performance
        # This avoids hash lookups when setting values in the loop below
        channel_index_map = descriptor.mapping()
        channel_indices_and_methods = [
            (channel_index_map["motor_temp"], ValuePy.Double),
            (channel_index_map["tank_pressure"], ValuePy.Double),
        ]

        # Generate 5 seconds of data at 10Hz (10 flows per second = 50 flows total)
        sample_rate_hz = 10
        duration_seconds = 5
        num_flows = sample_rate_hz * duration_seconds  # 50 flows

        start_time = datetime.now(timezone.utc)
        requests = []
        
        for i in range(num_flows):
            # Calculate timestamp for each sample (spaced 0.1 seconds apart)
            timestamp_secs = int((start_time + timedelta(seconds=i / sample_rate_hz)).timestamp())
            timestamp = TimeValuePy.from_timestamp(timestamp_secs, 0)
            
            # Create FlowBuilderPy and build request using pre-computed indices
            flow_builder = FlowBuilderPy(descriptor)
            flow_builder.attach_run_id(run_id)
            
            # Generate values
            motor_temp_value = 50.0 + random.random() * 5.0
            tank_pressure_value = 2000.0 + random.random() * 100.0
            
            # Use indices directly - no hash operations!
            values = [motor_temp_value, tank_pressure_value]
            for (channel_index, conversion_method), value in zip(channel_indices_and_methods, values):
                flow_builder.set(channel_index, conversion_method(value))

            request = flow_builder.request(timestamp)
            requests.append(request)

        # Send all requests in a single non-blocking batch operation
        # The combination of channel indices + non-blocking batch sending provides
        # the best performance for high-throughput scenarios
        ingest_client.send_requests_nonblocking(requests)


# Uncomment to run:
# asyncio.run(high_performance_batch_example())

## 4. Queue-Based Lazy Flow Creation

This example demonstrates a multi-task architecture for handling dynamic flow schemas using `add_new_flows()`:
- **Task 1**: Ingest raw data from a source and push to Queue 1
- **Task 2**: Read from Queue 1, check if flow descriptor is cached
  - If not cached, call `add_new_flows()` to register the new flow
  - After registration, retrieve the descriptor and cache it
  - Push the message with descriptor to Queue 2
- **Task 3**: Drain Queue 2, decode the raw data, and send to Sift using `FlowBuilderPy`

This pattern enables lazy flow registration, allowing you to handle unknown schemas at runtime without pre-registering all possible flows.


In [None]:
import asyncio
import random
import time
from dataclasses import dataclass
from datetime import datetime, timezone

from sift_stream_bindings import FlowBuilderPy, FlowDescriptorPy, TimeValuePy


@dataclass
class RawDataMessage:
    """Represents raw data that needs to be decoded and sent."""
    flow_name: str
    timestamp: datetime
    channel_values: dict[str, float]  # Raw channel name -> value mapping


async def queue_based_lazy_flow_example():
    """Example demonstrating queue-based lazy flow creation with add_new_flows."""
    from sift_client import SiftClient, SiftConnectionConfig
    from sift_client.sift_types import (
        ChannelConfig,
        ChannelDataType,
        FlowConfig,
        IngestionConfigCreate,
        RunCreate,
    )

    connection_config = SiftConnectionConfig(
        api_key="my_api_key",
        grpc_url="sift_grpc_url",
        rest_url="sift_rest_url",
    )

    client = SiftClient(connection_config=connection_config)

    # Start with an empty ingestion config - flows will be added dynamically
    ingestion_config = IngestionConfigCreate(
        asset_name="sift_rover_1",
        flows=[],  # Empty initially
    )

    run = RunCreate(name="sift_rover-" + str(int(time.time())))

    async with await client.async_.ingestion.create_ingestion_config_streaming_client(
        ingestion_config=ingestion_config,
        run=run,
    ) as ingest_client:
        # Queues for the pipeline
        queue1: asyncio.Queue[RawDataMessage] = asyncio.Queue()
        queue2: asyncio.Queue[tuple[RawDataMessage, FlowDescriptorPy]] = asyncio.Queue()
        
        # Cache for flow descriptors (flow_name -> FlowDescriptorPy)
        descriptor_cache: dict[str, FlowDescriptorPy] = {}
        
        # Cache for flow configs (flow_name -> FlowConfig)
        # In a real scenario, you'd derive this from your raw data schema
        flow_config_cache: dict[str, FlowConfig] = {
            "onboard_sensors": FlowConfig(
                name="onboard_sensors",
                channels=[
                    ChannelConfig(name="motor_temp", unit="C", data_type=ChannelDataType.DOUBLE),
                    ChannelConfig(
                        name="tank_pressure", unit="kPa", data_type=ChannelDataType.DOUBLE
                    ),
                ],
            ),
            "navigation": FlowConfig(
                name="navigation",
                channels=[
                    ChannelConfig(name="gps_lat", unit="deg", data_type=ChannelDataType.DOUBLE),
                    ChannelConfig(name="gps_lon", unit="deg", data_type=ChannelDataType.DOUBLE),
                ],
            ),
        }

        run_id = ingest_client.get_run_id()

        # Task 1: Ingest raw data and push to Queue 1
        async def ingest_task():
            """Simulate ingesting raw data from a source."""
            for i in range(20):
                # Simulate different flows arriving
                flow_name = "onboard_sensors" if i % 2 == 0 else "navigation"
                
                if flow_name == "onboard_sensors":
                    raw_data = RawDataMessage(
                        flow_name=flow_name,
                        timestamp=datetime.now(timezone.utc),
                        channel_values={
                            "motor_temp": 50.0 + random.random() * 5.0,
                            "tank_pressure": 2000.0 + random.random() * 100.0,
                        },
                    )
                else:
                    raw_data = RawDataMessage(
                        flow_name=flow_name,
                        timestamp=datetime.now(timezone.utc),
                        channel_values={
                            "gps_lat": 37.7749 + random.random() * 0.01,
                            "gps_lon": -122.4194 + random.random() * 0.01,
                        },
                    )
                
                queue1.put_nowait(raw_data)
                await asyncio.sleep(0.1)

        # Task 2: Register flows lazily
        async def registration_task():
            """Check if flow is registered, register if needed, then push to Queue 2."""
            while True:
                try:
                    raw_data = await asyncio.wait_for(queue1.get(), timeout=1.0)
                except asyncio.TimeoutError:
                    # Check if ingest_task is done by checking queue size
                    if queue1.empty():
                        break
                    continue

                flow_name = raw_data.flow_name
                
                # Check if descriptor is cached
                if flow_name not in descriptor_cache:
                    
                    # For this example, the flow configs are pre-defined above. 
                    # 
                    # Though in practice, these would often be dynamically generated based on
                    # the raw data schema.
                    if flow_name not in flow_config_cache:
                        raise ValueError(f"Flow config not found for {flow_name}")
                    
                    flow_config = flow_config_cache[flow_name]
        
                    # Convert to Rust FlowConfigPy format
                    from sift_stream_bindings import FlowConfigPy, ChannelConfigPy, ChannelDataTypePy
                    
                    channel_configs_py = [
                        ChannelConfigPy(
                            name=ch.name,
                            data_type=ChannelDataTypePy.Double if ch.data_type == ChannelDataType.DOUBLE else ChannelDataTypePy.Double,
                            unit=ch.unit,
                            description=ch.description or "",
                            enum_types=[],
                            bit_field_elements=[],
                        )
                        for ch in flow_config.channels
                    ]
                    
                    flow_config_py = FlowConfigPy(
                        name=flow_config.name,
                        channels=channel_configs_py,
                    )
                    
                    # Register the new flow
                    await ingest_client.add_new_flows([flow_config_py])
                    
                    # Get the descriptor and cache it
                    descriptor = ingest_client.get_flow_descriptor(flow_name)
                    descriptor_cache[flow_name] = descriptor
                    print(f"Registered new flow: {flow_name}")
                
                # Push to Queue 2 with the descriptor
                await queue2.put((raw_data, descriptor_cache[flow_name]))

        # Task 3: Decode and send
        async def send_task():
            """Decode raw data and send to Sift using FlowBuilderPy."""
            while True:
                try:
                    raw_data, descriptor = await asyncio.wait_for(queue2.get(), timeout=1.0)
                except asyncio.TimeoutError:
                    # Check if registration_task is done
                    if queue2.empty() and queue1.empty():
                        break
                    continue

                # Create FlowBuilderPy and set values
                flow_builder = FlowBuilderPy(descriptor)
                flow_builder.attach_run_id(run_id)
                
                # Set all channel values from raw data.
                for channel_name, value in raw_data.channel_values.items():
                    flow_builder.set_with_key(channel_name, value)
                
                # Convert timestamp to TimeValuePy
                timestamp_secs = int(raw_data.timestamp.timestamp())
                timestamp = TimeValuePy.from_timestamp(timestamp_secs, 0)
                
                # Build request and send
                request = flow_builder.request(timestamp)
                await ingest_client.send_requests([request])

        # Run all tasks concurrently
        await asyncio.gather(
            ingest_task(),
            registration_task(),
            send_task(),
        )


# Uncomment to run:
# asyncio.run(queue_based_lazy_flow_example())
