In [1]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

from dataset import (build_pipeline_from_path, 
                        tranform, 
                        CHARS
                    )
from predict import predict
from metric import metric
from model import LPRNet
import tensorflow as tf
import numpy as np
import time

tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

## 一、超参数设置

In [2]:
epochs = 3  # 训练次数
img_size=(24, 94)  # 输入图片尺寸
train_img_dir = './ccpd_util/ccpd_after_dealing'  # 训练数据集路径
test_img_dir = ''  # 测试训练集路径，该参数与split参数2选1即可
split = 0.03  # 从训练集中拿出占比split的数据集作为测试集，与上面参数2选1即可
initial_lr = 0.001  # 初始学习率
dropout_rate = 0.5  # 丢弃率，防止过拟合
lpr_len = 7    # 车牌字符长度
train_batch_size = 128  # 训练batch大小
test_batch_size = 128   # 测试batch大小  
saved_model_folder = './saved_model' # .pb模型文件保存路径
pretrained_model = ''  # 预训练模型路径

## 二、加载训练、测试数据集

In [3]:
# 加载数据集
train_dataset = build_pipeline_from_path(train_img_dir)
num_train_imgs = len(list(train_dataset.as_numpy_iterator()))
# 若test_img_dir为空，从训练数据集中拿出split的数据作为测试集
if not test_img_dir:
    num_test_imgs = int(split * num_train_imgs)
    num_train_imgs -= num_test_imgs
    test_dataset = train_dataset.take(num_test_imgs)
    train_dataset = train_dataset.skip(num_test_imgs)
else:
    test_dataset = build_pipeline_from_path(test_img_dir)
test_dataset = test_dataset.map(lambda x, y: \
                                (tranform(x, img_size=img_size), tf.cast(y, tf.int32)), 
                                num_parallel_calls=4)
test_dataset = test_dataset.batch(test_batch_size)

# 对数据集进行归一化等操作
train_dataset = train_dataset.map(lambda x, y: \
                                (tranform(x, img_size=img_size), tf.cast(y, tf.int32)), 
                                num_parallel_calls=4)
train_dataset = train_dataset.shuffle(buffer_size=256)
train_dataset = train_dataset.batch(train_batch_size)
train_dataset.prefetch(tf.data.experimental.AUTOTUNE).cache()

<CacheDataset shapes: ((None, 24, 94, 3), (None, 7)), types: (tf.float32, tf.int32)>

## 三、模型训练

In [4]:
if not os.path.exists(saved_model_folder):
    os.mkdir(saved_model_folder)

# 实例化模型
model = LPRNet(num_chars=len(CHARS), dropout_rate=dropout_rate)
print("********** Successful to build network! **********\n")

# 加载预训练模型
if pretrained_model:
    model.load_weights(pretrained_model)
    print("********** Successful to load pretrained model! **********")

# 优化器使用 RMSprop or Adam, 优先使用 Adam
# optimizer = tf.keras.optimizers.RMSprop(learning_rate=initial_lr, momentum=momentum)
optimizer = tf.keras.optimizers.Adam(learning_rate=initial_lr)

# 模型训练
top_acc = 0.
for cur_epoch in range(1, epochs + 1):
    batch = 0
    for batch_index, (train_imgs, train_labels) in enumerate(train_dataset): 
        start_time = time.time()
        with tf.GradientTape() as tape:
            train_logits = model(train_imgs) #[N,66,21]
            train_labels = tf.cast(train_labels, tf.int32) #[N,7]
            train_logits = tf.transpose(train_logits, [2, 0, 1]) #[21,N,66]
            logits_shape = train_logits.shape # 获取模型输出的尺寸
            logit_length = tf.fill([logits_shape[1]], logits_shape[0]) #shape=(N,),用21填充
            label_length = tf.fill([logits_shape[1]], lpr_len) #shape=(N,), 用7填充
            loss = tf.nn.ctc_loss(labels=train_labels,
                                  logits=train_logits,
                                  label_length=label_length,
                                  logit_length=logit_length,
                                  logits_time_major=True,
                                  blank_index=len(CHARS) - 1)
            loss = tf.reduce_mean(loss)
        grads = tape.gradient(loss, model.variables)
        optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
        end_time = time.time()
        batch = batch + int(np.shape(train_imgs)[0])
        print('\r' + "Epoch {0}/{1} || ".format(cur_epoch, epochs) 
                  + "Batch {0}/{1} || ".format(batch, num_train_imgs)
                  + "Loss:{} || ".format(loss) 
                  + "A Batch time:{0:.4f}s || ".format(end_time - start_time)
                  + "Learning rate:{0:.8f} || ".format(optimizer.lr.numpy().item()), end=''*20)
    acc, tp, tp_error, t = metric(test_dataset, model)
    print("\n******* Prediction {0}/{1} || Acc:{2:.2f}% *******".format(tp, tp + tp_error, acc*100))
    print("******* Test speed: {}s 1/{} *******".format(t / (tp + tp_error), tp + tp_error))
        
    # 保存模型
    if acc >= top_acc:
        top_acc = acc
        model.save(saved_model_folder, save_format='tf')


********** Successful to build network! **********

Epoch 1/3 || Batch 3968/139871 || Loss:26.30145263671875 || A Batch time:0.1387s || Learning rate:0.00100000 ||  

KeyboardInterrupt: 

### 四、模型预测

In [6]:
model = tf.keras.models.load_model('./pretrained_model')
pre_lp = predict('./ccpd_util/ccpd_after_dealing/豫NQH178.jpg', model)
print("Prediction:", pre_lp)

Prediction: 豫NQH178


## 五、将模型保存为 .tflite 模型文件

In [7]:
# 将.pb模型转为.tflite
cvtmodel = tf.keras.models.load_model(saved_model_folder)
converter = tf.lite.TFLiteConverter.from_keras_model(cvtmodel)
tflite_model = converter.convert()
with open('model' + '{}'.format(np.around(top_acc * 100)) + '.tflite', "wb") as f:
    f.write(tflite_model)
print("\n ********** Successful to convert tflite model! ********** \n")


 ********** Successful to convert tflite model! ********** 

