In [22]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

from PIL import Image # Модули работы с изображениями
from tensorflow.keras.preprocessing import image
import os
import matplotlib.pyplot as plt
import copy
from sklearn.model_selection import train_test_split

from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, concatenate, Dense, Dropout, BatchNormalization, Conv2D, Conv2DTranspose, MaxPooling2D, Flatten,Reshape, GlobalAveragePooling1D
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import utils
from tensorflow.keras.callbacks import LambdaCallback
import tensorflow as tf

import tensorflow_addons as tfa
from tensorflow import keras
from tensorflow.keras import layers

from tensorflow.keras.applications.resnet50 import ResNet50
from learntools.deep_learning.decode_predictions import decode_predictions
from tensorflow.keras.applications.resnet50 import preprocess_input
from nltk.corpus import wordnet as wn
import nltk
#nltk.download("wordnet")
import tqdm

In [23]:
processed_path = '../input/petfinder-pawpularity-score/train/'
df = pd.read_csv('../input/petfinder-pawpularity-score/train.csv')
model_cod = keras.models.load_model('../input/cat-or-dog-model/cat_or_dog_model.h5')
def get_all_hyponyms(label):
  syn = wn.synset(label)
  return set([w.lower() for s in syn.closure(lambda s:s.hyponyms()) for w in s.lemma_names()])

def cat_or_dog(predictions):
  probs = np.array([e[2] for e in predictions])
  
  dog_arr = np.array([e[1].lower() in dogs for e in predictions])
  dog = np.sum(dog_arr * probs)

  cat_arr = np.array([e[1].lower() in cats for e in predictions])
  cat = np.sum(cat_arr * probs)

  neither_arr = np.logical_and(np.logical_not(dog_arr), np.logical_not(cat_arr))
  neither = np.sum(neither_arr * probs)

  res = "neither"
  if dog > cat:
    res = "dog"
  elif dog < cat:
    res = "cat"

  return {'result':res, 'dog':dog, 'cat':cat, 'neither':neither}
dogs = get_all_hyponyms("dog.n.01")
cats = get_all_hyponyms("cat.n.01")
import math

labels = []
batch_size = 500
ids = list(df.Id)
num_batches = math.ceil(len(ids) / batch_size)

for batch in tqdm.tqdm(range(num_batches)):
  images_batch = []
  
  for filename in ids[(batch_size*batch):(batch_size*batch+batch_size+1)]:
    img_path = f'{processed_path}/{filename}.jpg'
    img = image.load_img(img_path, target_size=(224, 224))
    img = image.img_to_array(img)
    img = image.smart_resize(img, (224, 224))
    img = np.expand_dims(img, axis=0)
    img = preprocess_input(img)
    images_batch.append(img)
#######################################
  preds = model_cod.predict_on_batch(np.array(images_batch).squeeze())
  decoded = decode_predictions(preds, top=5, class_list_path='../input/keras-pretrained-models/imagenet_class_index.json')
  batch_labels = [cat_or_dog(dec)['result'] for dec in decoded]
  labels += batch_labels
#######################################
with open(f"labels.csv", "w") as fo:
  for img_id, label in zip(ids, labels):
    if label == 'dog':
      label_num = 0
    elif label == 'cat':
      label_num = 1
    else:
      label_num = 2
    fo.write(f"{img_id},{label_num}\n")


In [36]:
processed_path = '../input/petfinder-pawpularity-score/train/'
x_train_label = pd.read_csv('../input/petfinder-pawpularity-score/train.csv')
x_train_cod = pd.read_csv('labels.csv',header=None)

In [25]:
class DataGenerator(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, list_IDs, batch_size=32, dim_1=(128,128),dim_2=(15),n_channels=3, shuffle=True):
        'Initialization'
        self.dim_1 = dim_1
        self.dim_2 = dim_2
        self.batch_size = batch_size
        self.list_IDs = list_IDs
        self.n_channels = n_channels
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        # Generate data
        [pic,lables], paw = self.__data_generation(list_IDs_temp)

        return [pic,lables], paw 

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, list_IDs_temp):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization
        pic = []
        lables = []
        paw = []

        # Generate data
        for i, ID in enumerate(list_IDs_temp):
            # Store sample
            pic.append(np.array(image.img_to_array(image.load_img(os.path.join(processed_path, ID + '.jpg'), target_size=(self.dim_1[0], self.dim_1[1]))))/255) 
            label = x_train_label.loc[x_train_label['Id'] == ID].drop(['Id','Pawpularity'], axis=1).to_numpy()
            cod = utils.to_categorical(x_train_cod.loc[x_train_cod[0] == ID].drop(0, axis=1),3)
            lables.append(np.concatenate((label[0],cod[0]))) 
            # Store class
            paw.append(np.array(x_train_label['Pawpularity'].loc[x_train_label['Id'] == ID])[0]/100)

        return [np.array(pic),np.array(lables)], np.array(paw)

In [37]:
input_shape = (256, 256, 3)
patch_size = (2, 2)  # 2-by-2 sized patches
dropout_rate = 0.2  # Dropout rate
num_heads = 8  # Attention heads
embed_dim = 64  # Embedding dimension
num_mlp = 256  # MLP layer size
qkv_bias = True  # Convert embedded patches to query, key, and values with a learnable additive value
window_size = 2  # Size of attention window
shift_size = 1  # Size of shifting window
image_dimension = 256  # Initial image size

num_patch_x = input_shape[0] // patch_size[0]
num_patch_y = input_shape[1] // patch_size[1]

learning_rate = 1e-3
batch_size = 64
num_epochs = 20
validation_split = 0.1
weight_decay = 0.0001
label_smoothing = 0.1
def window_partition(x, window_size):
    _, height, width, channels = x.shape
    patch_num_y = height // window_size
    patch_num_x = width // window_size
    x = tf.reshape(
        x, shape=(-1, patch_num_y, window_size, patch_num_x, window_size, channels)
    )
    x = tf.transpose(x, (0, 1, 3, 2, 4, 5))
    windows = tf.reshape(x, shape=(-1, window_size, window_size, channels))
    return windows


def window_reverse(windows, window_size, height, width, channels):
    patch_num_y = height // window_size
    patch_num_x = width // window_size
    x = tf.reshape(
        windows,
        shape=(-1, patch_num_y, patch_num_x, window_size, window_size, channels),
    )
    x = tf.transpose(x, perm=(0, 1, 3, 2, 4, 5))
    x = tf.reshape(x, shape=(-1, height, width, channels))
    return x


class DropPath(layers.Layer):
    def __init__(self, drop_prob=None, **kwargs):
        super(DropPath, self).__init__(**kwargs)
        self.drop_prob = drop_prob

    def call(self, x):
        input_shape = tf.shape(x)
        batch_size = input_shape[0]
        rank = x.shape.rank
        shape = (batch_size,) + (1,) * (rank - 1)
        random_tensor = (1 - self.drop_prob) + tf.random.uniform(shape, dtype=x.dtype)
        path_mask = tf.floor(random_tensor)
        output = tf.math.divide(x, 1 - self.drop_prob) * path_mask
        return output
class WindowAttention(layers.Layer):
    def __init__(
        self, dim, window_size, num_heads, qkv_bias=True, dropout_rate=0.0, **kwargs
    ):
        super(WindowAttention, self).__init__(**kwargs)
        self.dim = dim
        self.window_size = window_size
        self.num_heads = num_heads
        self.scale = (dim // num_heads) ** -0.5
        self.qkv = layers.Dense(dim * 3, use_bias=qkv_bias)
        self.dropout = layers.Dropout(dropout_rate)
        self.proj = layers.Dense(dim)

    def build(self, input_shape):
        num_window_elements = (2 * self.window_size[0] - 1) * (
            2 * self.window_size[1] - 1
        )
        self.relative_position_bias_table = self.add_weight(
            shape=(num_window_elements, self.num_heads),
            initializer=tf.initializers.Zeros(),
            trainable=True,
        )
        coords_h = np.arange(self.window_size[0])
        coords_w = np.arange(self.window_size[1])
        coords_matrix = np.meshgrid(coords_h, coords_w, indexing="ij")
        coords = np.stack(coords_matrix)
        coords_flatten = coords.reshape(2, -1)
        relative_coords = coords_flatten[:, :, None] - coords_flatten[:, None, :]
        relative_coords = relative_coords.transpose([1, 2, 0])
        relative_coords[:, :, 0] += self.window_size[0] - 1
        relative_coords[:, :, 1] += self.window_size[1] - 1
        relative_coords[:, :, 0] *= 2 * self.window_size[1] - 1
        relative_position_index = relative_coords.sum(-1)

        self.relative_position_index = tf.Variable(
            initial_value=tf.convert_to_tensor(relative_position_index), trainable=False
        )

    def call(self, x, mask=None):
        _, size, channels = x.shape
        head_dim = channels // self.num_heads
        x_qkv = self.qkv(x)
        x_qkv = tf.reshape(x_qkv, shape=(-1, size, 3, self.num_heads, head_dim))
        x_qkv = tf.transpose(x_qkv, perm=(2, 0, 3, 1, 4))
        q, k, v = x_qkv[0], x_qkv[1], x_qkv[2]
        q = q * self.scale
        k = tf.transpose(k, perm=(0, 1, 3, 2))
        attn = q @ k

        num_window_elements = self.window_size[0] * self.window_size[1]
        relative_position_index_flat = tf.reshape(
            self.relative_position_index, shape=(-1,)
        )
        relative_position_bias = tf.gather(
            self.relative_position_bias_table, relative_position_index_flat
        )
        relative_position_bias = tf.reshape(
            relative_position_bias, shape=(num_window_elements, num_window_elements, -1)
        )
        relative_position_bias = tf.transpose(relative_position_bias, perm=(2, 0, 1))
        attn = attn + tf.expand_dims(relative_position_bias, axis=0)

        if mask is not None:
            nW = mask.get_shape()[0]
            mask_float = tf.cast(
                tf.expand_dims(tf.expand_dims(mask, axis=1), axis=0), tf.float32
            )
            attn = (
                tf.reshape(attn, shape=(-1, nW, self.num_heads, size, size))
                + mask_float
            )
            attn = tf.reshape(attn, shape=(-1, self.num_heads, size, size))
            attn = keras.activations.softmax(attn, axis=-1)
        else:
            attn = keras.activations.softmax(attn, axis=-1)
        attn = self.dropout(attn)

        x_qkv = attn @ v
        x_qkv = tf.transpose(x_qkv, perm=(0, 2, 1, 3))
        x_qkv = tf.reshape(x_qkv, shape=(-1, size, channels))
        x_qkv = self.proj(x_qkv)
        x_qkv = self.dropout(x_qkv)
        return x_qkv
class SwinTransformer(layers.Layer):
    def __init__(
        self,
        dim,
        num_patch,
        num_heads,
        window_size=7,
        shift_size=0,
        num_mlp=1024,
        qkv_bias=True,
        dropout_rate=0.0,
        **kwargs,
    ):
        super(SwinTransformer, self).__init__(**kwargs)

        self.dim = dim  # number of input dimensions
        self.num_patch = num_patch  # number of embedded patches
        self.num_heads = num_heads  # number of attention heads
        self.window_size = window_size  # size of window
        self.shift_size = shift_size  # size of window shift
        self.num_mlp = num_mlp  # number of MLP nodes

        self.norm1 = layers.LayerNormalization(epsilon=1e-5)
        self.attn = WindowAttention(
            dim,
            window_size=(self.window_size, self.window_size),
            num_heads=num_heads,
            qkv_bias=qkv_bias,
            dropout_rate=dropout_rate,
        )
        self.drop_path = DropPath(dropout_rate)
        self.norm2 = layers.LayerNormalization(epsilon=1e-5)

        self.mlp = keras.Sequential(
            [
                layers.Dense(num_mlp),
                layers.Activation(keras.activations.gelu),
                layers.Dropout(dropout_rate),
                layers.Dense(dim),
                layers.Dropout(dropout_rate),
            ]
        )

        if min(self.num_patch) < self.window_size:
            self.shift_size = 0
            self.window_size = min(self.num_patch)

    def build(self, input_shape):
        if self.shift_size == 0:
            self.attn_mask = None
        else:
            height, width = self.num_patch
            h_slices = (
                slice(0, -self.window_size),
                slice(-self.window_size, -self.shift_size),
                slice(-self.shift_size, None),
            )
            w_slices = (
                slice(0, -self.window_size),
                slice(-self.window_size, -self.shift_size),
                slice(-self.shift_size, None),
            )
            mask_array = np.zeros((1, height, width, 1))
            count = 0
            for h in h_slices:
                for w in w_slices:
                    mask_array[:, h, w, :] = count
                    count += 1
            mask_array = tf.convert_to_tensor(mask_array)

            # mask array to windows
            mask_windows = window_partition(mask_array, self.window_size)
            mask_windows = tf.reshape(
                mask_windows, shape=[-1, self.window_size * self.window_size]
            )
            attn_mask = tf.expand_dims(mask_windows, axis=1) - tf.expand_dims(
                mask_windows, axis=2
            )
            attn_mask = tf.where(attn_mask != 0, -100.0, attn_mask)
            attn_mask = tf.where(attn_mask == 0, 0.0, attn_mask)
            self.attn_mask = tf.Variable(initial_value=attn_mask, trainable=False)

    def call(self, x):
        height, width = self.num_patch
        _, num_patches_before, channels = x.shape
        x_skip = x
        x = self.norm1(x)
        x = tf.reshape(x, shape=(-1, height, width, channels))
        if self.shift_size > 0:
            shifted_x = tf.roll(
                x, shift=[-self.shift_size, -self.shift_size], axis=[1, 2]
            )
        else:
            shifted_x = x

        x_windows = window_partition(shifted_x, self.window_size)
        x_windows = tf.reshape(
            x_windows, shape=(-1, self.window_size * self.window_size, channels)
        )
        attn_windows = self.attn(x_windows, mask=self.attn_mask)

        attn_windows = tf.reshape(
            attn_windows, shape=(-1, self.window_size, self.window_size, channels)
        )
        shifted_x = window_reverse(
            attn_windows, self.window_size, height, width, channels
        )
        if self.shift_size > 0:
            x = tf.roll(
                shifted_x, shift=[self.shift_size, self.shift_size], axis=[1, 2]
            )
        else:
            x = shifted_x

        x = tf.reshape(x, shape=(-1, height * width, channels))
        x = self.drop_path(x)
        x = x_skip + x
        x_skip = x
        x = self.norm2(x)
        x = self.mlp(x)
        x = self.drop_path(x)
        x = x_skip + x
        return x
class PatchExtract(layers.Layer):
    def __init__(self, patch_size, **kwargs):
        super(PatchExtract, self).__init__(**kwargs)
        self.patch_size_x = patch_size[0]
        self.patch_size_y = patch_size[0]

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=(1, self.patch_size_x, self.patch_size_y, 1),
            strides=(1, self.patch_size_x, self.patch_size_y, 1),
            rates=(1, 1, 1, 1),
            padding="VALID",
        )
        patch_dim = patches.shape[-1]
        patch_num = patches.shape[1]
        return tf.reshape(patches, (batch_size, patch_num * patch_num, patch_dim))


class PatchEmbedding(layers.Layer):
    def __init__(self, num_patch, embed_dim, **kwargs):
        super(PatchEmbedding, self).__init__(**kwargs)
        self.num_patch = num_patch
        self.proj = layers.Dense(embed_dim)
        self.pos_embed = layers.Embedding(input_dim=num_patch, output_dim=embed_dim)

    def call(self, patch):
        pos = tf.range(start=0, limit=self.num_patch, delta=1)
        return self.proj(patch) + self.pos_embed(pos)


class PatchMerging(tf.keras.layers.Layer):
    def __init__(self, num_patch, embed_dim):
        super(PatchMerging, self).__init__()
        self.num_patch = num_patch
        self.embed_dim = embed_dim
        self.linear_trans = layers.Dense(2 * embed_dim, use_bias=False)

    def call(self, x):
        height, width = self.num_patch
        _, _, C = x.get_shape().as_list()
        x = tf.reshape(x, shape=(-1, height, width, C))
        x0 = x[:, 0::2, 0::2, :]
        x1 = x[:, 1::2, 0::2, :]
        x2 = x[:, 0::2, 1::2, :]
        x3 = x[:, 1::2, 1::2, :]
        x = tf.concat((x0, x1, x2, x3), axis=-1)
        x = tf.reshape(x, shape=(-1, (height // 2) * (width // 2), 4 * C))
        return self.linear_trans(x)

In [38]:
pic_input = layers.Input(input_shape)
x = layers.experimental.preprocessing.RandomCrop(image_dimension, image_dimension)(pic_input)
x = layers.experimental.preprocessing.RandomFlip("horizontal")(x)
x = PatchExtract(patch_size)(x)
model_val = keras.Model(pic_input,x)
x = PatchEmbedding(num_patch_x * num_patch_y, embed_dim)(x)
x = SwinTransformer(
    dim=embed_dim,
    num_patch=(num_patch_x, num_patch_y),
    num_heads=num_heads,
    window_size=window_size,
    shift_size=0,
    num_mlp=num_mlp,
    qkv_bias=qkv_bias,
    dropout_rate=dropout_rate,
)(x)
x = SwinTransformer(
    dim=embed_dim,
    num_patch=(num_patch_x, num_patch_y),
    num_heads=num_heads,
    window_size=window_size,
    shift_size=shift_size,
    num_mlp=num_mlp,
    qkv_bias=qkv_bias,
    dropout_rate=dropout_rate,
)(x)
x = PatchMerging((num_patch_x, num_patch_y), embed_dim=embed_dim)(x)
x = layers.GlobalAveragePooling1D()(x)

label_input = Input(15)
label = Dense(64, activation='relu')(label_input)

dense_conc = concatenate([x,label])
output = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model([pic_input,label_input], output)

In [None]:
params = {'dim_1': (256,256),
          'dim_2': (15),
          'batch_size': 32,
          'n_channels': 3,
          'shuffle': True}
partition = x_train_label['Id'].to_numpy()
partition, partition_val = train_test_split(partition, test_size=0.1)
training_generator = DataGenerator(partition, **params)
validation_generator = DataGenerator(partition_val, **params)
model.compile(
    loss='mse',
    optimizer=Adam(learning_rate=1e-4, clipvalue=0.5),
)
model.fit(training_generator,validation_data=validation_generator, epochs=num_epochs)

In [29]:
processed_path = '../input/petfinder-pawpularity-score/test/'
df = pd.read_csv('../input/petfinder-pawpularity-score/test.csv')
model_cod = keras.models.load_model('../input/cat-or-dog-model/cat_or_dog_model.h5')
def get_all_hyponyms(label):
  syn = wn.synset(label)
  return set([w.lower() for s in syn.closure(lambda s:s.hyponyms()) for w in s.lemma_names()])

def cat_or_dog(predictions):
  probs = np.array([e[2] for e in predictions])
  
  dog_arr = np.array([e[1].lower() in dogs for e in predictions])
  dog = np.sum(dog_arr * probs)

  cat_arr = np.array([e[1].lower() in cats for e in predictions])
  cat = np.sum(cat_arr * probs)

  neither_arr = np.logical_and(np.logical_not(dog_arr), np.logical_not(cat_arr))
  neither = np.sum(neither_arr * probs)

  res = "neither"
  if dog > cat:
    res = "dog"
  elif dog < cat:
    res = "cat"

  return {'result':res, 'dog':dog, 'cat':cat, 'neither':neither}
dogs = get_all_hyponyms("dog.n.01")
cats = get_all_hyponyms("cat.n.01")
import math

labels = []
batch_size = 500
ids = list(df.Id)
num_batches = math.ceil(len(ids) / batch_size)
for batch in tqdm.tqdm(range(num_batches)):
  images_batch = []
  
  for filename in ids[(batch_size*batch):(batch_size*batch+batch_size+1)]:
    img_path = f'{processed_path}/{filename}.jpg'
    img = image.load_img(img_path, target_size=(224, 224))
    img = image.img_to_array(img)
    img = image.smart_resize(img, (224, 224))
    img = np.expand_dims(img, axis=0)
    img = preprocess_input(img)
    images_batch.append(img)

  preds = model_cod.predict_on_batch(np.array(images_batch).squeeze())
  decoded = decode_predictions(preds, top=5, class_list_path='../input/keras-pretrained-models/imagenet_class_index.json')
  batch_labels = [cat_or_dog(dec)['result'] for dec in decoded]
  labels += batch_labels

with open(f"labels_test.csv", "w") as fo:
  for img_id, label in zip(ids, labels):
    if label == 'dog':
      label_num = 0
    elif label == 'cat':
      label_num = 1
    else:
      label_num = 2
    fo.write(f"{img_id},{label_num}\n")

In [30]:
processed_path = '../input/petfinder-pawpularity-score/test/'
x_test_label = pd.read_csv('../input/petfinder-pawpularity-score/test.csv')
x_test_cod = pd.read_csv('labels_test.csv', header=None)

In [31]:
PawPularity = []
IDs = []
for ID in x_test_label['Id'].to_numpy():
    pic_test = np.array(image.img_to_array(image.load_img(os.path.join(processed_path, ID + '.jpg'), target_size=(256, 256))))/255
    label_test = x_test_label.loc[x_test_label['Id'] == ID].drop(['Id'], axis=1).to_numpy()
    cod_test = utils.to_categorical(x_test_cod.loc[x_test_cod[0] == ID].drop(0, axis=1),3)
    lables_test = np.concatenate((label_test[0],cod_test[0]))
    pic_test = np.expand_dims(pic_test, axis=0)
    lables_test = np.expand_dims(lables_test, axis=0)
    PawPularity.append(model.predict([pic_test,lables_test])[0][0])
    IDs.append(ID)
submission = pd.DataFrame(columns=['Id','Pawpularity'])
submission['Pawpularity'] = np.array(PawPularity)*100
submission['Id'] = np.array(IDs)
submission.to_csv('submission.csv', index=False)

In [32]:
submission