### Template strings

- Python 3.14 introduces t-strings, which you can treat as an extension of the usual f-string

- You can see that a tstring returns a `Template` type, while an fstring just returns a string

In [4]:
name = 'yj'
tstring = t"hello {name}"
fstring = f"hello {name}"

type(tstring), type(fstring)

(string.templatelib.Template, str)

- The great thing about the `Template` is that you can sanitize inputs very easily, which avoids issues like SQL injection

- Since th return value is a `Template`, you can easily cast it to anything else. For example, can return a `Prompt` in Langgraph

In [9]:
from string.templatelib import Interpolation

amended_string=[]
for element in tstring:
    if isinstance(element, Interpolation):
        amended_string.append(str(element.value).upper())
    else:
        amended_string.append(element)

''.join(amended_string)


'hello YJ'

### Deferred Evaluation of Annocation, and `annotationlib`

- In previous iterations of Python, type hints are evaluated in order at runtime. So if you had functions with some class defined in the signature, with the class defined later in your code, you end up needing to use strings as the type annotation instead, or you get a runtime error

- Now, type evaluation is deferred, so you no longer get an error

In [None]:
from dataclasses import dataclass

## No longer get runtime errors for Input1 and Output1
def somefunc(input1: Input1) -> Output1:
    pass

@dataclass
class Input1:
    attr1: str
    attr2: float

@dataclass
class Output1:
    output1: float
    output2: list[float]


### Multiple Interpreters + InterpreterPoolExecutor

- Previously, multiprocessing in python requires spawning a bunch of different processes

- While each process has their own GIL, making parallel processing feasible, this is problematic because different processes do not share memory. So if you need to share data, SERDE is needed

- In 3.14, multiprocessing is now enabled within the same process via sub-interpreters. Shared data is possible, but it also means that if the main interpreter crashes, the subinterpreters also dies

In [1]:
from concurrent import interpreters

interpreter = interpreters.create()
interpreter.exec('''print("hello from interpreter")''')

def square(n):
    return n*n

print(interpreter.call(square, 5))

hello from interpreter
25


In [2]:
from concurrent.futures import InterpreterPoolExecutor, ThreadPoolExecutor, as_completed
import time
import math

def compute_factorial_concurrent():
    start=time.perf_counter()
    res = []

    with InterpreterPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(math.factorial, i) for i in range(10000,15000)]

        for future in as_completed(futures):
            res.append(future.result())

    end=time.perf_counter()
    return end-start, res

def compute_factorial_threaded():
    start=time.perf_counter()
    res = []

    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(math.factorial, i) for i in range(10000,15000)]

        for future in as_completed(futures):
            res.append(future.result())

    end=time.perf_counter()
    return end-start, res

def compute_factorial_seqeuntial():
    start=time.perf_counter()
    res = [math.factorial(i) for i in range(10000, 15000)]
    end=time.perf_counter()
    return end-start, res


ctime, cres = compute_factorial_concurrent()
ttime, tres = compute_factorial_threaded()
stime, sres = compute_factorial_seqeuntial()

print(ctime, ttime, stime)

5.928771208011312 24.111683625000296 20.142291500000283


In [None]:
### Hybrid Approach: Subinterpreters + Multithreading

# Combining subinterpreters for CPU-bound tasks with threading for I/O-bound tasks
from concurrent.futures import InterpreterPoolExecutor, ThreadPoolExecutor, as_completed
import time
import math
import requests
import threading
from concurrent import interpreters

def cpu_intensive_task(n):
    """CPU-bound task that benefits from subinterpreters"""
    return sum(math.factorial(i) for i in range(n, n+10))

def io_intensive_task(url):
    """I/O-bound task that benefits from threading"""
    try:
        response = requests.get(url, timeout=5)
        return f"Status: {response.status_code}, Length: {len(response.content)}"
    except:
        return "Request failed"

def hybrid_worker(cpu_tasks, io_tasks):
    """Worker function that runs in a subinterpreter and uses threading for I/O"""
    results = {}
    
    # Use threading within the subinterpreter for I/O tasks
    with ThreadPoolExecutor(max_workers=2) as thread_executor:
        # Submit I/O tasks to threads
        io_futures = {thread_executor.submit(io_intensive_task, url): url for url in io_tasks}
        
        # Process CPU tasks in the main thread of this subinterpreter
        for task_id, n in cpu_tasks:
            results[f"cpu_{task_id}"] = cpu_intensive_task(n)
        
        # Collect I/O results
        for future in as_completed(io_futures):
            url = io_futures[future]
            results[f"io_{url}"] = future.result()
    
    return results

# Example usage
cpu_workload = [(i, 100 + i*5) for i in range(10)]  # 10 CPU tasks
io_workload = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2", 
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2"
]  # 4 I/O tasks

print("Running hybrid approach...")
start = time.perf_counter()

# Split work across subinterpreters
subinterpreter_count = 3
cpu_chunks = [cpu_workload[i::subinterpreter_count] for i in range(subinterpreter_count)]
io_chunks = [io_workload[i::subinterpreter_count] for i in range(subinterpreter_count)]

with InterpreterPoolExecutor(max_workers=subinterpreter_count) as executor:
    futures = [
        executor.submit(hybrid_worker, cpu_chunks[i], io_chunks[i]) 
        for i in range(subinterpreter_count)
    ]
    
    all_results = {}
    for future in as_completed(futures):
        all_results.update(future.result())

end = time.perf_counter()
print(f"Hybrid execution time: {end - start:.4f} seconds")
print(f"Total results collected: {len(all_results)}")


In [None]:
### Advanced Hybrid Patterns

# Pattern 1: Pipeline approach - subinterpreters for processing, threads for I/O
def pipeline_worker(data_batch):
    """Process data in subinterpreter, use threads for I/O operations"""
    processed_data = []
    
    # CPU-intensive processing in main thread of subinterpreter
    for item in data_batch:
        processed = math.factorial(item) + sum(range(item))
        processed_data.append(processed)
    
    # I/O operations using threads within subinterpreter
    with ThreadPoolExecutor(max_workers=3) as executor:
        # Simulate saving processed data (I/O bound)
        save_futures = [
            executor.submit(simulate_save, f"result_{i}.txt", data) 
            for i, data in enumerate(processed_data)
        ]
        
        # Wait for all saves to complete
        save_results = [future.result() for future in as_completed(save_futures)]
    
    return len(processed_data), save_results

def simulate_save(filename, data):
    """Simulate file I/O operation"""
    time.sleep(0.1)  # Simulate I/O delay
    return f"Saved {filename} with data length {len(str(data))}"

# Pattern 2: Producer-consumer with subinterpreters and threads
def producer_consumer_worker(queue_size=5):
    """Producer-consumer pattern using threads within subinterpreter"""
    import queue
    import threading
    
    result_queue = queue.Queue(maxsize=queue_size)
    results = []
    
    def producer():
        """Produce CPU-intensive results"""
        for i in range(10):
            result = math.factorial(50 + i)
            result_queue.put(f"factorial_{i}: {result}")
    
    def consumer():
        """Consume results and perform I/O operations"""
        while True:
            try:
                item = result_queue.get(timeout=1)
                # Simulate I/O operation on consumed item
                time.sleep(0.05)
                results.append(f"processed_{item}")
                result_queue.task_done()
            except queue.Empty:
                break
    
    # Use threads within subinterpreter
    with ThreadPoolExecutor(max_workers=2) as executor:
        producer_future = executor.submit(producer)
        consumer_future = executor.submit(consumer)
        
        producer_future.result()
        consumer_future.result()
    
    return results

# Pattern 3: Nested parallelism - subinterpreters containing thread pools
def nested_parallel_worker(work_items):
    """Each subinterpreter manages its own thread pool for different task types"""
    cpu_results = []
    io_results = []
    
    # Separate thread pools for different task types within subinterpreter
    with ThreadPoolExecutor(max_workers=2) as cpu_executor, \
         ThreadPoolExecutor(max_workers=2) as io_executor:
        
        # Submit CPU tasks
        cpu_futures = [
            cpu_executor.submit(math.factorial, item) 
            for item in work_items if item % 2 == 0
        ]
        
        # Submit I/O tasks
        io_futures = [
            io_executor.submit(simulate_network_call, item) 
            for item in work_items if item % 2 == 1
        ]
        
        # Collect results
        cpu_results = [future.result() for future in as_completed(cpu_futures)]
        io_results = [future.result() for future in as_completed(io_futures)]
    
    return {"cpu": cpu_results, "io": io_results}

def simulate_network_call(item):
    """Simulate network I/O"""
    time.sleep(0.1)
    return f"network_result_{item}"

# Run different patterns
print("\\n=== Pattern 1: Pipeline Approach ===")
start = time.perf_counter()
with InterpreterPoolExecutor(max_workers=2) as executor:
    futures = [
        executor.submit(pipeline_worker, list(range(100 + i*10, 110 + i*10)))
        for i in range(3)
    ]
    pipeline_results = [future.result() for future in as_completed(futures)]
end = time.perf_counter()
print(f"Pipeline execution time: {end - start:.4f} seconds")

print("\\n=== Pattern 2: Producer-Consumer ===")
start = time.perf_counter()
with InterpreterPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(producer_consumer_worker) for _ in range(2)]
    pc_results = [future.result() for future in as_completed(futures)]
end = time.perf_counter()
print(f"Producer-consumer execution time: {end - start:.4f} seconds")

print("\\n=== Pattern 3: Nested Parallelism ===")
start = time.perf_counter()
with InterpreterPoolExecutor(max_workers=2) as executor:
    futures = [
        executor.submit(nested_parallel_worker, list(range(10 + i*5, 20 + i*5)))
        for i in range(3)
    ]
    nested_results = [future.result() for future in as_completed(futures)]
end = time.perf_counter()
print(f"Nested parallelism execution time: {end - start:.4f} seconds")


In [None]:
### When to Use Hybrid Approach

"""
Key scenarios where combining subinterpreters + threading is beneficial:

1. **Mixed Workloads**: When you have both CPU-intensive and I/O-intensive tasks
2. **Resource Optimization**: Maximize CPU cores for computation while handling I/O efficiently
3. **Pipeline Processing**: Process data in subinterpreters, save results using threads
4. **Real-time Systems**: CPU processing in subinterpreters, async I/O in threads
5. **Data Processing**: Transform data in subinterpreters, stream to databases via threads
"""

# Practical example: Data processing pipeline
def data_processing_pipeline(data_chunks):
    """Real-world example: Process data chunks with mixed CPU/I/O operations"""
    processed_chunks = []
    
    for chunk in data_chunks:
        # CPU-intensive: Data transformation and analysis
        transformed_data = []
        for item in chunk:
            # Complex mathematical operations
            result = {
                'original': item,
                'factorial': math.factorial(min(item, 20)),  # Cap to avoid overflow
                'sum_squares': sum(i*i for i in range(item)),
                'processed_at': time.time()
            }
            transformed_data.append(result)
        
        # I/O-intensive: Save processed data using threads
        with ThreadPoolExecutor(max_workers=3) as io_executor:
            save_futures = [
                io_executor.submit(save_to_database, f"chunk_{i}", data)
                for i, data in enumerate(transformed_data)
            ]
            
            # Wait for all saves to complete
            save_results = [future.result() for future in as_completed(save_futures)]
        
        processed_chunks.append({
            'chunk_size': len(transformed_data),
            'saves_completed': len(save_results)
        })
    
    return processed_chunks

def save_to_database(key, data):
    """Simulate database save operation"""
    time.sleep(0.05)  # Simulate database I/O
    return f"Saved {key} with {len(str(data))} characters"

# Performance comparison: Hybrid vs Pure approaches
def benchmark_approaches():
    """Compare different approaches for mixed workloads"""
    data = [list(range(100 + i*20, 120 + i*20)) for i in range(5)]
    
    print("\\n=== Performance Comparison ===")
    
    # Approach 1: Pure subinterpreters (CPU tasks only)
    start = time.perf_counter()
    with InterpreterPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(lambda chunk: [math.factorial(min(x, 15)) for x in chunk], chunk) for chunk in data]
        pure_subinterpreter_results = [future.result() for future in as_completed(futures)]
    pure_subinterpreter_time = time.perf_counter() - start
    
    # Approach 2: Pure threading (limited by GIL for CPU tasks)
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(lambda chunk: [math.factorial(min(x, 15)) for x in chunk], chunk) for chunk in data]
        pure_threading_results = [future.result() for future in as_completed(futures)]
    pure_threading_time = time.perf_counter() - start
    
    # Approach 3: Hybrid (subinterpreters + threading)
    start = time.perf_counter()
    with InterpreterPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(data_processing_pipeline, [chunk]) for chunk in data]
        hybrid_results = [future.result() for future in as_completed(futures)]
    hybrid_time = time.perf_counter() - start
    
    print(f"Pure Subinterpreters: {pure_subinterpreter_time:.4f}s")
    print(f"Pure Threading: {pure_threading_time:.4f}s") 
    print(f"Hybrid Approach: {hybrid_time:.4f}s")
    print(f"\\nHybrid vs Pure Subinterpreters: {pure_subinterpreter_time/hybrid_time:.2f}x")
    print(f"Hybrid vs Pure Threading: {pure_threading_time/hybrid_time:.2f}x")

# Run benchmark
benchmark_approaches()


### Free-threaded Python

- Not only can we have sub-interpreters to do multiprocessing, 3.14 allows you to compile python without the GIL. This is known as free threaded python

- There is a difference between free-threaded multithreading vs multi processing via sub-interpreters
    - Free-threaded multithreading:
        - All threads can run python bytecode at the same time
        - All threads share memory
        - BUT you are responsible for handling race conditions vs locks and mutex
    - Multiprocessing with sub-interpreter
        - Every subinterpreter runs in the same process, and can execute code in parallel 
        - BUT Every subinterpreter is more or less isolated, and sharing memory will require you to used some specific mechanisms (queues, shared memory etc)

- If you rerun the benchmark in previous section with free threaded python, the time taken for subinterpreter and multithreading is ~the same

### asyncio CLI

- You can use a CLI tool to look at asyncio tasks

- Assume you have a python process running and awaiting; you can use the call below to see the asyncio task status
    - `sudo .venv/bin/python  -m  asyncio ps 12345`
    - `sudo .venv/bin/python  -m  asyncio pstree 12345` 

- The process value can be found by running `os.getpid()`

### Compression algorithms combined into 1 package

In [19]:
from compression import zlib, bz2, lzma, gzip

### 