Фильтры для нахождения фильмов

In [1]:
%pyspark

search_genres = 'Comedy'
year_from = 2000
year_to = 2000
regexp = ''
top_n = 0


In [2]:
%sh

apt install unzip
# download
rm -rf /zeppelin/seed/ml-latest-small
mkdir -p /zeppelin/seed
wget -O dataset-small.zip "https://files.grouplens.org/datasets/movielens/ml-latest-small.zip" --no-check-certificate
unzip -j dataset-small.zip -d /zeppelin/seed/ml-latest-small


rm -f /zeppelin/seed/ml-latest-small/links.csv
rm -f /zeppelin/seed/ml-latest-small/tags.csv
rm -f /zeppelin/seed/ml-latest-small/README.txt
rm -f dataset-small.zip


In [3]:
%pyspark
import re

movies = sc.textFile("file:///zeppelin/seed/movies.csv")
ratings = sc.textFile("file:///zeppelin/seed/ratings.csv")

In [4]:
%pyspark 

def parse_year(name):
    year_index = name.rfind('(')
    try:
        year = int(name[year_index + 1:year_index + 5])
    except ValueError:
        year = 0
        year_index = len(name)
    return year, year_index
    
def split_line(line):
     return line[0:line.find(',')], \
           line[line.find(',') + 1:line.rfind(',')].replace('"', ''), \
           line[line.rfind(',') + 1:len(line)].replace('\r', '')
    
def normalize_movies(line):
    try:
        movie_id, title, genres = split_line(line)
        year, year_index = parse_year(title)
        movie_id = int(movie_id)
        title = title[0:year_index - 1]
        return (movie_id, (title, year, genres))
    except:
        return (None, (None, None, None))

Нормализуем фильмы в формат (movie_id, (title, year, genres))

In [6]:
%pyspark

rdd_normalized_movies = movies.map(normalize_movies)

In [7]:
%pyspark
rdd_normalized_movies.take(5)

Фильтруем файл фильмов на корректность и по введеным фильтрам

In [9]:
%pyspark

rdd_filtered_movies = rdd_normalized_movies.filter(lambda p :(
    lambda title, year, genres:(
        title != None and year != None and genres != None and
        search_genres in genres and
        year_from <= year <= year_to and
        re.search(regexp, title)
    ))(*p[1]))

In [10]:
%pyspark

rdd_filtered_movies.take(5)

In [11]:
%pyspark

def normalize_ratings(line):
    try:
        _, movie_id, rating, _ = line.split(',')
        return (int(movie_id), (float(rating), 1))
    except:
        return (None, (None, None))

Нормализуем файл рейтингов в формат (movie_id, (rating, 1))

In [13]:
%pyspark

rdd_normalized_ratings = ratings.map(normalize_ratings)

In [14]:
%pyspark

rdd_normalized_ratings.take(5)

Считаем рейтинги для каждого id_movie и возвращаем в виде tuple(movie_id, (sum_ratings, count_ratings))

In [16]:
%pyspark

rdd_counted_ratings = rdd_normalized_ratings\
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
    .filter(lambda x: x[0] != None)

In [17]:
%pyspark

rdd_counted_ratings.take(5)

In [18]:
%pyspark
rdd_filtered_movies.take(5)

Объединяем таблицу фильмов и рейтингов по полю movie_id и считаем средний рейтинг

In [20]:
%pyspark


rdd_movies_rating = rdd_filtered_movies\
    .join(rdd_counted_ratings.map(lambda x: (x[0], (round(x[1][0] / x[1][1], 2),))))\
    .map(lambda x: (x[0], (x[1][0] + x[1][1])))



In [21]:
%pyspark
rdd_movies_rating.take(5)

In [22]:
%pyspark
Сортируем фильмы по рейтингу по убыванию и году по возрастанию

In [23]:
%pyspark
rdd_sorted1 = rdd_movies_rating.sortBy(lambda x: -x[1][3])
rdd_sorted1.take(20)

Сортируем фильмы по рейтинги и году

In [25]:
%pyspark
import heapq
# rdd_sorted = rdd_movies_rating.sortBy(lambda x: (-x[1][3], x[1][1]))
def key(kv):
    return kv[1][3], -kv[1][1]
    
# def merge(a, b):
#     return heapq.nlargest(10, a + b, key)
    
# rdd_movies_rating.mapPartitions(lambda iter: heapq.nlargest(10, iter, key)).reduce((lambda a, b: heapq.nlargest(10, a + b, key)))
if top_n == 0:
        top_n = 100000
        
def topIterator(iterator):
    
    yield heapq.nlargest(top_n, iterator, key=key)

def merge(a, b):
    return heapq.nlargest(top_n, a + b, key=key)

rdd_sorted = rdd_movies_rating.mapPartitions(topIterator).reduce(merge)


Делаем JSON like формат

In [27]:
%pyspark
rdd_result = rdd_sorted.flatMap(lambda x: [(genre, [{'title':x[1][0], 'year': x[1][1], 'rating':x[1][3]}]) for genre in x[1][2].split('|')])


In [28]:
%pyspark
rdd_result.take(5)

Объединяем фильмы по жанрам и возвращаем JSON

In [30]:
%pyspark

rdd_result_json = rdd_result.reduceByKey(lambda x, y: x + y).map(lambda x: {'genre': x[0], 'movies': x[1]})

Сохраняем результат

In [32]:
%pyspark

rdd_result_json.saveAsTextFile('/zeppelin/seed/result')