In [48]:
import datasource
import torch
import sklearn.preprocessing
import numpy as np
from ray.air import session
from ray.air.checkpoint import Checkpoint
from ray.train.torch import TorchCheckpoint


def train_loop_per_worker(config):
    def _import_class(name:str):
        import importlib
        index = name.rfind('.')
        module_name = name[:index] if index != -1 else '__main__'
        class_name = name[index + 1:]
        return getattr(importlib.import_module(module_name), class_name)
    
    model = config['model']
    criterion = config['criterion']
    optimizer = config['optimizer']
    imputer = config['imputer']
    scaler = config['scaler']

    data = datasource.get_data()
    train_indexes, test_indexes = datasource.get_index_splited_by_time(data)

    index_X = 'FSR_for_force'
    index_y = 'force'
    concated_train_indexes = np.concatenate(train_indexes)
    if imputer:
        imputer = _import_class(imputer)(**config['imputer_args'])
        imputer.fit(data.loc[concated_train_indexes, [index_X, index_y]])
    if scaler:
        scaler_X = _import_class(scaler)()
        scaler_y = _import_class(scaler)()
        scaler_X.fit(data.loc[concated_train_indexes, index_X])
        scaler_y.fit(data.loc[concated_train_indexes, index_y])
    data.loc[:, index_X] = scaler_X.transform(data.loc[:, index_X])
    data.loc[:, index_y] = scaler_y.transform(data.loc[:, index_y])
    train_dataset = datasource.FSRDataset(data.loc[:, index_X], data.loc[:, index_y], train_indexes)
    test_dataset = datasource.FSRDataset(data.loc[:, index_X], data.loc[:, index_y], test_indexes)

    model = _import_class(model)(input_size=len(data.loc[:, index_X].columns), output_size=len(data.loc[:, index_y].columns), **config['model_args'])
    criterion = _import_class(criterion)()
    optimizer = _import_class(optimizer)(model.parameters(), **config['optimizer_args'])

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=None)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=None)

    while True:
        model.train()
        for X, y in train_loader:
            pred = model(X)
            loss = criterion(pred, y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        model.eval()
        with torch.inference_mode():
            mae, mse, mape, num = [], [], [], []
            for X, y in test_loader:
                pred = model(X)
                if scaler:
                    pred = scaler_y.inverse_transform(pred)
                    y = scaler_y.inverse_transform(y)
                mae.append(sklearn.metrics.mean_absolute_error(y, pred))
                mse.append(sklearn.metrics.mean_squared_error(y, pred))
                mape.append(sklearn.metrics.mean_absolute_percentage_error(y, pred))
                num.append(len(y))
            mae = np.average(mae, weights=num)
            mse = np.average(mse, weights=num)
            mape = np.average(mape, weights=num)
            rmse = mse ** 0.5
        session.report(
            dict(rmse=rmse, mae=mae, mape=mape),
            checkpoint=Checkpoint.from_dict(
                dict(model=model.state_dict(), optimizer=optimizer.state_dict()),
            ),
        ),


In [25]:
import ray
from ray.data.preprocessors import Chain, MinMaxScaler, SimpleImputer

# Generate one simple dataset.
dataset = ray.data.from_items(
    [{"id": 0, 'value': 3}, {"id": 1, 'value': 3}, {"id": 2, 'value': 2}, {"id": 3, 'value': 2}, {"id": None, 'value': 1}]
)
scaler = MinMaxScaler(columns=['id'])
scaler.fit_transform(dataset)
dataset.groupby('value').

2023-07-04 03:26:44,692	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Aggregate]
2023-07-04 03:26:44,694	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 03:26:44,697	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- Aggregate 1:   0%|          | 0/5 [00:00<?, ?it/s]

SortSample 2:   0%|          | 0/5 [00:00<?, ?it/s]

ShuffleMap 3:   0%|          | 0/5 [00:00<?, ?it/s]

ShuffleReduce 4:   0%|          | 0/5 [00:00<?, ?it/s]

Running 0:   0%|          | 0/5 [00:00<?, ?it/s]

2023-07-04 03:26:44,815	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-1020, stopped daemon 140279520679488)>.


AttributeError: 'GroupedData' object has no attribute 'take'

In [6]:
import ray.data
import datasource
import numpy as np
data = datasource.get_data()
train_indexes, test_indexes = datasource.get_index_splited_by_time(data)
train_ds = ray.data.from_items(np.concatenate([data.loc[train_index, ['FSR_for_force', 'force']].to_numpy() for train_index in train_indexes]))
def rename_column(batch):
    batch['X'] = batch['item'][..., :6].tolist()
    batch['y'] = batch['item'][..., 6:].tolist()
    batch.pop('item')
    return batch
train_ds = train_ds.map_batches(rename_column)
from ray.data.preprocessors import SimpleImputer, StandardScaler
scaler = StandardScaler(train_ds.columns())
scaler.fit_transform(train_ds)
train_ds

2023-07-04 01:53:42,537	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(rename_column)]
2023-07-04 01:53:42,539	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 01:53:42,541	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 01:53:46,047	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-485, stopped daemon 140418050160192)>.
2023-07-04 01:53:46,063	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Aggregate]
2023-07-04 01:53:46,066	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 01:53:46,069	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- Aggregate 1:   0%|          | 0/200 [00:00<?, ?it/s]

SortSample 2:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleMap 3:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleReduce 4:   0%|          | 0/200 [00:00<?, ?it/s]

Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 01:53:49,104	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-487, stopped daemon 140418050160192)>.


RayTaskError(ValueError): [36mray::map()[39m (pid=976556, ip=172.26.215.93)
  File "/home/seokj/workspace/.venv/lib/python3.10/site-packages/ray/data/grouped_data.py", line 59, in map
    parts = [BlockAccessor.for_block(p).combine(key, aggs) for p in partitions]
  File "/home/seokj/workspace/.venv/lib/python3.10/site-packages/ray/data/grouped_data.py", line 59, in <listcomp>
    parts = [BlockAccessor.for_block(p).combine(key, aggs) for p in partitions]
  File "/home/seokj/workspace/.venv/lib/python3.10/site-packages/ray/data/_internal/pandas_block.py", line 442, in combine
    accumulators[i] = aggs[i].accumulate_block(accumulators[i], group_view)
  File "/home/seokj/workspace/.venv/lib/python3.10/site-packages/ray/data/_internal/null_aggregate.py", line 216, in _accum_block_null
    ret = accum_block(block)
  File "/home/seokj/workspace/.venv/lib/python3.10/site-packages/ray/data/aggregate.py", line 230, in vectorized_mean
    sum_ = block_acc.sum(on, ignore_nulls)
  File "/home/seokj/workspace/.venv/lib/python3.10/site-packages/ray/data/_internal/pandas_block.py", line 324, in sum
    if pd.isnull(val):
ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

2023-07-04 01:53:55,096	ERROR worker.py:408 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::map()[39m (pid=976556, ip=172.26.215.93)
  File "/home/seokj/workspace/.venv/lib/python3.10/site-packages/ray/data/grouped_data.py", line 59, in map
    parts = [BlockAccessor.for_block(p).combine(key, aggs) for p in partitions]
  File "/home/seokj/workspace/.venv/lib/python3.10/site-packages/ray/data/grouped_data.py", line 59, in <listcomp>
    parts = [BlockAccessor.for_block(p).combine(key, aggs) for p in partitions]
  File "/home/seokj/workspace/.venv/lib/python3.10/site-packages/ray/data/_internal/pandas_block.py", line 442, in combine
    accumulators[i] = aggs[i].accumulate_block(accumulators[i], group_view)
  File "/home/seokj/workspace/.venv/lib/python3.10/site-packages/ray/data/_internal/null_aggregate.py", line 216, in _accum_block_null
    ret = accum_block(block)
  File "/home/seokj/workspace/.venv/lib/python3.10/site-packages/ray/data/aggregate.py", l

In [19]:
a = np.array([3, 2, 1])
np.expand_dims(a, 1)

array([[3],
       [2],
       [1]])

In [11]:
import numpy as np

a = np.array([3, 2, 1])
b = np.array([2, 2, 1]).transpose()
np.hstack([a, b])

array([3, 2, 1, 2, 2, 1])

In [16]:
train_indexes

[Index([  0,   1,   2,   3,   4,   5,   6,   7,   8,   9,
        ...
        413, 414, 415, 416, 417, 418, 419, 420, 421, 422],
       dtype='int64', length=423),
 Index([ 564,  565,  566,  567,  568,  569,  570,  571,  572,  573,
        ...
        1013, 1014, 1015, 1016, 1017, 1018, 1019, 1020, 1021, 1022],
       dtype='int64', length=459),
 Index([1177, 1178, 1179, 1180, 1181, 1182, 1183, 1184, 1185, 1186,
        ...
        1575, 1576, 1577, 1578, 1579, 1580, 1581, 1582, 1583, 1584],
       dtype='int64', length=408),
 Index([1721, 1722, 1723, 1724, 1725, 1726, 1727, 1728, 1729, 1730,
        ...
        2148, 2149, 2150, 2151, 2152, 2153, 2154, 2155, 2156, 2157],
       dtype='int64', length=437),
 Index([2304, 2305, 2306, 2307, 2308, 2309, 2310, 2311, 2312, 2313,
        ...
        2709, 2710, 2711, 2712, 2713, 2714, 2715, 2716, 2717, 2718],
       dtype='int64', length=415),
 Index([2858, 2859, 2860, 2861, 2862, 2863, 2864, 2865, 2866, 2867,
        ...
        3470, 3471, 

In [18]:
np.vstack([np.array([1, 1, 1, 1]),
np.array([2, 2, 2, 2])])

array([[1, 1, 1, 1],
       [2, 2, 2, 2]])

In [42]:
import ray
import pandas as pd
import numpy as np

def fn(group):
    return {"result": group['value'][0]}
ds = ray.data.from_items([ 
    {"group": 1, "value": 1},
    {"group": 1, "value": 2},
    {"group": 2, "value": 3},
    {"group": 2, "value": 4},
    # {"group": 1, "value": [1, 1, 1, 1]},
    # {"group": 1, "value": [2, 2, 2, 2]},
    # {"group": 2, "value": [3, 3, 3, 3]},
    # {"group": 2, "value": [4, 4, 4, 4]},
])
ds.groupby('group').aggregate(fn).take()

2023-07-04 15:00:31,110	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Aggregate]
2023-07-04 15:00:31,112	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 15:00:31,114	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- Aggregate 1:   0%|          | 0/4 [00:00<?, ?it/s]

SortSample 2:   0%|          | 0/4 [00:00<?, ?it/s]

ShuffleMap 3:   0%|          | 0/4 [00:00<?, ?it/s]

ShuffleReduce 4:   0%|          | 0/4 [00:00<?, ?it/s]

Running 0:   0%|          | 0/4 [00:00<?, ?it/s]

2023-07-04 15:00:31,202	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-550, stopped daemon 140598390998592)>.


AttributeError: 'function' object has no attribute '_validate'

In [49]:
import ray
import pandas as pd
import numpy as np

def fn(group):
    return {"result": [np.vstack(group['value'])]}
ds = ray.data.from_items([ 
    # {"group": 1, "value": [[1, 1, 1, 1],[2, 2, 2, 2]]},
    # {"group": 2, "value": [[3, 3, 3, 3],[4 ,4, 4, 4]]},
    {"group": 1, "value": [1, 1, 1, 1]},
    {"group": 1, "value": [2, 2, 2, 2]},
    {"group": 2, "value": [3, 3, 3, 3]},
    {"group": 2, "value": [4, 4, 4, 4]},
])
ds.groupby('group').map_groups(fn).take()

2023-07-04 15:05:19,304	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(group_fn)]
2023-07-04 15:05:19,305	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 15:05:19,306	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- Sort 1:   0%|          | 0/4 [00:00<?, ?it/s]

SortSample 2:   0%|          | 0/4 [00:00<?, ?it/s]

ShuffleMap 3:   0%|          | 0/4 [00:00<?, ?it/s]

ShuffleReduce 4:   0%|          | 0/4 [00:00<?, ?it/s]

Running 0:   0%|          | 0/4 [00:00<?, ?it/s]

Shuffle Map 0:   0%|          | 0/4 [00:00<?, ?it/s]

Shuffle Reduce 0:   0%|          | 0/4 [00:00<?, ?it/s]

2023-07-04 15:05:19,524	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-620, stopped daemon 140598390998592)>.


[{'result': array([[1, 1, 1, 1],
         [2, 2, 2, 2]])},
 {'result': array([[3, 3, 3, 3],
         [4, 4, 4, 4]])}]

In [6]:
ray.data.range(100).groupby('id').sum()

Aggregate
+- Dataset(num_blocks=17, num_rows=100, schema={id: int64})

0         0
1         0
2         0
3         0
4         0
         ..
52144    72
52145    72
52146    72
52147    72
52148    72
Name: id, Length: 52149, dtype: int64

In [48]:
[data.loc[train_index, ['id', 'FSR_for_force', 'force']].to_numpy() for train_index in train_indexes]

[array([[  0.,   0.,   0., ...,  22.,   0.,   0.],
        [  0.,   0.,   0., ..., 277.,   0.,   0.],
        [  0.,   0.,   0., ..., 488.,   0.,   0.],
        ...,
        [  0.,  55.,  68., ..., 632.,   3.,  70.],
        [  0.,  57.,  70., ..., 566.,   3.,  97.],
        [  0.,  60.,  71., ..., 497.,   4., 100.]]),
 array([[1.000e+00, 2.230e+02, 2.700e+02, ..., 0.000e+00, 3.500e+01,
         0.000e+00],
        [1.000e+00, 2.270e+02, 2.890e+02, ..., 0.000e+00, 3.700e+01,
         3.000e+00],
        [1.000e+00, 2.300e+02, 3.090e+02, ..., 0.000e+00, 3.400e+01,
         0.000e+00],
        ...,
        [1.000e+00, 0.000e+00, 0.000e+00, ..., 6.250e+02, 0.000e+00,
         0.000e+00],
        [1.000e+00, 0.000e+00, 0.000e+00, ..., 8.120e+02, 0.000e+00,
         0.000e+00],
        [1.000e+00, 0.000e+00, 0.000e+00, ..., 1.024e+03, 0.000e+00,
         0.000e+00]]),
 array([[ 2.,  0.,  0., ..., 71.,  0.,  0.],
        [ 2.,  0.,  0., ..., 83.,  0.,  0.],
        [ 2.,  0.,  0., ..., 96., 

In [45]:
import ray.data
import datasource
import numpy as np
data = datasource.get_data()
train_indexes, test_indexes = datasource.get_index_splited_by_time(data)
train_ds = ray.data.from_items(np.concatenate([data.loc[train_index, ['id', 'FSR_for_force', 'force']].to_numpy() for train_index in train_indexes]))
train_ds.take()

2023-07-04 12:50:46,917	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-597, stopped daemon 139909946816064)>.


[{'item': array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0., 22.,  0.,  0.])},
 {'item': array([  0.,   0.,   0.,   0.,  60.,   0.,   0.,   0.,   0.,   0., 277.,
           0.,   0.])},
 {'item': array([  0.,   0.,   0.,   0.,  73.,   0.,   0.,   0.,   0.,   0., 488.,
           0.,   0.])},
 {'item': array([  0.,   0.,   0.,   0.,  84.,   0.,   0.,   0.,   0.,   0., 501.,
           0.,   0.])},
 {'item': array([  0.,   0.,   0.,   0., 100.,   0.,   0.,   0.,   0.,   0., 540.,
           0.,   0.])},
 {'item': array([  0.,   0.,   0.,   0., 121.,   0.,   0.,   0.,   0.,   0., 660.,
           0.,   0.])},
 {'item': array([  0.,   0.,   0.,   0., 146.,   0.,   0.,   0.,   0.,   0., 837.,
           0.,   0.])},
 {'item': array([  0.,   0.,   0.,   0., 172.,   0.,   0.,   0.,   0.,   0., 980.,
           0.,   0.])},
 {'item': array([   0.,    0.,    0.,    0.,  195.,    0.,    0.,    0.,    0.,
            0., 1064.,    0.,    0.])},
 {'item': array([   0.,    0.,    0.,    0.,  

In [59]:
import datasource
data = datasource.get_data()
train_indexes, test_indexes = datasource.get_index_splited_by_time(data)
for i, train_index in enumerate(train_indexes):
    data.loc[train_index, 'group'] = i
data

Category,id,time,force,force,force,force,force,force,x_coord,x_coord,...,FSR_for_force,FSR_for_force,FSR_for_force,FSR_for_coord,FSR_for_coord,FSR_for_coord,FSR_for_coord,FSR_for_coord,FSR_for_coord,group
Position,Unnamed: 1_level_1,Unnamed: 2_level_1,A,B,C,D,E,F,A,B,...,D,E,F,A,B,C,D,E,F,Unnamed: 21_level_1
0,0,0,0.0,0.0,0.0,22.0,0.0,0.0,,,...,0.0,0.0,0.0,0.0,0.0,0.000000,0.000000,0.0,0.0,0.0
1,0,1,0.0,0.0,0.0,277.0,0.0,0.0,,,...,60.0,0.0,0.0,0.0,0.0,0.000000,0.952381,0.0,0.0,0.0
2,0,2,0.0,0.0,0.0,488.0,0.0,0.0,,,...,73.0,0.0,0.0,0.0,0.0,0.000000,1.158730,0.0,0.0,0.0
3,0,3,0.0,0.0,0.0,501.0,0.0,0.0,,,...,84.0,0.0,0.0,0.0,0.0,0.000000,1.333333,0.0,0.0,0.0
4,0,4,0.0,0.0,0.0,540.0,0.0,0.0,,,...,100.0,0.0,0.0,0.0,0.0,0.000000,1.587302,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
52144,72,635,0.0,0.0,0.0,983.0,0.0,0.0,,,...,180.0,0.0,0.0,0.0,0.0,0.587302,2.857143,0.0,0.0,
52145,72,636,0.0,0.0,0.0,962.0,0.0,0.0,,,...,172.0,0.0,0.0,0.0,0.0,0.603175,2.730159,0.0,0.0,
52146,72,637,0.0,0.0,0.0,910.0,0.0,0.0,,,...,165.0,0.0,0.0,0.0,0.0,0.603175,2.619048,0.0,0.0,
52147,72,638,0.0,0.0,0.0,851.0,0.0,0.0,,,...,160.0,0.0,0.0,0.0,0.0,0.619048,2.539683,0.0,0.0,


In [1]:
import ray.data
import datasource
import numpy as np
data = datasource.get_data()
train_indexes, test_indexes = datasource.get_index_splited_by_time(data)
for i, train_index in enumerate(train_indexes):
    data.loc[train_index, 'group'] = i
train_ds = ray.data.from_items(np.concatenate([data.loc[train_index, ['FSR_for_force', 'force', 'group']].to_numpy() for train_index in train_indexes]))
def split_column(batch):
    for i in range(13):
        batch[str(i)] = batch['item'][..., i]
        batch[str(i)] = batch['item'][..., i]
    batch.pop('item')
    return batch
def combine_column(batch):
    batch['X'] = np.hstack([np.expand_dims(batch[str(i)], 1) for i in range(6)])
    batch['y'] = np.hstack([np.expand_dims(batch[str(i)], 1) for i in range(6, 12)])
    batch['group'] = batch[str(12)]
    for i in range(13):
        batch.pop(str(i))
    return batch
def group_row(group):
    return {'X': [np.vstack(group['X'])], 'y': [np.vstack(group['y'])]}
from ray.data.preprocessors import SimpleImputer, StandardScaler, BatchMapper, Chain
split_mapper = BatchMapper(split_column, batch_format='numpy')
imputer = SimpleImputer(map(str, range(12)))
scaler = StandardScaler(map(str, range(12)))
combine_mapper = BatchMapper(combine_column, batch_format='numpy')
preprocessor = Chain(split_mapper, imputer, scaler, combine_mapper)
train_ds = preprocessor.fit_transform(train_ds)

2023-07-04 15:42:54,223	INFO worker.py:1627 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m

Learn more here: https://docs.ray.io/en/master/data/faq.html#migrating-to-strict-mode[0m
2023-07-04 15:42:57,602	INFO dataset.py:2087 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2023-07-04 15:42:57,609	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper] -> AllToAllOperator[Aggregate]
2023-07-04 15:42:57,611	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 15:42:57,613	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- Aggregate 1:   0%|          | 0/200 [00:00<?, ?it/s]

SortSample 2:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleMap 3:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleReduce 4:   0%|          | 0/200 [00:00<?, ?it/s]

Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 15:43:02,074	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper]
2023-07-04 15:43:02,077	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 15:43:02,082	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 15:43:03,230	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-10, stopped daemon 139772966188608)>.
2023-07-04 15:43:04,770	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-7, stopped daemon 139772974581312)>.
2023-07-04 15:43:04,786	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[SimpleImputer] -> AllToAllOperator[Aggregate]
2023-07-04 15:43:04,789	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 15:43:04,791	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- Aggregate 1:   0%|          | 0/200 [00:00<?, ?it/s]

SortSample 2:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleMap 3:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleReduce 4:   0%|          | 0/200 [00:00<?, ?it/s]

Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 15:43:10,322	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[SimpleImputer]
2023-07-04 15:43:10,324	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 15:43:10,327	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 15:43:11,643	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-15, stopped daemon 139772966188608)>.
2023-07-04 15:43:13,345	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-13, stopped daemon 139772974581312)>.


In [9]:
def fn(batch):
    return batch.groupby('group').agg(list)
batch = BatchMapper(fn, batch_format='pandas', batch_size=60000)
for i in batch.fit_transform(train_ds).iter_rows():
    print(i)

2023-07-04 15:53:53,300	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[StandardScaler->BatchMapper->BatchMapper]
2023-07-04 15:53:53,302	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 15:53:53,304	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 15:53:55,218	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-313, stopped daemon 139772320278080)>.


{'X': [array([0., 0., 0., 0., 0., 0.]), array([ 0.,  0.,  0., 60.,  0.,  0.]), array([ 0.,  0.,  0., 73.,  0.,  0.]), array([ 0.,  0.,  0., 84.,  0.,  0.]), array([  0.,   0.,   0., 100.,   0.,   0.]), array([  0.,   0.,   0., 121.,   0.,   0.]), array([  0.,   0.,   0., 146.,   0.,   0.]), array([  0.,   0.,   0., 172.,   0.,   0.]), array([  0.,   0.,   0., 195.,   0.,   0.]), array([  0.,   0.,   0., 216.,   0.,   0.]), array([  0.,   0.,  36., 234.,   0.,   0.]), array([  0.,   0.,  37., 250.,   0.,   0.]), array([  0.,   0.,   0., 261.,   0.,   0.]), array([  0.,   0.,   0., 264.,   0.,   0.]), array([  0.,   0.,   0., 263.,   0.,   0.]), array([ 26.,   0.,  37., 260.,   0.,  36.]), array([ 27.,   0.,  40., 259.,   0.,  37.]), array([ 30.,   0.,  42., 253.,   0.,  39.]), array([ 34.,  42.,  44., 240.,   0.,  41.]), array([ 38.,  48.,  47., 221.,   0.,  43.]), array([ 40.,  51.,  51., 199.,   0.,  46.]), array([ 41.,  53.,  57., 175.,   0.,  50.]), array([ 42.,  55.,  62., 149.,   

In [5]:
a = pd.DataFrame(
    [[0, 1],
     [0, 2],
     [1, 3],
     [1, 4]], columns=['A', 'B']
).groupby('A').apply(lambda e: {'B':e['B']})
a

NameError: name 'pd' is not defined

TypeError: sum() takes at least 1 positional argument (0 given)

In [91]:
a = pd.DataFrame(
    [[0, 1],
     [0, 2],
     [1, 3],
     [1, 4]], columns=['A', 'B']
).groupby('A').agg(list)
a

Unnamed: 0_level_0,B
A,Unnamed: 1_level_1
0,"[1, 2]"
1,"[3, 4]"


In [None]:
def grouping(batch: pd.DataFrame):
    batch.groupby('group').

In [51]:
grouped_data = train_ds.groupby('group').map_groups(group_row)
grouped_data.take_all()

2023-07-04 15:07:39,930	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[StandardScaler->BatchMapper]
2023-07-04 15:07:39,932	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 15:07:39,935	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 15:07:47,241	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-658, stopped daemon 140598390998592)>.
2023-07-04 15:07:47,442	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(group_fn)]
2023-07-04 15:07:47,444	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 15:07:47,448	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- Sort 1:   0%|          | 0/200 [00:00<?, ?it/s]

SortSample 2:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleMap 3:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleReduce 4:   0%|          | 0/200 [00:00<?, ?it/s]

Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

Shuffle Map 0:   0%|          | 0/200 [00:00<?, ?it/s]

Shuffle Reduce 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 15:08:32,190	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-660, stopped daemon 140597683803712)>.


[{'X': array([[  0.,   0.,   0.,   0.,   0.,   0.],
         [  0.,   0.,   0.,  60.,   0.,   0.],
         [  0.,   0.,   0.,  73.,   0.,   0.],
         ...,
         [ 55.,  68.,  65., 125.,  38.,  41.],
         [ 57.,  70.,  67., 117.,  38.,  42.],
         [ 60.,  71.,  69., 108.,  38.,  43.]]),
  'y': array([[  0.,   0.,   0.,  22.,   0.,   0.],
         [  0.,   0.,   0., 277.,   0.,   0.],
         [  0.,   0.,   0., 488.,   0.,   0.],
         ...,
         [ 53., 242., 390., 632.,   3.,  70.],
         [ 54., 278., 438., 566.,   3.,  97.],
         [ 57., 270., 455., 497.,   4., 100.]])},
 {'X': array([[223., 270., 179.,   0.,  44.,   0.],
         [227., 289., 187.,   0.,  46.,   0.],
         [230., 309., 191.,   0.,  49.,   0.],
         ...,
         [  0.,   0.,   0., 180.,   0.,   0.],
         [  0.,   0.,   0., 220.,   0.,   0.],
         [  0.,   0.,   0., 258.,   0.,   0.]]),
  'y': array([[ 232.,  941.,  758.,    0.,   35.,    0.],
         [ 251.,  974.,  792.,  

In [54]:
grouped_data.count()

2023-07-04 15:12:39,556	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper->SimpleImputer->StandardScaler->BatchMapper] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(group_fn)]
2023-07-04 15:12:39,559	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 15:12:39,561	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- Sort 1:   0%|          | 0/200 [00:00<?, ?it/s]

SortSample 2:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleMap 3:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleReduce 4:   0%|          | 0/200 [00:00<?, ?it/s]

Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

Shuffle Map 0:   0%|          | 0/200 [00:00<?, ?it/s]

Shuffle Reduce 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 15:13:30,070	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-721, stopped daemon 140597675411008)>.


73

In [3]:
for e in grouped_data.iter_rows():
    print(f'{e["X"] = }')
    # print(f'{e["y"] = }')
    # print(f'{e["group"] = }')
    # print(f'{y = }')
    break

2023-07-04 14:27:52,096	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper->SimpleImputer->StandardScaler->BatchMapper] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(group_fn)]
2023-07-04 14:27:52,099	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 14:27:52,102	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- Sort 1:   0%|          | 0/200 [00:00<?, ?it/s]

SortSample 2:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleMap 3:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleReduce 4:   0%|          | 0/200 [00:00<?, ?it/s]

Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

Shuffle Map 0:   0%|          | 0/200 [00:00<?, ?it/s]

Shuffle Reduce 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 14:28:41,494	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-33, stopped daemon 140598382605888)>.


e["X"] = array([0., 0., 0., 0., 0., 0.])


In [4]:
grouped_data

MapBatches(group_fn)
+- Sort
   +- BatchMapper
      +- StandardScaler
         +- SimpleImputer
            +- BatchMapper
               +- Dataset(
                     num_blocks=200,
                     num_rows=39079,
                     schema={item: numpy.ndarray(shape=(13,), dtype=double)}
                  )

In [11]:
train_ds.take(0)[0]['0'].shape

(423,)

In [None]:
import ray.data
import datasource
import numpy as np
data = datasource.get_data()
train_indexes, test_indexes = datasource.get_index_splited_by_time(data)
train_ds = ray.data.from_items(np.concatenate([data.loc[train_index, ['FSR_for_force', 'force']].to_numpy() for train_index in train_indexes]))
def split_column(batch):
    for i in range(12):
        batch[str(i)] = batch['item'][..., i]
        batch[str(i)] = batch['item'][..., i]
    batch.pop('item')
    return batch
def combine_column(batch):
    batch['X'] = np.hstack([np.expand_dims(batch[str(i)], 1) for i in range(6)])
    batch['y'] = np.hstack([np.expand_dims(batch[str(i)], 1) for i in range(6, 12)])
    for i in range(12):
        batch.pop(str(i))
    return batch
from ray.data.preprocessors import SimpleImputer, StandardScaler, BatchMapper, Chain
split_mapper = BatchMapper(split_column, batch_format='numpy')
imputer = SimpleImputer(map(str, range(12)))
scaler = StandardScaler(map(str, range(12)))
combine_mapper = BatchMapper(combine_column, batch_format='numpy')
preprocessor = Chain(split_mapper)
# preprocessor = Chain(split_mapper, imputer, scaler, combine_mapper)
train_ds = preprocessor.fit_transform(train_ds)
train_ds.schema()

In [30]:
import ray.data
import datasource
import numpy as np
data = datasource.get_data()
train_indexes, test_indexes = datasource.get_index_splited_by_time(data)
train_ds = ray.data.from_items(np.concatenate([data.loc[train_index, ['FSR_for_force', 'force']].to_numpy() for train_index in train_indexes]))
def split_column(batch):
    for i in range(12):
        batch[str(i)] = batch['item'][..., i]
        batch[str(i)] = batch['item'][..., i]
    batch.pop('item')
    return batch
def combine_column(batch):
    batch['X'] = np.hstack([np.expand_dims(batch[str(i)], 1) for i in range(6)])
    batch['y'] = np.hstack([np.expand_dims(batch[str(i)], 1) for i in range(6, 12)])
    for i in range(12):
        batch.pop(str(i))
    return batch
from ray.data.preprocessors import SimpleImputer, StandardScaler, BatchMapper, Chain
split_mapper = BatchMapper(split_column, batch_format='numpy')
imputer = SimpleImputer(map(str, range(12)))
scaler = StandardScaler(map(str, range(12)))
combine_mapper = BatchMapper(combine_column, batch_format='numpy')
preprocessor = Chain(split_mapper, imputer, scaler, combine_mapper)
train_ds = preprocessor.fit_transform(train_ds)
train_ds.schema()

2023-07-04 14:46:25,190	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper] -> AllToAllOperator[Aggregate]
2023-07-04 14:46:25,192	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 14:46:25,192	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- Aggregate 1:   0%|          | 0/200 [00:00<?, ?it/s]

SortSample 2:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleMap 3:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleReduce 4:   0%|          | 0/200 [00:00<?, ?it/s]

Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 14:46:27,752	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper]
2023-07-04 14:46:27,753	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 14:46:27,755	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 14:46:28,757	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-360, stopped daemon 140598399391296)>.
2023-07-04 14:46:31,117	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-356, stopped daemon 140598390998592)>.
2023-07-04 14:46:31,144	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[SimpleImputer] -> AllToAllOperator[Aggregate]
2023-07-04 14:46:31,147	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 14:46:31,150	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- Aggregate 1:   0%|          | 0/200 [00:00<?, ?it/s]

SortSample 2:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleMap 3:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleReduce 4:   0%|          | 0/200 [00:00<?, ?it/s]

Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 14:46:38,036	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[SimpleImputer]
2023-07-04 14:46:38,038	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 14:46:38,040	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 14:46:39,753	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-365, stopped daemon 140598399391296)>.
2023-07-04 14:46:41,918	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-363, stopped daemon 140598390998592)>.
2023-07-04 14:46:41,955	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[StandardScaler->BatchMapper]
2023-07-04 14:46:41,957	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 14:46:41,960	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 14:46:48,533	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-368, stopped daemon 140598390998592)>.


Column  Type
------  ----
X       numpy.ndarray(shape=(6,), dtype=double)
y       numpy.ndarray(shape=(6,), dtype=double)

In [15]:
train_ds.take()[0]

{'0': 0.0,
 '1': 0.0,
 '2': 0.0,
 '3': 0.0,
 '4': 0.0,
 '5': 0.0,
 '6': 0.0,
 '7': 0.0,
 '8': 0.0,
 '9': 22.0,
 '10': 0.0,
 '11': 0.0}

In [81]:
import ray.data
import datasource
data = datasource.get_data()
train_indexes, test_indexes = datasource.get_index_splited_by_time(data)
train_ds = ray.data.from_items(np.concatenate([data.loc[train_index, ['FSR_for_force', 'force']].to_numpy() for train_index in train_indexes]))
# test_ds = ray.data.from_items([data.loc[test_index, ['FSR_for_force', 'force']] for test_index in test_indexes])
def rename_column(batch):
    batch['X'] = batch['item'][..., :6]
    batch['y'] = batch['item'][..., 6:]
    batch.pop('item')
    return batch
train_ds = train_ds.map_batches(rename_column)
test_ds = test_ds.map_batches(rename_column)
from ray.data.preprocessors import SimpleImputer, StandardScaler
scaler = StandardScaler(['item'])
scaler.fit_transform(train_ds)
train_ds


2023-07-04 01:29:24,451	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(rename_column)] -> AllToAllOperator[Aggregate]
2023-07-04 01:29:24,454	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 01:29:24,455	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- Aggregate 1:   0%|          | 0/200 [00:00<?, ?it/s]

SortSample 2:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleMap 3:   0%|          | 0/200 [00:00<?, ?it/s]

ShuffleReduce 4:   0%|          | 0/200 [00:00<?, ?it/s]

Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 01:29:27,535	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(rename_column)]
2023-07-04 01:29:27,537	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-04 01:29:27,539	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2023-07-04 01:29:28,779	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-3472, stopped daemon 140084528522816)>.
2023-07-04 01:29:28,937	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-3470, stopped daemon 140083982235200)>.


ValueError: The column 'item' does not exist in the schema 'Column  Type
------  ----
X       numpy.ndarray(shape=(6,), dtype=double)
y       numpy.ndarray(shape=(6,), dtype=double)'.

In [49]:
from ray.air.config import ScalingConfig, RunConfig, CheckpointConfig
from ray.air.integrations.wandb import WandbLoggerCallback
from ray.train.torch import TorchTrainer
from ray.tune.stopper import TrialPlateauStopper, ExperimentPlateauStopper, CombinedStopper
from ray.data.preprocessors import Chain, SimpleImputer, MaxAbsScaler, MinMaxScaler, PowerTransformer, RobustScaler, StandardScaler

trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker, 
    train_loop_config={
        'model':'fsr_model.LSTM',
        'model_args':{
            'hidden_size':8,
            'num_layer':1,
        },
        'criterion':'torch.nn.MSELoss',
        'optimizer':'torch.optim.Adam',
        'optimizer_args':{
            'lr': 1e-3,
        },
        'scaler':'sklearn.preprocessing.StandardScaler',
    },
    scaling_config=ScalingConfig(
        num_workers=2,
        use_gpu=False,
    ),
    run_config=RunConfig(
        # callbacks=[
        #     WandbLoggerCallback(project='FSR-prediction'),
        # ],
        stop=CombinedStopper(
            TrialPlateauStopper(metric='rmse'),
            ExperimentPlateauStopper(metric='rmse'),
        ),
        checkpoint_config=CheckpointConfig(
            num_to_keep=3,
            checkpoint_score_attribute='rmse',
            checkpoint_score_order='min',
        ),
    ),
    datasets={
        'train':train_ds,
        'test':test_ds,
    },
    preprocessor=Chain(
        SimpleImputer(strategy='mean', ),
        StandardScaler(),
    ),
)
result = trainer.fit()
print(f"Last result: {result.metrics}")

TypeError: SimpleImputer.__init__() missing 1 required positional argument: 'columns'

In [2]:
from glob import glob
import pandas as pd
paths = glob('./data/*/*/*')
filedata = pd.DataFrame([path.split('/')[2:] for path in paths], columns=['subject', 'pose', 'filename'])
filedata['path'] = paths
filedata = filedata.sort_values(['subject', 'pose']).reset_index(drop=True)
filedata
data = []
for index, value in filedata.iterrows():
    df = pd.read_pickle(value['path'])
    df = df.rename_axis('time').reset_index()
    df['id'] = index
    cols = df.columns.to_list()
    df = df[cols[-1:] + cols[:-1]]
    data.append(df)
data = pd.concat(data)
data = data.reset_index(drop=True)
data

Category,id,time,force,force,force,force,force,force,x_coord,x_coord,...,FSR_for_force,FSR_for_force,FSR_for_force,FSR_for_force,FSR_for_coord,FSR_for_coord,FSR_for_coord,FSR_for_coord,FSR_for_coord,FSR_for_coord
Position,Unnamed: 1_level_1,Unnamed: 2_level_1,A,B,C,D,E,F,A,B,...,C,D,E,F,A,B,C,D,E,F
0,0,0,0.0,0.0,0.0,22.0,0.0,0.0,,,...,0.0,0.0,0.0,0.0,0.0,0.0,0.000000,0.000000,0.0,0.0
1,0,1,0.0,0.0,0.0,277.0,0.0,0.0,,,...,0.0,60.0,0.0,0.0,0.0,0.0,0.000000,0.952381,0.0,0.0
2,0,2,0.0,0.0,0.0,488.0,0.0,0.0,,,...,0.0,73.0,0.0,0.0,0.0,0.0,0.000000,1.158730,0.0,0.0
3,0,3,0.0,0.0,0.0,501.0,0.0,0.0,,,...,0.0,84.0,0.0,0.0,0.0,0.0,0.000000,1.333333,0.0,0.0
4,0,4,0.0,0.0,0.0,540.0,0.0,0.0,,,...,0.0,100.0,0.0,0.0,0.0,0.0,0.000000,1.587302,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
52144,72,635,0.0,0.0,0.0,983.0,0.0,0.0,,,...,37.0,180.0,0.0,0.0,0.0,0.0,0.587302,2.857143,0.0,0.0
52145,72,636,0.0,0.0,0.0,962.0,0.0,0.0,,,...,38.0,172.0,0.0,0.0,0.0,0.0,0.603175,2.730159,0.0,0.0
52146,72,637,0.0,0.0,0.0,910.0,0.0,0.0,,,...,38.0,165.0,0.0,0.0,0.0,0.0,0.603175,2.619048,0.0,0.0
52147,72,638,0.0,0.0,0.0,851.0,0.0,0.0,,,...,39.0,160.0,0.0,0.0,0.0,0.0,0.619048,2.539683,0.0,0.0


In [3]:
import torch

class FSRDataset(torch.utils.data.Dataset):
    def __init__(self, X_df, y_df, index):
        assert(len(X_df) == len(y_df))
        self.X_df = X_df
        self.y_df = y_df
        self.index = index

    def __len__(self):
        return len(self.index)
    
    def __getitem__(self, idx):
        import numpy as np
        X = self.X_df.loc[self.index[idx]].to_numpy().astype(np.float32)
        y = self.y_df.loc[self.index[idx]].to_numpy().astype(np.float32)
        return X, y

In [4]:
def get_index_splited_by_time(test_size=None):
    from sklearn.model_selection import train_test_split
    train_indexes = []
    test_indexes = []
    for _, group in data.groupby('id'):
        train_index, test_index = train_test_split(group.index, test_size=0.2, shuffle=False)
        train_indexes.append(train_index)
        test_indexes.append(test_index)
    return train_indexes, test_indexes

In [5]:
class LSTM(torch.nn.Module):
    def __init__(self, input_size, hidden_size, num_layer, output_size):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layer = num_layer
        self.output_size = output_size
        self.encoder = torch.nn.LSTM(input_size, hidden_size, num_layer)
        self.decoder = torch.nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        x, _ = self.encoder(x)
        x = self.decoder(x)
        return x

In [6]:
import torch
from ray.air import session
from ray.air.config import ScalingConfig
from sklearn.metrics import mean_absolute_error, mean_squared_error
import ray.train.torch
import numpy as np
import random
import model as net
import torch.utils.data

def train_loop_per_worker(config):
    ray.train.torch.enable_reproducibility()
    model_name = config['model_name']
    model_args = config['model_args']
    num_epochs = config['num_epochs']
    criterion_name = config['criterion_name']
    optimizer_name = config['optimizer_name']
    lr = config['lr']

    model = getattr(net, model_name)(**model_args)
    model = ray.train.torch.prepare_model(model)
    criterion = getattr(torch.nn, criterion_name)()
    optimizer = getattr(torch.optim, optimizer_name)(model.parameters(), lr=lr)

    train_index, test_index = get_index_splited_by_time()
    train_dataset = FSRDataset(data['FSR_for_force'], data['force'], train_index)
    test_dataset = FSRDataset(data['FSR_for_force'], data['force'], test_index)
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=None)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=None)
    train_loader = ray.train.torch.prepare_data_loader(train_loader)
    test_loader = ray.train.torch.prepare_data_loader(test_loader)

    best_rmse = float('inf')
    
    for epoch in range(num_epochs):
        model.train()
        criterion.train()
        for X, y in train_loader:
            pred = model(X)
            loss = criterion(pred, y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        model.eval()
        criterion.eval()
        with torch.no_grad():
            mae = []
            mse = []
            num = []
            for X, y in test_loader:
                pred = model(X)
                mae.append(mean_absolute_error(y, pred.cpu().detach()))
                mse.append(mean_squared_error(y, pred.cpu().detach()))
                num.append(len(y))
            mae = np.array(mae)
            mse = np.array(mse)
            num = np.array(num)
            mae = (mae * num).sum() / sum(num)
            mse = (mse * num).sum() / sum(num)
            
            rmse = mse ** 0.5
            if rmse < best_rmse:
                best_rmse = rmse

            session.report({'MAE': mae, 'RMSE': rmse})
                
    return best_rmse

trainer = ray.train.torch.TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config={
        'batch_size':128,
        'lr':0.0001,
        'model_name':'LSTM',
        'model_args':{
            'input_size':6, 
            'hidden_size':128, 
            'num_layer':4, 
            'output_size':6,
        },
        'num_epochs': 32,
        'criterion_name': 'MSELoss',
        'optimizer_name': 'Adam',
    },
    scaling_config=ScalingConfig(
        num_workers=3,
        use_gpu=False,
        trainer_resources={'CPU':0}
    ),
)

In [7]:
trainer.fit()

2023-06-28 10:26:22,595	INFO worker.py:1627 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
2023-06-28 10:26:23,797	INFO tune.py:226 -- Initializing Ray automatically. For cluster usage or custom Ray initialization, call `ray.init(...)` before `Trainer(...)`.


0,1
Current time:,2023-06-28 10:28:10
Running for:,00:01:46.36
Memory:,4.4/7.7 GiB

Trial name,status,loc,iter,total time (s),MAE,RMSE
TorchTrainer_cb57c_00000,TERMINATED,172.26.215.93:84502,32,100.355,227.397,525.853


[2m[36m(TorchTrainer pid=84502)[0m 2023-06-28 10:26:29,797	INFO backend_executor.py:137 -- Starting distributed worker processes: ['84566 (172.26.215.93)', '84567 (172.26.215.93)', '84568 (172.26.215.93)']
[2m[36m(RayTrainWorker pid=84566)[0m 2023-06-28 10:26:31,419	INFO config.py:86 -- Setting up process group for: env:// [rank=0, world_size=3]
[2m[36m(RayTrainWorker pid=84566)[0m 2023-06-28 10:26:33,295	INFO train_loop_utils.py:286 -- Moving model to device: cpu
[2m[36m(RayTrainWorker pid=84566)[0m 2023-06-28 10:26:33,296	INFO train_loop_utils.py:346 -- Wrapping provided model in DistributedDataParallel.


Trial name,MAE,RMSE,date,done,experiment_tag,hostname,iterations_since_restore,node_ip,pid,time_since_restore,time_this_iter_s,time_total_s,timestamp,training_iteration,trial_id
TorchTrainer_cb57c_00000,227.397,525.853,2023-06-28_10-28-08,True,0,DESKTOP-0P789CI,32,172.26.215.93,84502,100.355,2.81067,100.355,1687915688,32,cb57c_00000


2023-06-28 10:28:10,209	INFO tune.py:1111 -- Total run time: 106.41 seconds (106.36 seconds for the tuning loop).


Result(
  metrics={'MAE': 227.39678746011765, 'RMSE': 525.8533704863545, 'done': True, 'trial_id': 'cb57c_00000', 'experiment_tag': '0'},
  path='/home/seokj/ray_results/TorchTrainer_2023-06-28_10-26-20/TorchTrainer_cb57c_00000_0_2023-06-28_10-26-23',
  checkpoint=None
)