In [None]:
# Concurrency with Joblib
# When dealing with large datasets or extensive computations, parallelizing tasks can significantly speed up 
# the process. Joblib is a library in Python that provides easy-to-use parallelism, allowing you to leverage all CPU cores.

from joblib import Parallel, delayed

# Define a function you want to run in parallel
def compute(x):
    return x * x

# Create a list of inputs
inputs = range(100000000)
# Use Joblib's Parallel and delayed to run the compute function in parallel over all inputs
results = Parallel(n_jobs=-1)(delayed(compute)(i) for i in inputs)
# 'n_jobs=-1' will use all available CPU cores
# 'results' will now contain the square of each number in 'inputs'

In [None]:
# Scheduling jobs
# The schedule library in Python is used for running jobs at regular intervals. It's a lighter alternative to more complex task scheduling
# libraries and tools, and it's often used for small-to-medium-sized projects where tasks need to run periodically but don't require the 
# full power of something like Cron jobs in Unix or Task Scheduler in Windows.
# First, you'll need to install the schedule package if you haven't already. You can install it using pip:

import schedule
import time

def my_job():
    print("Doing the task...")

# Schedule the job to run every day at 8 am
schedule.every().day.at("08:00").do(my_job)

while True:
    schedule.run_pending()
    time.sleep(1)

In [1]:
# Memory Profiling
# Managing memory usage is critical, especially when dealing with large datasets or complex models. Python's tracemalloc module allows 
# users to trace memory blocks allocated by Python. It can be a lifesaver when debugging memory leaks or just for understanding where most 
# of the memory is being consumed.
# Here's how you can utilize it:

import tracemalloc

# Start tracing memory allocations
tracemalloc.start()

# Your code that might be memory-intensive
x = [i for i in range(1000000)]

# Capture the current snapshot and display the top 5 memory-consuming lines
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')  # or 'filename' or 'traceback'

# Display the top 5 lines consuming memory
for stat in top_stats[:5]:
    print(stat)

/tmp/ipykernel_38428/2864254135.py:13: size=35.0 MiB, count=999744, average=37 B
/home/cosmos/mambaforge/envs/th/lib/python3.8/site-packages/traitlets/traitlets.py:676: size=1112 B, count=1, average=1112 B
/home/cosmos/mambaforge/envs/th/lib/python3.8/site-packages/IPython/core/history.py:851: size=480 B, count=1, average=480 B
/tmp/ipykernel_38428/2864254135.py:16: size=424 B, count=1, average=424 B
/home/cosmos/mambaforge/envs/th/lib/python3.8/site-packages/IPython/core/interactiveshell.py:3505: size=400 B, count=1, average=400 B


In [6]:
# Parallelism
# Python's “dataclasses” module offers a decorator and functions for auto-generating special methods in classes. With it, you can swiftly create
# classes that primarily exist to hold values, eliminating the need to manually write boilerplate code like “__init__”.
# In the code below, the “@dataclass” decorator automatically adds special methods to the class, including a well-defined “__init__” method. 
# This approach provides a concise and readable way to define classes that serve as data containers.

from dataclasses import dataclass

@dataclass
class Point:
    x: int
    y: int

p = Point(1, 2)
print(p)

Point(x=1, y=2)


In [2]:
# Parallelism
# Running tasks in parallel can greatly enhance the efficiency of your Python programs, particularly when dealing with CPU-bound operations.
# Python's built-in `concurrent.futures` module provides a high-level interface for asynchronously executing callables, with the ProcessPoolExecutor
# class facilitating parallelism.

from concurrent.futures import ProcessPoolExecutor
import os

def task(n):
    return (n, os.getpid())  

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(task, range(10)))

    for n, pid in results:
        print(f"Task {n} ran in process {pid}")

Task 0 ran in process 38530
Task 1 ran in process 38531
Task 2 ran in process 38532
Task 3 ran in process 38533
Task 4 ran in process 38530
Task 5 ran in process 38531
Task 6 ran in process 38530
Task 7 ran in process 38531
Task 8 ran in process 38532
Task 9 ran in process 38530


In [7]:
# Empty List
# A straightforward way to check if a list is empty in Python is to take advantage of Python's ability to interpret empty collections (like lists)
# as “False” in a boolean context, as shown in the code snippet below.
# In provided example, an empty list will be evaluated as “False", so “not x” will be “True” if the list “x” is empty. This is a clean and efficient 
# way to check for an empty list and is considered a good practice in Python.

# Note that this will also work with other collections (like tuples, sets, and dictionaries) and strings, because Python treats empty collections and 
# strings as “False” when converting them to a boolean context.

x = []
if not x:
    print("List is empty")

List is empty


In [8]:
# Heap Queue
# The heap structure is used to represent priority queues and comes with the heapq module. Why is this useful? If you need to find the 
# n-largest or n-smallest elements from a list, this could be done by employing its two methods: nsmallest() and nlargest().

import heapq

simple_list = [5, 12, 93, 1, 53, 17]

print('3 smallest values:', heapq.nsmallest(3, simple_list))
print('2 smallest values:', heapq.nsmallest(2, simple_list))

print('5 largest values:', heapq.nlargest(5, simple_list))
print('3 largest values:', heapq.nlargest(2, simple_list))

3 smallest values: [1, 5, 12]
2 smallest values: [1, 5]
5 largest values: [93, 53, 17, 12, 5]
3 largest values: [93, 53]


In [1]:
# Generator Pipelines
# In Python, generators are a powerful way to handle large streams of data efficiently.
# An advanced technique is to pipeline multiple generators together to create a data processing workflow.
# Each generator function processes data and yields an output, which becomes the input for the next generator in the pipeline.
# Generator pipelines come in handy when you require handling large data streams with minimal memory overhead 
# you need to apply a sequence of transformations lazily creating modular and composable data processing workflows.

def read_data():
    for i in range(5):
        yield i

def process_data(data):
    for item in data:
        yield item * 2

def save_data(data):
    for item in data:
        print(f"Saved: {item}")

pipeline = save_data(process_data(read_data()))
for _ in pipeline:
    pass

Saved: 0
Saved: 2
Saved: 4
Saved: 6
Saved: 8


TypeError: 'NoneType' object is not iterable

In [1]:
# Caching with LRU Cache
# Caching can boost Python code's performance, especially for functions called multiple times with the same arguments.
# One way to implement this is with the lru_cache decorator from the functools module, where the "LRU" stands for Least Recently Used.
# It caches the results of recent function calls, so re-calling with the same arguments returns the cached result.
# When to Use LRU Cache:
# For expensive or I/O-bound functions.
# To improve the performance of recursive functions.
# Here's a concise example using `lru_cache` to optimize a recursive Fibonacci function. 
# You can run this code snippet using OpenAI’s Code Interpreter, where it takes 0.00015 seconds to execute this script, 
# compared to 4.33 seconds for the non-cached Fibonacci function!

from functools import lru_cache
import time

def fib(n):
    return n if n <= 1 else fib(n-1) + fib(n-2)

@lru_cache(maxsize=128)
def fib_lru(n):
    return n if n <= 1 else fib_lru(n-1) + fib_lru(n-2)

start = time.time()
print(fib(35))
print(f"Took {time.time() - start:.5f} seconds for fib without LRU.")

start = time.time()
print(fib_lru(35))
print(f"Took {time.time() - start:.5f} seconds for fib with LRU.")

9227465
Took 2.53030 seconds for fib without LRU.
9227465
Took 0.00008 seconds for fib with LRU.


Resources:

- news@alphasignal.ai