# Generators and Lazy Pipelines

- You can chain generator functions to form **multi-stage data pipelines** that process items one at a time.  
- **No intermediate lists** are built, so memory stays low even for very large streams.  
- Each generator only holds its own minimal state and passes items downstream on demand.  

## Memory Efficiency

- Lazy iterables maintain only minimal state (like start, stop, step) regardless of total length.  
- Eager collections (lists, tuples) grow in memory usage as you add items.  
- Use `sys.getsizeof()` to inspect the in-memory size of objects themselves (not their contents).  

In [54]:
# 1. Ingest the log lines
# 2. Filter log lines based on either level or message substring
# 3. Extract and return only the message attribute of the logs

# The pipeline avoids loading the entire log file into memory at once.
# It processes the file line by line keeping memory usage at minimal 
# and constant regardless of the file size.

import sys
import json


def read_logs(filepath):
    """Reads the contents of a file line by line.

    Args:
        filepath (str): The path where the file is located.

    Returns:
        generator (dict(str)): The json dictionary for the log line.
    """
    with open(filepath, 'r') as file:
        for line in file:
            line = line.strip()
            if not line:
                continue
            yield json.loads(line) # yield dict object


def filter_logs(logs, level=None, message_substring=None):
    """Filters any iterable containing dictionaries by either level or message_substring (or both)

    Args:
        logs (iterable(dict)): Iterable containing the logs to be filtered.
        level (str): The log level to filter. Defaults to None.
        message_substring (str): The pattern to look for in messages to filter. Defaults to None.

    Returns:
        generator (dict(str)): The json dictionary for the filtered log.
    """
    for log in logs:
        if (
            level is not None
            and log.get("level", "").lower() != level.lower()
        ):
            continue
        
        if (
            message_substring is not None
            and message_substring.lower() not in log.get("message", "").lower()
        ):
            continue

        yield log


def extract_field(logs, field="message"):
    """Extracts a specific field from any iterable containing dictionaries.

    Args:
        logs (iterable(dict)): Iterable containing the logs to be evaluated.
        field (str): The field to return. Defaults to 'message'.

    Returns:
        generator (str): The value of the extracted field.
    """
    for log in logs:
        yield log.get(field, "").strip()


def get_first_n(logs, n=10):
    """Extracts the first n items from the provided iterable.

    Args:
        logs (iterable(T)): Iterable from which items will be extracted.
        n (int): The number of items to extract.

    Returns:
        generator (T): The item from the iterable.
    """
    count = 0
    for log in logs:
        if count >= n:
            break

        yield log
        count += 1


logs_gen = read_logs("large_logs.txt") 
filter_gen = filter_logs(logs_gen, level="INFO",  message_substring="user") 
extract_gen = extract_field(filter_gen, "message")


for log in get_first_n(extract_gen, 4):
    print(log)


print("Generator object sizes (in bytes):",
      sys.getsizeof(logs_gen),
      sys.getsizeof(filter_gen),
      sys.getsizeof(extract_gen)
     )

User user77 logged out
User user75 logged out
User user1 logged in
User user85 logged in
Generator object sizes (in bytes): 232 240 224


## Hands-on Exercise

In [62]:
def parse_transactions(data):
    print("Parsing...")
    for item in data:
        parts = item.split(':')
        yield (parts[0], int(parts[1]))
 
def filter_high_value(transactions, min_value=100):
    print("Filtering...")
    for _, amount in transactions:
        if amount >= min_value:
            yield amount
 
def apply_fees(amounts, fee_percent=10):
    print("Applying fees...")
    for amount in amounts:
        yield amount - (amount * fee_percent / 100)
 
raw_data = ["TX01:50", "TX02:200", "TX03:150", "TX04:90"]
 
pipeline = apply_fees(filter_high_value(parse_transactions(raw_data)))
 
result = list(pipeline)
print(result)


Applying fees...
Filtering...
Parsing...
[180.0, 135.0]


In [85]:

def get_login_events(all_events: list[dict]):
    for event in all_events:
        if event.get("type") == "login":
            print(f"Found login for user: {event['user_id']}")
            yield event
 
events = [
    {"user_id": 101, "type": "login"},
    {"user_id": 102, "type": "page_view"},
    {"user_id": 103, "type": "login"},
    {"user_id": 104, "type": "logout"},
    {"user_id": 105, "type": "login"},
]
 
login_gen = get_login_events(events)

# Perform an action on each login event
count = 0
for _ in login_gen:
    count += 1
    pass

print(f"Total number of login events: {count}")

# Alternatively, convert generator to list that can be reused
# login_list = list(get_login_events(events))
# print(f"Count: {len(login_list)}")



Found login for user: 101
Found login for user: 103
Found login for user: 105
Total number of login events: 3


In [90]:
def number_source(n):
    print("SOURCE: Starting")
    for i in range(n):
        print(f"SOURCE: Yielding {i}")
        yield i
    print("SOURCE: Finished")
 
def doubler(items):
    print("DOUBLER: Starting")
    for item in items:
        print(f"DOUBLER: Processing {item}")
        yield item * 2
    print("DOUBLER: Finished")
 
pipeline = doubler(number_source(2))
print("--- Getting first item ---")
print(f"Result: {next(pipeline)}")
print("--- Getting second item ---")
print(f"Result: {next(pipeline)}")
    

--- Getting first item ---
DOUBLER: Starting
SOURCE: Starting
SOURCE: Yielding 0
DOUBLER: Processing 0
Result: 0
--- Getting second item ---
SOURCE: Yielding 1
DOUBLER: Processing 1
Result: 2


In [103]:
def parse_logs(log_lines):
    for line in log_lines:
        parts = line.split()
        if len(parts) == 2:
            yield (parts[0], int(parts[1]))
 
def filter_heavy_hitters(records):
    # This function is not lazy
    results = []
    for ip, byte_count in records:
        if byte_count > 1000:
            yield ip
 
def format_for_report(ip_addresses):
    for ip in ip_addresses:
        yield f"ALERT: High traffic from {ip}"
 
# How the pipeline is used:
logs = ["1.1.1.1 500", "2.2.2.2 2500", "3.3.3.3 4000"]
records_gen = parse_logs(logs)
heavy_ips = filter_heavy_hitters(records_gen)
report_lines = format_for_report(heavy_ips)

for ips in report_lines:
    print(ips)

ALERT: High traffic from 2.2.2.2
ALERT: High traffic from 3.3.3.3
