In [1]:
from joblib import Parallel, delayed
import multiprocessing 

In [2]:
inputs = range(10) 
def processInput(i):
    return i * i
 
num_cores = multiprocessing.cpu_count()
     
results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)

print(results)

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


In [3]:
num_cores

8

In [4]:
import numpy as np
d = {"a": 1, "b": 2, "c": 3, "d": 4}

def test_function(val):
    return [np.ones((2,2))*val, val]

num_cores = multiprocessing.cpu_count()
     
results = Parallel(n_jobs=num_cores)(delayed(test_function)(val) for val in list(d.values()))

print(results)
print(type(results))
print(len(results))
print(results[0])

[[array([[1., 1.],
       [1., 1.]]), 1], [array([[2., 2.],
       [2., 2.]]), 2], [array([[3., 3.],
       [3., 3.]]), 3], [array([[4., 4.],
       [4., 4.]]), 4]]
<class 'list'>
4
[array([[1., 1.],
       [1., 1.]]), 1]


In [5]:
import numpy as np
d = {
    "a": np.random.randint(0, 100, size = (10000,10000)),
    "b": np.random.randint(0, 100, size = (10000,10000)),
    "c": np.random.randint(0, 100, size = (10000,10000)),
    "d": np.random.randint(0, 100, size = (10000,10000)),
    "e": np.random.randint(0, 100, size = (10000,10000)),
    "f": np.random.randint(0, 100, size = (10000,10000)),
    "g": np.random.randint(0, 100, size = (10000,10000)),
    "h": np.random.randint(0, 100, size = (10000,10000)),
    }

def test_function(val):
    return val**2

num_cores = multiprocessing.cpu_count()

In [6]:
import time

In [7]:
start_time = time.time()
for i in range(len(d)):
    a = list(d.values())[i]**2
end_time = time.time()
print("Time with for loop: {} seconds".format(end_time-start_time))

Time with for loop: 3.647850751876831 seconds


In [8]:
# hier geht die CPU-Auslastung richtig hoch
start_time = time.time()
results = Parallel(n_jobs=num_cores, backend = "threading")(delayed(test_function)(val) for val in list(d.values()))
end_time = time.time()
print("Time with parallelization: {} seconds".format(end_time-start_time))

Time with parallelization: 10.8079092502594 seconds


Je nachdem, was sonst noch so läuft, kann die Zeit mit der for-Schleife auch mal geringer sein als mit der Parallelisierung.

In [9]:
# das gleiche, aber ohne backend = "threading"
# CPU-Auslastung sinkt wieder auf Normalniveau
start_time = time.time()
results = Parallel(n_jobs=num_cores)(delayed(test_function)(val) for val in list(d.values()))
end_time = time.time()
print("Time with parallelization: {} seconds".format(end_time-start_time))

Time with parallelization: 33.828524589538574 seconds


# -----------------------------------------------------------------------------------------------------------------

In [10]:
import tensorflow as tf
from sklearn.preprocessing import StandardScaler
import pandas as pd

(X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data()
X_train = X_train.reshape(-1, 784).astype("float32") / 255.0
X_test = X_test.reshape(-1, 784).astype("float32") / 255.0

scaler = StandardScaler()
scaler.fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)

y_train = pd.get_dummies(y_train)
y_test = pd.get_dummies(y_test)

In [11]:
from tensorflow.python.keras.layers import Dense
from tensorflow.python.keras.models import Sequential

model_dict = {}
for i in range(100):
    tf.python.keras.backend.clear_session()
    model = Sequential()
    model.add(Dense(128, "relu", input_shape = (X_train.shape[1],)))
    model.add(Dense(128, "relu"))
    model.add(Dense(64, "relu"))
    model.add(Dense(32, "relu"))
    model.add(Dense(10, "softmax"))
    
    model.compile("adam", "categorical_crossentropy", metrics = ["accuracy"])
    
    model_dict["model_{}".format(str(i+1))] = model

In [12]:
def eval_fct(model):
    return model.predict(X_test)

In [13]:
# darum geht es!! 
# hier habe ich eine CPU-Auslastung von maximal 30-40%
# alle 100 Modelle sind voneinander unabhängig und können daher parallel berechnet werden
# ohne das backend = "threading" kommt der Error mit dem pickle.
start_time = time.time()
results = Parallel(n_jobs=num_cores, backend = "threading")(delayed(eval_fct)(model) for model in list(model_dict.values()))
end_time = time.time()
print("Calculation took {} seconds.".format(str(end_time-start_time)))
print(results[0].shape)

Calculation took 44.139549255371094 seconds.
(10000, 10)
