Skip to content

Commit

Permalink
Merge pull request #18 from preinaj/scalers-and-tests
Browse files Browse the repository at this point in the history
Add StandardizationScaler and MeanNormalizationScaler
  • Loading branch information
pedrolarben authored Jan 18, 2022
2 parents 3671fff + 2eac54c commit 955c243
Show file tree
Hide file tree
Showing 9 changed files with 422 additions and 15 deletions.
16 changes: 1 addition & 15 deletions ADLStream/adlstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@

class ADLStreamContext:
"""ADLStream context.
This object is shared among training, predicting, stream-generator and validator processes.
It is used to send the data from the stream generator to the predicting process,
then it is used for training and finally the validator has access to the output predictions.
Argumennts:
batch_size (int): Number of instances per batch.
num_batches_fed (int): Maximun number of batches to be used for training.
Expand Down Expand Up @@ -116,10 +113,6 @@ def set_weights(self, w):
def get_weights(self):
return self.weights

def get_shape(self):
X = self.x_train
return np.asarray(X).shape

def add(self, x, y=None):
with self.data_lock:
if x is not None:
Expand Down Expand Up @@ -185,7 +178,6 @@ def add_predictions(self, o_eval):

class ADLStreamManager(BaseManager):
"""ADLStream Manager
Manager server which hold ADLStreamContext object.
It allows other processes to manipulate the shared context.
"""
Expand All @@ -197,12 +189,10 @@ def __init__(self):

class ADLStream:
"""ADLStream.
This is the main object of the framework.
Based on a stream generator and a given deep learning model, it runs the training and
predicting process in paralell (ideally in two different GPU) to obtain obtain accurate
predictions as soon as an instance is received.
Parameters:
stream_generator (ADLStream.data.BaseStreamGenerator):
It is in charge of generating new instances from the stream.
Expand All @@ -229,7 +219,6 @@ class ADLStream:
If log_file is given, log level is set to "DEBUG". However if None,
log level is kept as default.
Defaults to None.
"""

def __init__(
Expand Down Expand Up @@ -257,6 +246,7 @@ def __init__(
self.train_gpu_index = train_gpu_index
self.predict_gpu_index = predict_gpu_index
self.log_file = log_file

self.x_shape = None
self.output_size = None
self.weights = None
Expand All @@ -265,7 +255,6 @@ def __init__(

def training_process(self, context, gpu_index):
"""Training process.
Args:
context (ADLStreamContext): Shared object among processes.
gpu_index (int): Index of the GPU to use for training
Expand Down Expand Up @@ -333,7 +322,6 @@ def training_process(self, context, gpu_index):

def predicting_process(self, context, gpu_index):
"""Predicting process.
Args:
context (ADLStreamContext): Shared object among processes.
gpu_index (int): Index of the GPU to use for training
Expand Down Expand Up @@ -409,9 +397,7 @@ def predicting_process(self, context, gpu_index):

def run(self):
"""Function that run ADLStream.
It run 4 different processes:
- Training process.
- Predicting process.
- Stream generator process.
Expand Down
4 changes: 4 additions & 0 deletions ADLStream/data/preprocessing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
from ADLStream.data.preprocessing.base_preprocessor import BasePreprocessor
from ADLStream.data.preprocessing.min_max_scaler import MinMaxScaler
from ADLStream.data.preprocessing.mean_normalization_scaler import (
MeanNormalizationScaler,
)
from ADLStream.data.preprocessing.standardization_scaler import StandardizationScaler
95 changes: 95 additions & 0 deletions ADLStream/data/preprocessing/mean_normalization_scaler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""Implements a mean normalization scaler"""
from ADLStream.data.preprocessing import BasePreprocessor


class MeanNormalizationScaler(BasePreprocessor):
"""The transformation is given by
x_scaled = (x - avg_x) / (max_x - min_x)
where min_x is the minimun value seen until now for the feature x,
max_x represents the maximun value seen until now for the feature x
and avg_x is the mean seen until now for the feature x.
Arguments:
share_params (bool): Whether to share scaler parameters among columns.
Defaults to False.
"""

def __init__(self, share_params=False):
self.share_params = share_params
self.data_min = None
self.data_max = None
self.data_sum = None
self.data_count = 1
self.data_avg = None

def _minimum(self, a, b):
assert len(a) == len(b)
min_values = [min(a[i], b[i]) for i in range(len(a))]
if self.share_params:
min_values = [min(min_values) for _ in min_values]
return min_values

def _maximum(self, a, b):
assert len(a) == len(b)
max_values = [max(a[i], b[i]) for i in range(len(a))]
if self.share_params:
max_values = [max(max_values) for _ in max_values]
return max_values

def _mean(self, a):
if self.share_params == False:
assert len(a) == len(self.data_sum)
self.data_sum = [self.data_sum[i] + a[i] for i in range(len(a))]
mean = [(self.data_sum[i]) / self.data_count for i in range(len(a))]

else:
self.data_sum += sum(a)
mean = [self.data_sum / (self.data_count * len(a))] * len(a)

return mean

def learn_one(self, x):
"""Updates `min` `max` `avg` and `count` parameters for each feature
Args:
x (list): input data from stream generator.
Returns:
BasePreprocessor: self updated scaler.
"""
if self.data_min is None:
self.data_min = x
self.data_max = x
self.data_avg = x
self.data_sum = [0.0] * len(x)
if self.share_params == True:
self.data_sum = 0.0
self.data_min = self._minimum(x, self.data_min)
self.data_max = self._maximum(x, self.data_max)
self.data_avg = self._mean(x)
self.data_count += 1
return self

def _mean_normalization(self, val, min_val, max_val, avg_val):
def _safe_div_zero(a, b):
return 0 if b == 0 else a / b

return _safe_div_zero((val - avg_val), (max_val - min_val))

def transform_one(self, x):
"""Scales one instance data
Args:
x (list): input data from stream generator.
Returns:
scaled_x (list): minmax scaled data.
"""
assert self.data_min is not None
scaled_x = [
self._mean_normalization(v, m, M, a)
for v, m, M, a in zip(x, self.data_min, self.data_max, self.data_avg)
]
return scaled_x
144 changes: 144 additions & 0 deletions ADLStream/data/preprocessing/standardization_scaler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""Implements a standardization scaler"""
from ADLStream.data.preprocessing import BasePreprocessor
import math


class StandardizationScaler(BasePreprocessor):
"""The transformation is given by
x_scaled = (x - avg_x) / x_stdev
where avg_x is the mean seen until now for the feature x and x_stdev
is the standard deviation seen until now for the feature x.
Arguments:
share_params (bool): Whether to share scaler parameters among columns.
Defaults to False.
"""

def __init__(self, share_params=False):
self.share_params = share_params
self.data = None
self.data_sum = None
self.data_stdev_sum = None
self.data_avg = None
self.data_stdev = None
self.data_count = 1

def _mean(self, a):

if self.share_params == False:
assert len(a) == len(self.data_sum)
self.data_sum = [self.data_sum[i] + a[i] for i in range(len(a))]
mean = [(self.data_sum[i]) / self.data_count for i in range(len(a))]

else:
self.data_sum += sum(a)
mean = [self.data_sum / (self.data_count * len(a))] * len(a)

return mean

def _standard_deviation(self, a):
"""In order to compute the standard deviation
uses the Welford's online algorithm given by
data_stdev_sum = old_data_stdev_sum + delta * delta2
stdev = sqrt(data_stdev_sum / (n - 1))
Arguments:
a (list): input data from stream generator.
Returns:
stev: standard deviation of the data.
"""

if self.share_params == False:
assert len(a) == len(self.data_sum)

mean = self.data_avg
if self.share_params == False:
delta = [(a[i] - mean[i]) for i in range(len(a))]

data_sum = [self.data_sum[i] + a[i] for i in range(len(a))]
mean = [(data_sum[i]) / self.data_count for i in range(len(a))]

delta2 = [(a[i] - mean[i]) for i in range(len(a))]

self.data_stdev_sum = [
(self.data_stdev_sum[i] + (delta[i] * delta2[i])) for i in range(len(a))
]

if self.data_count == 1:
stdev = [math.sqrt(self.data_stdev_sum[i]) for i in range(len(a))]
else:
stdev = [
math.sqrt(self.data_stdev_sum[i] / (self.data_count - 1))
for i in range(len(a))
]

else:
delta = [(a[i] - mean[i]) for i in range(len(a))]

data_sum = self.data_sum
data_sum += sum(a)
mean = [data_sum / (self.data_count * len(a))] * len(a)

delta2 = [(a[i] - mean[i]) for i in range(len(a))]

for i in range(len(a)):
self.data_stdev_sum += delta[i] * delta2[i]

stdev = [
math.sqrt(self.data_stdev_sum / (((self.data_count) * len(a)) - 1))
] * len(a)

return stdev

def learn_one(self, x):
"""Updates `avg` and `count` parameters for each feature
Args:
x (list): input data from stream generator.
Returns:
BasePreprocessor: self updated scaler.
"""
if self.data_sum is None:
self.data = [x]
self.data_avg = x
self.data_mean = x
self.data_sum = [0.0] * len(x)
self.data_stdev_sum = [0.0] * len(x)
if self.share_params == True:
self.data_sum = 0.0
self.data_stdev_sum = 0.0
self.data_avg = [
self.data_sum + sum(x) / (self.data_count * len(x))
] * len(x)

self.data_stdev = self._standard_deviation(x)
self.data_avg = self._mean(x)

self.data_count += 1
return self

def _standardization(self, val, avg_val, std_val):
def _safe_div_zero(a, b):
return 0 if b == 0 else a / b

return _safe_div_zero((val - avg_val), std_val)

def transform_one(self, x):
"""Scales one instance data
Args:
x (list): input data from stream generator.
Returns:
scaled_x (list): minmax scaled data.
"""
assert self.data_sum is not None
scaled_x = [
self._standardization(v, a, s)
for v, a, s in zip(x, self.data_avg, self.data_stdev)
]
return scaled_x
Loading

0 comments on commit 955c243

Please sign in to comment.