# Python Functions

## What Are Python Functions?

In [None]:
class IceCream:
	def __init__(self, flavor: str):
		self.flavor = flavor

	def eat(self):
		print(
			f"Eating the {self.flavor} ice cream"
		)


chocolate = IceCream("chocolate")
vanilla = IceCream("vanilla")

chocolate.eat()
vanilla.eat()

## Why Are Python Functions Essential?

In [None]:
import numpy as np

X_train = np.array([5, 10, 15, 20, 25])
X_test = np.array([8, 12, 18, 22, 28])

X_train_standardized = (X_train - X_train.mean()) / X_train.std()
X_test_standardized = (X_test - X_train.mean()) / X_train.std()

In [None]:
def standardize_features(X):
    return (X - X.mean()) / X.std()

X_train = np.array([5, 10, 15, 20, 25])
X_test = np.array([8, 12, 18, 22, 28])

X_train_standardized = standardize_features(X_train)
X_test_standardized = standardize_features(X_test)

### Improve Code Readability

In [None]:
import pandas as pd
import numpy as np

data = pd.DataFrame(
    {
        "category": ["A", "B", "C"],
        "feature1": [10.5, 15.2, 7.8],
        "feature2": [100, 150, 80],
        "target": [5.5, 8.2, 6.7],
    }
)

data.to_csv("data/dataset.csv", index=False)

In [None]:
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor

# Load the data from a CSV file
data = pd.read_csv("data/dataset.csv")

# Handle missing values by filling them with 0
data = data.fillna(0)

# Encode the categorical variable 'category'
data["category"] = data["category"].map(
    {"A": 0, "B": 1, "C": 2}
)

# Using the StandardScaler to standardize the data
scaler = StandardScaler()
data["feature1"] = scaler.fit_transform(
    data["feature1"].values.reshape(-1, 1)
)

### Hide Implementation Details

In [None]:
import sqlite3

conn = sqlite3.connect('data/users.db')
cursor = conn.cursor()

cursor.execute("DROP TABLE IF EXISTS users");

# Create the users table
cursor.execute('''
CREATE TABLE users (
    username TEXT NOT NULL,
    email TEXT NOT NULL
)
''')

conn.commit()
conn.close()

In [None]:
import sqlite3


def save_user(username, email):
    with sqlite3.connect("data/users.db") as conn:
        try:
            cursor = conn.cursor()
            cursor.execute(
                "INSERT INTO users (username, email) VALUES (?, ?)",
                (username, email),
            )
            conn.commit()
            print("User saved successfully")
        except sqlite3.Error:
            print("Failed to save user")

# Users can call this function without understanding database details
save_user("john_doe", "john@example.com")

## Best Practices for Python Functions

### Use Descriptive Verb-Based Names

In [None]:
def data_clean(df):
    return df.dropna()


def transform(s):
    return np.log(s)


def above_mean(df, column):
    return df[df[column] > df[column].mean()]

In [None]:
def remove_missing_values(df):
    return df.dropna()


def apply_log_transformation(s):
    return np.log(s)


def filter_values_above_mean(df, column):
    return df[df[column] > df[column].mean()]

### Keep Functions Focused

In [None]:
import pandas as pd
import numpy as np

# Create a sample DataFrame
data = pd.DataFrame({
    'Sales': [1000, 1500, 800],
    'Quantity': [10, 15, 8],
    'Category': ['Electronics', 'Clothing', 'Books'],
    'Region': ['North', 'South', 'East']
})

# Save the DataFrame to a CSV file
data.to_csv('data/sales_data.csv', index=False)

In [None]:
from sklearn.preprocessing import StandardScaler


def process_sales_data(df):
    # Remove missing values
    df = df.dropna()

    # Log transform sales
    df["log_sales"] = np.log1p(df["Sales"])

    # Encode categorical variables
    df = pd.get_dummies(df, columns=["Category", "Region"])

    # Normalize numeric features
    scaler = StandardScaler()
    num_columns = ["Sales", "Quantity"]
    df[num_columns] = scaler.fit_transform(df[num_columns])

    return df

In [None]:
def remove_missing_values(df):
    return df.dropna()


def log_transform_sales(df):
    return df.assign(
        log_sales=lambda x: np.log1p(x["Sales"])
    )


def encode_categorical_variables(df, cat_columns):
    return pd.get_dummies(df, columns=cat_columns)


def normalize_numeric_features(df, num_columns):
    scaler = StandardScaler()
    df = df.copy()
    df[num_columns] = scaler.fit_transform(df[num_columns])
    return df

In [None]:
def process_sales_data(df):
    return (
        df.pipe(remove_missing_values)
        .pipe(log_transform_sales)
        .pipe(
            encode_categorical_variables,
            cat_columns=["Category", "Region"],
        )
        .pipe(
            normalize_numeric_features,
            num_columns=["Sales", "Quantity"],
        )
    )

### Use Type Hints

In [None]:
def calculate_average_rating(ratings, product_id):
    product_ratings = [
        r for r in ratings if r["product_id"] == product_id
    ]
    if not product_ratings:
        return None
    total_score = sum(r["score"] for r in product_ratings)
    return total_score / len(product_ratings)

In [None]:
def calculate_average_rating(
    ratings: list[dict[str, int]], product_id: int
) -> float | None:

   ...

### Write Clear and Helpful Docstrings

In [None]:
def parse_pipe_delimited_text(text: str) -> dict:
    parts = text.split("|")
    if len(parts) % 2 != 0:
        raise ValueError(
            "Input string must have an even number of parts"
        )
    return {parts[i]: parts[i + 1] for i in range(0, len(parts), 2)}

In [None]:
def parse_pipe_delimited_text(text: str) -> dict:
    """
    Parse a pipe-delimited string into a dictionary.

    Parameters
    ----------
    text: str
        A pipe-delimited string to parse

    Returns
    -------
    dict
        Dictionary with even indices as keys, odd indices as values

    Raises
    ------
    ValueError: If the input string has an odd number of parts

    Examples
    --------
    >>> parse_pipe_delimited_text("name|John|age|30")
    {'name': 'John', 'age': '30'}
    """
    parts = text.split("|")
    if len(parts) % 2 != 0:
        raise ValueError(
            "Input string must have an even number of parts"
        )
    return {parts[i]: parts[i + 1] for i in range(0, len(parts), 2)}

### Use Function Parameters Instead of Global Variables

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import KFold, cross_val_score
import numpy as np

X = np.array([1, 2, 3, 4, 5]).reshape(-1, 1)
train_labels = np.array([2, 4, 5, 4, 5])

kf = KFold(n_splits=5, random_state=42, shuffle=True)

model = LinearRegression()

def cv_rmse(model, X):
    return np.sqrt(
        -cross_val_score(
            model, X, train_labels,
            scoring="neg_mean_squared_error", cv=kf
        )
    )

# Calculate RMSE scores
scores = cv_rmse(model=model, X=X)
print(f"Mean RMSE: {scores.mean():.3f}")

In [None]:
# Change the global variable
kf = KFold(n_splits=2, random_state=42, shuffle=True)

# The function's output will change
scores = cv_rmse(model=model, X=X)
print(f"Mean RMSE: {scores.mean():.3f}")

In [None]:
def cv_rmse(model, X, train_labels, kf):
    rmse = np.sqrt(
        -cross_val_score(
            model, X, train_labels,
            scoring="neg_mean_squared_error", cv=kf
        )
    )
    return rmse

### Avoid Modifying Input Parameters

In [None]:
def normalize_data(df: pd.DataFrame, columns: list) -> pd.DataFrame:
    df[columns] = (
        df[columns] - df[columns].mean()
    ) / df[columns].std()
    return df

In [None]:
data = pd.DataFrame(
    {
        "temperature": [25.5, 27.8, 23.2],
        "humidity": [60.0, 55.5, 62.3],
        "pressure": [1013.2, 1015.7, 1012.8],
    }
)

normalized_data = normalize_data(data, columns=["humidity"])
print(f"Original data:\n{data.head()}")

In [None]:
def normalize_data(df: pd.DataFrame, columns: list) -> pd.DataFrame:
    df = df.copy()
    df[columns] = (
        df[columns] - df[columns].mean()
    ) / df[columns].std()
    return df

In [None]:
data = pd.DataFrame(
    {
        "temperature": [25.5, 27.8, 23.2],
        "humidity": [60.0, 55.5, 62.3],
        "pressure": [1013.2, 1015.7, 1012.8],
    }
)
normalized_data = normalize_data(data, columns=["humidity"])
print(f"Original data:\n{data}")

### Avoid Using Flags As Parameters

In [None]:
import pandas as pd
import numpy as np

np.random.seed(42)
df = pd.DataFrame({"A": [1, 2, np.nan], "B": [10, 20, 30], "C": [100, 200, 300]})

df.to_csv("data/sample.csv", index=False)

In [None]:
def preprocess_data(
    df: pd.DataFrame,
    fill_missing: bool = False,
    normalize: bool = False,
) -> pd.DataFrame:

    if fill_missing:
        df = df.fillna(df.mean())

    if normalize:
        df = (df - df.mean()) / df.std()

    return df

In [None]:
df = pd.read_csv("data/sample.csv")
cleaned_df = preprocess_data(df, fill_missing=True, normalize=False)

In [None]:
import pandas as pd
import numpy as np

def fill_missing_values(df: pd.DataFrame) -> pd.DataFrame:
    return df.fillna(df.mean())

def normalize_data(df: pd.DataFrame) -> pd.DataFrame:
    return (df - df.mean()) / df.std()

def preprocess_data(df: pd.DataFrame, steps: list) -> pd.DataFrame:
    for step in steps:
        df = step(df)
    return df

In [None]:
df = pd.read_csv("data/sample.csv")
cleaning_steps = [normalize_data, fill_missing_values]
cleaned_df = preprocess_data(df, cleaning_steps)

### Extract Common Logic Into Utilities

In [None]:
import pandas as pd

# Create a sample DataFrame
df = pd.DataFrame(
    {
        "text": [
            "Hello, World! 123",
            "This is a TEST comment.",
            "Special @#$% characters here!",
        ],
        "user": ["user1", "user2", "user3"],
        "date": ["2023-05-01", "2023-05-02", "2023-05-03"],
    }
)

# Save the DataFrame to a CSV file
df.to_csv("data/comments.csv", index=False)

In [None]:
def clean_text_data(df: pd.DataFrame) -> pd.DataFrame:
    df["text"] = df["text"].str.lower()
    df["text"] = df["text"].str.replace(
        "[^a-zA-Z\s]", "", regex=True
    )
    df["text"] = df["text"].str.strip()
    return df

def preprocess_user_input(text: str) -> str:
    text = text.lower()
    text = "".join(
        char for char in text if char.isalnum() or char.isspace()
    )
    text = text.strip()
    return text

In [None]:
df = pd.read_csv("data/comments.csv")
cleaned_df = clean_text_data(df)
user_input = "Hello, World! 123"
cleaned_input = preprocess_user_input(user_input)

In [None]:
def clean_text(text: str) -> str:
    text = text.lower()
    text = "".join(
        char for char in text if char.isalnum() or char.isspace()
    )
    return text.strip()


def clean_text_data(df: pd.DataFrame) -> pd.DataFrame:
    df["text"] = df["text"].apply(clean_text)
    return df


def preprocess_user_input(text: str) -> str:
    return clean_text(text)

## Advanced Function Toolkit

### Lambda Functions

In [None]:
numbers = [1, 2, 3, 4, 5]
squared = list(map(lambda x: x**2, numbers))
print(squared)

In [None]:
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
even_numbers = list(filter(lambda x: x % 2 == 0, numbers))
print(even_numbers)

In [None]:
data = [('Alice', 25), ('Bob', 30), ('Charlie', 22)]
sorted_data = sorted(data, key=lambda x: x[1])
print(sorted_data)

### Partial Functions

In [None]:
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.model_selection import KFold, cross_val_score
import numpy as np


def cv_rmse(model, X, train_labels, kf):
    return np.sqrt(
        -cross_val_score(
            model, X, train_labels,
            scoring="neg_mean_squared_error", cv=kf
        )
    )

In [None]:
from functools import partial

# Create sample data
X = np.array([1, 2, 3, 4, 5]).reshape(-1, 1)
train_labels = np.array([2, 4, 5, 4, 5])
kf = KFold(n_splits=5, random_state=42, shuffle=True)

# Create a partial function with fixed X, train_labels, and kf
cv_rmse_with_data = partial(
    cv_rmse, X=X, train_labels=train_labels, kf=kf
)

In [None]:
# Call the partial function with Linear Regression
linear_scores = cv_rmse_with_data(model=LinearRegression())

# Call the partial function with Ridge Regression
ridge_scores = cv_rmse_with_data(model=Ridge(alpha=0.5))

### `*args` and `**kwargs`

In [None]:
import numpy as np
from typing import Callable


def transform_pipeline(
    data: np.ndarray, *transformers: Callable
) -> np.ndarray:
    for transformer in transformers:
        data = transformer(data)
    return data

In [None]:
def log_transform(data: np.ndarray) -> np.ndarray:
	return np.log1p(data)


def standardize(data: np.ndarray) -> np.ndarray:
	return (data - data.mean()) / data.std()


raw_data = np.random.rand(100, 5) * 100

transformed_data = transform_pipeline(
	raw_data, log_transform, standardize
)

In [None]:
import numpy as np
from typing import Callable


def transform_pipeline(
    data: np.ndarray, **transformers: dict[str, Callable]
) -> np.ndarray:
    for transformer_name, transformer_func in transformers.items():
        if not callable(transformer_func):
            raise ValueError(
                f"{transformer_name} is not callable"
            )

        data = transformer_func(data)
    return data


def log_transform(data: np.ndarray) -> np.ndarray:
    return np.log1p(data)


def standardize(data: np.ndarray) -> np.ndarray:
    return (data - data.mean()) / data.std()

In [None]:
raw_data = np.random.rand(100, 5) * 100

transformed_data = transform_pipeline(
    raw_data,
    log_transform=log_transform,
    standardize=standardize,
)

### Python Decorators in Data Science

In [None]:
import time
from typing import Callable, List, Union
import numpy as np


def timer_decorator(func: Callable) -> Callable:
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(
            f"Function {func.__name__} took "
            f"{end_time - start_time:.2f} seconds to execute."
        )
        return result

    return wrapper

In [None]:
@timer_decorator
def train_model(X: np.ndarray, y: np.ndarray | list[float]) -> None:
    """Simulating a time-consuming model training process"""
    time.sleep(2)


if __name__ == "__main__":
    X = np.random.rand(1000, 10)
    y = np.random.rand(1000)
    train_model(X, y)

In [None]:
print(f"name: {train_model.__name__}")
print(f"doc: {train_model.__doc__}")
print(f"annotations: {train_model.__annotations__}")

In [None]:
from functools import wraps


def timer_decorator(func: Callable) -> Callable:
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(
            f"Function {func.__name__} took "
            f"{end_time - start_time:.2f} seconds to execute."
        )
        return result

    return wrapper