<a href="https://colab.research.google.com/github/smandhai/Astro-Data-Handling-Tutorials/blob/main/Primer%20Workbook%20Examples.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#


# Debugging

Worked examples showing different methods that can be employed to debug your code.


## Traceback

In [None]:
#Rudiemntary debugging

def divide(a, b):
    return a / b

# Intentional error
result = divide(10, 0)

#What is the issue here?

ZeroDivisionError: division by zero

## Printing Variables


In [None]:
# Printing variable states

def divide(a, b):
    print(f"a = {a}, b = {b}")  # Simple inline debug
    return a / b

result = divide(10, 0)


a = 10, b = 0


ZeroDivisionError: division by zero

## Using variable explorers in IDEs

In [None]:
def divide(a, b):
    result = a / b  # ← set breakpoint here
    return result

a =  10
b = 0


## Local Module Testing (Unit Tests)

With this approach, you create localised tests using standalone scripts with basic, simulated input. The aim of this approach is to ensure the data input is being processed the way you think it is.


In [None]:
# In utils.py <- Utility testing script

def divide(a, b):
    return a / b

In [None]:
#In unit.py <- Unit testing script

from utils import divide

def test_divide():
    for a, b in [(10, 2), (5, 0)]:
        try:
            print(divide(a, b))
        except Exception as e:
            print(f"Error dividing {a}/{b}: {e}")

test_divide()

## Using pdb
The Python debugger (pdb) is a library included with python that allows you to launch a python debugging session in your terminal (either through python calls or ipython). It allows you to interact with variables directly through the terminal.

In [None]:
import pdb

def divide(a, b):
    pdb.set_trace()  # Pauses execution here
    return a / b

result = divide(10, 0)


## Using decorators and wrappers

Decorators allow you to alter functionality of written functions. This is useful for adding special methods to dealing with different types of errors and premitting interactive debugging.

In [None]:
import functools
import traceback
import pdb

def debug_on_error(func):
    @functools.wraps(func) #Creates a duplicate of parsed variables
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception:
            print(f"\n[DEBUG] Error in {func.__name__} with args={args}, kwargs={kwargs}")
            traceback.print_exc()   # print full traceback
            pdb.set_trace()          # drop into debugger if needed
    return wrapper

@debug_on_error
def divide(a, b):
    return a / b

divide(10, 0)


# Error Handling

Error handling in python lets you catch errors, print statements, and add exceptions to work around errors (to prevent code breakage). If you want to be a robust programmer, handling errors is crucial.

In [None]:
def safe_divide_file(filename, a, b):
    try:
        print("Opening file...")
        f = open(filename, "r")  # May raise FileNotFoundError

        print("Performing division...")
        result = a / b           # May raise ZeroDivisionError

        data = f.read()
        print(f"File contents: {data[:20]}...")  # Read first 20 chars
        return result

    except FileNotFoundError as e:
        print(f"[ERROR] File not found: {e.filename}")

    except ZeroDivisionError as e:
        print(f"[ERROR] Division by zero is not allowed: {e}")

    except Exception as e:
        print(f"[ERROR] Unexpected error occurred: {type(e).__name__}: {e}")

    finally:
        print("Cleaning up...")
        try:
            f.close()
            print("File closed.")
        except Exception:
            print("No file was opened; nothing to close.")

# ---- Test cases ----
safe_divide_file("nonexistent.txt", 10, 2)
safe_divide_file("example.txt", 10, 0)


# Parallelisation
Tired of running long for loops for repeated tasks? Parallelisation will allow you to make use of threads and processors to spread tasks over the CPU/GPU. This avoids bottlenecks from running single processes and can allow scaling across clusters and high-power computing (HPC) architecture.

In [None]:
# Function

import time
import math

def simulate(x):
    # Simulate heavy computation
    total = 0
    for i in range(10_000_00):  # adjust workload
        total += math.sin(i + x) ** 2
    return total

# Serial test - running tasks with a for loop

start = time.perf_counter()

results = []
for x in range(8):
    results.append(simulate(x))

end = time.perf_counter()
print(f"Serial: {end - start:.2f}s")

#Multithreading

from concurrent.futures import ThreadPoolExecutor

start = time.perf_counter()

with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(simulate, range(8)))

end = time.perf_counter()
print(f"ThreadPool: {end - start:.2f}s")

#Multiprocessing

from concurrent.futures import ProcessPoolExecutor

start = time.perf_counter()

with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(simulate, range(8)))

end = time.perf_counter()
print(f"ProcessPool: {end - start:.2f}s")

# Using pythonic "map" function

# Serial
start = time.perf_counter()
list(map(simulate, range(8)))
print(f"map(): {time.perf_counter() - start:.2f}s")

# Parallel (multiprocessing)
from multiprocessing import Pool

start = time.perf_counter()
with Pool(4) as p:
    p.map(simulate, range(8))
print(f"Pool.map(): {time.perf_counter() - start:.2f}s")




## Queuing

Queuing is a robust way to ensure data is read and written in the order that it is processed. It also avoids clashes with the GIL  (global interpreter lock) which can cause your code to crash if certain operations, like I/O, happen at the same time over different threads.

Queue =  Ensures tasks are added to the queue for execution

get  = Returns output and signals that the task is complete

join = Waits until the queue is emptied

Upon completion, the output can be collated to an array to give the same output as you would normally have with regular looping.


In [None]:
import threading
import queue
import time

# Create a shared queue
task_queue = queue.Queue()

# Worker function
def worker():
    while True:
        try:
            item = task_queue.get(timeout=1)   # Get a task
        except queue.Empty:
            break
        print(f"{threading.current_thread().name} processing item {item}")
        time.sleep(1)  # Simulate I/O work
        task_queue.task_done()

# Populate queue with tasks
for i in range(5):
    task_queue.put(i)

# Start worker threads
threads = []
for _ in range(3):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

# Wait for all tasks to finish
task_queue.join()

print("All tasks completed.")


## Message Passing Interface (MPI)

MPI can be used to launch sub-processes across a multitude of cores allowing large data analysis/simulations to be conducted over multiple nodes/computers.

In [None]:
#Deploying across clusters - using MPI (Message Passing Interface):

from mpi4py import MPI
import time, math

def simulate(x):
    total = 0
    for i in range(10_000_00):
        total += math.sin(i + x) ** 2
    return total

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

start = time.perf_counter()
local_result = simulate(rank)
results = comm.gather(local_result, root=0)
end = time.perf_counter()

if rank == 0:
    print(f"MPI with {size} ranks: {end - start:.2f}s, results={results}")