diff --git a/python/docs/examples/ingestion.ipynb b/python/docs/examples/ingestion.ipynb index 6339da2dc..ade951971 100644 --- a/python/docs/examples/ingestion.ipynb +++ b/python/docs/examples/ingestion.ipynb @@ -19,9 +19,10 @@ "\n", "## Topics Covered\n", "\n", - "1. **Basic Example**: Simple single flow sending\n", - "2. **Batch Sending**: Efficiently sending multiple flows at once\n", - "3. **Advanced Concepts**: Recovery strategies, checkpoints, and more\n" + "1. **Basic Example**: Simple single flow sending using FlowConfig\n", + "2. **Advanced FlowBuilderPy**: High-performance flow building with direct run ID management\n", + "3. **High-Performance Batch Sending**: Efficiently sending multiple flows using FlowBuilderPy\n", + "4. **Queue-Based Lazy Flow Creation**: Dynamic flow registration with multi-task architecture\n" ] }, { @@ -29,12 +30,15 @@ "id": "b324ed85", "metadata": {}, "source": [ - "## 1. Basic Example: Sending Individual Flows\n", + "## 1. Basic Example: Sending Individual Flows with FlowConfig\n", "\n", "This example shows the simplest way to send telemetry data to Sift:\n", "- Create an ingestion config with flow definitions\n", + "- Save the flow config from the ingestion config (no API call needed)\n", "- Create a run to associate data with\n", - "- Send individual flows one at a time" + "- Send individual flows one at a time using `as_flow()`\n", + "\n", + "This is the simplest approach and is recommended for basic use cases where performance is not critical." ] }, { @@ -47,7 +51,7 @@ "import asyncio\n", "import random\n", "import time\n", - "from datetime import datetime, timedelta, timezone\n", + "from datetime import datetime, timezone\n", "\n", "from sift_client import SiftClient, SiftConnectionConfig\n", "from sift_client.sift_types import (\n", @@ -69,20 +73,18 @@ "\n", " client = SiftClient(connection_config=connection_config)\n", "\n", - " # Define your telemetry schema using an ingestion config\n", + " # Define your telemetry schema using an flow config and ingestion config\n", + " flow_config = FlowConfig(\n", + " name=\"onboard_sensors\",\n", + " channels=[\n", + " ChannelConfig(name=\"motor_temp\", unit=\"C\", data_type=ChannelDataType.DOUBLE),\n", + " ChannelConfig(name=\"tank_pressure\", unit=\"kPa\", data_type=ChannelDataType.DOUBLE),\n", + " ],\n", + " )\n", + "\n", " ingestion_config = IngestionConfigCreate(\n", " asset_name=\"sift_rover_1\",\n", - " flows=[\n", - " FlowConfig(\n", - " name=\"onboard_sensors\",\n", - " channels=[\n", - " ChannelConfig(name=\"motor_temp\", unit=\"C\", data_type=ChannelDataType.DOUBLE),\n", - " ChannelConfig(\n", - " name=\"tank_pressure\", unit=\"kPa\", data_type=ChannelDataType.DOUBLE\n", - " ),\n", - " ],\n", - " )\n", - " ],\n", + " flows=[flow_config],\n", " )\n", "\n", " # Create a run to associate this data collection session\n", @@ -95,10 +97,7 @@ " ) as ingest_client:\n", " # Send data in a loop\n", " for i in range(10):\n", - " # Get the flow config to create flows\n", - " flow_config = ingest_client.get_flow_config(flow_name=\"onboard_sensors\")\n", - "\n", - " # Create a flow with timestamp and values\n", + " # Create a flow with timestamp and values using the saved flow_config\n", " # The timestamp can also be left out to default to datetime.now(timezone.utc)\n", " flow = flow_config.as_flow(\n", " timestamp=datetime.now(timezone.utc),\n", @@ -123,9 +122,18 @@ "id": "c5bf10f3", "metadata": {}, "source": [ - "## 2. Batch Sending: Efficiently Sending Multiple Flows\n", - "\n", - "For potentially better performance when sending many flows, use `batch_send()` to send multiple flows in a single operation. This may reduce the overhead of calling the underlying Rust SiftStream ingestion client.\n" + "## 2. Advanced FlowBuilderPy Usage\n", + "\n", + "This example demonstrates the advanced `FlowBuilderPy` paradigm, which provides better performance and more control:\n", + "- Get `FlowDescriptorPy` using `get_flow_descriptor()`\n", + "- Retrieve the run ID from SiftStream using `get_run_id()`\n", + "- Create `FlowBuilderPy` from the descriptor\n", + "- Set the run ID directly on the flow builder using `attach_run_id()`\n", + "- Use channel indices from the descriptor mapping to avoid hash operations\n", + "- Use `set()` with channel indices instead of `set_with_key()` for maximum performance\n", + "- Build the request and send using `send_requests()` or `send_requests_nonblocking()`\n", + "\n", + "**Note**: This approach requires managing the run ID directly, making it more advanced but also more performant. Using channel indices instead of channel names avoids hash lookups, providing the best performance for high-frequency data sending. It's useful when you need fine-grained control or are sending data for multiple runs/assets with a single SiftStream instance.\n" ] }, { @@ -135,8 +143,10 @@ "metadata": {}, "outputs": [], "source": [ - "async def batch_send_example():\n", - " \"\"\"Example showing how to efficiently send multiple flows at once.\"\"\"\n", + "async def advanced_flowbuilder_example():\n", + " \"\"\"Example showing advanced FlowBuilderPy usage with channel indices for maximum performance.\"\"\"\n", + " from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy\n", + "\n", " connection_config = SiftConnectionConfig(\n", " api_key=\"my_api_key\",\n", " grpc_url=\"sift_grpc_url\",\n", @@ -166,35 +176,69 @@ " ingestion_config=ingestion_config,\n", " run=run,\n", " ) as ingest_client:\n", - " flow_config = ingest_client.get_flow_config(flow_name=\"onboard_sensors\")\n", - "\n", - " # Generate 5 seconds of data at 10Hz (10 flows per second = 50 flows total)\n", - " sample_rate_hz = 10\n", - " duration_seconds = 5\n", - " num_flows = sample_rate_hz * duration_seconds # 50 flows\n", - "\n", - " start_time = datetime.now(timezone.utc)\n", - " flows = []\n", - " for i in range(num_flows):\n", - " # Calculate timestamp for each sample (spaced 0.1 seconds apart)\n", - " timestamp = start_time + timedelta(seconds=i / sample_rate_hz)\n", - " flows.append(\n", - " flow_config.as_flow(\n", - " timestamp=timestamp,\n", - " values={\n", - " \"motor_temp\": 50.0 + random.random() * 5.0,\n", - " \"tank_pressure\": 2000.0 + random.random() * 100.0,\n", - " },\n", - " )\n", - " )\n", + " # Get the flow descriptor and run ID from SiftStream\n", + " descriptor = ingest_client.get_flow_descriptor(flow_name=\"onboard_sensors\")\n", + " run_id = ingest_client.get_run_id()\n", + "\n", + " if run_id is None:\n", + " raise ValueError(\"Run ID is required for FlowBuilderPy usage\")\n", + "\n", + " # Get the mapping from channel names to ChannelIndexPy\n", + " # This allows us to avoid hash lookups by using indices directly\n", + " channel_index_map = descriptor.mapping()\n", + "\n", + " # Pre-compute channel indices and value conversion methods\n", + " # This creates a list of (ChannelIndexPy, conversion_method) tuples\n", + " # that can be reused for each flow, avoiding hash operations\n", + " #\n", + " # If this technique is used, caching the indices and conversion method\n", + " # is strongly recommended.\n", + " channel_indices_and_methods = [\n", + " (channel_index_map[\"motor_temp\"], ValuePy.Double),\n", + " (channel_index_map[\"tank_pressure\"], ValuePy.Double),\n", + " ]\n", + "\n", + " # Send data in a loop using FlowBuilderPy with channel indices\n", + " for i in range(10):\n", + " # Create a FlowBuilderPy from the descriptor\n", + " flow_builder = FlowBuilderPy(descriptor)\n", + "\n", + " # Attach the run ID directly to the flow builder\n", + " flow_builder.attach_run_id(run_id)\n", + "\n", + " # Set channel values using set() with pre-computed indices\n", + " # This avoids hash lookups and provides better performance\n", + " motor_temp_value = 50.0 + random.random() * 5.0\n", + " tank_pressure_value = 2000.0 + random.random() * 100.0\n", + "\n", + " # If the raw data class used provides in-order iteration over the raw data, you can also iterate\n", + " # over the values and encoding information directly. Since the value indices are used, the\n", + " # additional per-channel hash lookup is not needed, further improving performance.\n", + " #\n", + " # Though for convenience, the values can also be set using set_with_key() which takes a channel name\n", + " # and value.\n", + " #\n", + " # Example:\n", + " #\n", + " # flow_builder.set_with_key(\"motor_temp\", motor_temp_value)\n", + " # flow_builder.set_with_key(\"tank_pressure\", tank_pressure_value)\n", + " values = [motor_temp_value, tank_pressure_value]\n", + " for (channel_index, conversion_method), value in zip(\n", + " channel_indices_and_methods, values\n", + " ):\n", + " flow_builder.set(channel_index, conversion_method(value))\n", + "\n", + " # Build the request with current timestamp\n", + " request = flow_builder.request(TimeValuePy.now())\n", + "\n", + " # Send the request (non-blocking version)\n", + " ingest_client.send_requests_nonblocking([request])\n", "\n", - " # Send all flows in a single batch operation\n", - " # batch_send supports sending any iterables of Flow or FlowPy objects\n", - " await ingest_client.batch_send(flows)\n", + " await asyncio.sleep(0.1)\n", "\n", "\n", "# Uncomment to run:\n", - "# asyncio.run(batch_send_example())" + "# asyncio.run(advanced_flowbuilder_example())" ] }, { @@ -202,36 +246,15 @@ "id": "ab425ee7", "metadata": {}, "source": [ - "## 3. Advanced Concepts\n", + "## 3. High-Performance Batch Sending\n", "\n", - "### Recovery Strategies\n", + "This example demonstrates high-performance batch sending using `FlowBuilderPy` with channel indices and `send_requests_nonblocking()`:\n", + "- Pre-compute channel indices from the descriptor mapping to avoid hash operations\n", + "- Use `FlowBuilderPy` with `set()` and channel indices for maximum performance\n", + "- Use `send_requests_nonblocking()` for non-blocking batch sending\n", + "- This approach provides the best performance for high-throughput scenarios\n", "\n", - "Recovery strategies can be used to allow fine-tuned control of SiftStream ingestion:\n", - "- **Retry with Backups [DEFAULT]**: Retry failed connections + temporarily keep backups of ingested data for automatic re-ingestion if a streaming checkpoint (defaults to 60s) fails to send all data.\n", - "- **Retry Only**: Retry failed connections only. More performant, but with no guarantee of data ingestion in the event of a connection issue.\n", - "\n", - "### Tracing\n", - "\n", - "Tracing allows you to monitor and debug SiftStream ingestion through logs. You can configure tracing in several ways:\n", - "\n", - "- **Console Only**: Output logs to stdout/stderr only\n", - "- **File Logging**: Output logs to both console and rolling log files\n", - "- **Disabled**: Turn off tracing entirely\n", - "\n", - "Tracing is initialized once per process, and cannot be modified afterward.\n", - "\n", - "### Metrics\n", - "\n", - "SiftStream provides detailed metrics about ingestion performance. Use `get_metrics_snapshot()` to access:\n", - "\n", - "- **Bytes sent**: Total bytes successfully sent to Sift\n", - "- **Byte rate**: Current throughput in bytes per second\n", - "- **Messages sent**: Total number of flows/messages sent\n", - "- **Message rate**: Current message throughput\n", - "- **Checkpoint metrics**: Timing and counts for checkpoints\n", - "- **Backup metrics**: Statistics about disk backups (if enabled)\n", - "\n", - "Metrics are updated in real-time and can help you monitor ingestion health and performance.\n" + "The combination of channel indices (avoiding hash lookups) and non-blocking batch sending allows the underlying Rust client to handle batching and sending asynchronously, minimizing Python overhead and maximizing throughput.\n" ] }, { @@ -241,11 +264,12 @@ "metadata": {}, "outputs": [], "source": [ - "from sift_client.resources.ingestion import RecoveryStrategyConfig, TracingConfig\n", + "async def high_performance_batch_example():\n", + " \"\"\"Example showing high-performance batch sending with FlowBuilderPy using channel indices.\"\"\"\n", + " from datetime import timedelta\n", "\n", + " from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy\n", "\n", - "async def advanced_example():\n", - " \"\"\"Example with recovery strategies, tracing, and metrics.\"\"\"\n", " connection_config = SiftConnectionConfig(\n", " api_key=\"my_api_key\",\n", " grpc_url=\"sift_grpc_url\",\n", @@ -271,46 +295,295 @@ "\n", " run = RunCreate(name=\"sift_rover-\" + str(int(time.time())))\n", "\n", - " # Use retry only for better performance (no backups)\n", - " recovery_strategy = RecoveryStrategyConfig.retry_only()\n", + " async with await client.async_.ingestion.create_ingestion_config_streaming_client(\n", + " ingestion_config=ingestion_config,\n", + " run=run,\n", + " ) as ingest_client:\n", + " # Get the flow descriptor and run ID\n", + " descriptor = ingest_client.get_flow_descriptor(flow_name=\"onboard_sensors\")\n", + " run_id = ingest_client.get_run_id()\n", + "\n", + " if run_id is None:\n", + " raise ValueError(\"Run ID is required for FlowBuilderPy usage\")\n", + "\n", + " # Pre-compute channel indices and conversion methods for maximum performance\n", + " # This avoids hash lookups when setting values in the loop below\n", + " channel_index_map = descriptor.mapping()\n", + " channel_indices_and_methods = [\n", + " (channel_index_map[\"motor_temp\"], ValuePy.Double),\n", + " (channel_index_map[\"tank_pressure\"], ValuePy.Double),\n", + " ]\n", + "\n", + " # Generate 5 seconds of data at 10Hz (10 flows per second = 50 flows total)\n", + " sample_rate_hz = 10\n", + " duration_seconds = 5\n", + " num_flows = sample_rate_hz * duration_seconds # 50 flows\n", + "\n", + " start_time = datetime.now(timezone.utc)\n", + " requests = []\n", + "\n", + " for i in range(num_flows):\n", + " # Calculate timestamp for each sample (spaced 0.1 seconds apart)\n", + " timestamp_secs = int((start_time + timedelta(seconds=i / sample_rate_hz)).timestamp())\n", + " timestamp = TimeValuePy.from_timestamp(timestamp_secs, 0)\n", + "\n", + " # Create FlowBuilderPy and build request using pre-computed indices\n", + " flow_builder = FlowBuilderPy(descriptor)\n", + " flow_builder.attach_run_id(run_id)\n", + "\n", + " # Generate values\n", + " motor_temp_value = 50.0 + random.random() * 5.0\n", + " tank_pressure_value = 2000.0 + random.random() * 100.0\n", + "\n", + " # Use indices directly - no hash operations!\n", + " values = [motor_temp_value, tank_pressure_value]\n", + " for (channel_index, conversion_method), value in zip(\n", + " channel_indices_and_methods, values\n", + " ):\n", + " flow_builder.set(channel_index, conversion_method(value))\n", + "\n", + " request = flow_builder.request(timestamp)\n", + " requests.append(request)\n", + "\n", + " # Send all requests in a single non-blocking batch operation\n", + " # The combination of channel indices + non-blocking batch sending provides\n", + " # the best performance for high-throughput scenarios\n", + " ingest_client.send_requests_nonblocking(requests)\n", "\n", - " # Use console-only tracing (stdout/stderr) instead of file logging\n", - " tracing_config = TracingConfig.console_only(level=\"info\")\n", + "\n", + "# Uncomment to run:\n", + "# asyncio.run(high_performance_batch_example())" + ] + }, + { + "cell_type": "markdown", + "id": "f8efce39", + "metadata": {}, + "source": [ + "## 4. Queue-Based Lazy Flow Creation\n", + "\n", + "This example demonstrates a multi-task architecture for handling dynamic flow schemas using `add_new_flows()`:\n", + "- **Task 1**: Ingest raw data from a source and push to Queue 1\n", + "- **Task 2**: Read from Queue 1, check if flow descriptor is cached\n", + " - If not cached, call `add_new_flows()` to register the new flow\n", + " - After registration, retrieve the descriptor and cache it\n", + " - Push the message with descriptor to Queue 2\n", + "- **Task 3**: Drain Queue 2, decode the raw data, and send to Sift using `FlowBuilderPy`\n", + "\n", + "This pattern enables lazy flow registration, allowing you to handle unknown schemas at runtime without pre-registering all possible flows.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a8829136", + "metadata": {}, + "outputs": [], + "source": [ + "from dataclasses import dataclass\n", + "\n", + "from sift_stream_bindings import FlowBuilderPy, FlowDescriptorPy, TimeValuePy\n", + "\n", + "\n", + "@dataclass\n", + "class RawDataMessage:\n", + " \"\"\"Represents raw data that needs to be decoded and sent.\"\"\"\n", + "\n", + " flow_name: str\n", + " timestamp: datetime\n", + " channel_values: dict[str, float] # Raw channel name -> value mapping\n", + "\n", + "\n", + "async def queue_based_lazy_flow_example():\n", + " \"\"\"Example demonstrating queue-based lazy flow creation with add_new_flows.\"\"\"\n", + " from sift_client import SiftClient, SiftConnectionConfig\n", + " from sift_client.sift_types import (\n", + " ChannelConfig,\n", + " ChannelDataType,\n", + " FlowConfig,\n", + " IngestionConfigCreate,\n", + " RunCreate,\n", + " )\n", + "\n", + " connection_config = SiftConnectionConfig(\n", + " api_key=\"my_api_key\",\n", + " grpc_url=\"sift_grpc_url\",\n", + " rest_url=\"sift_rest_url\",\n", + " )\n", + "\n", + " client = SiftClient(connection_config=connection_config)\n", + "\n", + " # Start with an empty ingestion config - flows will be added dynamically\n", + " ingestion_config = IngestionConfigCreate(\n", + " asset_name=\"sift_rover_1\",\n", + " flows=[], # Empty initially\n", + " )\n", + "\n", + " run = RunCreate(name=\"sift_rover-\" + str(int(time.time())))\n", "\n", " async with await client.async_.ingestion.create_ingestion_config_streaming_client(\n", " ingestion_config=ingestion_config,\n", " run=run,\n", - " recovery_strategy=recovery_strategy,\n", - " tracing_config=tracing_config,\n", " ) as ingest_client:\n", - " flow_config = ingest_client.get_flow_config(flow_name=\"onboard_sensors\")\n", + " # Queues for the pipeline\n", + " queue1: asyncio.Queue[RawDataMessage] = asyncio.Queue()\n", + " queue2: asyncio.Queue[tuple[RawDataMessage, FlowDescriptorPy]] = asyncio.Queue()\n", "\n", - " # Send some flows\n", - " for i in range(10):\n", - " flow = flow_config.as_flow(\n", - " timestamp=datetime.now(timezone.utc),\n", - " values={\n", - " \"motor_temp\": 50.0 + random.random() * 5.0,\n", - " \"tank_pressure\": 2000.0 + random.random() * 100.0,\n", - " },\n", - " )\n", - " await ingest_client.send(flow=flow)\n", + " # Cache for flow descriptors (flow_name -> FlowDescriptorPy)\n", + " descriptor_cache: dict[str, FlowDescriptorPy] = {}\n", "\n", - " # Get metrics snapshot to see ingestion statistics\n", - " metrics = ingest_client.get_metrics_snapshot()\n", - " print(\"\\n=== Ingestion Metrics ===\")\n", - " print(f\"Bytes sent: {metrics.bytes_sent:,}\")\n", - " print(f\"Byte rate: {metrics.byte_rate:,} bytes/s\")\n", - " print(f\"Messages sent: {metrics.messages_sent:,}\")\n", - " print(f\"Message rate: {metrics.message_rate:.2f} messages/s\")\n", - "\n", - " # Additional metrics available:\n", - " # - metrics.checkpoint_metrics: Checkpoint timing and counts\n", - " # - metrics.backup_metrics: Backup statistics (if backups enabled)\n", + " # Cache for flow configs (flow_name -> FlowConfig)\n", + " # In a real scenario, you'd derive this from your raw data schema\n", + " flow_config_cache: dict[str, FlowConfig] = {\n", + " \"onboard_sensors\": FlowConfig(\n", + " name=\"onboard_sensors\",\n", + " channels=[\n", + " ChannelConfig(name=\"motor_temp\", unit=\"C\", data_type=ChannelDataType.DOUBLE),\n", + " ChannelConfig(\n", + " name=\"tank_pressure\", unit=\"kPa\", data_type=ChannelDataType.DOUBLE\n", + " ),\n", + " ],\n", + " ),\n", + " \"navigation\": FlowConfig(\n", + " name=\"navigation\",\n", + " channels=[\n", + " ChannelConfig(name=\"gps_lat\", unit=\"deg\", data_type=ChannelDataType.DOUBLE),\n", + " ChannelConfig(name=\"gps_lon\", unit=\"deg\", data_type=ChannelDataType.DOUBLE),\n", + " ],\n", + " ),\n", + " }\n", + "\n", + " run_id = ingest_client.get_run_id()\n", + "\n", + " # Task 1: Ingest raw data and push to Queue 1\n", + " async def ingest_task():\n", + " \"\"\"Simulate ingesting raw data from a source.\"\"\"\n", + " for i in range(20):\n", + " # Simulate different flows arriving\n", + " flow_name = \"onboard_sensors\" if i % 2 == 0 else \"navigation\"\n", + "\n", + " if flow_name == \"onboard_sensors\":\n", + " raw_data = RawDataMessage(\n", + " flow_name=flow_name,\n", + " timestamp=datetime.now(timezone.utc),\n", + " channel_values={\n", + " \"motor_temp\": 50.0 + random.random() * 5.0,\n", + " \"tank_pressure\": 2000.0 + random.random() * 100.0,\n", + " },\n", + " )\n", + " else:\n", + " raw_data = RawDataMessage(\n", + " flow_name=flow_name,\n", + " timestamp=datetime.now(timezone.utc),\n", + " channel_values={\n", + " \"gps_lat\": 37.7749 + random.random() * 0.01,\n", + " \"gps_lon\": -122.4194 + random.random() * 0.01,\n", + " },\n", + " )\n", + "\n", + " queue1.put_nowait(raw_data)\n", + " await asyncio.sleep(0.1)\n", + "\n", + " # Task 2: Register flows lazily\n", + " async def registration_task():\n", + " \"\"\"Check if flow is registered, register if needed, then push to Queue 2.\"\"\"\n", + " while True:\n", + " try:\n", + " raw_data = await asyncio.wait_for(queue1.get(), timeout=1.0)\n", + " except asyncio.TimeoutError:\n", + " # Check if ingest_task is done by checking queue size\n", + " if queue1.empty():\n", + " break\n", + " continue\n", + "\n", + " flow_name = raw_data.flow_name\n", + "\n", + " # Check if descriptor is cached\n", + " if flow_name not in descriptor_cache:\n", + " # For this example, the flow configs are pre-defined above.\n", + " #\n", + " # Though in practice, these would often be dynamically generated based on\n", + " # the raw data schema.\n", + " if flow_name not in flow_config_cache:\n", + " raise ValueError(f\"Flow config not found for {flow_name}\")\n", + "\n", + " flow_config = flow_config_cache[flow_name]\n", + "\n", + " # Convert to Rust FlowConfigPy format\n", + " from sift_stream_bindings import (\n", + " ChannelConfigPy,\n", + " ChannelDataTypePy,\n", + " FlowConfigPy,\n", + " )\n", + "\n", + " channel_configs_py = [\n", + " ChannelConfigPy(\n", + " name=ch.name,\n", + " data_type=ChannelDataTypePy.Double\n", + " if ch.data_type == ChannelDataType.DOUBLE\n", + " else ChannelDataTypePy.Double,\n", + " unit=ch.unit,\n", + " description=ch.description or \"\",\n", + " enum_types=[],\n", + " bit_field_elements=[],\n", + " )\n", + " for ch in flow_config.channels\n", + " ]\n", + "\n", + " flow_config_py = FlowConfigPy(\n", + " name=flow_config.name,\n", + " channels=channel_configs_py,\n", + " )\n", + "\n", + " # Register the new flow\n", + " await ingest_client.add_new_flows([flow_config_py])\n", + "\n", + " # Get the descriptor and cache it\n", + " descriptor = ingest_client.get_flow_descriptor(flow_name)\n", + " descriptor_cache[flow_name] = descriptor\n", + " print(f\"Registered new flow: {flow_name}\")\n", + "\n", + " # Push to Queue 2 with the descriptor\n", + " await queue2.put((raw_data, descriptor_cache[flow_name]))\n", + "\n", + " # Task 3: Decode and send\n", + " async def send_task():\n", + " \"\"\"Decode raw data and send to Sift using FlowBuilderPy.\"\"\"\n", + " while True:\n", + " try:\n", + " raw_data, descriptor = await asyncio.wait_for(queue2.get(), timeout=1.0)\n", + " except asyncio.TimeoutError:\n", + " # Check if registration_task is done\n", + " if queue2.empty() and queue1.empty():\n", + " break\n", + " continue\n", + "\n", + " # Create FlowBuilderPy and set values\n", + " flow_builder = FlowBuilderPy(descriptor)\n", + " flow_builder.attach_run_id(run_id)\n", + "\n", + " # Set all channel values from raw data.\n", + " for channel_name, value in raw_data.channel_values.items():\n", + " flow_builder.set_with_key(channel_name, value)\n", + "\n", + " # Convert timestamp to TimeValuePy\n", + " timestamp_secs = int(raw_data.timestamp.timestamp())\n", + " timestamp = TimeValuePy.from_timestamp(timestamp_secs, 0)\n", + "\n", + " # Build request and send\n", + " request = flow_builder.request(timestamp)\n", + " await ingest_client.send_requests([request])\n", + "\n", + " # Run all tasks concurrently\n", + " await asyncio.gather(\n", + " ingest_task(),\n", + " registration_task(),\n", + " send_task(),\n", + " )\n", "\n", "\n", "# Uncomment to run:\n", - "# asyncio.run(advanced_example())" + "# asyncio.run(queue_based_lazy_flow_example())" ] } ], @@ -322,4 +595,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} \ No newline at end of file +} diff --git a/rust/crates/sift_connect/src/grpc/config.rs b/rust/crates/sift_connect/src/grpc/config.rs index 2148bc342..a488d23e3 100644 --- a/rust/crates/sift_connect/src/grpc/config.rs +++ b/rust/crates/sift_connect/src/grpc/config.rs @@ -4,13 +4,62 @@ use toml::{Table, Value}; /// The expected name of the config file. pub const SIFT_CONFIG_NAME: &str = "sift.toml"; -/// Specifies source of credentials. If `Profile` is used, then the provided string will be used to -/// query the corresponding table from [`SIFT_CONFIG_NAME`] located at -/// [these locations](https://docs.rs/dirs/6.0.0/dirs/fn.config_local_dir.html) -/// depending on your operating system. If `None` is provided, then the top-level table is used. +/// Specifies the source of credentials for connecting to Sift. +/// +/// Credentials can be provided either directly via `Config` or loaded from a +/// configuration file using `Profile`. +/// +/// # Profile-based Credentials +/// +/// If `Profile` is used, the provided string will be used to query the corresponding +/// table from [`SIFT_CONFIG_NAME`] located at [these locations](https://docs.rs/dirs/6.0.0/dirs/fn.config_local_dir.html) +/// depending on your operating system. If `None` is provided, then the top-level +/// table is used. +/// +/// Example `sift.toml` file: +/// +/// ```toml +/// uri = "https://api.siftstack.com" +/// apikey = "default-api-key" +/// +/// [production] +/// uri = "https://api.siftstack.com" +/// apikey = "production-api-key" +/// ``` +/// +/// # Direct Credentials +/// +/// The `Config` variant allows you to provide credentials directly without +/// requiring a configuration file. +/// +/// # Example +/// +/// ```no_run +/// use sift_connect::Credentials; +/// +/// // Direct credentials +/// let creds = Credentials::Config { +/// uri: "https://api.siftstack.com".to_string(), +/// apikey: "your-api-key".to_string(), +/// }; +/// +/// // Profile-based credentials (default profile) +/// let default_profile = Credentials::Profile(None); +/// +/// // Profile-based credentials (named profile) +/// let prod_profile = Credentials::Profile(Some("production".to_string())); +/// ``` #[derive(Debug, Clone)] pub enum Credentials { + /// Load credentials from a named profile in the configuration file. + /// + /// If `None`, uses the default (top-level) profile. Profile(Option), + /// Provide credentials directly. + /// + /// Fields: + /// - `uri`: The Sift API endpoint URI + /// - `apikey`: The API key for authentication Config { uri: String, apikey: String }, } diff --git a/rust/crates/sift_connect/src/grpc/interceptor.rs b/rust/crates/sift_connect/src/grpc/interceptor.rs index e94b8ff40..41cecdfb4 100644 --- a/rust/crates/sift_connect/src/grpc/interceptor.rs +++ b/rust/crates/sift_connect/src/grpc/interceptor.rs @@ -1,8 +1,23 @@ use std::str::FromStr; use tonic::{Request, Status, metadata::MetadataValue, service::Interceptor}; +/// Interceptor that adds authentication headers to gRPC requests. +/// +/// This interceptor automatically adds a `Bearer` token authorization header +/// to all outgoing gRPC requests using the provided API key. +/// +/// # Example +/// +/// ``` +/// use sift_connect::grpc::AuthInterceptor; +/// +/// let interceptor = AuthInterceptor { +/// apikey: "your-api-key".to_string(), +/// }; +/// ``` #[derive(Clone)] pub struct AuthInterceptor { + /// The API key to use for authentication. pub apikey: String, } diff --git a/rust/crates/sift_connect/src/grpc/mod.rs b/rust/crates/sift_connect/src/grpc/mod.rs index 579ade6c8..3fee0c03a 100644 --- a/rust/crates/sift_connect/src/grpc/mod.rs +++ b/rust/crates/sift_connect/src/grpc/mod.rs @@ -18,9 +18,52 @@ pub mod interceptor; pub use interceptor::AuthInterceptor; /// A pre-configured gRPC channel to conveniently establish a connection to Sift's gRPC API. +/// +/// This is a type alias for a gRPC channel that has been configured with authentication +/// via [`AuthInterceptor`]. The channel is lazy and won't actually connect until the +/// first RPC call is made. +/// +/// # Example +/// +/// ```no_run +/// use sift_connect::{Credentials, SiftChannelBuilder}; +/// use std::env; +/// +/// let credentials = Credentials::Config { +/// uri: env::var("SIFT_URI").unwrap(), +/// apikey: env::var("SIFT_API_KEY").unwrap(), +/// }; +/// +/// let channel: sift_connect::SiftChannel = SiftChannelBuilder::new(credentials) +/// .build() +/// .unwrap(); +/// ``` pub type SiftChannel = InterceptedService; /// Used to build an instance of [SiftChannel]. +/// +/// This builder provides a fluent API for configuring a gRPC channel connection +/// to Sift's API. It supports custom credentials, TLS configuration, and HTTP/2 +/// keep-alive settings. +/// +/// # Example +/// +/// ```no_run +/// use sift_connect::{Credentials, SiftChannelBuilder}; +/// use std::env; +/// use std::time::Duration; +/// +/// let credentials = Credentials::Config { +/// uri: env::var("SIFT_URI").unwrap(), +/// apikey: env::var("SIFT_API_KEY").unwrap(), +/// }; +/// +/// let channel = SiftChannelBuilder::new(credentials) +/// .use_tls(true) +/// .keep_alive_timeout(Duration::from_secs(30)) +/// .build() +/// .unwrap(); +/// ``` pub struct SiftChannelBuilder { credentials: Credentials, use_tls: bool, @@ -32,6 +75,30 @@ pub struct SiftChannelBuilder { impl SiftChannelBuilder { /// Initializes a new [SiftChannelBuilder] with sane defaults. + /// + /// Default settings: + /// - TLS enabled + /// - Keep-alive while idle enabled + /// - Keep-alive timeout: 20 seconds + /// - Keep-alive interval: 20 seconds + /// - User agent: crate name and version + /// + /// # Arguments + /// + /// * `credentials` - The credentials to use for authentication + /// + /// # Example + /// + /// ```no_run + /// use sift_connect::{Credentials, SiftChannelBuilder}; + /// + /// let credentials = Credentials::Config { + /// uri: "https://api.siftstack.com".to_string(), + /// apikey: "your-api-key".to_string(), + /// }; + /// + /// let builder = SiftChannelBuilder::new(credentials); + /// ``` pub fn new(credentials: Credentials) -> Self { let crate_name = env!("CARGO_PKG_NAME"); let crate_version = env!("CARGO_PKG_VERSION"); @@ -47,8 +114,37 @@ impl SiftChannelBuilder { } } - /// Consume [SiftChannelBuilder] and return a [SiftChannel]. The [SiftChannel] is lazy and - /// won't actually connect to Sift until the first RPC is made. + /// Consume [SiftChannelBuilder] and return a [SiftChannel]. + /// + /// The [SiftChannel] is lazy and won't actually connect to Sift until the first + /// RPC is made. This allows you to create the channel early without incurring + /// connection overhead until it's needed. + /// + /// # Returns + /// + /// A configured [SiftChannel] ready for use with gRPC clients. + /// + /// # Errors + /// + /// Returns an error if: + /// - The URI is invalid + /// - Credentials cannot be loaded (for profile-based credentials) + /// - TLS configuration fails + /// + /// # Example + /// + /// ```no_run + /// use sift_connect::{Credentials, SiftChannelBuilder}; + /// + /// let credentials = Credentials::Config { + /// uri: "https://api.siftstack.com".to_string(), + /// apikey: "your-api-key".to_string(), + /// }; + /// + /// let channel = SiftChannelBuilder::new(credentials) + /// .build() + /// .expect("failed to create channel"); + /// ``` pub fn build(self) -> Result { let config::SiftChannelConfig { uri, apikey } = config::SiftChannelConfig::try_from(self.credentials)?; @@ -81,41 +177,151 @@ impl SiftChannelBuilder { Ok(intercepted_channel) } - /// Override the default user-agent which is the name of the crate. Do note that the - /// application firewall is sensitive to certain user-agents so if you experience any issues - /// connecting to Sift, please notify the team to ascertain if it's related to a bad - /// user-agent. + /// Override the default user-agent which is the name of the crate. + /// + /// The default user-agent is set to `{crate_name}/{crate_version}`. This method + /// allows you to customize it. + /// + /// # Note + /// + /// The application firewall is sensitive to certain user-agents. If you experience + /// any issues connecting to Sift, please notify the team to ascertain if it's related + /// to a bad user-agent. + /// + /// # Arguments + /// + /// * `user_agent` - The custom user-agent string to use + /// + /// # Example + /// + /// ```no_run + /// use sift_connect::{Credentials, SiftChannelBuilder}; + /// + /// let credentials = Credentials::Config { + /// uri: "https://api.siftstack.com".to_string(), + /// apikey: "your-api-key".to_string(), + /// }; + /// + /// let builder = SiftChannelBuilder::new(credentials) + /// .user_agent("MyApp/1.0"); + /// ``` pub fn user_agent>(mut self, user_agent: S) -> Self { self.user_agent = user_agent.as_ref().to_string(); self } - /// Enables/disables TLS. In production, TLS should only ever be enabled. For mocking/testing + /// Enables or disables TLS. + /// + /// # Warning + /// + /// In production, TLS should only ever be enabled. For mocking/testing /// purposes, TLS may be disabled. + /// + /// # Arguments + /// + /// * `use_tls` - Whether to enable TLS encryption + /// + /// # Example + /// + /// ```no_run + /// use sift_connect::{Credentials, SiftChannelBuilder}; + /// + /// let credentials = Credentials::Config { + /// uri: "https://api.siftstack.com".to_string(), + /// apikey: "your-api-key".to_string(), + /// }; + /// + /// // Production use - TLS enabled (default) + /// let builder = SiftChannelBuilder::new(credentials.clone()) + /// .use_tls(true); + /// + /// // Testing use - TLS disabled + /// let test_builder = SiftChannelBuilder::new(credentials) + /// .use_tls(false); + /// ``` pub fn use_tls(mut self, use_tls: bool) -> Self { self.use_tls = use_tls; self } - /// See [`hyper documentation`]. + /// Configures whether to send keep-alive pings while the connection is idle. + /// + /// See [`hyper documentation`] for detailed information. /// /// [`hyper documentation`]: https://docs.rs/hyper/latest/hyper/client/conn/http2/struct.Builder.html#method.keep_alive_while_idle + /// + /// # Arguments + /// + /// * `keep_alive_while_idle` - Whether to send keep-alive pings while idle + /// + /// # Example + /// + /// ```no_run + /// use sift_connect::{Credentials, SiftChannelBuilder}; + /// + /// # let credentials = Credentials::Config { + /// # uri: "https://api.siftstack.com".to_string(), + /// # apikey: "your-api-key".to_string(), + /// # }; + /// let builder = SiftChannelBuilder::new(credentials) + /// .keep_alive_while_idle(true); + /// ``` pub fn keep_alive_while_idle(mut self, keep_alive_while_idle: bool) -> Self { self.keep_alive_while_idle = keep_alive_while_idle; self } - /// See [`hyper documentation`]. + /// Configures the timeout for keep-alive pings. + /// + /// See [`hyper documentation`] for detailed information. /// /// [`hyper documentation`]: https://docs.rs/hyper/latest/hyper/client/conn/http2/struct.Builder.html#method.keep_alive_timeout + /// + /// # Arguments + /// + /// * `keep_alive_timeout` - The timeout duration for keep-alive pings + /// + /// # Example + /// + /// ```no_run + /// use sift_connect::{Credentials, SiftChannelBuilder}; + /// use std::time::Duration; + /// + /// # let credentials = Credentials::Config { + /// # uri: "https://api.siftstack.com".to_string(), + /// # apikey: "your-api-key".to_string(), + /// # }; + /// let builder = SiftChannelBuilder::new(credentials) + /// .keep_alive_timeout(Duration::from_secs(30)); + /// ``` pub fn keep_alive_timeout(mut self, keep_alive_timeout: Duration) -> Self { self.keep_alive_timeout = keep_alive_timeout; self } - /// See [`hyper documentation`]. + /// Configures the interval between keep-alive pings. + /// + /// See [`hyper documentation`] for detailed information. /// /// [`hyper documentation`]: https://docs.rs/hyper/latest/hyper/client/conn/http2/struct.Builder.html#method.keep_alive_interval + /// + /// # Arguments + /// + /// * `keep_alive_interval` - The interval duration between keep-alive pings + /// + /// # Example + /// + /// ```no_run + /// use sift_connect::{Credentials, SiftChannelBuilder}; + /// use std::time::Duration; + /// + /// # let credentials = Credentials::Config { + /// # uri: "https://api.siftstack.com".to_string(), + /// # apikey: "your-api-key".to_string(), + /// # }; + /// let builder = SiftChannelBuilder::new(credentials) + /// .keep_alive_interval(Duration::from_secs(30)); + /// ``` pub fn keep_alive_interval(mut self, keep_alive_interval: Duration) -> Self { self.keep_alive_interval = keep_alive_interval; self diff --git a/rust/crates/sift_error/src/lib.rs b/rust/crates/sift_error/src/lib.rs index b790910f9..0cae103bc 100644 --- a/rust/crates/sift_error/src/lib.rs +++ b/rust/crates/sift_error/src/lib.rs @@ -10,23 +10,100 @@ pub mod prelude { } /// A `Result` that returns [Error] as the error-type. +/// +/// This is a convenience type alias for `std::result::Result`. +/// It's used throughout Sift crates as the standard error handling type. +/// +/// # Example +/// +/// ```rust +/// use sift_error::{Error, ErrorKind, Result}; +/// +/// fn might_fail() -> Result { +/// Ok("success".to_string()) +/// } +/// +/// fn handle_error() -> Result<()> { +/// might_fail()?; +/// Ok(()) +/// } +/// ``` pub type Result = StdResult; pub type BoxedError = Box; /// Trait that defines the behavior of errors that Sift manages. +/// +/// This trait provides methods for adding context and help text to errors, +/// allowing for rich error messages that guide users toward resolution. +/// +/// # Example +/// +/// ```rust +/// use sift_error::prelude::*; +/// use std::io; +/// +/// fn read_config() -> Result { +/// std::fs::read_to_string("config.toml") +/// .map_err(|e| Error::new(ErrorKind::IoError, e)) +/// .context("failed to read configuration file") +/// .help("ensure the config.toml file exists and is readable") +/// } +/// ``` pub trait SiftError where C: fmt::Display + Send + Sync + 'static, { /// Adds context that is printed with the error. + /// + /// Context is displayed as the most recent error message, with previous + /// context forming a chain of causes. + /// + /// # Example + /// + /// ```rust + /// use sift_error::prelude::*; + /// + /// fn example() -> Result<()> { + /// let err = Error::new_msg(ErrorKind::IoError, "file not found"); + /// Err(err).context("failed to load user data") + /// } + /// ``` fn context(self, ctx: C) -> Result; /// Like `context` but takes in a closure. + /// + /// This is useful when constructing the context string is expensive, + /// as the closure is only called if there's an error. + /// + /// # Example + /// + /// ```rust + /// use sift_error::prelude::*; + /// + /// fn example(user_id: &str) -> Result<()> { + /// let err = Error::new_msg(ErrorKind::NotFoundError, "resource missing"); + /// Err(err).with_context(|| format!("user {} not found", user_id)) + /// } + /// ``` fn with_context(self, op: F) -> Result where F: Fn() -> C; /// User-help text. + /// + /// Help text provides actionable guidance to users on how to resolve + /// the error. It's displayed separately from the error context. + /// + /// # Example + /// + /// ```rust + /// use sift_error::prelude::*; + /// + /// fn example() -> Result<()> { + /// let err = Error::new_msg(ErrorKind::ConfigError, "invalid config"); + /// Err(err).help("check your sift.toml file for syntax errors") + /// } + /// ``` fn help(self, txt: C) -> Result; } @@ -42,7 +119,25 @@ pub struct Error { impl StdError for Error {} impl Error { - /// Initializes an [Error]. + /// Initializes an [Error] from a standard error type. + /// + /// This constructor wraps a standard library error (or any type implementing + /// `std::error::Error`) into a Sift error with the specified [ErrorKind]. + /// + /// # Arguments + /// + /// * `kind` - The category of error that occurred + /// * `err` - The underlying error to wrap + /// + /// # Example + /// + /// ```rust + /// use sift_error::{Error, ErrorKind}; + /// use std::io; + /// + /// let io_error = io::Error::new(io::ErrorKind::NotFound, "file not found"); + /// let sift_error = Error::new(ErrorKind::IoError, io_error); + /// ``` pub fn new(kind: ErrorKind, err: E) -> Self where E: StdError + Send + Sync + 'static, @@ -56,7 +151,23 @@ impl Error { } } - /// Initializes an [Error] with a generic message. + /// Initializes an [Error] with a generic message string. + /// + /// This constructor creates an error from a string message without + /// wrapping an underlying error type. + /// + /// # Arguments + /// + /// * `kind` - The category of error that occurred + /// * `msg` - A string message describing the error + /// + /// # Example + /// + /// ```rust + /// use sift_error::{Error, ErrorKind}; + /// + /// let error = Error::new_msg(ErrorKind::NotFoundError, "resource not found"); + /// ``` pub fn new_msg>(kind: ErrorKind, msg: S) -> Self { Self { inner: None, @@ -66,20 +177,68 @@ impl Error { } } - /// Initializes a general catch-all type of [Error]. Contributors should be careful not to use - /// this unless strictly necessary. + /// Initializes a general catch-all type of [Error]. + /// + /// Contributors should be careful not to use this unless strictly necessary. + /// Prefer more specific [ErrorKind] variants when possible. + /// + /// # Arguments + /// + /// * `msg` - A string message describing the error + /// + /// # Example + /// + /// ```rust + /// use sift_error::Error; + /// + /// let error = Error::new_general("unexpected condition occurred"); + /// ``` pub fn new_general>(msg: S) -> Self { Self::new_msg(ErrorKind::GeneralError, msg) } /// Used for user-errors that have to do with bad arguments. + /// + /// This is a convenience constructor for argument validation errors. + /// + /// # Arguments + /// + /// * `msg` - A string message describing the argument validation failure + /// + /// # Example + /// + /// ```rust + /// use sift_error::Error; + /// + /// fn validate_age(age: i32) -> Result<(), Error> { + /// if age < 0 { + /// return Err(Error::new_arg_error("age must be non-negative")); + /// } + /// Ok(()) + /// } + /// ``` pub fn new_arg_error>(msg: S) -> Self { Self::new_msg(ErrorKind::ArgumentValidationError, msg) } - /// Tonic response types usually return optional types that we need to handle; if responses are - /// empty then this is the appropriate way to initialize an [Error] for that situation, though - /// this has never been observed. + /// Creates an error for empty gRPC responses. + /// + /// Tonic response types usually return optional types that we need to handle; + /// if responses are empty then this is the appropriate way to initialize an + /// [Error] for that situation, though this has never been observed in practice. + /// + /// # Arguments + /// + /// * `msg` - A string message describing the empty response situation + /// + /// # Example + /// + /// ```rust + /// use sift_error::Error; + /// + /// // This would typically be used when a gRPC response is unexpectedly empty + /// let error = Error::new_empty_response("asset response was empty"); + /// ``` pub fn new_empty_response>(msg: S) -> Self { Self { inner: None, @@ -90,12 +249,42 @@ impl Error { } /// Get the underlying error kind. + /// + /// # Returns + /// + /// The [ErrorKind] that categorizes this error. + /// + /// # Example + /// + /// ```rust + /// use sift_error::{Error, ErrorKind}; + /// + /// let error = Error::new_msg(ErrorKind::NotFoundError, "resource missing"); + /// assert_eq!(error.kind(), ErrorKind::NotFoundError); + /// ``` pub fn kind(&self) -> ErrorKind { self.kind } } /// Various categories of errors that can occur throughout Sift crates. +/// +/// Each variant represents a different category of error that can occur when +/// interacting with Sift services or processing data. Error kinds help categorize +/// errors for better error handling and user feedback. +/// +/// # Example +/// +/// ```rust +/// use sift_error::{Error, ErrorKind}; +/// +/// let error = Error::new_msg(ErrorKind::NotFoundError, "asset not found"); +/// match error.kind() { +/// ErrorKind::NotFoundError => println!("Resource was not found"), +/// ErrorKind::IoError => println!("I/O error occurred"), +/// _ => println!("Other error"), +/// } +/// ``` #[derive(Debug, PartialEq, Copy, Clone)] pub enum ErrorKind { /// Indicates that the error is due to a resource already existing. @@ -104,7 +293,10 @@ pub enum ErrorKind { ArgumentValidationError, /// Indicates that the program is unable to grab credentials from a user's `sift.toml` file. ConfigError, - /// Inidicates that the program was unable to connect to Sift. + /// Indicates that the program was unable to connect to Sift. + /// + /// This occurs when there are network issues, invalid URIs, TLS problems, + /// or other connection-related failures when attempting to reach Sift services. GrpcConnectError, /// Indicates that the program was unable to retrieve the run being requested. RetrieveRunError, @@ -130,11 +322,15 @@ pub enum ErrorKind { IoError, /// Indicates that there was a conversion between numeric times. NumberConversionError, - /// Indicates a failure to generated a particular time-type from arguments. + /// Indicates a failure to generate a particular time-type from arguments. TimeConversionError, /// General errors that can occur while streaming telemetry i.e. data ingestion. + /// + /// This is a catch-all for streaming-related errors that don't fit into more + /// specific categories, such as stream initialization failures or unexpected + /// stream state errors. StreamError, - /// Indicates that all retries were exhausted in the configure retry policy. + /// Indicates that all retries were exhausted in the configured retry policy. RetriesExhausted, /// General errors that can occur while processing backups during streaming. BackupsError, @@ -144,7 +340,10 @@ pub enum ErrorKind { /// Indicates that a user provided a flow-name that doesn't match any configured flow in the /// parent ingestion config. UnknownFlow, - /// This really shouldn't happen. + /// Indicates an empty response from a gRPC service. + /// + /// This really shouldn't happen in normal operation. It occurs when a gRPC + /// response is unexpectedly empty. EmptyResponseError, /// When failing to decode protobuf from its wire format. ProtobufDecodeError, @@ -152,9 +351,12 @@ pub enum ErrorKind { BackupIntegrityError, /// When backup file/buffer limit has been reached. BackupLimitReached, - /// Errors with the SiftStream Metrics Server + /// Errors with the SiftStream Metrics Server. SiftStreamMetricsServerError, /// General errors that are rarely returned. + /// + /// This is a catch-all error kind for unexpected or unclassified errors. + /// Contributors should prefer more specific error kinds when possible. GeneralError, } diff --git a/rust/crates/sift_pbfs/src/chunk.rs b/rust/crates/sift_pbfs/src/chunk.rs index 63b4db339..5f683a68a 100644 --- a/rust/crates/sift_pbfs/src/chunk.rs +++ b/rust/crates/sift_pbfs/src/chunk.rs @@ -2,13 +2,22 @@ use prost::Message; use sift_error::prelude::*; use std::{marker::PhantomData, ops::Deref}; -/// Length of the checksum byte-header length. +/// Length of the checksum byte-header in bytes. +/// +/// This is the size of a `u32` (4 bytes), which stores the CRC32 checksum +/// of the chunk data. pub const CHECKSUM_HEADER_LEN: usize = std::mem::size_of::(); -/// Length of the header that indicates the total byte-length of all protobuf messages. +/// Length of the batch size header in bytes. +/// +/// This is the size of a `u64` (8 bytes), which stores the total byte-length +/// of all protobuf messages in the chunk. pub const BATCH_SIZE_LEN: usize = std::mem::size_of::(); -/// Length of the length prefix of the individual protobuf message. +/// Length of the length prefix for individual protobuf messages in bytes. +/// +/// This is the size of a `u32` (4 bytes), which precedes each protobuf message +/// to indicate its length. pub const MESSAGE_LENGTH_PREFIX_LEN: usize = std::mem::size_of::(); /// Represents a chunk of protobuf messages that is written to and read from disk. @@ -36,6 +45,36 @@ where message_type: PhantomData, } +/// Iterator over protobuf messages within a [`PbfsChunk`]. +/// +/// This iterator decodes and yields individual protobuf messages from a chunk. +/// If an error is encountered during decoding, it will return `Some(Err(...))` +/// and subsequent calls to `next()` will return `None`. +/// +/// # Type Parameters +/// +/// * `M` - The protobuf message type to decode (must implement `Message + Default`) +/// +/// # Example +/// +/// ``` +/// use sift_pbfs::PbfsChunk; +/// use prost::Message; +/// # use prost::Message as _; +/// +/// # #[derive(Clone, PartialEq, Message)] +/// # struct MyMessage { } +/// +/// # let messages = vec![MyMessage {}, MyMessage {}]; +/// # let chunk = PbfsChunk::new(&messages).unwrap(); +/// // Iterate over messages in a chunk +/// for result in chunk { +/// match result { +/// Ok(message) => println!("Decoded: {:?}", message), +/// Err(e) => eprintln!("Error: {}", e), +/// } +/// } +/// ``` pub struct PbfsMessageIter where M: Message + Default + 'static, @@ -51,11 +90,40 @@ where M: Message + Default + 'static, { /// Encodes `messages` into the provided `buffer`, reusing its capacity. - /// The buffer is cleared before encoding, and the encoded data is written to it. - /// Returns a slice of the encoded data. + /// + /// The buffer is assumed to be cleared before encoding, and the encoded + /// data is written to it. /// /// This method is more efficient than `new()` when encoding many small messages, /// as it avoids allocating a new vector for each chunk. + /// + /// # Arguments + /// + /// * `messages` - Slice of protobuf messages to encode + /// * `buffer` - Buffer to encode into (will be cleared and reused) + /// + /// # Returns + /// + /// A slice of the encoded chunk data, including checksum and headers. + /// + /// # Errors + /// + /// Returns an error if encoding fails or if the total message size exceeds `u64::MAX`. + /// + /// # Example + /// + /// ``` + /// use sift_pbfs::PbfsChunk; + /// use prost::Message; + /// # use prost::Message as _; + /// + /// # #[derive(Clone, PartialEq, Message)] + /// # struct MyMessage { } + /// + /// let messages = vec![MyMessage {}, MyMessage {}]; + /// let mut buffer = Vec::new(); + /// let encoded = PbfsChunk::encode_into(&messages, &mut buffer).unwrap(); + /// ``` pub fn encode_into<'a>(messages: &[M], buffer: &'a mut Vec) -> Result<&'a [u8]> { // Calculate total encoded message length let mut encoded_message_len = 0; @@ -97,6 +165,32 @@ where } /// Encodes `messages` and returns a [PbfsChunk] which wraps around the encoded messages. + /// + /// # Arguments + /// + /// * `messages` - Slice of protobuf messages to encode + /// + /// # Returns + /// + /// A `PbfsChunk` containing the encoded messages with checksum validation. + /// + /// # Errors + /// + /// Returns an error if encoding fails or if the total message size exceeds `u64::MAX`. + /// + /// # Example + /// + /// ``` + /// use sift_pbfs::PbfsChunk; + /// use prost::Message; + /// # use prost::Message as _; + /// + /// # #[derive(Clone, PartialEq, Message)] + /// # struct MyMessage { } + /// + /// let messages = vec![MyMessage {}, MyMessage {}]; + /// let chunk = PbfsChunk::new(&messages).unwrap(); + /// ``` pub fn new(messages: &[M]) -> Result { let mut data = Vec::new(); Self::encode_into(messages, &mut data)?; @@ -119,8 +213,33 @@ where u32::from_le_bytes(checksum_le) } - /// Returns the byte length of all length-prefixed protobuf messages from the byte headers of - /// the chunk. + /// Returns the byte length of all length-prefixed protobuf messages from the byte headers. + /// + /// This reads the batch size header from the chunk without fully decoding it. + /// + /// # Arguments + /// + /// * `bytes` - The chunk bytes to read the header from + /// + /// # Returns + /// + /// The total byte length of all protobuf messages in the chunk. + /// + /// # Example + /// + /// ``` + /// use sift_pbfs::{PbfsChunk, BATCH_SIZE_LEN, CHECKSUM_HEADER_LEN}; + /// use prost::Message; + /// # use prost::Message as _; + /// + /// # #[derive(Clone, PartialEq, Message)] + /// # struct MyMessage { } + /// + /// # let messages = vec![MyMessage {}]; + /// # let chunk = PbfsChunk::new(&messages).unwrap(); + /// let bytes: &[u8] = &chunk; + /// let messages_len = PbfsChunk::::messages_len_from_header(bytes); + /// ``` #[allow(dead_code)] pub fn messages_len_from_header(bytes: &[u8]) -> u64 { let mut messages_len_le = [0_u8; BATCH_SIZE_LEN]; diff --git a/rust/crates/sift_pbfs/src/lib.rs b/rust/crates/sift_pbfs/src/lib.rs index a6c872c77..45dbe7b24 100644 --- a/rust/crates/sift_pbfs/src/lib.rs +++ b/rust/crates/sift_pbfs/src/lib.rs @@ -13,10 +13,38 @@ pub use chunk::{ #[cfg(test)] mod test; +/// Decoder for backup files containing protobuf messages. +/// /// Takes a `reader` to the backup file containing the backed up protobuf messages and offers /// functionality to iterate over all protobuf messages in the file. Each chunk of protobuf /// messages will be validated by having its checksum computed and compared against the checksum /// that stored in its byte-header. +/// +/// # Type Parameters +/// +/// * `M` - The protobuf message type to decode (must implement `Message + Default`) +/// * `R` - The reader type (must implement `Read`) +/// +/// # Example +/// +/// ```no_run +/// use sift_pbfs::BackupsDecoder; +/// use prost::Message; +/// # use prost::Message as _; +/// +/// # #[derive(Clone, PartialEq, Message)] +/// # struct MyMessage { } +/// +/// let file = std::fs::File::open("backup.pbfs").unwrap(); +/// let decoder = BackupsDecoder::::new(file); +/// +/// for result in decoder { +/// match result { +/// Ok(message) => println!("Decoded message: {:?}", message), +/// Err(e) => eprintln!("Error: {}", e), +/// } +/// } +/// ``` pub struct BackupsDecoder where M: Message + Default + 'static, @@ -33,6 +61,25 @@ where M: Message + Default + 'static, R: Read, { + /// Creates a new `BackupsDecoder` from a reader. + /// + /// # Arguments + /// + /// * `reader` - The reader to read backup data from + /// + /// # Example + /// + /// ```no_run + /// use sift_pbfs::BackupsDecoder; + /// use prost::Message; + /// # use prost::Message as _; + /// + /// # #[derive(Clone, PartialEq, Message)] + /// # struct MyMessage { } + /// + /// let file = std::fs::File::open("backup.pbfs").unwrap(); + /// let decoder = BackupsDecoder::::new(file); + /// ``` pub fn new(reader: R) -> Self { Self { reader, diff --git a/rust/crates/sift_rs/src/wrappers/assets.rs b/rust/crates/sift_rs/src/wrappers/assets.rs index 4f78b590a..25754ee97 100644 --- a/rust/crates/sift_rs/src/wrappers/assets.rs +++ b/rust/crates/sift_rs/src/wrappers/assets.rs @@ -9,21 +9,100 @@ use crate::assets::v1::{ Asset, GetAssetRequest, UpdateAssetRequest, asset_service_client::AssetServiceClient, }; -/// Return an implementation of [AssetServiceWrapper] which also exposes methods from the -/// raw [AssetServiceClient]. +/// Creates a new asset service wrapper. +/// +/// Returns an implementation of [`AssetServiceWrapper`] which also exposes methods +/// from the raw [`AssetServiceClient`] via `Deref` and `DerefMut`. +/// +/// # Arguments +/// +/// * `grpc_channel` - The gRPC channel to use for communication +/// +/// # Example +/// +/// ```no_run +/// use sift_rs::wrappers::assets::{new_asset_service, AssetServiceWrapper}; +/// use sift_connect::{Credentials, SiftChannelBuilder}; +/// +/// # async fn example() -> Result<(), Box> { +/// let credentials = Credentials::Config { +/// uri: "https://api.siftstack.com".to_string(), +/// apikey: "your-api-key".to_string(), +/// }; +/// let channel = SiftChannelBuilder::new(credentials).build()?; +/// let mut asset_service = new_asset_service(channel); +/// +/// let asset = asset_service.try_get_asset_by_id("asset-123").await?; +/// # Ok(()) +/// # } +/// ``` pub fn new_asset_service(grpc_channel: SiftChannel) -> impl AssetServiceWrapper { AssetServiceImpl(AssetServiceClient::new(grpc_channel)) } -/// Convenience methods +/// Convenience methods for working with Sift's Asset service. +/// +/// This trait provides simplified methods that return [`sift_error::Result`] instead +/// of raw gRPC responses. The underlying [`AssetServiceClient`] is accessible via +/// `Deref` and `DerefMut` for advanced use cases. #[async_trait] pub trait AssetServiceWrapper: Clone + Deref> + DerefMut { - /// Retrieves an asset by ID + /// Retrieves an asset by ID. + /// + /// # Arguments + /// + /// * `asset_id` - The ID of the asset to retrieve + /// + /// # Returns + /// + /// The requested asset, or an error if the asset doesn't exist or the request fails. + /// + /// # Errors + /// + /// Returns [`ErrorKind::RetrieveAssetError`] if the request fails or the asset + /// doesn't exist. Returns [`ErrorKind::EmptyResponseError`] if the response is empty. + /// + /// # Example + /// + /// ```no_run + /// use sift_rs::wrappers::assets::AssetServiceWrapper; + /// + /// # async fn example(mut service: impl AssetServiceWrapper) -> Result<(), Box> { + /// let asset = service.try_get_asset_by_id("asset-123").await?; + /// # Ok(()) + /// # } + /// ``` async fn try_get_asset_by_id(&mut self, asset_id: &str) -> Result; - /// Update an asset + /// Updates an asset. + /// + /// # Arguments + /// + /// * `asset` - The asset to update (must include the asset ID) + /// * `update_mask` - List of field paths to update (in snake_case) + /// + /// # Returns + /// + /// The updated asset, or an error if the update fails. + /// + /// # Errors + /// + /// Returns [`ErrorKind::UpdateAssetError`] if the update fails. Returns + /// [`ErrorKind::EmptyResponseError`] if the response is empty. + /// + /// # Example + /// + /// ```no_run + /// use sift_rs::wrappers::assets::AssetServiceWrapper; + /// + /// # async fn example(mut service: impl AssetServiceWrapper, mut asset: sift_rs::assets::v1::Asset) -> Result<(), Box> { + /// asset.name = "Updated Name".to_string(); + /// let updated = service.try_update_asset(asset, vec!["name".to_string()]).await?; + /// # Ok(()) + /// # } + /// ``` async fn try_update_asset(&mut self, asset: Asset, update_mask: Vec) -> Result; } diff --git a/rust/crates/sift_rs/src/wrappers/ingestion_configs.rs b/rust/crates/sift_rs/src/wrappers/ingestion_configs.rs index 06152c16d..4afced35b 100644 --- a/rust/crates/sift_rs/src/wrappers/ingestion_configs.rs +++ b/rust/crates/sift_rs/src/wrappers/ingestion_configs.rs @@ -10,20 +10,77 @@ use sift_connect::SiftChannel; use sift_error::prelude::*; use std::ops::{Deref, DerefMut}; -/// Return an implementation of [IngestionConfigServiceWrapper] which also exposes methods from the -/// raw [IngestionConfigServiceClient]. +/// Creates a new ingestion config service wrapper. +/// +/// Returns an implementation of [`IngestionConfigServiceWrapper`] which also exposes +/// methods from the raw [`IngestionConfigServiceClient`] via `Deref` and `DerefMut`. +/// +/// # Arguments +/// +/// * `grpc_channel` - The gRPC channel to use for communication +/// +/// # Example +/// +/// ```no_run +/// use sift_rs::wrappers::ingestion_configs::{new_ingestion_config_service, IngestionConfigServiceWrapper}; +/// use sift_connect::{Credentials, SiftChannelBuilder}; +/// +/// # async fn example() -> Result<(), Box> { +/// let credentials = Credentials::Config { +/// uri: "https://api.siftstack.com".to_string(), +/// apikey: "your-api-key".to_string(), +/// }; +/// let channel = SiftChannelBuilder::new(credentials).build()?; +/// let mut service = new_ingestion_config_service(channel); +/// +/// let config = service.try_get_ingestion_config_by_client_key("my-config").await?; +/// # Ok(()) +/// # } +/// ``` pub fn new_ingestion_config_service( grpc_channel: SiftChannel, ) -> impl IngestionConfigServiceWrapper { IngestionConfigServiceImpl(IngestionConfigServiceClient::new(grpc_channel)) } -/// Convenience methods on top of [IngestionConfigServiceClient]. +/// Convenience methods for working with Sift's IngestionConfig service. +/// +/// This trait provides simplified methods that return [`sift_error::Result`] instead +/// of raw gRPC responses. The underlying [`IngestionConfigServiceClient`] is accessible +/// via `Deref` and `DerefMut` for advanced use cases. #[async_trait] pub trait IngestionConfigServiceWrapper: Clone + Deref> + DerefMut { - /// Create an ingestion config. + /// Creates an ingestion config. + /// + /// # Arguments + /// + /// * `asset_name` - The name of the asset this config is for + /// * `client_key` - A unique identifier for this ingestion config + /// * `flows` - The flow configurations to include + /// + /// # Returns + /// + /// The created ingestion config, or an error if creation fails. + /// + /// # Errors + /// + /// Returns [`ErrorKind::ArgumentValidationError`] if `asset_name` or `client_key` + /// is empty. Returns [`ErrorKind::CreateIngestionConfigError`] if creation fails. + /// + /// # Example + /// + /// ```no_run + /// use sift_rs::wrappers::ingestion_configs::IngestionConfigServiceWrapper; + /// use sift_rs::ingestion_configs::v2::FlowConfig; + /// + /// # async fn example(mut service: impl IngestionConfigServiceWrapper) -> Result<(), Box> { + /// let flows = vec![/* FlowConfig instances */]; + /// let config = service.try_create_ingestion_config("MyAsset", "config-v1", &flows).await?; + /// # Ok(()) + /// # } + /// ``` async fn try_create_ingestion_config( &mut self, asset_name: &str, @@ -31,22 +88,80 @@ pub trait IngestionConfigServiceWrapper: flows: &[FlowConfig], ) -> Result; - /// Retrieve ingestion config by ID. + /// Retrieves an ingestion config by ID. + /// + /// # Arguments + /// + /// * `id` - The ID of the ingestion config to retrieve + /// + /// # Returns + /// + /// The requested ingestion config, or an error if it doesn't exist. + /// + /// # Errors + /// + /// Returns [`ErrorKind::ArgumentValidationError`] if `id` is empty. + /// Returns [`ErrorKind::RetrieveIngestionConfigError`] if retrieval fails. async fn try_get_ingestion_config_by_id(&mut self, id: &str) -> Result; - /// Retrieve ingestion config by client key. + /// Retrieves an ingestion config by client key. + /// + /// # Arguments + /// + /// * `client_key` - The client key of the ingestion config to retrieve + /// + /// # Returns + /// + /// The requested ingestion config, or an error if it doesn't exist. + /// + /// # Errors + /// + /// Returns [`ErrorKind::ArgumentValidationError`] if `client_key` is empty. + /// Returns [`ErrorKind::RetrieveIngestionConfigError`] if retrieval fails. + /// Returns [`ErrorKind::NotFoundError`] if no config with the given client key exists. async fn try_get_ingestion_config_by_client_key( &mut self, client_key: &str, ) -> Result; - /// Create [FlowConfig]s for a given ingestion config. If this function does not return an - /// error, then it is safe to assume that all [FlowConfig]s in `configs` was created. + /// Creates flow configs for a given ingestion config. + /// + /// If this function does not return an error, then it is safe to assume that + /// all [`FlowConfig`]s in `configs` were created. + /// + /// # Arguments + /// + /// * `ingestion_config_id` - The ID of the ingestion config to add flows to + /// * `configs` - The flow configs to create (can be any type that converts to `Vec`) + /// + /// # Returns + /// + /// `Ok(())` if all flows were created successfully. + /// + /// # Errors + /// + /// Returns [`ErrorKind::AlreadyExistsError`] if a flow with the same name already exists. + /// Returns [`ErrorKind::CreateFlowError`] if creation fails for other reasons. async fn try_create_flows(&mut self, ingestion_config_id: &str, configs: I) -> Result<()> where I: Into> + Send; - /// Retrieve all flows that satisfy the provided filter. + /// Retrieves all flows that satisfy the provided filter. + /// + /// This method handles pagination automatically and returns all matching flows. + /// + /// # Arguments + /// + /// * `ingestion_config_id` - The ID of the ingestion config to filter flows from + /// * `filter` - A filter expression (e.g., `"name == 'my-flow'"`) + /// + /// # Returns + /// + /// A vector of all flows matching the filter, or an error if retrieval fails. + /// + /// # Errors + /// + /// Returns [`ErrorKind::RetrieveIngestionConfigError`] if retrieval fails. async fn try_filter_flows( &mut self, ingestion_config_id: &str, diff --git a/rust/crates/sift_rs/src/wrappers/metadata.rs b/rust/crates/sift_rs/src/wrappers/metadata.rs index 8850f7830..13b5a85a3 100644 --- a/rust/crates/sift_rs/src/wrappers/metadata.rs +++ b/rust/crates/sift_rs/src/wrappers/metadata.rs @@ -59,17 +59,29 @@ impl> From<(&str, T)> for MetadataValue { } /// A macro for easily creating an array of metadata to be provided to Sift. -/// Returns a Vec<[MetadataValue]> /// -/// # Example +/// Returns a `Vec<[MetadataValue]>` from a list of key-value pairs. +/// +/// # Arguments +/// +/// The macro takes a comma-separated list of `(key, value)` tuples where: +/// - `key` is a string (or string literal) +/// - `value` can be a number (`f64`), boolean, or string +/// +/// # Returns +/// +/// A `Vec` containing the metadata entries. +/// +/// # Example +/// /// ``` /// # #[macro_use] extern crate sift_rs; /// # use sift_rs::metadata::v1::MetadataValue; /// # fn main() { /// let metadata: Vec = metadata![ -/// ("test_number", 5.0), -/// ("is_simulation", true), -/// ("location", "SiftHQ"), +/// ("test_number", 5.0), +/// ("is_simulation", true), +/// ("location", "SiftHQ"), /// ]; /// # } /// ``` diff --git a/rust/crates/sift_rs/src/wrappers/mod.rs b/rust/crates/sift_rs/src/wrappers/mod.rs index 05949fa64..912e00d14 100644 --- a/rust/crates/sift_rs/src/wrappers/mod.rs +++ b/rust/crates/sift_rs/src/wrappers/mod.rs @@ -1,3 +1,12 @@ +//! Wrapper modules for Sift's gRPC services. +//! +//! These modules provide convenient wrapper traits and implementations around the +//! raw gRPC service clients generated from protobuf definitions. The wrappers +//! provide: +//! - Simplified error handling with [`sift_error::Error`] +//! - Convenient methods for common operations +//! - Access to underlying gRPC clients via `Deref` and `DerefMut` + /// Offers a wrapper over Sift's assets API. pub mod assets; diff --git a/rust/crates/sift_rs/src/wrappers/runs.rs b/rust/crates/sift_rs/src/wrappers/runs.rs index 0e9239571..11fa7490f 100644 --- a/rust/crates/sift_rs/src/wrappers/runs.rs +++ b/rust/crates/sift_rs/src/wrappers/runs.rs @@ -12,18 +12,81 @@ use sift_connect::SiftChannel; use sift_error::prelude::*; use std::ops::{Deref, DerefMut}; -/// Return an implementation of [RunServiceWrapper] which also exposes methods from the -/// raw [RunServiceClient]. +/// Creates a new run service wrapper. +/// +/// Returns an implementation of [`RunServiceWrapper`] which also exposes methods +/// from the raw [`RunServiceClient`] via `Deref` and `DerefMut`. +/// +/// # Arguments +/// +/// * `grpc_channel` - The gRPC channel to use for communication +/// +/// # Example +/// +/// ```no_run +/// use sift_rs::wrappers::runs::{new_run_service, RunServiceWrapper}; +/// use sift_connect::{Credentials, SiftChannelBuilder}; +/// +/// # async fn example() -> Result<(), Box> { +/// let credentials = Credentials::Config { +/// uri: "https://api.siftstack.com".to_string(), +/// apikey: "your-api-key".to_string(), +/// }; +/// let channel = SiftChannelBuilder::new(credentials).build()?; +/// let mut service = new_run_service(channel); +/// +/// let run = service.try_get_run_by_id("run-123").await?; +/// # Ok(()) +/// # } +/// ``` pub fn new_run_service(grpc_channel: SiftChannel) -> impl RunServiceWrapper { RunServiceWrapperImpl(RunServiceClient::new(grpc_channel)) } -/// Convenience methods over [RunServiceClient]. +/// Convenience methods for working with Sift's Run service. +/// +/// This trait provides simplified methods that return [`sift_error::Result`] instead +/// of raw gRPC responses. The underlying [`RunServiceClient`] is accessible via +/// `Deref` and `DerefMut` for advanced use cases. #[async_trait] pub trait RunServiceWrapper: Clone + Deref> + DerefMut { /// Creates a run. + /// + /// # Arguments + /// + /// * `name` - The name of the run + /// * `client_key` - A unique identifier for this run + /// * `description` - A description of the run + /// * `tags` - Tags to associate with the run + /// * `metadata` - Metadata key-value pairs to associate with the run + /// + /// # Returns + /// + /// The created run, or an error if creation fails. + /// + /// # Errors + /// + /// Returns [`ErrorKind::ArgumentValidationError`] if `name` or `client_key` + /// is empty. Returns [`ErrorKind::CreateRunError`] if creation fails. + /// + /// # Example + /// + /// ```no_run + /// use sift_rs::wrappers::runs::RunServiceWrapper; + /// + /// # async fn example(mut service: impl RunServiceWrapper) -> Result<(), Box> { + /// let run = service.try_create_run( + /// "My Run", + /// "run-v1", + /// "Test run", + /// &["test".to_string()], + /// &[], + /// ).await?; + /// # Ok(()) + /// # } + /// ``` async fn try_create_run( &mut self, name: &str, @@ -33,19 +96,71 @@ pub trait RunServiceWrapper: metadata: &[MetadataValue], ) -> Result; - /// Update a run. The `updated_run` is expected to contain the `run_id` or `client_key` used to - /// identify the run to update. The `update_mask` is a list of snake_cased field names used to - /// indicate which fields should actually be updated. A list of valid field names can be found - /// at [`this link`]. The [Run] returned is the updated run. If `update_mask` is empty, then no - /// update is required and the `updated_run` is simply returned. + /// Updates a run. /// - /// [`this link`]: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/runs#updaterunrequest + /// The `updated_run` is expected to contain the `run_id` or `client_key` used to + /// identify the run to update. The `update_mask` is a list of snake_cased field names + /// used to indicate which fields should actually be updated. A list of valid field names + /// can be found at [this link](https://docs.siftstack.com/docs/api/grpc/protocol-buffers/runs#updaterunrequest). + /// + /// If `update_mask` is empty, then no update is required and the `updated_run` is + /// simply returned. + /// + /// # Arguments + /// + /// * `updated_run` - The run with updated fields (must include `run_id` or `client_key`) + /// * `update_mask` - List of snake_cased field names to update + /// + /// # Returns + /// + /// The updated run, or an error if the update fails. + /// + /// # Errors + /// + /// Returns [`ErrorKind::UpdateRunError`] if the update fails. + /// + /// # Example + /// + /// ```no_run + /// use sift_rs::wrappers::runs::RunServiceWrapper; + /// + /// # async fn example(mut service: impl RunServiceWrapper, mut run: sift_rs::runs::v2::Run) -> Result<(), Box> { + /// run.name = "Updated Name".to_string(); + /// let updated = service.try_update_run(run, &["name".to_string()]).await?; + /// # Ok(()) + /// # } + /// ``` async fn try_update_run(&mut self, updated_run: Run, update_mask: &[String]) -> Result; - /// Retrieve a run by ID. + /// Retrieves a run by ID. + /// + /// # Arguments + /// + /// * `run_id` - The ID of the run to retrieve + /// + /// # Returns + /// + /// The requested run, or an error if it doesn't exist. + /// + /// # Errors + /// + /// Returns [`ErrorKind::RetrieveRunError`] if retrieval fails. async fn try_get_run_by_id(&mut self, run_id: &str) -> Result; - /// Retrieve a run by client key. + /// Retrieves a run by client key. + /// + /// # Arguments + /// + /// * `client_key` - The client key of the run to retrieve + /// + /// # Returns + /// + /// The requested run, or an error if it doesn't exist. + /// + /// # Errors + /// + /// Returns [`ErrorKind::RetrieveRunError`] if retrieval fails. + /// Returns [`ErrorKind::NotFoundError`] if no run with the given client key exists. async fn try_get_run_by_client_key(&mut self, client_key: &str) -> Result; } diff --git a/rust/crates/sift_stream/Cargo.toml b/rust/crates/sift_stream/Cargo.toml index 74b4446f6..02b4f69b9 100644 --- a/rust/crates/sift_stream/Cargo.toml +++ b/rust/crates/sift_stream/Cargo.toml @@ -52,7 +52,7 @@ tokio-stream = "0.1.17" tonic = { workspace = true } tower = { version = "0.5.2", features = ["util"] } tracing-subscriber = { version = "0.3.19", features = ["fmt", "env-filter"] } -tokio = { version = "1.43.0", features = ["rt", "rt-multi-thread", "sync", "time"] } +tokio = { version = "1.43.0", features = ["rt", "rt-multi-thread", "sync", "time", "signal"] } uuid = { version = "1.16.0", features = ["v4"] } tracing-test = { version = "0.2.5", features = ["no-env-filter"] } criterion = { version = "0.5", features = ["html_reports"] } @@ -61,4 +61,22 @@ rand = "0.8" [[bench]] name = "message_to_ingest_req" harness = false -required-features = ["unstable"] \ No newline at end of file +required-features = ["unstable"] + +[[example]] +name = "backups-only" +path = "examples/backups-only/main.rs" + +[[example]] +name = "quick-start" +path = "examples/quick-start/main.rs" + +[[example]] +name = "direct-metrics" +path = "examples/metrics/direct_metrics.rs" +required-features = ["metrics-unstable"] + +[[example]] +name = "http-server-metrics" +path = "examples/metrics/http_server.rs" +required-features = ["metrics-unstable"] \ No newline at end of file diff --git a/rust/crates/sift_stream/examples/metrics/direct_metrics.rs b/rust/crates/sift_stream/examples/metrics/direct_metrics.rs new file mode 100644 index 000000000..9bcc668f7 --- /dev/null +++ b/rust/crates/sift_stream/examples/metrics/direct_metrics.rs @@ -0,0 +1,163 @@ +//! Example demonstrating direct metrics access from SiftStream. +//! +//! This example shows how to: +//! - Access metrics using `get_metrics_snapshot()` on a SiftStream instance +//! - Display all available metric fields +//! - Perform periodic metrics polling +//! - Access checkpoint and backup metrics +//! +//! Run with: `cargo run --example direct-metrics --features metrics-unstable` +#[cfg(not(feature = "metrics-unstable"))] +compile_error!( + "This example requires the 'metrics-unstable' feature to be enabled. Run with: cargo run --example direct-metrics --features metrics-unstable" +); + +use sift_stream::{ + ChannelConfig, ChannelDataType, ChannelValue, Credentials, Flow, FlowConfig, + IngestionConfigForm, RecoveryStrategy, RunForm, SiftStreamBuilder, TimeValue, +}; +use std::{env, error::Error, process::ExitCode, time::Duration}; +use tracing_subscriber::filter::EnvFilter; + +#[tokio::main] +async fn main() -> ExitCode { + tracing_subscriber::fmt() + .with_target(false) + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + match run().await { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("{err}"); + ExitCode::FAILURE + } + } +} + +async fn run() -> Result<(), Box> { + let credentials = Credentials::Config { + apikey: env::var("SIFT_API_KEY").expect("SIFT_API_KEY environment variable must be set"), + uri: env::var("SIFT_URI").expect("SIFT_URI environment variable must be set"), + }; + + // Define the schema of your telemetry + let ingestion_config = IngestionConfigForm { + asset_name: "MetricsExample".into(), + client_key: "metrics-example-v1".into(), + flows: vec![FlowConfig { + name: "sensor-data".into(), + channels: vec![ChannelConfig { + name: "temperature".into(), + description: "Temperature sensor reading".into(), + data_type: ChannelDataType::Double.into(), + unit: "Celsius".into(), + ..Default::default() + }], + }], + }; + + // Define a run to group together data + let run = RunForm { + name: "Metrics Example Run".into(), + client_key: "metrics-example-run".into(), + description: Some("Example demonstrating metrics access".into()), + tags: Some(vec!["metrics".into(), "example".into()]), + metadata: None, + }; + + // Initialize your Sift Stream + let mut sift_stream = SiftStreamBuilder::new(credentials) + .ingestion_config(ingestion_config) + .recovery_strategy(RecoveryStrategy::default()) + .attach_run(run) + .build() + .await?; + + println!("Starting to send data and collect metrics...\n"); + + // Send some data to generate metrics + for i in 0..100 { + let flow = Flow::new( + "sensor-data", + TimeValue::now(), + &[ChannelValue::new("temperature", 20.0 + (i as f64) * 0.1)], + ); + + sift_stream.send(flow).await?; + + // Print metrics every 10 messages + if i % 10 == 0 && i > 0 { + let metrics = sift_stream.get_metrics_snapshot(); + print_metrics_snapshot(&metrics, i); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // Print final metrics + println!("\n=== Final Metrics ==="); + let final_metrics = sift_stream.get_metrics_snapshot(); + print_metrics_snapshot(&final_metrics, 100); + + // Demonstrate accessing specific metric categories + let metrics = sift_stream.get_metrics_snapshot(); + println!("\n=== Checkpoint Metrics ==="); + println!("Total checkpoints: {}", metrics.checkpoint.checkpoint_count); + println!( + "Failed checkpoints: {}", + metrics.checkpoint.failed_checkpoint_count + ); + println!( + "Current checkpoint elapsed: {:.2}s", + metrics.checkpoint.cur_elapsed_secs + ); + println!( + "Current checkpoint message rate: {:.2} msg/s", + metrics.checkpoint.cur_message_rate + ); + println!( + "Current checkpoint byte rate: {:.2} bytes/s", + metrics.checkpoint.cur_byte_rate + ); + + println!("\n=== Backup Metrics ==="); + println!("Total backup files: {}", metrics.backups.total_file_count); + println!("Total backup bytes: {}", metrics.backups.total_bytes); + println!("Total backup messages: {}", metrics.backups.total_messages); + println!( + "Files pending ingestion: {}", + metrics.backups.files_pending_ingestion + ); + println!("Files ingested: {}", metrics.backups.files_ingested); + + // Gracefully terminate your stream + sift_stream + .finish() + .await + .expect("failed to gracefully terminate Sift stream"); + + Ok(()) +} + +fn print_metrics_snapshot(metrics: &sift_stream::SiftStreamMetricsSnapshot, message_count: u64) { + println!( + "=== Metrics Snapshot (after {} messages) ===", + message_count + ); + println!("Elapsed time: {:.2}s", metrics.elapsed_secs); + println!("Loaded flows: {}", metrics.loaded_flows); + println!("Unique flows received: {}", metrics.unique_flows_received); + println!("Messages received: {}", metrics.messages_received); + println!("Messages sent: {}", metrics.messages_sent); + println!("Message rate: {:.2} messages/s", metrics.message_rate); + println!("Bytes sent: {}", metrics.bytes_sent); + println!("Byte rate: {:.2} bytes/s", metrics.byte_rate); + println!("Current retry count: {}", metrics.cur_retry_count); + println!( + "Ingestion channel depth: {}", + metrics.ingestion_channel_depth + ); + println!("Backup channel depth: {}", metrics.backup_channel_depth); + println!(); +} diff --git a/rust/crates/sift_stream/examples/metrics/http_server.rs b/rust/crates/sift_stream/examples/metrics/http_server.rs new file mode 100644 index 000000000..9e4cdeb2d --- /dev/null +++ b/rust/crates/sift_stream/examples/metrics/http_server.rs @@ -0,0 +1,173 @@ +//! Example demonstrating the HTTP metrics server for SiftStream. +//! +//! This example shows how to: +//! - Start the metrics HTTP server using `MetricsServerBuilder` +//! - Query the `/` and `/metrics` endpoints +//! - Parse and display the JSON metrics response +//! - Query the server from external tools (demonstrated with curl command) +//! +//! Run with: `cargo run --example http_server --features metrics-unstable` +//! +//! Once running, you can query metrics using: +//! ```bash +//! curl http://127.0.0.1:8080/metrics +//! ``` +//! +//! To pretty-print the metrics, you can also use the `jq` command: +//! ```bash +//! curl http://127.0.0.1:8080/metrics | jq . +//! ``` +#[cfg(not(feature = "metrics-unstable"))] +compile_error!( + "This example requires the 'metrics-unstable' feature to be enabled. Run with: cargo run --example http-server-metrics --features metrics-unstable" +); + +use sift_stream::{ + ChannelConfig, ChannelDataType, ChannelValue, Credentials, Flow, FlowConfig, + IngestionConfigForm, MetricsServerBuilder, RecoveryStrategy, RunForm, SiftStreamBuilder, + TimeValue, +}; +use std::{env, error::Error, net::SocketAddr, process::ExitCode, time::Duration}; +use tokio::signal; +use tracing_subscriber::filter::EnvFilter; + +#[tokio::main] +async fn main() -> ExitCode { + tracing_subscriber::fmt() + .with_target(false) + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + match run().await { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("{err}"); + ExitCode::FAILURE + } + } +} + +async fn run() -> Result<(), Box> { + let credentials = Credentials::Config { + apikey: env::var("SIFT_API_KEY").expect("SIFT_API_KEY environment variable must be set"), + uri: env::var("SIFT_URI").expect("SIFT_URI environment variable must be set"), + }; + + // Define the schema of your telemetry + let ingestion_config = IngestionConfigForm { + asset_name: "MetricsServerExample".into(), + client_key: "metrics-server-example-v1".into(), + flows: vec![FlowConfig { + name: "sensor-data".into(), + channels: vec![ChannelConfig { + name: "temperature".into(), + description: "Temperature sensor reading".into(), + data_type: ChannelDataType::Double.into(), + unit: "Celsius".into(), + ..Default::default() + }], + }], + }; + + // Define a run to group together data + let run = RunForm { + name: "Metrics Server Example Run".into(), + client_key: "metrics-server-example-run".into(), + description: Some("Example demonstrating HTTP metrics server".into()), + tags: Some(vec!["metrics".into(), "http-server".into()]), + metadata: None, + }; + + // Start the metrics HTTP server + // Defaults to 127.0.0.1:8080, but can be customized + let socket_addr: SocketAddr = "127.0.0.1:8080".parse().expect("Invalid socket address"); + + println!("Starting metrics HTTP server on {}...", socket_addr); + MetricsServerBuilder::new() + .socket(socket_addr) + .start_metrics_server() + .await?; + + println!("Metrics server started! Metrics are available at:"); + println!(" - http://{}/", socket_addr); + println!(" - http://{}/metrics", socket_addr); + println!("\nYou can query metrics using:"); + println!(" curl http://{}/metrics", socket_addr); + println!("\nStarting to send data continuously..."); + println!("Press Ctrl+C to stop.\n"); + + // Initialize your Sift Stream + // The metrics from this stream will be automatically registered with the HTTP server + let mut sift_stream = SiftStreamBuilder::new(credentials) + .ingestion_config(ingestion_config) + .recovery_strategy(RecoveryStrategy::default()) + .attach_run(run) + .build() + .await?; + + println!("\n=== Example: Querying Metrics Server ==="); + println!("You can query the metrics server from your application or external tools."); + println!("The server returns JSON with metrics organized by sift_stream_id."); + println!("\nExample curl command:"); + println!(" curl http://{}/metrics", socket_addr); + println!("\nExample Python code to query metrics:"); + println!( + r#" import requests + import json + + response = requests.get("http://127.0.0.1:8080/metrics") + metrics = json.loads(response.text) + print(json.dumps(metrics, indent=2))"# + ); + println!("\nSending data continuously. Press Ctrl+C to stop...\n"); + + // Continuously send data until Ctrl-C is pressed + // We use tokio::select! to race between sending data and waiting for Ctrl-C + let mut counter = 0u64; + + let mut signal = Box::pin(signal::ctrl_c()); + + loop { + tokio::select! { + _ = &mut signal => { + println!("\nCtrl+C received. Shutting down gracefully..."); + break; + } + result = async { + let flow = Flow::new( + "sensor-data", + TimeValue::now(), + &[ChannelValue::new("temperature", 20.0 + (counter as f64) * 0.1)], + ); + sift_stream.send(flow).await + } => { + match result { + Ok(_) => { + counter += 1; + + // Print status every 10 messages + if counter % 10 == 0 { + println!("Sent {} messages.", counter); + } + } + Err(e) => { + eprintln!("Error sending flow: {}", e); + break; + } + } + + tokio::time::sleep(Duration::from_millis(200)).await; + } + } + } + + // Gracefully terminate your stream + sift_stream + .finish() + .await + .expect("failed to gracefully terminate Sift stream"); + + println!("Stream finished gracefully. Exiting..."); + + Ok(()) +} diff --git a/rust/crates/sift_stream/src/lib.rs b/rust/crates/sift_stream/src/lib.rs index 1677c151d..b8e5dce18 100644 --- a/rust/crates/sift_stream/src/lib.rs +++ b/rust/crates/sift_stream/src/lib.rs @@ -417,11 +417,11 @@ //! Metrics are currently considered an unstable feature, and future updates may break the existing metrics API. //! //! When the `metrics-unstable` feature flag is enabled, users may currently access metrics through one of two methods: -//! - [SiftStream::get_metrics_snapshot] returns a [SiftStreamMetricsSnapshot] +//! - [`SiftStream::get_metrics_snapshot`](stream::SiftStream::get_metrics_snapshot) returns a [SiftStreamMetricsSnapshot] //! - Enable the light weight HTTP metrics server using [metrics::start_metrics_server], which exposes the `/` and `/metrics` //! endpoints, providing a JSON formatted struct of each sift-stream-id and its [SiftStreamMetricsSnapshot] //! -//! Snapshots of the metrics are taken at any time the user calls [SiftStream::get_metrics_snapshot] or sends a GET request to the metrics +//! Snapshots of the metrics are taken at any time the user calls [`SiftStream::get_metrics_snapshot`](stream::SiftStream::get_metrics_snapshot) or sends a GET request to the metrics //! server endpoints. Metrics are internally updated atomically, and calls to get metric snapshots are non-blocking to SiftStream //! operaration. //! @@ -450,7 +450,7 @@ pub use sift_rs::{ pub use stream::{ RetryPolicy, SiftStream, builder::{IngestionConfigForm, RecoveryStrategy, RunForm, SiftStreamBuilder}, - channel::{ChannelValue, Value}, + channel::{ChannelEnum, ChannelValue, Value}, flow::{ChannelIndex, FlowBuilder, FlowDescriptor, FlowDescriptorBuilder}, mode::{ file_backup::FileBackup, @@ -472,7 +472,7 @@ pub use sift_connect::grpc::{Credentials, SiftChannel}; pub mod metrics; #[cfg(feature = "metrics-unstable")] -pub use metrics::SiftStreamMetricsSnapshot; +pub use metrics::{MetricsServerBuilder, SiftStreamMetricsSnapshot}; #[cfg(test)] mod test; diff --git a/rust/crates/sift_stream/src/stream/channel.rs b/rust/crates/sift_stream/src/stream/channel.rs index 735b9eadd..1d15c8d1a 100644 --- a/rust/crates/sift_stream/src/stream/channel.rs +++ b/rust/crates/sift_stream/src/stream/channel.rs @@ -3,17 +3,55 @@ use sift_rs::{ }; /// Represents the value emitted by a named channel. +/// +/// A `ChannelValue` pairs a channel name with its typed value. This is used +/// when constructing [`Flow`](crate::Flow) instances to send telemetry data. +/// +/// # Example +/// +/// ``` +/// use sift_stream::ChannelValue; +/// +/// let value = ChannelValue::new("temperature", 72.5_f64); +/// let string_value = ChannelValue::new("status", "operational"); +/// ``` #[derive(Debug, PartialEq, Clone)] pub struct ChannelValue { + /// The name of the channel. pub name: String, + /// The value emitted by the channel. pub value: Value, } -/// Represents a specific enumeration of an enum channel. +/// Represents a specific enumeration value for an enum channel. +/// +/// Enum channels use numeric values to represent discrete states. This wrapper +/// type makes it clear when a value represents an enum variant. +/// +/// # Example +/// +/// ``` +/// use sift_stream::{ChannelValue, ChannelEnum}; +/// +/// let enum_value = ChannelValue::new("state", ChannelEnum(0)); +/// ``` #[derive(Debug, PartialEq)] pub struct ChannelEnum(pub u32); -/// Represents a typed-value emitted by a channel. +/// Represents a typed value emitted by a channel. +/// +/// This enum covers all supported data types for telemetry channels. Values can +/// be created from various Rust types using the `From` trait implementations. +/// +/// # Example +/// +/// ``` +/// use sift_stream::Value; +/// +/// let bool_val: Value = true.into(); +/// let float_val: Value = 3.14_f32.into(); +/// let string_val: Value = "text".into(); +/// ``` #[derive(Debug, PartialEq, Clone)] pub enum Value { Bool(bool), @@ -61,12 +99,29 @@ impl Value { } impl ChannelValue { - /// Creates a [ChannelValue] for a channel of name `name`. + /// Creates a [`ChannelValue`] for a channel with the given name. /// - /// Example: - /// ```ignore - /// ChannelValue::new("arm-joint", 3_i32); - /// ChannelValue::new("navigation", 3.14_f32); + /// The value type is inferred from the provided value, which can be any type + /// that implements `Into` (including `bool`, numeric types, strings, etc.). + /// + /// # Arguments + /// + /// * `name` - The name of the channel + /// * `val` - The value to associate with the channel (any type that converts to `Value`) + /// + /// # Returns + /// + /// A new `ChannelValue` instance. + /// + /// # Example + /// + /// ``` + /// use sift_stream::ChannelValue; + /// + /// let int_value = ChannelValue::new("arm-joint", 3_i32); + /// let float_value = ChannelValue::new("navigation", 3.14_f32); + /// let bool_value = ChannelValue::new("enabled", true); + /// let string_value = ChannelValue::new("status", "operational"); /// ``` pub fn new>(name: &str, val: T) -> Self { Self { diff --git a/rust/crates/sift_stream/src/stream/mod.rs b/rust/crates/sift_stream/src/stream/mod.rs index 208c6c55f..83fdb2a20 100644 --- a/rust/crates/sift_stream/src/stream/mod.rs +++ b/rust/crates/sift_stream/src/stream/mod.rs @@ -149,7 +149,7 @@ where self.run.as_ref() } - /// The entry-point to send actual telemetry to Sift in the form of [Flow]s. + /// The entry-point to send actual telemetry to Sift in the form of [`Flow`](mode::ingestion_config::Flow)s. pub async fn send(&mut self, message: M) -> Result<()> where M: Encodeable::Message> + Send + Sync, diff --git a/rust/crates/sift_stream/src/stream/retry.rs b/rust/crates/sift_stream/src/stream/retry.rs index 213d42476..c4334d805 100644 --- a/rust/crates/sift_stream/src/stream/retry.rs +++ b/rust/crates/sift_stream/src/stream/retry.rs @@ -1,18 +1,50 @@ use std::time::Duration; -/// A retry policy that is used to configure the retry behavior of a Sift stream. Most users should -/// opt to use the default retry policy provided by [RetryPolicy::default], however, they are able -/// to completely configure their own. +/// A retry policy that configures the stream retry behavior of a Sift stream +/// instance. +/// +/// Most users should opt to use the default retry policy provided by [`RetryPolicy::default`]. +/// +/// The retry policy uses exponential backoff with configurable parameters. When a retryable +/// error occurs, the stream will wait for the calculated backoff duration before retrying. +/// +/// # Example +/// +/// ``` +/// use sift_stream::RetryPolicy; +/// use std::time::Duration; +/// +/// // Use default policy +/// let default_policy = RetryPolicy::default(); +/// +/// // Create custom policy +/// let custom_policy = RetryPolicy { +/// max_attempts: 10, +/// initial_backoff: Duration::from_millis(100), +/// max_backoff: Duration::from_secs(10), +/// backoff_multiplier: 2, +/// }; +/// ``` #[derive(Debug, Clone)] pub struct RetryPolicy { + /// Maximum number of retry attempts (including the initial attempt). pub max_attempts: u8, + /// Initial backoff duration for the first retry. pub initial_backoff: Duration, + /// Maximum backoff duration cap. pub max_backoff: Duration, + /// Multiplier for exponential backoff (applied to current wait time). pub backoff_multiplier: u8, } impl Default for RetryPolicy { - /// The default [RetryPolicy] that is configured to retry 5 times with exponential backoff. + /// The default [`RetryPolicy`] configured to retry 5 times with exponential backoff. + /// + /// Default settings: + /// - `max_attempts`: 5 + /// - `initial_backoff`: 50ms + /// - `max_backoff`: 5s + /// - `backoff_multiplier`: 5 fn default() -> Self { Self { max_attempts: 5, @@ -24,6 +56,30 @@ impl Default for RetryPolicy { } impl RetryPolicy { + /// Calculates the next backoff duration based on the current wait time. + /// + /// The backoff calculation: + /// - If `current_wait` is zero, returns `initial_backoff` + /// - Otherwise, multiplies `current_wait` by `backoff_multiplier` and caps at `max_backoff` + /// + /// # Arguments + /// + /// * `current_wait` - The current wait duration (use `Duration::ZERO` for the first retry) + /// + /// # Returns + /// + /// The calculated backoff duration. + /// + /// # Example + /// + /// ``` + /// use sift_stream::RetryPolicy; + /// use std::time::Duration; + /// + /// let policy = RetryPolicy::default(); + /// let first_backoff = policy.backoff(Duration::ZERO); + /// let second_backoff = policy.backoff(first_backoff); + /// ``` pub fn backoff(&self, current_wait: Duration) -> Duration { if current_wait == Duration::ZERO { return self.initial_backoff; diff --git a/rust/crates/sift_stream/src/stream/run.rs b/rust/crates/sift_stream/src/stream/run.rs index 53eb028e3..b6f2fd815 100644 --- a/rust/crates/sift_stream/src/stream/run.rs +++ b/rust/crates/sift_stream/src/stream/run.rs @@ -9,8 +9,36 @@ use sift_rs::{ }; use std::collections::{HashMap, HashSet}; +/// Selector for identifying a run when attaching to a stream. +/// +/// This enum allows you to specify a run either by its ID (if it already exists) +/// or by providing a [`RunForm`] to create or retrieve a run by client key. +/// +/// # Example +/// +/// ```no_run +/// use sift_stream::stream::run::RunSelector; +/// use sift_stream::RunForm; +/// +/// // Select by ID +/// let selector = RunSelector::ById("run-123".to_string()); +/// +/// // Select by form (creates if doesn't exist) +/// let selector = RunSelector::ByForm(RunForm { +/// name: "My Run".to_string(), +/// client_key: "run-v1".to_string(), +/// ..Default::default() +/// }); +/// ``` pub enum RunSelector { + /// Select a run by its ID. + /// + /// The run must already exist. If it doesn't exist, an error will be returned. ById(String), + /// Select a run by providing a [`RunForm`]. + /// + /// If a run with the given `client_key` exists, it will be retrieved and + /// updated if any fields have changed. If no run exists, a new one will be created. ByForm(RunForm), } diff --git a/rust/crates/sift_stream/src/stream/time.rs b/rust/crates/sift_stream/src/stream/time.rs index a1cb67dba..5473dea9f 100644 --- a/rust/crates/sift_stream/src/stream/time.rs +++ b/rust/crates/sift_stream/src/stream/time.rs @@ -3,27 +3,78 @@ use pbjson_types::Timestamp; use sift_error::prelude::*; use std::ops::{Deref, DerefMut}; -/// The primary time-type of the `sift_stream` crate. This is a flexible wrapper over the -/// underlying protobuf time-type that can be constructed from a variety of different time -/// representations. +/// The primary time-type of the `sift_stream` crate. +/// +/// This is a flexible wrapper over the underlying protobuf time-type that can be +/// constructed from a variety of different time representations. All times are +/// stored and transmitted as UTC. +/// +/// # Example +/// +/// ``` +/// use sift_stream::TimeValue; +/// +/// // Current time +/// let now = TimeValue::now(); +/// +/// // From timestamp +/// let time = TimeValue::try_from_timestamp_millis(1609459200000).unwrap(); +/// +/// // From RFC3339 string +/// let time = TimeValue::try_from_rfc3339("2021-01-01T00:00:00Z").unwrap(); +/// ``` #[derive(Debug, Clone)] pub struct TimeValue(pub(crate) Timestamp); /// Initializes with the current time. impl Default for TimeValue { - /// Creates a time-value that represents the current absolute time. + /// Creates a time-value that represents the current absolute time in UTC. fn default() -> Self { Self::from(Local::now().to_utc()) } } impl TimeValue { - /// Creates a time-value that represents the current absolute time. + /// Creates a time-value that represents the current absolute time in UTC. + /// + /// # Returns + /// + /// A `TimeValue` representing the current time. + /// + /// # Example + /// + /// ``` + /// use sift_stream::TimeValue; + /// + /// let now = TimeValue::now(); + /// ``` pub fn now() -> Self { Self::default() } - /// Creates a [TimeValue] from a second and nanosecond timestamp. + /// Creates a [`TimeValue`] from a second and nanosecond timestamp. + /// + /// # Arguments + /// + /// * `secs` - Seconds since Unix epoch (can be negative) + /// * `nsecs` - Nanoseconds component (0-999,999,999) + /// + /// # Returns + /// + /// A `TimeValue` if the timestamp is valid, or an error if the timestamp + /// is out of range. + /// + /// # Errors + /// + /// Returns [`ErrorKind::TimeConversionError`] if the timestamp is invalid. + /// + /// # Example + /// + /// ``` + /// use sift_stream::TimeValue; + /// + /// let time = TimeValue::try_from_timestamp(1609459200, 0).unwrap(); + /// ``` pub fn try_from_timestamp(secs: i64, nsecs: u32) -> Result { DateTime::::from_timestamp(secs, nsecs) .map(|t| TimeValue(Timestamp::from(t))) @@ -35,7 +86,28 @@ impl TimeValue { }) } - /// Creates a [TimeValue] from a millisecond timestamp. + /// Creates a [`TimeValue`] from a millisecond timestamp. + /// + /// # Arguments + /// + /// * `millis` - Milliseconds since Unix epoch (can be negative) + /// + /// # Returns + /// + /// A `TimeValue` if the timestamp is valid, or an error if the timestamp + /// is out of range. + /// + /// # Errors + /// + /// Returns [`ErrorKind::TimeConversionError`] if the timestamp is invalid. + /// + /// # Example + /// + /// ``` + /// use sift_stream::TimeValue; + /// + /// let time = TimeValue::try_from_timestamp_millis(1609459200000).unwrap(); + /// ``` pub fn try_from_timestamp_millis(millis: i64) -> Result { DateTime::::from_timestamp_millis(millis) .map(|t| TimeValue(Timestamp::from(t))) @@ -47,7 +119,28 @@ impl TimeValue { }) } - /// Creates a [TimeValue] from a microsecond timestamp. + /// Creates a [`TimeValue`] from a microsecond timestamp. + /// + /// # Arguments + /// + /// * `micros` - Microseconds since Unix epoch (can be negative) + /// + /// # Returns + /// + /// A `TimeValue` if the timestamp is valid, or an error if the timestamp + /// is out of range. + /// + /// # Errors + /// + /// Returns [`ErrorKind::TimeConversionError`] if the timestamp is invalid. + /// + /// # Example + /// + /// ``` + /// use sift_stream::TimeValue; + /// + /// let time = TimeValue::try_from_timestamp_micros(1609459200000000).unwrap(); + /// ``` pub fn try_from_timestamp_micros(micros: i64) -> Result { DateTime::::from_timestamp_micros(micros) .map(|t| TimeValue(Timestamp::from(t))) @@ -59,14 +152,54 @@ impl TimeValue { }) } - /// Creates a [TimeValue] from a nanosecond timestamp. + /// Creates a [`TimeValue`] from a nanosecond timestamp. + /// + /// Unlike the other timestamp constructors, this method does not return a `Result` + /// because nanosecond timestamps are always valid (they cover the full range of + /// representable times). + /// + /// # Arguments + /// + /// * `nanos` - Nanoseconds since Unix epoch (can be negative) + /// + /// # Returns + /// + /// A `TimeValue` representing the given timestamp. + /// + /// # Example + /// + /// ``` + /// use sift_stream::TimeValue; + /// + /// let time = TimeValue::from_timestamp_nanos(1609459200000000000); + /// ``` pub fn from_timestamp_nanos(nanos: i64) -> Self { TimeValue(Timestamp::from(DateTime::::from_timestamp_nanos( nanos, ))) } - /// Creates a [TimeValue] from a RFC3339 datetime string. + /// Creates a [`TimeValue`] from an RFC3339 datetime string. + /// + /// # Arguments + /// + /// * `val` - An RFC3339 formatted datetime string (e.g., "2021-01-01T00:00:00Z") + /// + /// # Returns + /// + /// A `TimeValue` if the string is valid, or an error if parsing fails. + /// + /// # Errors + /// + /// Returns [`ErrorKind::TimeConversionError`] if the string is not valid RFC3339. + /// + /// # Example + /// + /// ``` + /// use sift_stream::TimeValue; + /// + /// let time = TimeValue::try_from_rfc3339("2021-01-01T00:00:00Z").unwrap(); + /// ``` pub fn try_from_rfc3339>(val: S) -> Result { DateTime::parse_from_rfc3339(val.as_ref()) .map(|d| TimeValue(Timestamp::from(d.to_utc()))) diff --git a/rust/crates/sift_stream_bindings/src/stream/builder.rs b/rust/crates/sift_stream_bindings/src/stream/builder.rs index 14c9fa9b1..947013864 100644 --- a/rust/crates/sift_stream_bindings/src/stream/builder.rs +++ b/rust/crates/sift_stream_bindings/src/stream/builder.rs @@ -9,6 +9,13 @@ use std::sync::Arc; use tokio::sync::Mutex; // Type Definitions +/// Python binding for [`SiftStreamBuilder`](sift_stream::stream::builder::SiftStreamBuilder). +/// +/// This is a thin wrapper around the Rust `SiftStreamBuilder` type. For detailed documentation, +/// see [`SiftStreamBuilder`](sift_stream::stream::builder::SiftStreamBuilder). +/// +/// The builder provides a fluent API for configuring and creating a [`SiftStreamPy`] instance +/// with various options including ingestion configs, retry policies, checkpoint intervals, and more. #[gen_stub_pyclass] #[pyclass] pub struct SiftStreamBuilderPy { diff --git a/rust/crates/sift_stream_bindings/src/stream/channel.rs b/rust/crates/sift_stream_bindings/src/stream/channel.rs index 613ba5f9f..dc1d9241c 100644 --- a/rust/crates/sift_stream_bindings/src/stream/channel.rs +++ b/rust/crates/sift_stream_bindings/src/stream/channel.rs @@ -5,6 +5,13 @@ use sift_rs::common::r#type::v1::{ChannelBitFieldElement, ChannelDataType, Chann use sift_rs::ingest::v1::ingest_with_config_data_channel_value::Type as ChannelValueType; use sift_stream::stream::channel::{ChannelValue, Value}; +/// Python binding for [`ChannelValue`](sift_stream::ChannelValue). +/// +/// This is a thin wrapper around the Rust `ChannelValue` type. For detailed documentation, +/// see [`ChannelValue`](sift_stream::ChannelValue). +/// +/// A `ChannelValue` pairs a channel name with its typed value, used when constructing +/// [`Flow`](sift_stream::Flow) instances. #[gen_stub_pyclass] #[pyclass] #[derive(Clone)] @@ -15,11 +22,22 @@ pub struct ChannelValuePy { value: ValuePy, } +/// Python binding for channel enum values. +/// +/// Represents a specific enumeration value for an enum channel. Enum channels use +/// numeric values to represent discrete states. #[gen_stub_pyclass] #[pyclass] #[derive(Clone)] pub struct ChannelEnumPy(pub u32); +/// Python binding for [`Value`](sift_stream::Value). +/// +/// This is a thin wrapper around the Rust `Value` enum. For detailed documentation, +/// see [`Value`](sift_stream::Value). +/// +/// `Value` represents a typed value emitted by a channel, supporting all standard +/// telemetry data types (bool, numbers, strings, enums, bitfields). #[gen_stub_pyclass] #[pyclass] #[derive(Clone)] diff --git a/rust/crates/sift_stream_bindings/src/stream/config.rs b/rust/crates/sift_stream_bindings/src/stream/config.rs index 6de261857..240ce2442 100644 --- a/rust/crates/sift_stream_bindings/src/stream/config.rs +++ b/rust/crates/sift_stream_bindings/src/stream/config.rs @@ -18,6 +18,13 @@ use std::collections::HashMap; use std::sync::Arc; // Type Definitions +/// Python binding for [`ChannelConfig`](sift_rs::ingestion_configs::v2::ChannelConfig). +/// +/// This is a thin wrapper around the Rust `ChannelConfig` type. For detailed documentation, +/// see [`ChannelConfig`](sift_rs::ingestion_configs::v2::ChannelConfig). +/// +/// A `ChannelConfig` defines the schema for a single telemetry channel, including its +/// name, data type, unit, and description. #[gen_stub_pyclass] #[pyclass] #[derive(Clone)] @@ -37,6 +44,13 @@ pub struct ChannelConfigPy { bit_field_elements: Vec, } +/// Python binding for [`FlowConfig`](sift_rs::ingestion_configs::v2::FlowConfig). +/// +/// This is a thin wrapper around the Rust `FlowConfig` type. For detailed documentation, +/// see [`FlowConfig`](sift_rs::ingestion_configs::v2::FlowConfig). +/// +/// A `FlowConfig` defines the schema for a flow, which is a named group of channels +/// that are often telemetered together. #[gen_stub_pyclass] #[pyclass] #[derive(Clone)] @@ -79,6 +93,13 @@ pub struct FlowBuilderPy { builder: Option>, } +/// Python binding for [`IngestionConfigForm`](sift_stream::stream::builder::IngestionConfigForm). +/// +/// This is a thin wrapper around the Rust `IngestionConfigForm` type. For detailed documentation, +/// see [`IngestionConfigForm`](sift_stream::stream::builder::IngestionConfigForm). +/// +/// An `IngestionConfigForm` is used to create a new ingestion config or retrieve an existing +/// one based on the `client_key`. It defines the schema of an asset's telemetry. #[gen_stub_pyclass] #[pyclass] #[derive(Clone)] @@ -91,6 +112,14 @@ pub struct IngestionConfigFormPy { client_key: String, } +/// Python binding for [`RunForm`](sift_stream::stream::builder::RunForm). +/// +/// This is a thin wrapper around the Rust `RunForm` type. For detailed documentation, +/// see [`RunForm`](sift_stream::stream::builder::RunForm). +/// +/// A `RunForm` is used to create a new run or retrieve an existing run based on the +/// `client_key`. If a run with the given `client_key` exists, it will be updated +/// with any changed fields. #[gen_stub_pyclass] #[pyclass] #[derive(Clone)] diff --git a/rust/crates/sift_stream_bindings/src/stream/mod.rs b/rust/crates/sift_stream_bindings/src/stream/mod.rs index 1370e2d53..ea836b055 100644 --- a/rust/crates/sift_stream_bindings/src/stream/mod.rs +++ b/rust/crates/sift_stream_bindings/src/stream/mod.rs @@ -21,12 +21,26 @@ use std::sync::Arc; use tokio::sync::Mutex; // Type Definitions +/// Python binding for [`SiftStream`](sift_stream::SiftStream). +/// +/// This is a thin wrapper around the Rust `SiftStream` type. For detailed documentation, +/// see [`SiftStream`](sift_stream::SiftStream). +/// +/// The Python binding provides the same functionality as the Rust type, with methods +/// adapted for Python's async/await syntax. #[gen_stub_pyclass] #[pyclass] pub struct SiftStreamPy { inner: Arc>>>, } +/// Python binding for [`Flow`](sift_stream::Flow). +/// +/// This is a thin wrapper around the Rust `Flow` type. For detailed documentation, +/// see [`Flow`](sift_stream::Flow). +/// +/// A `Flow` represents a single telemetry message containing channel values that share +/// a common timestamp. #[gen_stub_pyclass] #[pyclass] #[derive(Clone, Debug)] diff --git a/rust/crates/sift_stream_bindings/src/stream/retry.rs b/rust/crates/sift_stream_bindings/src/stream/retry.rs index 3a151b297..093fed5d3 100644 --- a/rust/crates/sift_stream_bindings/src/stream/retry.rs +++ b/rust/crates/sift_stream_bindings/src/stream/retry.rs @@ -15,7 +15,15 @@ pub struct DurationPy { nanos: u32, } -// Pyo3 doesn't support nested enums, so we need to build RecoveryStrategy differently +/// Python binding for [`RecoveryStrategy`](sift_stream::stream::builder::RecoveryStrategy). +/// +/// This is a thin wrapper around the Rust `RecoveryStrategy` enum. For detailed documentation, +/// see [`RecoveryStrategy`](sift_stream::stream::builder::RecoveryStrategy). +/// +/// A recovery strategy defines how the stream handles errors and failures, including +/// retry policies and optional disk backups. +/// +/// Note: PyO3 doesn't support nested enums, so this is implemented as a struct wrapper. #[gen_stub_pyclass] #[pyclass] #[derive(Clone, Debug)] @@ -23,6 +31,13 @@ pub struct RecoveryStrategyPy { inner: RecoveryStrategy, } +/// Python binding for [`RetryPolicy`](sift_stream::RetryPolicy). +/// +/// This is a thin wrapper around the Rust `RetryPolicy` type. For detailed documentation, +/// see [`RetryPolicy`](sift_stream::RetryPolicy). +/// +/// A retry policy configures the retry behavior of a Sift stream, including the number +/// of attempts and exponential backoff parameters. #[gen_stub_pyclass] #[pyclass] #[derive(Clone, Debug)] @@ -37,6 +52,13 @@ pub struct RetryPolicyPy { backoff_multiplier: u8, } +/// Python binding for [`DiskBackupPolicy`](sift_stream::backup::DiskBackupPolicy). +/// +/// This is a thin wrapper around the Rust `DiskBackupPolicy` type. For detailed documentation, +/// see [`DiskBackupPolicy`](sift_stream::backup::DiskBackupPolicy). +/// +/// A disk backup policy configures how telemetry data is backed up to disk, including +/// backup directory, file size limits, and retention policies. #[gen_stub_pyclass] #[pyclass] #[derive(Clone, Debug)] diff --git a/rust/crates/sift_stream_bindings/src/stream/time.rs b/rust/crates/sift_stream_bindings/src/stream/time.rs index 21ee85f72..1391daa0a 100644 --- a/rust/crates/sift_stream_bindings/src/stream/time.rs +++ b/rust/crates/sift_stream_bindings/src/stream/time.rs @@ -3,6 +3,14 @@ use pyo3_stub_gen::derive::*; use sift_stream::stream::time::TimeValue; // Type Definitions +/// Python binding for [`TimeValue`](sift_stream::stream::time::TimeValue). +/// +/// This is a thin wrapper around the Rust `TimeValue` type. For detailed documentation, +/// see [`TimeValue`](sift_stream::stream::time::TimeValue). +/// +/// `TimeValue` represents a timestamp that can be constructed from various time +/// representations (Unix timestamps, RFC3339 strings, etc.). All times are stored +/// and transmitted as UTC. #[gen_stub_pyclass] #[pyclass] #[derive(Clone)]