In [19]:
pip install requests

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.1.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [20]:

import requests
import sys
import time
import os
from datetime import datetime


In [21]:
Primary_HOST = "http://" + os.getenv('RPI4_IP','192.168.0.2') + ":82"
Secondary_HOST = "http://" + os.getenv('RPI0_IP','192.168.0.3') + ":82"
Primary_PASSWORD = os.getenv('APP_PASSWORD') or os.getenv('PIHOLE_WEBPASSWORD', 'AlohaAdmin!1')
Secondary_PASSWORD = os.getenv('APP_PASSWORD') or os.getenv('PIHOLE_WEBPASSWORD', 'AlohaAdmin!1')
CHECK_INTERVAL = os.getenv('CHECK_INTERVAL', '300')

In [22]:
def log(message):
    """Print timestamped log message"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] {message}")

In [23]:
def auth_pihole(base_url, password):
    if not password:
        log(f"ERROR: No password provided for {base_url}")
        return None
    
    try:
        log(f"Logging into {base_url}.")
        response = requests.post(
            f"{base_url}/api/auth",
            json={"password": password},
            timeout=10
        )
        response.raise_for_status()
        data = response.json()
        
        if "session" in data:
            log(f"Logged into {base_url}.")
            return {
                "base_url":base_url,
                "sid": data["session"]["sid"],
                "csrf": data["session"]["csrf"],
            }
        else:
            log(f"ERROR: Authentication failed for {base_url}")
            return None
    except requests.exceptions.RequestException as e:
        log(f"ERROR: Could not connect to {base_url}: {e}")
        return False
    

In [24]:
def deauth_pihole(session):
    if not session:
        log("DEBUG: No session to logout from")
        return
    
    try:
        log(f"Logging out from {session['base_url']}...")
        response = requests.delete(
            f"{session['base_url']}/api/auth",
            headers={
                "X-FTL-SID": session["sid"],
                "X-FTL-CSRF": session["csrf"]
            },
            timeout=10
        )
        response.raise_for_status()
        log(f"Logged out from {session['base_url']}")
        return True
    except requests.exceptions.RequestException as e:
        log(f"WARNING: Could not logout from {session['base_url']}: {e}")
        return False

In [25]:
def get_dhcp_status(session):
    try:
        response = requests.get(
            f"{session['base_url']}/api/config",
            headers={
                "X-FTL-SID": session["sid"],
                "X-FTL-CSRF": session["csrf"]
            },
            timeout=10
        )
        response.raise_for_status()
        data = response.json()
        
        if "config" in data and "dhcp" in data["config"]:
            return data["config"]["dhcp"].get("active", False)
        return False
    except requests.exceptions.RequestException as e:
        log(f"ERROR: Could not get DHCP status from {session['base_url']}: {e}")
        return None

In [26]:
def set_dhcp_status(session, active):
    ## Active is boolean true or false
    try:
        response = requests.patch(
            f"{session['base_url']}/api/config",
            headers={
                "X-FTL-SID": session["sid"],
                "X-FTL-CSRF": session["csrf"],
                "Content-Type": "application/json"
            },
            json={
                "config": {
                    "dhcp": {
                        "active": active
                    }
                }
            },
            timeout=10
        )
        response.raise_for_status()
        data = response.json()
        
        if "error" in data:
            log(f"ERROR: Failed to set DHCP on {session['base_url']}: {data['error']}")
            return False
        return True
    except requests.exceptions.RequestException as e:
        log(f"ERROR: Could not set DHCP status on {session['base_url']}: {e}")
        return False

In [27]:
def Check_and_Set_DHCP(host, password, active):
    ## active is a boolean passed into set_dhcp_Status(), determining failover or failback
    session = auth_pihole(host,password)
    if session == False:
        log(f"Unable to reach {session['base_url']}.")
    else:
        if get_dhcp_status(session) == None:
            log(f"Unable to see if DHCP running on {session['base_url']}")
            if set_dhcp_status(session, active) == False:
                log(f"Tried making DHCP active:{active} on {session['base_url']} unsuccessfully.")
                return False
            else:
                log(f"Made DHCP active:{active} on {session['base_url']} successfully.")
                return True
        if get_dhcp_status(session) == active:
            log(f"DHCP already active:{active} on {session['base_url']}.")
            return True
        if get_dhcp_status(session) != active:
            if set_dhcp_status(session, active) == False:
                log(f"Tried making DHCP active:{active} on {session['base_url']} unsuccessfully.")
                return False
            else:
                log(f"Made DHCP active:{active} on {session['base_url']} successfully.")
                return True
    deauth_pihole(session)
 

In [35]:
def updater():
    if Check_and_Set_DHCP(Primary_HOST,Primary_PASSWORD, True) == False: #Primary
        if Check_and_Set_DHCP(Secondary_HOST,Secondary_PASSWORD, True) == False: #Secondary
            log(f"Unable to Enable Primary DHCP or Secondary DHCP")
            return 0
        if Check_and_Set_DHCP(Secondary_HOST,Secondary_PASSWORD, True) == True:
            log(f"Unable to Enable Primary DHCP, Secondary DHCP enabled.") 
            return 1
    if Check_and_Set_DHCP(Primary_HOST,Primary_PASSWORD, True) == True:
        if Check_and_Set_DHCP(Secondary_HOST,Secondary_PASSWORD, False) == True:
            log("Primary DHCP Enabled, Secondary DHCP Disabled.")
            return 1
        if Check_and_Set_DHCP(Secondary_HOST,Secondary_PASSWORD, False) == False:
            log("Primary DHCP Enabled, Unable to confirm status of secondary.")                          
            return 1




In [36]:
updater()

[2025-11-01 15:00:27] Logging into http://192.168.0.2:82.
[2025-11-01 15:00:27] Logged into http://192.168.0.2:82.
[2025-11-01 15:00:27] DHCP already active:True on http://192.168.0.2:82.
[2025-11-01 15:00:27] Logging into http://192.168.0.2:82.
[2025-11-01 15:00:28] Logged into http://192.168.0.2:82.
[2025-11-01 15:00:28] DHCP already active:True on http://192.168.0.2:82.
[2025-11-01 15:00:28] Logging into http://192.168.0.3:82.
[2025-11-01 15:00:30] ERROR: Could not connect to http://192.168.0.3:82: HTTPConnectionPool(host='192.168.0.3', port=82): Max retries exceeded with url: /api/auth (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000001302265EAD0>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))


TypeError: 'bool' object is not subscriptable

In [31]:
def main():
    """Initialize and start monitoring loop"""
    log("Starting Pi-hole DHCP failover monitor...")
    
    # Validate configuration
    if not Primary_HOST or not Secondary_HOST:
        log("CRITICAL: Pi-hole IP addresses must be set!")
        log("Set either Primary_IP or Secondary_IP environment variables")
        log(f"Current values: Primary_HOST={Primary_HOST}, Secondary_HOST={Secondary_HOST}")
        sys.exit(1)
    
    if not Primary_HOST or not Secondary_PASSWORD:
        log("WARNING: Passwords not set. Using empty passwords.")
    
    log(f"Configuration:")
    log(f"  Primary: {Primary_HOST}")
    log(f"  Secondary: {Secondary_HOST}")
    log(f"  Check interval: {CHECK_INTERVAL} seconds")
    
    # Main monitoring loop
    while True:
        try:
            exit_code = updater()
            if exit_code != 0:
                log(f"Check completed with exit code {exit_code}")
        except Exception as e:
            log(f"ERROR during check: {e}")
        
        log(f"Waiting {CHECK_INTERVAL} seconds until next check...")
        time.sleep(int(CHECK_INTERVAL))

    

In [30]:
if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        log("Interrupted by user - shutting down")
        sys.exit(0)

[2025-11-01 14:50:33] Starting Pi-hole DHCP failover monitor...
[2025-11-01 14:50:33] Configuration:
[2025-11-01 14:50:33]   Primary: http://192.168.0.2:82
[2025-11-01 14:50:33]   Secondary: http://192.168.0.3:82
[2025-11-01 14:50:33]   Check interval: 300 seconds
[2025-11-01 14:50:33] Logging into http://192.168.0.2:82.
[2025-11-01 14:50:33] Logged into http://192.168.0.2:82.
[2025-11-01 14:50:33] DHCP already active:True on http://192.168.0.2:82.
[2025-11-01 14:50:33] Logging into http://192.168.0.2:82.
[2025-11-01 14:50:34] Logged into http://192.168.0.2:82.
[2025-11-01 14:50:34] DHCP already active:True on http://192.168.0.2:82.
[2025-11-01 14:50:34] Logging into http://192.168.0.3:82.
[2025-11-01 14:50:36] Logged into http://192.168.0.3:82.
[2025-11-01 14:50:36] DHCP already active:False on http://192.168.0.3:82.
[2025-11-01 14:50:36] Primary DHCP Enabled, Secondary DHCP Disabled.
[2025-11-01 14:50:36] Logging into http://192.168.0.3:82.
[2025-11-01 14:50:38] Logged into http://19

TypeError: 'str' object cannot be interpreted as an integer