Creating a comprehensive example of a "SSRAI with HPC CI in real-time" capstone project in Python involves several steps. This project will include real-time data acquisition, high-performance computing (HPC) for computational tasks, and continuous integration (CI) practices.

Here’s a detailed outline of the project:

Real-Time Data Acquisition: This component involves collecting real-time data, which could be from sensors, APIs, or other streaming sources.
High-Performance Computing (HPC): Utilize HPC to perform complex computations or data processing tasks.
Continuous Integration (CI): Implement CI practices to ensure that code changes are automatically tested and deployed.

Step 1: Real-Time Data Acquisition

We'll use a mock example where we simulate real-time data acquisition from a sensor.

In [None]:
import time
import random

def acquire_data():
    """Simulate real-time data acquisition from a sensor."""
    while True:
        data = random.random()  # Simulate a sensor reading
        yield data
        time.sleep(1)  # Simulate a 1-second interval between readings

# Example usage
for data in acquire_data():
    print(f"Acquired data: {data}")
    # In a real scenario, you would send this data to a processing unit


Step 2: High-Performance Computing (HPC)

We'll use a mock computation task that leverages multiprocessing to simulate an HPC environment.

In [None]:
import multiprocessing as mp

def complex_computation(data):
    """Simulate a complex computation task."""
    return data ** 2

def process_data(data_queue, result_queue):
    while True:
        data = data_queue.get()
        if data is None:
            break
        result = complex_computation(data)
        result_queue.put(result)

# Setting up the multiprocessing environment
data_queue = mp.Queue()
result_queue = mp.Queue()
num_processes = 4

processes = [mp.Process(target=process_data, args=(data_queue, result_queue)) for _ in range(num_processes)]
for p in processes:
    p.start()

# Example usage
for data in acquire_data():
    data_queue.put(data)
    result = result_queue.get()
    print(f"Processed result: {result}")
    # Stop the loop after a certain number of iterations for demonstration purposes
    if random.random() > 0.95:
        break

# Terminate processes
for _ in range(num_processes):
    data_queue.put(None)
for p in processes:
    p.join()


Step 3: Continuous Integration (CI)

We'll use pytest for testing and GitHub Actions for CI. Here's a simplified example.

Testing with pytest
First, install pytest:

In [None]:
pip install pytest


Create a test file test_computation.py:

In [None]:
from your_module import complex_computation

def test_complex_computation():
    assert complex_computation(2) == 4
    assert complex_computation(3) == 9


Run the tests:

In [None]:
pytest test_computation.py


GitHub Actions Workflow
Create a .github/workflows/ci.yml file in your repository:

In [None]:
name: CI

on:
  push:
    branches:
      - main
  pull_request:
    branches:
      - main

jobs:
  test:
    runs-on: ubuntu-latest

    steps:
    - name: Checkout code
      uses: actions/checkout@v2

    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.x'

    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install pytest

    - name: Run tests
      run: |
        pytest test_computation.py


Bringing It All Together
This capstone project will involve the following:

Real-Time Data Acquisition: Acquiring data from a simulated sensor.
High-Performance Computing: Using multiprocessing to process data.
Continuous Integration: Setting up automated testing and deployment with GitHub Actions.
Here is a comprehensive Python script that ties these components together:

In [None]:
import time
import random
import multiprocessing as mp

def acquire_data():
    """Simulate real-time data acquisition from a sensor."""
    while True:
        data = random.random()  # Simulate a sensor reading
        yield data
        time.sleep(1)  # Simulate a 1-second interval between readings

def complex_computation(data):
    """Simulate a complex computation task."""
    return data ** 2

def process_data(data_queue, result_queue):
    while True:
        data = data_queue.get()
        if data is None:
            break
        result = complex_computation(data)
        result_queue.put(result)

if __name__ == "__main__":
    data_queue = mp.Queue()
    result_queue = mp.Queue()
    num_processes = 4

    processes = [mp.Process(target=process_data, args=(data_queue, result_queue)) for _ in range(num_processes)]
    for p in processes:
        p.start()

    for data in acquire_data():
        data_queue.put(data)
        result = result_queue.get()
        print(f"Processed result: {result}")
        if random.random() > 0.95:
            break

    for _ in range(num_processes):
        data_queue.put(None)
    for p in processes:
        p.join()
