In [1]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
# from dask_ml.feature_extraction import HashingVectorizer
import os
import seaborn as sns
from collections import Counter
import plotly.express as px
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import warnings
from scipy.sparse import csr_matrix
warnings.filterwarnings("ignore")

# import findspark
# findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.config("spark.driver.memory", "16g").appName('SparkByExamples.com').getOrCreate()
sc=spark.sparkContext

---

## Item Profiles based on TF-IDF of tags column

In [2]:
ratings = dd.read_csv('./clean_data/interactions_TRAIN.csv').drop('Unnamed: 0', axis=1)
recipes = dd.read_csv('./clean_data/recipes.csv').drop('Unnamed: 0', axis=1)

# ratings_TRAIN_dask = dd.from_pandas(ratings_TRAIN, 1).reset_index().drop('index', axis=1)
recipes_dask = dd.read_csv('./clean_data/recipes.csv').drop('Unnamed: 0', axis=1)

In [3]:
def parallelize_matrix(scipy_mat, rows_per_chunk=100):
        # Taken from https://medium.com/@rantav/large-scale-matrix-multiplication-with-pyspark-or-how-to-match-two-large-datasets-of-company-1be4b1b2871e
        [rows, cols] = scipy_mat.shape
        i = 0
        submatrices = []
        while i < rows:
            current_chunk_size = min(rows_per_chunk, rows - i)
            submat = scipy_mat[i:i + current_chunk_size]
            submatrices.append((i, (submat.data, submat.indices, 
                                    submat.indptr),
                                (current_chunk_size, cols)))
            i += current_chunk_size
        return sc.parallelize(submatrices)

In [4]:
class TFIDF_Recommendations():
    
    def __init__(self, recipes, ratings):
        self.recipes = recipes
        self.recipes['tags'] = self.recipes['tags'].map(eval, meta=('x', int)).map(lambda x: ' '.join(x), meta=('x', str))
        self.cosine_matrix = None
        self.flag = False
        self.recipes.drop_duplicates(subset='recipe_id')
        # self.indices = dd.Series(self.recipes.index, index = self.recipes['recipe_id']).drop_duplicates())
        
        
        # rated_recipes = ratings['recipe_id'].unique()
        # self.recipes = self.recipes[self.recipes['recipe_id'].isin(rated_recipes)]
        
        self.ratings = ratings.merge(self.recipes, on = 'recipe_id')
        self.ratings.drop(['date', 'minutes', 'contributor_id', 'submitted', 'n_steps', 'description', 'ingredients', 'n_ingredients', 'Calories', 'Total_fat_PDV', 'Sugar_PDV', 'Sodium_PDV', 'Protein_PDV', 'Saturated_fat_PDV', 'Carbohydrates_PDV', 'steps'], axis=1)
        
        mean_user_ratings = self.ratings.groupby('user_id')['rating'].mean().rename('mean_user_rating')
        self.ratings = self.ratings.join(mean_user_ratings, on='user_id')
        self.ratings['weighted_rating'] = self.ratings['rating'] - self.ratings['mean_user_rating']
        self.ratings.sort_values('user_id')
        
        self.tfidf = TfidfVectorizer(stop_words='english', token_pattern=r"(?u)\S\S+")
        
        self.tfidf_ratings_matrix = self.tfidf.fit_transform(self.ratings['tags'])
        self.tfidf_recipes_matrix = self.tfidf.fit_transform(self.recipes['tags'])
    
    def display_tag_distribution(self):
        
        tags_count = self.recipes['tags'].map(Counter).sum().compute().most_common()
        fig = px.bar(tags_count,
                     x = 0,
                     y = 1,
                     log_y = True,
                     title = "Count Distribution of Tags Column")
        fig.update_layout(
            xaxis_title="Tag Names",
            yaxis_title="Counts",
            yaxis = dict(
                tickmode = 'linear',
                tick0 = 0,
                dtick = 1
    ))
    
        return fig
    
    def display_cosine_matrix(self, n=5):
        
        try:
            fig = px.imshow(self.cosine_matrix[:n, :n],
                            labels = dict(x = "Recipe Index", y = "Recipe Index", color = "Cosine Similarity"),
                            title = "Cosine similarity of Recipes",
                            text_auto = True)
        except:
            raise Exception("Generate a cosine matrix first by calling generate_cosine_sim_matrix()")
            
        return fig
    
    def generate_user_profile(self, user_id):
        # print(self.tfidf_ratings_matrix)
        # print("***")
        # print(self.ratings.query(f'user_id == {user_id}'))
        # print("***")
        
        rows = self.ratings.query(f'user_id == {user_id}')
        
        indices = rows.index.compute()
        
        print(indices)
        
        # min_index = indices[0]
        # max_index = indices[-1]
        
        
#         # tfidf_ratings_matrix = self.tfidf.fit_transform(rows['tags'])
        
#         tfidf_ratings_matrix_rdd = sc.parallelize(self.tfidf_ratings_matrix.toarray()).zipWithIndex().toDF('data', 'index')
#         weighted_rating_rdd = sc.parallelize(rows['weighted_rating'])
        
#         print(tfidf_ratings_matrix_rdd.take(2))
#         print(weighted_rating_rdd.take(2))
        
# #         print(tfidf_ratings_matrix_rdd.zip(weighted_rating_rdd).take(5))
        
#         user_profile = tfidf_ratings_matrix_rdd.zip(weighted_rating_rdd).map(lambda x: x[0]*x[1])
        
#         print(user_profile.count())
#         print(user_profile.take(2))
        
#         # print(user_profile.take(5))
        
    
        user_profile = 0
        for index in indices:
            print(index)
            print("---")
            print(self.tfidf_ratings_matrix[index])
            print("---")
            print(self.ratings.loc[index]['weighted_rating'])
            print("===")
            user_profile += self.tfidf_ratings_matrix[index] * self.ratings.loc[index]['weighted_rating']

        return user_profile
        
    
    def generate_recommendations(self, user_id, n):
        
        similar_recipes = []
        toReturn = []
        curr_similarity = 0
        user_profile = self.generate_user_profile(user_id)
        
#         tfidf_recipes_matrix_rdd = sc.parallelize(self.tfidf_recipes_matrix.toarray())
        
#         tfidf_recipes_matrix_rdd.map(lambda x: cosine_similarity(x, user_profile))
        
#         print(tfidf_recipes_matrix_rdd.take(5))

        collected_profile = user_profile.collect()
        
        for i in range(0, self.tfidf_recipes_matrix.shape[0]):
            curr_similarity = cosine_similarity(self.tfidf_recipes_matrix[i], collected_profile)
            similar_recipes.append((curr_similarity[0][0], i))
        
        similar_recipes = sorted(similar_recipes, key = lambda x: x[0])[:n]
        
        for sim, idx in similar_recipes:
            recipe = self.recipes.loc[idx]['name']
            toReturn.append(recipe)
            
        return toReturn

In [5]:
obj = TFIDF_Recommendations(recipes, ratings)

In [6]:
obj.generate_recommendations(104295, 10)

Index([  1437,   2698,   3608,   3628,   4050,   7704,  11010,  11731,  12475,
        16527,
       ...
       203316, 203517, 204086, 204792, 204899, 205249, 205304, 205523, 205642,
       205781],
      dtype='int64', length=1466)
1437
---
  (0, 4)	0.28221392659442257
  (0, 410)	0.4916501502928258
  (0, 9)	0.30965123530347083
  (0, 196)	0.3420379903229226
  (0, 57)	0.39034133555315687
  (0, 332)	0.2668733429282359
  (0, 139)	0.20807578267422935
  (0, 173)	0.20382437967435968
  (0, 152)	0.13707853325321084
  (0, 350)	0.28074762278925786
  (0, 397)	0.11247615712913714
  (0, 295)	0.14187806349956963
  (0, 130)	0.11780320408188978
  (0, 516)	0.11430786855807026
---
Dask Series Structure:
npartitions=5
    float64
        ...
     ...   
        ...
        ...
Name: weighted_rating, dtype: float64
Dask Name: getitem, 34 graph layers
===


TypeError: Trying to convert dd.Scalar<eq-c40a..., dtype=bool> to a boolean value. Because Dask objects are lazily evaluated, they cannot be converted to a boolean value or used in boolean conditions like if statements. Try calling .compute() to force computation prior to converting to a boolean value or using in a conditional statement.