In [None]:
"""
BACnet Device Discovery and Control System
==========================================

This script demonstrates how to:
1. Connect to a BACnet network
2. Discover BACnet devices on the network
3. Read object properties from discovered devices
4. Write values to writable objects

Requirements:
- pip install BAC0
- pip install netifaces
"""

import BAC0
import asyncio

# ============================================================================
# BACnet Network Connection and Device Discovery
# ============================================================================

def initialize_bacnet_connection(ip_address="192.168.1.9/24"):
    """
    Initialize BACnet connection with specified IP address and subnet mask.
    
    Args:
        ip_address (str): IP address with subnet mask (e.g., "192.168.1.9/24")
    
    Returns:
        BAC0 connection object
    """
    print(f"Initializing BACnet connection on {ip_address}")
    bacnet = BAC0.lite(ip=ip_address)
    return bacnet

async def discover_bacnet_devices(bacnet):
    """
    Discover BACnet devices on the network using Who-Is request.
    
    Args:
        bacnet: BAC0 connection object
    
    Returns:
        List of discovered devices
    """
    print("Discovering BACnet devices on the network...")
    
    try:
        # Send Who-Is request to discover devices
        devices = await bacnet.who_is()
        
        if devices:
            print(f"Found {len(devices)} device(s):")
            for i, device in enumerate(devices):
                device_id = device.iAmDeviceIdentifier[1]  # Extract device ID
                device_source = device.pduSource  # Extract source address
                print(f"  Device {i+1}: ID={device_id}, Source={device_source}")
            return devices
        else:
            print("No devices found on the network.")
            return []
            
    except Exception as e:
        print(f"Error during device discovery: {e}")
        return []

# ============================================================================
# BACnet Object Reading Functions
# ============================================================================

async def read_object_property(bacnet, device_address, object_type, object_instance, property_name):
    """
    Read a specific property from a BACnet object.
    
    Args:
        bacnet: BAC0 connection object
        device_address (str): Device IP address and port (e.g., "192.168.1.9:62515")
        object_type (str): BACnet object type (e.g., "analogInput", "analogValue")
        object_instance (int): Object instance number
        property_name (str): Property to read (e.g., "presentValue", "objectName")
    
    Returns:
        Property value or None if error
    """
    try:
        request = f"{device_address} {object_type} {object_instance} {property_name}"
        result = await bacnet.read(request)
        return result
    except Exception as e:
        print(f"Error reading {property_name} from {object_type} {object_instance}: {e}")
        return None

async def read_object_details(bacnet, device_address, object_type, object_instance):
    """
    Read both object name and present value for a BACnet object.
    
    Args:
        bacnet: BAC0 connection object
        device_address (str): Device IP address and port
        object_type (str): BACnet object type
        object_instance (int): Object instance number
    """
    print(f"\nReading {object_type} {object_instance}:")
    
    # Read object name
    object_name = await read_object_property(bacnet, device_address, object_type, object_instance, "objectName")
    
    # Read present value
    present_value = await read_object_property(bacnet, device_address, object_type, object_instance, "presentValue")
    
    print(f"  Object Name: {object_name}")
    print(f"  Present Value: {present_value}")
    
    return object_name, present_value

# ============================================================================
# BACnet Object Writing Functions
# ============================================================================

async def write_object_value(bacnet, device_address, object_type, object_instance, value):
    """
    Write a value to a BACnet object's presentValue property.
    
    Args:
        bacnet: BAC0 connection object
        device_address (str): Device IP address and port
        object_type (str): BACnet object type
        object_instance (int): Object instance number
        value: Value to write
    
    Returns:
        True if successful, False otherwise
    """
    try:
        request = f"{device_address} {object_type} {object_instance} presentValue {value}"
        result = await bacnet.write(request)
        print(f"Successfully wrote value {value} to {object_type} {object_instance}")
        return True
    except Exception as e:
        print(f"Error writing value {value} to {object_type} {object_instance}: {e}")
        return False

# ============================================================================
# Main Application Functions
# ============================================================================

async def main_discovery_and_read():
    """
    Main function to demonstrate device discovery and reading operations.
    """
    # Initialize BACnet connection
    bacnet = initialize_bacnet_connection("192.168.1.9/24")
    
    # Discover devices on the network
    devices = await discover_bacnet_devices(bacnet)
    
    if not devices:
        print("No devices found. Exiting.")
        return
    
    # Example device address (adjust based on your network)
    device_address = "192.168.1.9:62515"
    
    print(f"\nReading objects from device: {device_address}")
    print("=" * 50)
    
    # Read various object types and instances
    await read_object_details(bacnet, device_address, "analogInput", 0)
    await read_object_details(bacnet, device_address, "analogInput", 1)
    await read_object_details(bacnet, device_address, "analogValue", 0)
    await read_object_details(bacnet, device_address, "analogValue", 1)
    await read_object_details(bacnet, device_address, "binaryValue", 0)
    await read_object_details(bacnet, device_address, "binaryValue", 1)

async def main_write_operations():
    """
    Main function to demonstrate writing operations to BACnet objects.
    """
    # Initialize BACnet connection
    bacnet = initialize_bacnet_connection("192.168.1.9/24")
    
    device_address = "192.168.1.9:62515"
    
    print(f"Writing values to device: {device_address}")
    print("=" * 50)
    
    # Write values to analog value objects
    await write_object_value(bacnet, device_address, "analogValue", 0, 25)
    await write_object_value(bacnet, device_address, "analogValue", 1, 25)
    
    # Verify the written values by reading them back
    print("\nVerifying written values:")
    await read_object_details(bacnet, device_address, "analogValue", 0)
    await read_object_details(bacnet, device_address, "analogValue", 1)

# ============================================================================
# Example Usage
# ============================================================================

async def complete_example():
    """
    Complete example demonstrating all functionality.
    """
    print("BACnet Device Discovery and Control System")
    print("=" * 50)
    
    # Initialize connection
    bacnet = initialize_bacnet_connection("192.168.1.9/24")
    
    # Step 1: Discover devices
    devices = await discover_bacnet_devices(bacnet)
    
    if not devices:
        return
    
    device_address = "192.168.1.9:62515"
    
    # Step 2: Read current values
    print(f"\nStep 2: Reading current values from {device_address}")
    print("-" * 50)
    await read_object_details(bacnet, device_address, "analogValue", 0)
    await read_object_details(bacnet, device_address, "analogValue", 1)
    
    # Step 3: Write new values
    print(f"\nStep 3: Writing new values to {device_address}")
    print("-" * 50)
    await write_object_value(bacnet, device_address, "analogValue", 0, 30)
    await write_object_value(bacnet, device_address, "analogValue", 1, 35)
    
    # Step 4: Verify written values
    print(f"\nStep 4: Verifying written values")
    print("-" * 50)
    await read_object_details(bacnet, device_address, "analogValue", 0)
    await read_object_details(bacnet, device_address, "analogValue", 1)

# ============================================================================
# Script Execution
# ============================================================================

if __name__ == "__main__":
    # Run the complete example
    # Note: In Jupyter Notebook, use: await complete_example()
    asyncio.run(complete_example())