In [None]:
import datetime
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import os
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
import matplotlib.image as mpimg
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, Dense, Flatten, MaxPooling2D, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras import layers
import seaborn as sns
import heapq
import csv
from tqdm.notebook import tqdm
import random
from sklearn.model_selection import train_test_split

# 讀入圖片路徑&處理label

In [None]:

# 訓練資料夾路徑

train_path = r"E:\hd_data_512_no_pad"

# 有多少資料夾 ( 分幾類 )

img_classes = len(os.listdir(train_path))
print(img_classes)

In [None]:

# img_path 圖片路徑, y 圖片分類ID 

img_path = []
y = []

# root 表示當前正在訪問的文件夾路徑
# dirs 表示該文件夾下的 子目錄名list
# files 表示該文件夾下的 文件list

for root, dirs, files in os.walk(train_path):

    # 遍歷文件

    for f in files:
        
        # img_path 加入圖片路徑
        
        img_path.append(os.path.join(root, f))
        
        # y 加入ID
        
        y.append(root.split("\\")[-1])

In [None]:

# 創建data前洗牌

z = list(zip(img_path, y))
random.shuffle(z)
img_path, y = list(zip(*z))

In [None]:
type(img_path), type(y)

In [None]:

# label轉換成數字

labelencoder = LabelEncoder()
labelencode_y = labelencoder.fit_transform(y)

labelencode_y

In [None]:

# ID對數字的轉換字典

id_to_num = dict(zip(y,labelencode_y))

# 數字轉ID的轉換字典

num_to_id = dict(zip(labelencode_y,y))

In [None]:

# 數字轉onehot

onehotencoder = OneHotEncoder()
onehotencode_y = onehotencoder.fit_transform(np.array(labelencode_y).reshape(-1,1)).toarray()

onehotencode_y.shape

# 批次處理

In [None]:
# 建立dataset

dataset = tf.data.Dataset.from_tensor_slices(
(
    tf.cast(img_path,dtype=tf.string),
    tf.cast(onehotencode_y,dtype=tf.uint8)
)
)
print(dataset)

In [None]:
# 自動分配訓練與預處理時間

AUTOTUNE = tf.data.AUTOTUNE 

# 路徑讀取圖片 前處理

def decodeImg_label(img_path, label):
    
    # 讀取檔案
    
    raw = tf.io.read_file(img_path)
    image = tf.image.decode_jpeg(raw, channels=3)
    image = tf.image.resize(image, (456, 456))
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_hue(image, 0.01)
    image = tf.image.random_saturation(image, 0.70, 1.30)
    image = tf.image.random_contrast(image, 0.80, 1.20)
    image = tf.image.random_brightness(image, 0.10)
    
    return image, label

dataset = dataset.map(decodeImg_label, num_parallel_calls=AUTOTUNE)

In [None]:

# 預取

dataset.prefetch(AUTOTUNE)

In [None]:

# load舊模型

# model = tf.keras.models.load_model(r"D:\DATA\dolphin\model\20220501-1445(0.17557).h5")

In [None]:
model.summary()

In [None]:

# 去掉前處理層

model = Model(model.layers[2].input, model.output)

In [None]:
model.summary()

In [None]:
logdir = os.path.join(r"D:\DATA\dolphin\test\logdir", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
tensorboard_callback = tf.keras.callbacks.TensorBoard(logdir, histogram_freq=1)
lr_callback = tf.keras.callbacks.ReduceLROnPlateau(monitor = "loss",
                                                  factor = 0.5,
                                                  patience = 2,
                                                  min_lr = 0)
print(logdir)

In [None]:
model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=tf.keras.losses.categorical_crossentropy,
        metrics=[tf.keras.metrics.TopKCategoricalAccuracy(),"accuracy"]
    )

In [None]:

# train model

history = model.fit(dataset.shuffle(4096).batch(1),
                    epochs=20,
                    callbacks=[tensorboard_callback,tensorboard_callback,lr_callback])

In [None]:
print(model.layers[-47:])

In [None]:

# 解凍模型

def unfreeze_model(model):
    # We unfreeze the top 47 layers while leaving BatchNorm layers frozen
    for layer in model.layers[-47:]:
        if not isinstance(layer, layers.BatchNormalization):
            layer.trainable = True

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
        loss=tf.keras.losses.categorical_crossentropy,
        metrics=[tf.keras.metrics.TopKCategoricalAccuracy(),"accuracy"]
    )


unfreeze_model(model)

In [None]:
# train model

history = model.fit(dataset.shuffle(4096).batch(8),
                    epochs=10,
                    callbacks=[tensorboard_callback,tensorboard_callback,lr_callback])

In [None]:
 
# 存檔

now_time = datetime.datetime.now().strftime("%Y%m%d-%H%M")
model.save(f'D:\\DATA\\dolphin\\model\\{now_time}.h5')
print(now_time)

In [None]:

# 預測資料夾路徑

test_path = r"E:\test_data_512"

# 所有圖片路徑

test_img_name = os.listdir(test_path)
test_img_path = list(map(lambda x : test_path + "\\" + x, test_img_name))

In [None]:
print(test_img_name[:5],"\n",test_img_path[:5])

In [None]:
def decodeImg(img_path):
    
    # 讀取檔案
    
    raw = tf.io.read_file(img_path)
    image = tf.image.decode_jpeg(raw, channels=3)
    image = tf.image.resize(image, (456, 456))
    image = tf.expand_dims(image, 0) # 增加一維
    """image = tf.image.random_flip_left_right(image)
    image = tf.image.random_hue(image, 0.01)
    image = tf.image.random_saturation(image, 0.70, 1.30)
    image = tf.image.random_contrast(image, 0.80, 1.20)
    image = tf.image.random_brightness(image, 0.10)"""
    
    return image

In [None]:
# 建立test_data

test_dataset = tf.data.Dataset.from_tensor_slices(

    (test_img_path,),

)
print(test_dataset)

In [None]:

# test_data 讀入圖片並前處理

test_dataset = test_dataset.map(decodeImg)

In [None]:

# 開始預測

predict_test_dataset = model.predict(test_dataset, verbose = 1)

In [None]:
predict_test_dataset.shape

In [None]:
predict_test_dataset.max()

In [None]:
prob_list = []
for i in range(len(predict_test_dataset)):
    prob_list.append(predict_test_dataset[i].max())

In [None]:
prob_list = np.array(prob_list)

In [None]:
p_prob = pd.Series(prob_list)

In [None]:
p_prob.quantile(0.25)

In [None]:
p_prob.quantile(0.5)

In [None]:
p_prob.quantile(0.75)

In [None]:
predict_test_dataset[0]

In [None]:
len(predict_test_dataset)

In [None]:

# 寫入submission

gate = p_prob.quantile(0.75)

with open(f'D:\\DATA\\dolphin\\predict\\{now_time}.csv', 'w', newline='') as csvfile:
    
    # 建立 CSV 檔寫入器
    writer = csv.writer(csvfile)

    # 寫入標題
    writer.writerow(['image', 'predictions'])

    # 寫入資料
    for i in range(len(predict_test_dataset)):
        
        # 轉成list
        
        m = predict_test_dataset[i].tolist()
        max_number = heapq.nlargest(5, m) 
        max_index = []
        for t in max_number:
            index = m.index(t)
            if index in max_index:
                print("ERROR")
            max_index.append(index)
            m[index] = 0
        if predict_test_dataset[i].max() < gate:
            predictions_str = " ".join(list(map(lambda x: num_to_id[x], max_index[:4])))+" new_individual"
        else:
            predictions_str = " ".join(list(map(lambda x: num_to_id[x], max_index)))

        writer.writerow([test_img_name[i], predictions_str])
print(f'D:\\DATA\\dolphin\\predict\\{now_time}.csv')

In [None]:
import random
i = random.randint(0,100)


m = predict_test_dataset[i].tolist()
max_number = heapq.nlargest(5, m) 
max_index = []
for t in max_number:
    index = m.index(t)
    max_index.append(index)
    m[index] = 0
if predict_test_dataset[i].max() < 0.84:
    predictions_str = " ".join(list(map(lambda x: num_to_id[x], max_index[:4])))+" new_individual"
else:
    predictions_str = " ".join(list(map(lambda x: num_to_id[x], max_index)))
print(test_img[i])
print(max_number)
print(max_index)
print(predictions_str)