In [1]:
import tensorflow as tf
import numpy as np

In [2]:
core = [1, 128, 1]
learning_rate = 0.01
train_step = 5000
batch_size = 1024
weight = 1
data_path = "../../data/index/nyct_1_10w_sorted.npy"
model_path = "model"
model_hdf_file = model_path + "/hdf/best.hdf5"

## 数据准备

In [3]:
data_list = np.load(data_path, allow_pickle=True)
data_x = [data[2] for data in data_list]
data_y = list(range(0, len(data_list)))

In [4]:
# 数据标准化
def normalize_input(na):
    min_v = na.min(axis=0)
    max_v = na.max(axis=0)
    if max_v == min_v:
        return na, min_v, max_v
    else:
        return (na - min_v) / (max_v - min_v) - 0.5, min_v, max_v
def normalize_output(na):
    min_v = na.min(axis=0)
    max_v = na.max(axis=0)
    if max_v == min_v:
        return na, min_v, max_v
    else:
        return (na - min_v) / (max_v - min_v), min_v, max_v
    
train_x, train_x_min, train_x_max = normalize_input(np.array(data_x).astype("float"))
train_y, train_y_min, train_y_max = normalize_output(np.array(data_y).astype("float"))

## 模型构建

In [5]:
def score(y_true, y_pred):
    y_pred_clip = tf.keras.backend.clip(y_pred, 0, 1)
    diff_clip = y_true - y_pred_clip
    range_loss = tf.keras.backend.max(diff_clip) - tf.keras.backend.min(diff_clip)
    diff = y_true - y_pred
    mse_loss = tf.keras.backend.mean(tf.keras.backend.square(diff), axis=-1)
    return weight * range_loss + mse_loss

model = tf.keras.Sequential()
for i in range(len(core) - 1):
    model.add(tf.keras.layers.Dense(units=core[i + 1],
                                     input_dim=core[i],
                                     activation='sigmoid'))
model.add(tf.keras.layers.Dense(units=1))
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
model.compile(optimizer=optimizer, loss=score)

## 模型训练

In [6]:
checkpoint = tf.keras.callbacks.ModelCheckpoint(model_hdf_file,
                                                monitor='loss',
                                                verbose=1,
                                                save_best_only=True,
                                                mode='min',
                                                save_freq='epoch')
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='loss',
                                                  patience=train_step // 100,
                                                  mode='min',
                                                  verbose=1,
                                                  restore_best_weights=True)
model.fit(train_x, train_y,
           epochs=train_step,
           initial_epoch=0,
           batch_size=batch_size,
           verbose=1,
           callbacks=[checkpoint, early_stopping])

Epoch 1/5000
Epoch 00001: loss improved from inf to 0.92812, saving model to model/hdf\best.hdf5
Epoch 2/5000
Epoch 00002: loss improved from 0.92812 to 0.68316, saving model to model/hdf\best.hdf5
Epoch 3/5000
Epoch 00003: loss improved from 0.68316 to 0.58410, saving model to model/hdf\best.hdf5
Epoch 4/5000
Epoch 00004: loss improved from 0.58410 to 0.54662, saving model to model/hdf\best.hdf5
Epoch 5/5000
Epoch 00005: loss improved from 0.54662 to 0.54102, saving model to model/hdf\best.hdf5
Epoch 6/5000
Epoch 00006: loss improved from 0.54102 to 0.52600, saving model to model/hdf\best.hdf5
Epoch 7/5000
Epoch 00007: loss improved from 0.52600 to 0.52394, saving model to model/hdf\best.hdf5
Epoch 8/5000
Epoch 00008: loss improved from 0.52394 to 0.52261, saving model to model/hdf\best.hdf5
Epoch 9/5000
Epoch 00009: loss improved from 0.52261 to 0.52119, saving model to model/hdf\best.hdf5
Epoch 10/5000
Epoch 00010: loss improved from 0.52119 to 0.51967, saving model to model/hdf\bes

<keras.callbacks.History at 0x235856f43c8>

In [7]:
model = tf.keras.models.load_model(model_hdf_file, custom_objects={'score': score})
model

<keras.engine.sequential.Sequential at 0x235e878c2b0>

In [8]:
data = np.array([[1,2,3]],dtype='float32')

In [9]:
np.save("data/test.npy", data)
import os
os.stat("data/test.npy").st_size # 6个数字，128+6*4=152

140

In [10]:
weight = np.load("data/test.npy")
t = tf.convert_to_tensor(weight)

In [11]:
from keras import backend as K

def my_init(shape, dtype=None):
    return t

model.add(tf.keras.layers.Dense(units=3,
                                input_dim=1, kernel_initializer=my_init))

In [12]:
def model_to_np(model):
    return model_path

def np_to_model(model_path):
    return model