In [None]:
%load_ext autoreload
%autoreload 2

# !pip install pyspark
# !pip install selenium
# !pip install boto3

In [None]:
import numpy as np

import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import matplotlib.image as mpimg
from PIL import Image
import io
import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from sklearn.model_selection import train_test_split
from functools import reduce
from functions import prepare_image, extract_features
import os
from tqdm import tqdm
from tqdm.contrib import tzip
from scipy.spatial.distance import cosine

import itertools
import collections

np.random.seed(0)

# Data Loader

In [None]:
df_snaps_women = pd.read_csv('datasets/snaps1k_women.csv')
print(df_snaps_women.columns)

df_snaps_women = df_snaps_women[['snap_id', 'image_url', 'tags_selected']]
df_snaps_women.head(1)

In [None]:
list_snaps = set(list(df_snaps_women['snap_id']))
dict_snap2id = {snap: idx for idx, snap in enumerate(list_snaps)}

list_tags = [eval(t) for t in list(df_snaps_women['tags_selected'])]
tags_flatten = list(itertools.chain.from_iterable(list_tags))
dict_tag2count = collections.Counter(tags_flatten)
df_tag2count = pd.DataFrame({'tag': list(dict_tag2count.keys()), 'count': list(dict_tag2count.values())})
df_tag2count = df_tag2count.sort_values('count')

print("count uniq tags : %d" % len(df_tag2count))
idxs = list(df_tag2count.index)
tags = list(df_tag2count['tag'])
dict_tag2id = {tag: idx for idx, tag in zip(idxs, tags)}

In [None]:
list_tagids = []
list_snapids = []

for snap, tags in zip(list_snaps, list_tags):
    for tag in tags:
        list_tagids.append(dict_tag2id[tag])
        list_snapids.append(dict_snap2id[snap])

hashtag_rec_data = pd.DataFrame({
    'hashtag_id': list_tagids,
    'image_id': list_snapids,
    'rating': 1})
hashtag_rec_data.tail()

# Load Pre-trained Neural Network

In [None]:
img_shape = (160, 160, 3)

# Create the base model from the pre-trained model MobileNet V2
base_model = MobileNetV2(input_shape=img_shape, include_top=False, weights='imagenet')
global_average_layer = tf.keras.layers.GlobalAveragePooling2D()

neural_network = tf.keras.Sequential([
  base_model,
  global_average_layer,
])

In [None]:
pics = []
for snap, tags in tzip(list_snaps, list_tags):
    img_path = f'data/women_images/{snap}.png'
    try:
        img = prepare_image(img_path, where='local')
        deep_features = extract_features(img, neural_network)
        pics.append({'pic': img, 
                     'hashtags': tags,
                     'deep_features': deep_features})
    except Exception as e:
        error_type = type(e).__name__
        if error_type == "NotFoundError":
            # If a file in the list isn't in 
            # storage, skip it and continue
            pass
        else:
            print(e)

In [None]:
pics = pd.DataFrame(pics)
pics['image_id'] = [dict_snap2id[idx] for idx in list(df_snaps_women['snap_id'])]
pics.head()

# Checking sample picture

In [None]:
pic = pics.iloc[20] 
print(type(pic['pic']))
print(pic['hashtags'], pic['deep_features'].shape, pic['pic'].shape)
plt.imshow(pic['pic'])

# Load ALS Collaborative filtering model

In [None]:
# !sudo apt-get update
# !sudo apt-get install openjdk-11-jdk
# !java --version
# !export JAVA_HOME=/usr/lib/jvm/java-1.11.0-openjdk-amd64

In [None]:
spark = SparkSession.builder.appName('local').getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

als = ALS(userCol='image_id',
          itemCol='hashtag_id',
          ratingCol="rating",
          implicitPrefs=True,
          alpha=40)
als.setSeed(0)

hashtag_spark_df = spark.createDataFrame(hashtag_rec_data)
als_model = als.fit(hashtag_spark_df)
# als_model.write().overwrite().save('als')

recs = als_model.recommendForAllUsers(numItems=10).toPandas()
recs.tail()

In [None]:
hashtag_index = list(dict_tag2id.keys())
def lookup_hashtag_recs(rec_scores):
    return [hashtag_index(rec_tagid) for (rec_tagid, score) in rec_scores]

recs['recommended_hashtags'] = recs['recommendations'].apply(lookup_hashtag_recs)
df_snaps_women['image_id'] = [dict_snap2id[idx] for idx in list(df_snaps_women['snap_id'])]
recs = pd.merge(recs, df_snaps_women, on='image_id')
recs.head(3)

# merge image features (cf)

In [None]:
# recs.drop('recommendations', axis=1, inplace=True)
image_factors = als_model.userFactors.toPandas()
image_factors['image_id'] = image_factors['id']
recs = pd.merge(recs, image_factors, on='image_id')
recs.head(3)

# merge image features (cnn)

In [None]:
pics['image_id'] = [dict_snap2id[idx] for idx in list(df_snaps_women['snap_id'])]
recs_deep = pd.merge(recs, pics, on='image_id', how='inner')
recs_deep.info()

# uniq hashtag list

In [2]:
print(recs.loc[0, 'image_id'])
print(len(dict_tag2id), type(dict_tag2id))

hashtags_df = pd.DataFrame.from_dict(dict_tag2id, orient='index')
hashtags_df = hashtags_df.reset_index()
hashtags_df.columns = ['hashtag', 'id']
hashtags_df.index = hashtags_df['id']
hashtags_df.drop('id', axis=1, inplace=True)
hashtags_df.head()

In [None]:
hashtag_features = als_model.itemFactors.toPandas()  # tag に関するCF特徴量
image_features = als_model.userFactors.toPandas()  # image に関するCF特徴量

# Only use certain columns
recommender_df = recs_deep[[
    'image_id', # image_id
    'hashtags', # hashtags
    'deep_features', # deep_features
    'features', # als_features
]]
recommender_df.head()

# Searching hashtags for test image

1. deep_features で cosine similarity が近い５件の画像を取ってくる
2. その５件の画像に関する als_features (tag x image の collaborative filtering の結果の image_features)を取ってきて，平均をとる(avg_features)
3. 全タグのCF特徴量について， 2 の avg_features (画像特徴量上位5件の，CF特徴量に関する平均)との内積を計算
4. 上位１０件のタグを取得してくる

**要するに，ターゲット画像と画像特徴量が似ている５件の画像に関するCF特徴量の平均と，CF特徴量が類似しているハッシュタグを１０件取得してくる（ハッシュタグと画像は同一空間に写像しているものとしている）**

- Alternating Least Squares (ALS): http://mogile.web.fc2.com/spark/ml-collaborative-filtering.html

In [None]:
# Function that finds k nearest neighbors by cosine similarity
def find_neighbor_vectors(image_path, k=5, recommender_df=recommender_df):
    # Find image features (user vectors) for similar images.
    prep_image = prepare_image(image_path, where='local')
    pics = extract_features(prep_image, neural_network)
    rdf = recommender_df.copy()
    rdf['dist'] = rdf['deep_features'].apply(lambda x: cosine(x, pics))
    rdf = rdf.sort_values(by='dist')
    return rdf.head(k)

def generate_hashtags(image_path):
    # 1. deep_features で cosine similarity が近い５件の画像を取ってくる
    fnv = find_neighbor_vectors(image_path, k=5, recommender_df=recommender_df)
    
    # 2. その５件の画像に関する als_features (tag x image の collaborative filtering の結果の image_features)を取ってきて，平均をとる(avg_features)
    features = []
    for item in fnv.features.values:
        features.append(item)
    avg_features = np.mean(np.asarray(features), axis=0)
    
    # 3. 全タグのCF特徴量について， 2 の avg_features (画像特徴量上位5件の，CF特徴量に関する平均)との内積を計算
    hashtag_features['dot_product'] = hashtag_features['features'].apply(lambda x: np.asarray(x).dot(avg_features))

    # 4. 上位１０件のタグを取得してくる
    final_recs = hashtag_features.sort_values(by='dot_product', ascending=False).head(10)
    print("final_recs: ", final_recs)
    print("final_recs.id.values: ", final_recs.id.values)
    output = []
    for hashtag_id in final_recs.id.values:
        output.append(hashtags_df.iloc[hashtag_id]['hashtag'])
    return output

def show_results(test_image):
    img = mpimg.imread(f'data/women_images/{test_image}.png')
    plt.figure(figsize=(9, 9))
    plt.title(f'Original Hashtag: {test_image.upper()}', fontsize=32)        
    plt.imshow(img)
    
    recommended_hashtags = generate_hashtags(f'data/women_images/{test_image}.png')
    print(', '.join(recommended_hashtags))

In [None]:
show_results('17658876')