# Vectorization Example

In [2]:
import random

def sqrt_of_n_random_numbers(n=1_000_000):
    result = 0
    for _ in range(n):
        a = random.uniform(0, 1)
        b = random.uniform(0, 1)
        result += a * b
    return result

%timeit sqrt_of_n_random_numbers()

294 ms ± 1.39 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [3]:
def sqrt_of_n_random_numbers_np(n=1_000_000):
    a = np.random.rand(n)
    b = np.random.rand(n)
    result = np.dot(a, b)
    return result

%timeit sqrt_of_n_random_numbers_np()

7.9 ms ± 119 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


# Python can be faster

In [4]:
X = ["foo.bar", "bar.foo", "x.y", "z.k"] * 1000
%timeit [x.split(".") for x in X]

299 µs ± 5.98 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


In [5]:
Y = ["foo.bar", "bar.foo", "x.y", "z.k"] * 1000
Y = np.array(Y)
%timeit np.core.defchararray.split(Y, ".")

1.18 ms ± 4.37 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


# Memory Allocation of Vectorized Operations

In [10]:
# !pip install memory_profiler==0.61.0
%load_ext memory_profiler

In [1]:
%%file memory_allocation_example.py

import numpy as np

def sum_of_square_vectorized(X):
    x = np.asarray(X)
    x_sq = np.square(x) # x ** 2
    res = x_sq.sum()
    return res


def sum_of_square_for_loop(X):
    res = 0
    for x in X:
        res += x ** 2
    return res


Overwriting memory_allocation_example.py


In [12]:
from memory_allocation_example import sum_of_square_vectorized
X = np.linspace(-50, 50, 1_000_000).tolist()
%mprun -f sum_of_square_vectorized sum_of_square_vectorized(X)




In [14]:
%timeit sum_of_square_vectorized(X)

3.91 ms ± 64.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [12]:
from memory_allocation_example import sum_of_square_for_loop
X = np.linspace(-50, 50, 100_000).tolist()
%mprun -f sum_of_square_for_loop sum_of_square_for_loop(X)




In [8]:
%timeit sum_of_square_for_loop(X)

5.72 ms ± 39 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [13]:
(3.91 - 5.72) / 5.72 * 100

-31.64335664335664

# Neural Network

In [7]:
import numpy as np

def sigmoid(x):
    """Implement sigmoid activation function."""
    return 1. / np.exp(-x)


def dense_layer(x, w, b):
    # linear model: (batch_size, hidden_size)
    y = x @ w + b
    # activation
    y = sigmoid(y)
    return y

# Multiprocessing

In [9]:
import pandas as pd

df = pd.DataFrame({
    "id": [
        "A", "A", "A", "A", "B", "B", "B", "B", "B",
        "C", "C", "C", "C", "C", "C", "C", "C", "C"
    ],
    "scores": [
        98, 97, 100, 80, 78, 80, 82, 81, 61, 51, 72, 
        70, 82, 80, 75, 62, 67, 53
    ],
})

def apply_func(pdf):
    """Compute the min-max normalized score"""
    min_val = pdf["scores"].min()
    max_val = pdf["scores"].max() + 1e-6
    pdf["scores"] = (pdf["scores"] - min_val) / (max_val - min_val)
    return pdf


df_out = df.groupby(
    by="id", as_index=False
).apply(apply_func)

df_out

Unnamed: 0,Unnamed: 1,id,scores
0,0,A,0.9
0,1,A,0.85
0,2,A,1.0
0,3,A,0.0
1,4,B,0.809524
1,5,B,0.904762
1,6,B,1.0
1,7,B,0.952381
1,8,B,0.0
2,9,C,0.0


## Joblib multiprocessing

In [11]:
from tqdm import tqdm
import multiprocessing
from joblib import Parallel, delayed
import pandas as pd

def apply_parallel(df_grouped, func):
    """
    Apply a function to grouped pandas dataframe using multiprocessing.
    Usage:
    def apply_func(pandas_df):
        ...

    df = apply_parallel(
        df.groupby(
            by=grouped_by_columns,
            as_index=False,
        ), apply_func)
    """
    ret = Parallel(n_jobs=multiprocessing.cpu_count())(
        delayed(func)(group) for name, group in tqdm(df_grouped)
    )  # enumerate(tqdm(dfGrouped))
    return pd.concat(ret)

df_out_parallel = apply_parallel(
    df.groupby(by="id", as_index=False), apply_func
)
df_out_parallel

100%|████████████████████████████████████████████| 3/3 [00:00<00:00, 490.70it/s]


Unnamed: 0,id,scores
0,A,0.9
1,A,0.85
2,A,1.0
3,A,0.0
4,B,0.809524
5,B,0.904762
6,B,1.0
7,B,0.952381
8,B,0.0
9,C,0.0


## Random data transformation

In [12]:
import string
import random

df = pd.DataFrame({
    "id": [
        "".join(
            random.choices(string.ascii_letters, k=3)
        ) for _ in range(100_000)
    ],
    "scores": [
        random.randint(1, 100) for _ in range(100_000)
    ],
})
df

Unnamed: 0,id,scores
0,wqP,50
1,CmQ,95
2,jJG,24
3,tvL,14
4,kBY,18
...,...,...
99995,Dmz,48
99996,TSR,87
99997,DFs,52
99998,PWb,75


# Multi-worker distributed computing using Apache Beam pipeline

caution: Apache Beam currenlty does not support `pandas` 2.0 yet. However, all the other examples in this book will use `pandas==2.1.1`.

In [2]:
!pip install apache-beam==2.49.0 pandas==1.5.3

Collecting pandas==1.5.3
  Using cached pandas-1.5.3-cp310-cp310-macosx_11_0_arm64.whl (10.9 MB)
Installing collected packages: pandas
  Attempting uninstall: pandas
    Found existing installation: pandas 2.1.1
    Uninstalling pandas-2.1.1:
      Successfully uninstalled pandas-2.1.1
Successfully installed pandas-1.5.3


In [1]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Setting up running environment
pipeline_args = [
    "--job_name=word_count",
    "--runner=DirectRunner", # local runner
]
pipeline_options = PipelineOptions(pipeline_args)

# Run the beam pipeline
with beam.Pipeline(options=pipeline_options) as p:
    _ = (
        p 
        # Creating the example document
        # This can also be read from files
        | "Create texts" >> beam.Create([
            "lorem ipsum dolor sit amet",
            "sit amet dolor lorem",
            "ut enim ad minim veniam",
            "ut enim ad dolor lorem",
        ])
        # Splitting the documents into words
        | "Split" >> beam.FlatMap(lambda x: x.split())
        # Convert to a tuple (key, value) = (word, 1)
        | "Pair with 1" >> beam.Map(lambda x: (x, 1))
        # Summing the value by each key/word
        | "Count" >> beam.CombinePerKey(sum)
        # Print the output
        | "Print" >> beam.Map(print)
    )



('lorem', 3)
('ipsum', 1)
('dolor', 3)
('sit', 2)
('amet', 2)
('ut', 2)
('enim', 2)
('ad', 2)
('minim', 1)
('veniam', 1)
