**텐서플로에서 데이터 적재와 전처리하기**

# 설정

먼저 몇 개의 모듈을 임포트한다. 맷플롯립 그림을 저장하는 함수를 준비한다.

In [1]:
# 공통 모듈 임포트
import matplotlib as mpl
import os
import matplotlib.pyplot as plt

# 깔끔한 그래프 출력을 위해
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)
# 그림을 저장할 위치
PROJECT_ROOT_DIR = '.'
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, 'images')
os.makedirs(IMAGES_PATH, exist_ok=True)


def save_fig(fig_id, tight_layout=True, fig_extension='png', resolution=300):
    path = os.path.join(IMAGES_PATH, f'{fig_id}.{fig_extension}')
    print(f'그림 저장: {fig_id}')
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

## 데이터셋

In [2]:
import tensorflow as tf

X = tf.range(10)
dataset = tf.data.Dataset.from_tensor_slices(X)
dataset

<TensorSliceDataset element_spec=TensorSpec(shape=(), dtype=tf.int32, name=None)>

In [3]:
dataset = tf.data.Dataset.range(10)

In [4]:
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(7, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(9, shape=(), dtype=int64)


In [5]:
dataset = dataset.repeat(3).batch(7)
for item in dataset:
    print(item)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int64)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int64)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int64)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int64)
tf.Tensor([8 9], shape=(2,), dtype=int64)


In [6]:
dataset = dataset.map(lambda x: x * 2)

In [7]:
for item in dataset:
    print(item)

tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int64)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int64)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int64)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int64)
tf.Tensor([16 18], shape=(2,), dtype=int64)


In [8]:
dataset = dataset.unbatch()

In [9]:
dataset = dataset.filter(lambda x: x < 10)  # < 10 항목만 유지

In [10]:
for item in dataset.take(3):
    print(item)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)


In [11]:
dataset = tf.data.Dataset.range(10).repeat(3)
dataset = dataset.shuffle(buffer_size=3).batch(7)
for item in dataset:
    print(item)

tf.Tensor([0 3 4 5 1 2 8], shape=(7,), dtype=int64)
tf.Tensor([9 6 0 7 1 4 2], shape=(7,), dtype=int64)
tf.Tensor([6 7 5 9 3 0 1], shape=(7,), dtype=int64)
tf.Tensor([3 8 2 5 7 4 8], shape=(7,), dtype=int64)
tf.Tensor([9 6], shape=(2,), dtype=int64)


## 캘리포니아 주택 데이터셋을 여러 개의 CSV로 나누기

캘리포니아 주택 데이터셋을 로드하고 준비한다. 먼저 로드한 다음 훈련 세트, 검증 세트, 테스트 세트로 나눈다. 마지막으로 스케일을 조정한다:

In [12]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(housing.data, housing.target.reshape(-1, 1))
X_train, X_valid, y_train, y_valid = train_test_split(X_train_full, y_train_full)
scaler = StandardScaler()
scaler.fit(X_train)
X_mean = scaler.mean_
X_std = scaler.scale_

메모리에 맞지 않는 매우 큰 데이터셋인 경우 일반적으로 먼저 여러 개의 파일로 나누고 텐서플로에서 이 파일들을 병렬로 읽게한다. 데모를 위해 주택 데이터셋을 20개의 CSV 파일로 나누어 본다:

In [13]:
def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = os.path.join('datasets', 'housing')
    os.makedirs(housing_dir, exist_ok=True)
    path_format = os.path.join(housing_dir, 'my_{}_{:02d}.csv')
    filepaths = []
    m = len(data)
    for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filepaths.append(part_csv)
        with open(part_csv, 'wt', encoding='utf-8') as f:
            if header is not None:
                f.write(header)
                f.write('\n')
            for row_idx in row_indices:
                f.write(','.join([repr(col) for col in data[row_idx]]))
                f.write('\n')
    return filepaths

In [14]:
import numpy as np

train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ['MedianHouseValue']
header = ','.join(header_cols)
train_filepaths = save_to_multiple_csv_files(train_data, 'train', header, n_parts=20)
valid_filepaths = save_to_multiple_csv_files(valid_data, 'valid', header, n_parts=10)
test_filepaths = save_to_multiple_csv_files(test_data, 'test', header, n_parts=10)

좋다. 이 CSV 파일 중 하나에서 몇 줄을 출력해 본다:

In [15]:
import pandas as pd

pd.read_csv(train_filepaths[0]).head()

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
0,3.5781,33.0,6.641509,1.287736,520.0,2.45283,38.34,-122.4,2.425
1,5.8309,52.0,6.477876,0.933628,603.0,2.668142,34.18,-118.13,3.091
2,2.3768,18.0,4.906,1.074,1503.0,3.006,34.07,-117.43,0.953
3,6.2488,16.0,5.461538,0.903134,1324.0,3.77208,33.71,-117.9,2.52
4,3.6397,33.0,4.464286,1.05042,1672.0,3.512605,33.97,-118.11,1.666


텍스트 파일로 읽으면 다음과 같다:

In [16]:
with open(train_filepaths[0]) as f:
    for _ in range(5):
        print(f.readline(), end='')

MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
3.5781,33.0,6.6415094339622645,1.2877358490566038,520.0,2.452830188679245,38.34,-122.4,2.425
5.8309,52.0,6.477876106194691,0.9336283185840708,603.0,2.668141592920354,34.18,-118.13,3.091
2.3768,18.0,4.906,1.074,1503.0,3.006,34.07,-117.43,0.953
6.2488,16.0,5.461538461538462,0.9031339031339032,1324.0,3.7720797720797723,33.71,-117.9,2.52


In [17]:
train_filepaths

['datasets\\housing\\my_train_00.csv',
 'datasets\\housing\\my_train_01.csv',
 'datasets\\housing\\my_train_02.csv',
 'datasets\\housing\\my_train_03.csv',
 'datasets\\housing\\my_train_04.csv',
 'datasets\\housing\\my_train_05.csv',
 'datasets\\housing\\my_train_06.csv',
 'datasets\\housing\\my_train_07.csv',
 'datasets\\housing\\my_train_08.csv',
 'datasets\\housing\\my_train_09.csv',
 'datasets\\housing\\my_train_10.csv',
 'datasets\\housing\\my_train_11.csv',
 'datasets\\housing\\my_train_12.csv',
 'datasets\\housing\\my_train_13.csv',
 'datasets\\housing\\my_train_14.csv',
 'datasets\\housing\\my_train_15.csv',
 'datasets\\housing\\my_train_16.csv',
 'datasets\\housing\\my_train_17.csv',
 'datasets\\housing\\my_train_18.csv',
 'datasets\\housing\\my_train_19.csv']

## 입력 파이프라인 만들기

In [18]:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths)

In [19]:
for filepath in filepath_dataset:
    print(filepath)

tf.Tensor(b'datasets\\housing\\my_train_17.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_15.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_01.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_11.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_07.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_14.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_03.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_16.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_00.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_08.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_19.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_05.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_18.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_13.csv', sh

In [20]:
n_readers = 5
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1), cycle_length=n_readers
)

In [21]:
for line in dataset.take(5):
    print(line.numpy())

b'4.1375,38.0,5.095070422535211,1.0316901408450705,1042.0,3.6690140845070425,34.03,-118.14,2.115'
b'3.6389,42.0,3.994296577946768,0.9961977186311787,1212.0,2.3041825095057034,37.34,-121.92,2.392'
b'2.8061,14.0,4.8798076923076925,1.0721153846153846,1974.0,1.5817307692307692,33.66,-117.97,1.803'
b'7.8627,24.0,7.1451149425287355,1.0186781609195403,1692.0,2.4310344827586206,37.26,-122.04,5.00001'
b'7.9213,44.0,6.44811320754717,0.9787735849056604,1057.0,2.4929245283018866,34.13,-118.18,4.778'


네 번째 필드의 4는 문자열로 해석된다:

In [22]:
record_defaults = [0, np.nan, tf.constant(np.nan, dtype=tf.float64), 'Hello', tf.constant([])]
parsed_fields = tf.io.decode_csv('1,2,3,4,5', record_defaults)
parsed_fields

[<tf.Tensor: shape=(), dtype=int32, numpy=1>,
 <tf.Tensor: shape=(), dtype=float32, numpy=2.0>,
 <tf.Tensor: shape=(), dtype=float64, numpy=3.0>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'4'>,
 <tf.Tensor: shape=(), dtype=float32, numpy=5.0>]

누락된 값은 제공된 기본값으로 대체된다:

In [23]:
parsed_fields = tf.io.decode_csv(',,,,5', record_defaults)
parsed_fields

[<tf.Tensor: shape=(), dtype=int32, numpy=0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=nan>,
 <tf.Tensor: shape=(), dtype=float64, numpy=nan>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'Hello'>,
 <tf.Tensor: shape=(), dtype=float32, numpy=5.0>]

다섯 번째 필드는 필수이다(기본값을 `tf.constant([])`로 지정했기 때문에). 따라서 값을 전달하지 않으면 예외가 발생한다:

In [24]:
try:
    parsed_fields = tf.io.decode_csv(',,,,', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

{{function_node __wrapped__DecodeCSV_OUT_TYPE_5_device_/job:localhost/replica:0/task:0/device:CPU:0}} Field 4 is required but missing in record 0! [Op:DecodeCSV]


필드 개수는 `record_defaults`에 있는 필드 개수와 정확히 맞아야 한다:

In [25]:
try:
    parsed_fields = tf.io.decode_csv('1,2,3,4,5,6,7', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

{{function_node __wrapped__DecodeCSV_OUT_TYPE_5_device_/job:localhost/replica:0/task:0/device:CPU:0}} Expect 5 fields but have 7 in record 0 [Op:DecodeCSV]


In [26]:
n_inputs = 8  # X_train.shape[-1]


@tf.function
def preprocess(line):
    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(fields[:-1])
    y = tf.stack(fields[-1:])
    return (x - X_mean) / X_std, y

In [27]:
preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 0.1713626 ,  1.2126731 , -0.04916688, -0.4360729 , -0.49995568,
        -0.06143444,  0.85889053, -1.3097554 ], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.782], dtype=float32)>)

In [28]:
def csv_reader_dataset(
        filepaths,
        repeat=1,
        n_readers=5,
        n_read_threads=None,
        shuffle_buffer_size=10000,
        n_parse_threads=5,
        batch_size=32
):
    dataset = tf.data.Dataset.list_files(filepaths).repeat(repeat)
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
        cycle_length=n_readers,
        num_parallel_calls=n_read_threads
    )
    dataset = dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset.prefetch(1)

In [29]:
train_set = csv_reader_dataset(train_filepaths, batch_size=3)
for X_batch, y_batch in train_set.take(2):
    print('X =', X_batch)
    print('y =', y_batch)
    print()

X = tf.Tensor(
[[-0.92626244 -1.4098945   8.284036    9.780367   -0.92926496 -0.08102798
   0.22794043  0.51490444]
 [-0.03680772 -1.7277814   0.18190964 -0.20809142  0.7485458  -0.01352079
   1.6487447  -1.0206043 ]
 [ 0.44563833  0.6563708  -0.13266188 -0.33095726 -0.13110015  0.01380935
   0.9009539  -1.3596127 ]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[0.991]
 [1.107]
 [2.718]], shape=(3, 1), dtype=float32)

X = tf.Tensor(
[[-0.31191775 -0.93306404 -0.2022773  -0.25429323 -0.45702475 -0.07035509
   1.307565   -1.5440706 ]
 [ 3.358875    0.2590121   1.0735925  -0.19028421  0.5978495  -0.0206938
   1.0598596  -1.2798442 ]
 [ 0.39944795 -0.37676182  0.36133352 -0.09664767  0.13261841 -0.05216635
  -0.8470095   0.75420606]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[2.027  ]
 [5.00001]
 [2.134  ]], shape=(3, 1), dtype=float32)



In [30]:
train_set = csv_reader_dataset(train_filepaths, repeat=None)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)

In [31]:
from tensorflow import keras

keras.backend.clear_session()
model = keras.models.Sequential(
    [keras.layers.Dense(30, activation="relu", input_shape=X_train.shape[1:]), keras.layers.Dense(1)]
)

In [32]:
model.compile(loss="mse", optimizer=keras.optimizers.SGD(learning_rate=1e-3))

In [33]:
batch_size = 32
model.fit(train_set, steps_per_epoch=len(X_train) // batch_size, epochs=10, validation_data=valid_set)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x26e9781e0b0>

In [34]:
model.evaluate(test_set, steps=len(X_test) // batch_size)



0.5311558842658997

In [35]:
new_set = test_set.map(lambda X, y: X)  # 대신 test_set을 전달할 수 있다. Keras는 레이블을 무시한다.
X_new = X_test
model.predict(new_set, steps=len(X_new) // batch_size)



array([[2.032022 ],
       [1.4179717],
       [2.871379 ],
       ...,
       [2.464898 ],
       [2.1004734],
       [2.102226 ]], dtype=float32)

In [36]:
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error
n_epochs = 5
batch_size = 32
n_steps_per_epoch = len(X_train) // batch_size
total_steps = n_epochs * n_steps_per_epoch
global_step = 0
for X_batch, y_batch in train_set.take(total_steps):
    global_step += 1
    print(f'\rGlobal step {global_step}/{total_steps}', end='')
    with tf.GradientTape() as tape:
        y_pred = model(X_batch)
        main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
        loss = tf.add_n([main_loss] + model.losses)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

Global step 1810/1810

In [37]:
keras.backend.clear_session()

In [38]:
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error


@tf.function
def train(model, n_epochs, batch_size=32, n_readers=5, n_read_threads=5, shuffle_buffer_size=10000, n_parse_threads=5):
    train_set = csv_reader_dataset(
        train_filepaths,
        repeat=n_epochs,
        n_readers=n_readers,
        n_read_threads=n_read_threads,
        shuffle_buffer_size=shuffle_buffer_size,
        n_parse_threads=n_parse_threads,
        batch_size=batch_size
    )
    for X_batch, y_batch in train_set:
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))


train(model, 5)

In [46]:
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error


@tf.function
def train(model, n_epochs, batch_size=32, n_readers=5, n_read_threads=5, shuffle_buffer_size=10000, n_parse_threads=5):
    train_set = csv_reader_dataset(
        train_filepaths,
        repeat=n_epochs,
        n_readers=n_readers,
        n_read_threads=n_read_threads,
        shuffle_buffer_size=shuffle_buffer_size,
        n_parse_threads=n_parse_threads,
        batch_size=batch_size
    )
    n_steps_per_epoch = len(X_train) // batch_size
    total_steps = n_epochs * n_steps_per_epoch
    global_step = 0
    for X_batch, y_batch in train_set.take(total_steps):
        global_step += 1
        if tf.equal(global_step % 100, 0):
            tf.print("\rGlobal step", global_step, "/", total_steps)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))


train(model, 5)

Global step 100 / 1810
Global step 200 / 1810
Global step 300 / 1810
Global step 400 / 1810
Global step 500 / 1810
Global step 600 / 1810
Global step 700 / 1810
Global step 800 / 1810
Global step 900 / 1810
Global step 1000 / 1810
Global step 1100 / 1810
Global step 1200 / 1810
Global step 1300 / 1810
Global step 1400 / 1810
Global step 1500 / 1810
Global step 1600 / 1810
Global step 1700 / 1810
Global step 1800 / 1810


`Dataset` 클래스에 있는 메서드의 간략한 설명이다:

In [73]:
for m in dir(tf.data.Dataset):
    if not (m.startswith('_') or m.endswith('_')):
        func = getattr(tf.data.Dataset, m)
        if hasattr(func, '__doc__'):
            print('● {:21s}{}'.format(m + '()', func.__doc__.split('\n')[0]))

● apply()              Applies a transformation function to this dataset.
● as_numpy_iterator()  Returns an iterator which converts all elements of the dataset to numpy.
● batch()              Combines consecutive elements of this dataset into batches.
● bucket_by_sequence_length()A transformation that buckets elements in a `Dataset` by length.
● cache()              Caches the elements in this dataset.
● cardinality()        Returns the cardinality of the dataset, if known.
● choose_from_datasets()Creates a dataset that deterministically chooses elements from `datasets`.
● concatenate()        Creates a `Dataset` by concatenating the given dataset with this dataset.
● element_spec()       The type specification of an element of this dataset.
● enumerate()          Enumerates the elements of this dataset.
● filter()             Filters this dataset according to `predicate`.
● flat_map()           Maps `map_func` across this dataset and flattens the result.
● from_generator()     Create