In [None]:
import pandas as pd
import cv2
import glob
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "6"
%pylab inline

In [None]:
import os, sys, codecs, glob
from PIL import Image, ImageDraw

import numpy as np
import pandas as pd
import cv2

import torch
torch.backends.cudnn.benchmark = False
# torch.backends.cudnn.enabled = False

import torchvision.models as models
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data.dataset import Dataset
import timm

In [None]:
class XunFeiDataset(Dataset):
    def __init__(self, img_path, img_group, transform):
        self.img_path = img_path
        self.transform = transform
        self.group = img_group

    def __getitem__(self, index):
        img = Image.open(self.img_path[index]).convert('RGB')
        
        if self.transform is not None:
            img = self.transform(img)
        
        return img, self.group[index]

    def __len__(self):
        return len(self.img_path)

In [None]:
class ArcModule(nn.Module):
    def __init__(self, in_features, out_features, s = 10, m = 0.2):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.s = s
        self.m = m
        self.weight = nn.Parameter(torch.FloatTensor(out_features, in_features))
        nn.init.xavier_normal_(self.weight)

        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)
        self.th = torch.tensor(math.cos(math.pi - m))
        self.mm = torch.tensor(math.sin(math.pi - m) * m)

    def forward(self, inputs, labels):
        cos_th = F.linear(inputs, F.normalize(self.weight))
        cos_th = cos_th.clamp(-1, 1)
        sin_th = torch.sqrt(1.0 - torch.pow(cos_th, 2))
        cos_th_m = cos_th * self.cos_m - sin_th * self.sin_m
        # print(type(cos_th), type(self.th), type(cos_th_m), type(self.mm))
        cos_th_m = torch.where(cos_th > self.th, cos_th_m, cos_th - self.mm)
        
        cond_v = cos_th - self.th
        cond = cond_v <= 0
        cos_th_m[cond] = (cos_th - self.mm)[cond]

        if labels.dim() == 1:
            labels = labels.unsqueeze(-1)
        onehot = torch.zeros(cos_th.size()).cuda()
        labels = labels.type(torch.LongTensor).cuda()
        onehot.scatter_(1, labels, 1.0)
        outputs = onehot * cos_th_m + (1.0 - onehot) * cos_th
        outputs = outputs * self.s
        return outputs
    
    
class XunFeiNet_a(nn.Module):
    def __init__(self):
        super(XunFeiNet_a, self).__init__()
                
        model = timm.create_model('efficientnet_b3', pretrained=True)
        self.model = model
        model.classifier = torch.nn.Identity()
        self.margin = ArcModule(in_features=1536, out_features = 1806)
        
    def forward(self, img, labels=None):        
        feat = self.model(img)
        
        feat = F.normalize(feat)
        if labels is not None:
            return self.margin(feat, labels)
        return feat



class XunFeiNet_b(nn.Module):
    def __init__(self):
        super(XunFeiNet_b, self).__init__()
                
        model = timm.create_model('efficientnet_b3', pretrained=True)
        self.model = model
        model.classifier = torch.nn.Identity()
        self.margin = ArcModule(in_features=1536, out_features = 1807)
        
    def forward(self, img, labels=None):        
        feat = self.model(img)
        
        feat = F.normalize(feat)
        if labels is not None:
            return self.margin(feat, labels)
        return feat
    
model_a = XunFeiNet_a().cuda()
model_b = XunFeiNet_b().cuda()

In [None]:
test_path = glob.glob('data/test_no_face/*')
test_path.sort()
test_path = np.array(test_path)

test_loader = torch.utils.data.DataLoader(
    XunFeiDataset(test_path, [0]*len(test_path),
                        transforms.Compose([
                        transforms.Resize((320, 320)),
                        transforms.ToTensor(),
                        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    ),
    batch_size=50, shuffle=False, num_workers=5,
)

In [None]:
# 加载模型
from sklearn.preprocessing import normalize
model_a.load_state_dict(torch.load('one_effcient.pth'))
model_a.eval()

test_feats = []
with torch.no_grad():
    for data in test_loader:
        data = data[0].cuda()
        feat = model_a(data)
        test_feats.append(feat.data.cpu().numpy())
        
test_feats = np.vstack(test_feats)
test_feats = normalize(test_feats)

# 每折保存距离
distance_1 = []
for feat in test_feats[:]:
    dis = np.dot(feat, test_feats.T)
    feat_qe = np.multiply(dis[np.argsort(dis)[::-1][:2]].reshape(2, -1),
            test_feats[np.argsort(dis)[::-1][:2]]).mean(0)
    dis = np.dot(feat_qe, test_feats.T)
    distance_1.append(dis)
distance_1

In [None]:
model_a.load_state_dict(torch.load('two_final.pth'))
model_a.eval()

test_feats = []
with torch.no_grad():
    for data in test_loader:
        data = data[0].cuda()
        feat = model_a(data)
        test_feats.append(feat.data.cpu().numpy())
        
test_feats = np.vstack(test_feats)
test_feats = normalize(test_feats)

# 每折保存距离
distance_2 = []
for feat in test_feats[:]:
    dis = np.dot(feat, test_feats.T)
    feat_qe = np.multiply(dis[np.argsort(dis)[::-1][:2]].reshape(2, -1),
        test_feats[np.argsort(dis)[::-1][:2]]).mean(0)
    dis = np.dot(feat_qe, test_feats.T)
    distance_2.append(dis)
    
distance_2

In [None]:
model_a.load_state_dict(torch.load('three_effcient.pth'))
model_a.eval()

test_feats = []
with torch.no_grad():
    for data in test_loader:
        data = data[0].cuda()
        feat = model_a(data)
        test_feats.append(feat.data.cpu().numpy())
        
test_feats = np.vstack(test_feats)
test_feats = normalize(test_feats)

# 每折保存距离
distance_3 = []
for feat in test_feats[:]:
    dis = np.dot(feat, test_feats.T)
    feat_qe = np.multiply(dis[np.argsort(dis)[::-1][:2]].reshape(2, -1),test_feats[np.argsort(dis)[::-1][:2]]).mean(0)
    dis = np.dot(feat_qe, test_feats.T)
    distance_3.append(dis)
    
distance_3

In [None]:
model_b.load_state_dict(torch.load('four_final.pth'))
model_b.eval()

test_feats = []
with torch.no_grad():
    for data in test_loader:
        data = data[0].cuda()
        feat = model_b(data)
        test_feats.append(feat.data.cpu().numpy())
        
test_feats = np.vstack(test_feats)
test_feats = normalize(test_feats)

# 每折保存距离
distance_4 = []
for feat in test_feats[:]:
    dis = np.dot(feat, test_feats.T)
    feat_qe = np.multiply(dis[np.argsort(dis)[::-1][:2]].reshape(2, -1),
            test_feats[np.argsort(dis)[::-1][:2]]).mean(0)
    dis = np.dot(feat_qe, test_feats.T)
    distance_4.append(dis)
    
distance_4

In [None]:
model_b.load_state_dict(torch.load('five_final.pth'))
model_b.eval()

test_feats = []
with torch.no_grad():
    for data in test_loader:
        data = data[0].cuda()
        feat = model_b(data)
        test_feats.append(feat.data.cpu().numpy())
        
test_feats = np.vstack(test_feats)
test_feats = normalize(test_feats)

# 每折保存距离
distance_5 = []
for feat in test_feats[:]:
    dis = np.dot(feat, test_feats.T)
    feat_qe = np.multiply(dis[np.argsort(dis)[::-1][:2]].reshape(2, -1),test_feats[np.argsort(dis)[::-1][:2]]).mean(0)
    dis = np.dot(feat_qe, test_feats.T)
    distance_5.append(dis)
    
distance_5

In [None]:
test_distance = []
for i in range(len(distance_5)):
    test_distance.append((0.1 * distance_1[i] + 0.25 * distance_2[i] + 0.15 * distance_3[i] + 0.25 * distance_4[i] + 0.25 * distance_5[i]))
    
test_distance

In [None]:
# 做了扩展查询
test_submit = []
for i in range(len(test_path)):
    dis = test_distance[i]
    path = test_path[i]
    pred = [x.split('/')[-1] for x in test_path[np.where(dis > 0.59)[0]]]
    
    if len(pred) <= 1:
        ids = dis.argsort()[::-1]
        pred = [x.split('/')[-1] for x in test_path[ids[:2]]]
    print(pred)
    test_submit.append([
        path.split('/')[-1],
        pred
    ])

In [None]:
test_submit = pd.DataFrame(test_submit, columns=['name', 'label'])
test_submit['label'] = test_submit['label'].apply(lambda x: ' '.join(x))

test_submit

In [None]:
test_submit.to_csv('submit.csv',index=None)