In [1]:
import pandas as pd
import os
import re

In [2]:
ratings_path = os.path.join(os.getcwd(),'data/ratings.csv')
genome_path = os.path.join(os.getcwd(),'data/genome-scores.csv')
genome_tags_path = os.path.join(os.getcwd(),'data/genome-tags.csv')
links_path = os.path.join(os.getcwd(),'data/links.csv')
movies_path = os.path.join(os.getcwd(),'data/movies.csv')
user_tags = os.path.join(os.getcwd(),'data/tags.csv')

In [3]:
def extract_year(string):
    pattern = r'[(]\d{4}[)]'
    matches = re.findall(pattern, string)
    if matches:
        return matches[0].replace('(', '').replace(')', '')
    return 

def replace_year(string):
    pattern = r'[(]\d{4}[)]'
    return re.sub(pattern, '', string, 1).strip()

In [4]:
ratings = pd.read_csv(
    ratings_path, sep=',', usecols=['userId', 'movieId', 'rating'],
    dtype={'userId': 'Int64', 'movieId': 'Int64', 'rating': 'float64'}
)

In [5]:
movies = pd.read_csv(movies_path, sep=',')
movies['genres'] = movies['genres'].apply(lambda x: x.split('|'))
movies['year'] = movies['title'].apply(lambda x: extract_year(x)).fillna(0).astype(int)
movies['title'] = movies['title'].apply(lambda x: replace_year(x))

In [7]:
ratings_with_names = ratings.merge(movies, how='left', on='movieId')

In [8]:
ratings_with_names.head()

Unnamed: 0,userId,movieId,rating,title,genres,year
0,1,296,5.0,Pulp Fiction,"[Comedy, Crime, Drama, Thriller]",1994
1,1,306,3.5,Three Colors: Red (Trois couleurs: Rouge),[Drama],1994
2,1,307,5.0,Three Colors: Blue (Trois couleurs: Bleu),[Drama],1993
3,1,665,5.0,Underground,"[Comedy, Drama, War]",1995
4,1,899,3.5,Singin' in the Rain,"[Comedy, Musical, Romance]",1952


In [13]:
from elasticsearch import Elasticsearch

es = Elasticsearch(
    hosts=["https://localhost:9200"],
    http_auth=("admin", "admin"),
    verify_certs=False
)



In [11]:
def movie_iterator(index_string_name, table):
    my_iterator = iter(table.to_dict('records'))
    counter = 0
    while my_iterator:
        try:
            value = next(my_iterator)
            yield {"_index": index_string_name, "_id": counter, **value}
            counter += 1
        except StopIteration:
            break

In [15]:
%%timeit -r 1

from elasticsearch.helpers import parallel_bulk
from elasticsearch.helpers import streaming_bulk
import urllib3
urllib3.disable_warnings()

data = ratings_with_names[:200000]
index_name = 'user_ratings'


for success, info in parallel_bulk(
    es, movie_iterator(index_name, data), thread_count=8, chunk_size=500):
    if not success:
        print('A document failed:', info)

21.9 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [16]:
# 21.9 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)