In [1]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import utils

## Split California housing to multiple CSV files

In [2]:
DATA_DIR = './data'
os.makedirs(DATA_DIR, exist_ok=True)

In [3]:
housing = fetch_california_housing()
x_train, x_test, y_train, y_test = train_test_split(housing.data, housing.target, random_state=42)
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, random_state=42)
print(f"x_train.shape = {x_train.shape}, y_train.shape = {y_train.shape}")
print(f"x_val.shape = {x_val.shape}, y_val.shape = {y_val.shape}")
print(f"x_test.shape = {x_test.shape}, y_test.shape = {y_test.shape}")

x_train.shape = (11610, 8), y_train.shape = (11610,)
x_val.shape = (3870, 8), y_val.shape = (3870,)
x_test.shape = (5160, 8), y_test.shape = (5160,)


In [4]:
def save_x_y_to_multiple_csv_files(x, y, name_prefix, num_parts):
    df_data = pd.DataFrame(np.c_[x, y], columns=housing.feature_names + ['MedianHouseValue'])
    return save_to_multiple_csv_files(df_data, name_prefix, num_parts=num_parts)
    
def save_to_multiple_csv_files(df_data, name_prefix, num_parts):
    splits = np.array_split(df_data, num_parts)
    file_paths = []
    for idx, df_split in enumerate(splits):
        file_path = f'{name_prefix}_{idx:02d}.csv'
        df_split.to_csv(file_path, index=False)
        file_paths.append(file_path)
    return file_paths

In [5]:
train_files = save_x_y_to_multiple_csv_files(x_train, y_train, './data/train', num_parts=20)
val_files = save_x_y_to_multiple_csv_files(x_val, y_val, './data/val', num_parts=10)
test_files = save_x_y_to_multiple_csv_files(x_test, y_test, './data/test', num_parts=10)

In [6]:
train_files

['./data/train_00.csv',
 './data/train_01.csv',
 './data/train_02.csv',
 './data/train_03.csv',
 './data/train_04.csv',
 './data/train_05.csv',
 './data/train_06.csv',
 './data/train_07.csv',
 './data/train_08.csv',
 './data/train_09.csv',
 './data/train_10.csv',
 './data/train_11.csv',
 './data/train_12.csv',
 './data/train_13.csv',
 './data/train_14.csv',
 './data/train_15.csv',
 './data/train_16.csv',
 './data/train_17.csv',
 './data/train_18.csv',
 './data/train_19.csv']

## File path dataset

In [7]:
utils.reset_session()

In [8]:
filepath_dataset = tf.data.Dataset.list_files(train_files, seed=42)
for file in filepath_dataset:
    print(file)

tf.Tensor(b'.\\data\\train_15.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_08.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_03.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_01.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_10.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_05.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_19.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_16.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_02.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_09.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_00.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_07.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_12.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_04.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_17.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_11.csv', shape=(), dtype=string)
tf.Tensor(b'.\\data\\train_14.csv', shap

In [9]:
num_readers = 5
dataset = filepath_dataset.interleave(
    lambda file_path: tf.data.TextLineDataset(file_path).skip(1), 
    cycle_length=num_readers
)

for line in dataset.take(5):
    print(line.numpy())

b'4.6477,38.0,5.03728813559322,0.911864406779661,745.0,2.5254237288135593,32.64,-117.07,1.504'
b'8.72,44.0,6.163179916317992,1.0460251046025104,668.0,2.794979079497908,34.2,-118.18,4.159'
b'3.8456,35.0,5.461346633416459,0.9576059850374065,1154.0,2.8778054862842892,37.96,-122.05,1.598'
b'3.3456,37.0,4.514084507042254,0.9084507042253521,458.0,3.2253521126760565,36.67,-121.7,2.526'
b'3.6875,44.0,4.524475524475524,0.993006993006993,457.0,3.195804195804196,34.04,-118.15,1.625'


## decode_csv

In [10]:
record_defaults=[0, np.nan, tf.constant(np.nan, dtype=tf.float64), "Hello", tf.constant([])]
parsed_fields = tf.io.decode_csv('1,2,3,4,5', record_defaults)
parsed_fields

[<tf.Tensor: shape=(), dtype=int32, numpy=1>,
 <tf.Tensor: shape=(), dtype=float32, numpy=2.0>,
 <tf.Tensor: shape=(), dtype=float64, numpy=3.0>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'4'>,
 <tf.Tensor: shape=(), dtype=float32, numpy=5.0>]

In [11]:
parsed_fields = tf.io.decode_csv(',,,,5', record_defaults)
parsed_fields

[<tf.Tensor: shape=(), dtype=int32, numpy=0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=nan>,
 <tf.Tensor: shape=(), dtype=float64, numpy=nan>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'Hello'>,
 <tf.Tensor: shape=(), dtype=float32, numpy=5.0>]

In [12]:
try:
    parsed_fields = tf.io.decode_csv(',,,,', record_defaults)
except tf.errors.InvalidArgumentError as e:
    print(e)

Field 4 is required but missing in record 0! [Op:DecodeCSV]


## Read California housing csv files

In [13]:
scaler = StandardScaler()
x_train = scaler.fit_transform(x_train)
x_mean = scaler.mean_
x_std = scaler.scale_

num_columns = x_train.shape[-1]

In [14]:
@tf.function
def preprocess(line):
    defs = [0.] * num_columns + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(fields[:-1])
    y = tf.stack(fields[-1:])
    return (x - x_mean) / x_std, y

In [15]:
preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 0.16579157,  1.216324  , -0.05204565, -0.39215982, -0.5277444 ,
        -0.2633488 ,  0.8543046 , -1.3072058 ], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.782], dtype=float32)>)

In [16]:
def csv_reader_dataset(file_paths, repeat=1, batch_size=32, shuffle_buffer_size=10000,
                       num_readers=5, num_read_threads=None, num_parse_threads=5):
    dataset = tf.data.Dataset.list_files(file_paths).repeat(repeat)
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
        cycle_length=num_readers,
        num_parallel_calls=num_read_threads
    )
    dataset = dataset.shuffle(shuffle_buffer_size)\
        .map(preprocess, num_parallel_calls=num_parse_threads)\
        .batch(batch_size)\
        .prefetch(1)
    return dataset

In [17]:
utils.reset_session()

train_set = csv_reader_dataset(train_files, batch_size=3)

for x_batch, y_batch in train_set.take(2):
    print(f"x_batch = \n{x_batch}")
    print(f"y_batch = \n{y_batch}")
    print()

x_batch = 
[[ 0.5804519  -0.20762321  0.05616303 -0.15191229  0.01343246  0.00604472
   1.2525111  -1.3671792 ]
 [ 5.818099    1.8491895   1.1784915   0.28173092 -1.2496178  -0.3571987
   0.7231292  -1.0023477 ]
 [-0.9253566   0.5834586  -0.7807257  -0.28213993 -0.36530012  0.27389365
  -0.76194876  0.72684526]]
y_batch = 
[[1.752]
 [1.313]
 [1.535]]

x_batch = 
[[-0.8324941   0.6625668  -0.20741376 -0.18699841 -0.14536144  0.09635526
   0.9807942  -0.67250353]
 [-0.62183803  0.5834586  -0.19862501 -0.3500319  -1.1437552  -0.3363751
   1.107282   -0.8674123 ]
 [ 0.8683102   0.02970133  0.3427381  -0.29872298  0.7124906   0.28026953
  -0.72915536  0.86178064]]
y_batch = 
[[0.919]
 [1.028]
 [2.182]]



In [18]:
batch_size = 32
train_set = csv_reader_dataset(train_files, batch_size=batch_size, repeat=None)
valid_set = csv_reader_dataset(val_files)
test_set = csv_reader_dataset(test_files)

In [19]:
utils.reset_session()

model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu', input_shape=[num_columns]),
    keras.layers.Dense(1),
])

model.compile(loss='mse', optimizer=keras.optimizers.SGD(lr=1e-3))

model.fit(
    train_set, 
    steps_per_epoch=len(x_train) // batch_size, 
    epochs=10,
    validation_data=valid_set
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x1c447836888>

In [20]:
model.evaluate(test_set)



0.47854188084602356

In [21]:
x_new = test_set.map(lambda x, y: x)
y_hat = model.predict(x_new)
print(f"y_hat.shape = {y_hat.shape}")

y_hat.shape = (5160, 1)
