<a href="https://colab.research.google.com/github/thinkaboutcode/cheatsheets/blob/main/python_playground.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Observer Pattern

In [33]:
# The Subject class
class Subject:
    def __init__(self):
        self._observers = []  # List of observers

    def attach(self, observer):
        """Attach an observer to the subject."""
        if observer not in self._observers:
            self._observers.append(observer)

    def detach(self, observer):
        """Detach an observer from the subject."""
        if observer in self._observers:
            self._observers.remove(observer)

    def notify(self):
        """Notify all observers about a state change."""
        for observer in self._observers:
            observer.update(self)


# The Observer interface
class Observer:
    def update(self, subject):
        """Receive update from the subject."""
        raise NotImplementedError("Subclasses must implement 'update' method.")


# Concrete Subject
class MessageBroker(Subject):
    def __init__(self):
        super().__init__()
        self._message = None

    @property
    def message(self):
        return self._message

    @message.setter
    def message(self, value):
        self._message = value
        self.notify()  # Notify observers when temperature changes


class SubscriberA(Observer):
    def update(self, subject):
        if isinstance(subject, MessageBroker):
            print(f"SubscriberA received message: {subject.message}")


class SubscriberB(Observer):
    def update(self, subject):
        if isinstance(subject, MessageBroker):
            print(f"SubscriberB received message: {subject.message}")


broker = MessageBroker()
subscriber_a = SubscriberA()
subscriber_b = SubscriberB()

broker.attach(subscriber_a)
broker.attach(subscriber_b)

broker.message = "Hello, subscribers!"

SubscriberA received message: Hello, subscribers!
SubscriberB received message: Hello, subscribers!


# Decorator

In [46]:
def logging_decorator(orig_func):

  def wrapper(*args):
    print("before")
    for arg in args:
      print(arg)

    orig_func()

    print("after")

  return wrapper

@logging_decorator
def display(*args):
  print("display function ran")

display('a', 'b')

before
a
b
display function ran
after


# Generator

In [50]:
def infinite_sequence():
    n = 1
    while True:
        yield n  # Generate the next value
        n += 1


gen = infinite_sequence()
print(next(gen))
print(next(gen))
print('doing other stuff')
print(next(gen))
print(next(gen))


1
2
doing other stuff
3
4


In [51]:
def read_data():
    """Simulate reading raw data from a file or source."""
    data = [
        "Alice, alice@example.com, 30",
        "Bob, bob@example, -5",          # Invalid email and age
        "Charlie, , 25",                # Missing email
        "Diana, diana@example.com, 40",
        "Edward, edward@example.com, invalid",  # Invalid age
    ]
    for row in data:
        yield row


def filter_invalid_data(data_gen):
    """Filter out rows with missing or invalid fields."""
    for row in data_gen:
        try:
            name, email, age = row.split(",")
            name, email, age = name.strip(), email.strip(), age.strip()

            # Check for missing or invalid fields
            if not name or not email or not age.isdigit() or int(age) < 0:
                continue  # Skip invalid rows

            yield name, email, int(age)

        except ValueError:
            continue  # Skip rows with parsing errors


def normalize_data(data_gen):
    """Normalize and clean the data."""
    for name, email, age in data_gen:
        # Capitalize names
        name = name.title()

        # Lowercase emails
        email = email.lower()

        yield {"name": name, "email": email, "age": age}


# Assemble the pipeline
raw_data = read_data()
filtered_data = filter_invalid_data(raw_data)
cleaned_data = normalize_data(filtered_data)

# Process the cleaned data
for record in cleaned_data:
    print(record)


{'name': 'Alice', 'email': 'alice@example.com', 'age': 30}
{'name': 'Diana', 'email': 'diana@example.com', 'age': 40}


# Context Manager

In [69]:
class MyContextManager:
    def __enter__(self):
        print("Entering the context")
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        print("Exiting the context")

with MyContextManager():
    print("Inside the context")



Entering the context
Inside the context
Exiting the context


In [None]:
import sqlite3

class DatabaseConnection:
    def __init__(self, db_name):
        self.db_name = db_name
        self.connection = None
        self.cursor = None

    def __enter__(self):
        # Setup: Open the database connection
        self.connection = sqlite3.connect(self.db_name)
        self.cursor = self.connection.cursor()
        print("Database connection established.")
        return self.cursor  # Return the cursor object to the with block

    def __exit__(self, exc_type, exc_value, traceback):
        # Teardown: Close the database connection
        if self.connection:
            self.connection.commit()  # Commit any changes if necessary
            self.connection.close()  # Close the connection
            print("Database connection closed.")
        if exc_type:
            print(f"An error occurred: {exc_value}")
        return False  # Propagate exceptions if any

# Using the context manager for database operations
with DatabaseConnection('example.db') as cursor:
    cursor.execute('CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)')
    cursor.execute("INSERT INTO users (name) VALUES ('Alice')")
    cursor.execute("INSERT INTO users (name) VALUES ('Bob')")
    cursor.execute("SELECT * FROM users")

    # Fetch and display data from the database
    rows = cursor.fetchall()
    for row in rows:
        print(row)


# Async

In [None]:
import asyncio

async def my_coroutine(name):
    print(f"Start {name}")
    await asyncio.sleep(1)
    print(f"End {name}")
    return f"Result from {name}"

# Running multiple coroutines concurrently and processing their results
async def main():
    results = await asyncio.gather(
        my_coroutine("Task 1"),
        my_coroutine("Task 2"),
        my_coroutine("Task 3")
    )
    print("All tasks completed")
    print("Processed results:", results)

# Run the main function that handles multiple coroutines
asyncio.run(main())


Promise like approach

In [None]:
import asyncio

def on_complete(result):
    print(f"Task completed with result: {result}")

async def my_coroutine(callback):
    print("Start")
    await asyncio.sleep(1)
    print("End")
    callback("Some result")

# Running the coroutine with a callback
asyncio.run(my_coroutine(on_complete))


# Procedural vs Functional Style

In [52]:
# Procedural Style
numbers = [1, 2, 3, 4, 5, 6]
result = []

# Use loops and conditions to process data
for num in numbers:
    if num % 2 == 0:
        result.append(num ** 2)

print(result)  # Output: [4, 16, 36]


[4, 16, 36]


In [53]:
# Functional Style
numbers = [1, 2, 3, 4, 5, 6]

# Use map, filter, and list comprehensions for transformations
result = list(map(lambda x: x ** 2, filter(lambda x: x % 2 == 0, numbers)))

print(result)  # Output: [4, 16, 36]


[4, 16, 36]
