## Top 10 Movies

#### Import packages

In [56]:
import os
import sys
import json

#### Connecting to Spark

In [57]:
os.environ["PYSPARK_SUBMIT_ARGS"]='--packages com.databricks:spark-csv_2.10:1.2.0 pyspark-shell'
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Python version 2.7.6 (default, Oct 26 2016 20:30:19)
SparkSession available as 'spark'.


#### Converting file to Resilient Distributed Dataset (RDD)

In [58]:
rdd = sc.textFile('ml-100k/u.data') # http://files.grouplens.org/datasets/movielens/ml-100k.zip

rdd.take(1)

[u'196\t242\t3\t881250949']

#### Strings to int

In [59]:
rdd = (
    rdd
    .map(lambda x: map(int, x.split("\t")))
)

rdd.take(1)

[[196, 242, 3, 881250949]]

### 1. Number of ratings for each movie

In [60]:
(
    rdd
    .map(lambda k: (k[1],1))
    .reduceByKey(lambda a,b: a+b)
    .sortBy(lambda x: -x[1])
    .take(10)
)

[(50, 583),
 (258, 509),
 (100, 508),
 (181, 507),
 (294, 485),
 (286, 481),
 (288, 478),
 (1, 452),
 (300, 431),
 (121, 429)]

In [61]:
rates = (
    rdd
    .map(lambda k: (k[1],1))
    .reduceByKey(lambda a,b: a+b)
    .sortBy(lambda x: x[0])
)

rates.take(10)

[(1, 452),
 (2, 131),
 (3, 90),
 (4, 209),
 (5, 86),
 (6, 26),
 (7, 392),
 (8, 219),
 (9, 299),
 (10, 89)]

In [62]:
top10_rates = (
    rates
    .sortBy(lambda x: -x[1])
    .map(lambda x: x[0])
    .take(10)
)

top10_rates

[50, 258, 100, 181, 294, 286, 288, 1, 300, 121]

### 2. Average rating of the movie
#### Sum of  ratings of the movie

In [63]:
rtng_sum = (
    rdd
    .map(lambda k: (k[1],k[2]))
    .reduceByKey(lambda a,b: a+b)
    .sortBy(lambda x: x[0])    
)

rtng_sum.take(3)

[(1, 1753), (2, 420), (3, 273)]

#### Number of people who rated

In [64]:
num_of_people = (
    rdd
    .map(lambda k: (k[1],1))
    .reduceByKey(lambda a,b: a+b)
    .sortBy(lambda x: x[0])
)

num_of_people.take(3)

[(1, 452), (2, 131), (3, 90)]

#### Average rating

In [65]:
top10_avg_unsort = (
    rtng_sum.zip(num_of_people) # merge two rdds
    .map(lambda x: [x[0][0], float(x[0][1]) / x[1][1]]) # rating sum / number of peoples
    .sortBy(lambda x: -x[1])
#     .map(lambda x: x[0])
    .take(10)
)

top10_avg_unsort

[[814, 5.0],
 [1122, 5.0],
 [1189, 5.0],
 [1201, 5.0],
 [1293, 5.0],
 [1467, 5.0],
 [1500, 5.0],
 [1536, 5.0],
 [1599, 5.0],
 [1653, 5.0]]

In [66]:
top10_avg_unsort_ids = zip(*top10_avg_unsort)[0] 
# >>> S
# [[1, 2], [3, 4], [5, 6]]
# >>> zip(*S)
# [(1, 3, 5), (2, 4, 6)]

top10_avg_unsort_ids

(814, 1122, 1189, 1201, 1293, 1467, 1500, 1536, 1599, 1653)

#### Average rating sorted by name of movie

In [67]:
u_item = sc.textFile("ml-100k/u.item") #read file

top10_average = (
    u_item
    .map(lambda x: x.split('|'))
    .map(lambda x: (int(x[0]), x[1])) # taking id and name of movie
    .filter(lambda x: x[0] in top10_avg_unsort_ids)
    .sortBy(lambda x: x[1])
    .map(lambda x: x[0])
    .take(10)
)

top10_average

[1536, 1653, 814, 1201, 1189, 1467, 1500, 1599, 1293, 1122]

### 3. Number of people who rated movie more than 4

In [68]:
rtng_more_4 = (
    rdd
    .map(lambda x: (x[1], 1) if x[2]>3 else (x[1], 0))
    .reduceByKey(lambda a, b: a + b)
    .sortBy(lambda x: x[0])
)

rtng_more_4.take(10)

[(1, 321),
 (2, 51),
 (3, 34),
 (4, 122),
 (5, 39),
 (6, 15),
 (7, 263),
 (8, 155),
 (9, 211),
 (10, 59)]

### 4. Proportion of people who rated the movie positively

In [69]:
prop_of_pozitive = (
    rtng_more_4.zip(rates) # merge two rdds
    .map(lambda x: (x[0][0], float(x[0][1]) / x[1][1])) # ratings > 3 / number of rates
)

prop_of_pozitive.take(10)

[(1, 0.7101769911504425),
 (2, 0.3893129770992366),
 (3, 0.37777777777777777),
 (4, 0.583732057416268),
 (5, 0.45348837209302323),
 (6, 0.5769230769230769),
 (7, 0.6709183673469388),
 (8, 0.7077625570776256),
 (9, 0.705685618729097),
 (10, 0.6629213483146067)]

### 5. Total average rating

#### Sum of all ratings

In [70]:
sum_all_rts = rdd.map(lambda x: x[2]).reduce(lambda a, b: a + b)

sum_all_rts

352986

#### Number of ratings of all movies

In [71]:
num_all_rts = rdd.count()

num_all_rts

100000

#### Total average rating

In [72]:
ttl_avg_rtng = mu = float(sum_all_rts) / num_all_rts

ttl_avg_rtng

3.52986

### 6. Corrected Estimation

In [73]:
k = 15

In [74]:
top10_rating = (
    rtng_sum.zip(num_of_people)
    .map(lambda x: [x[0][0], (float(x[0][1]) + k * mu) / (x[1][1] + k)])
    .sortBy(lambda x: -x[1])
    .map(lambda x: x[0])
    .take(10)
)

top10_rating

[318, 483, 64, 408, 169, 12, 50, 603, 114, 98]

### 7. Wilson score interval

In [75]:
from math import sqrt

In [76]:
z, prcnt = 3.291, 99.90

def wilson(n, p):
    return (p + z*z/(2*n) - z * sqrt((p*(1-p)+z*z/(4*n))/n))/(1+z*z/n)

In [77]:
wilson(452, 0.7) # testing

0.625062387412852

In [78]:
wilson_lower = (
    rates.zip(prop_of_pozitive)
    .map(lambda x: (x[0][0], wilson(x[0][1], x[1][1])))
    .sortBy(lambda x: -x[1])
)

wilson_lower.take(3)

[(64, 0.8270766587313791), (98, 0.8177122414044271), (318, 0.8152788845229866)]

In [79]:
top10_lower = wilson_lower.map(lambda x: x[0]).take(10)

top10_lower

[64, 98, 318, 479, 50, 483, 603, 427, 357, 12]

#### Creating json-file with results

In [80]:
with open('lab06s.json', 'w') as file:
    json.dump({'top10_rates': top10_rates, 
               'top10_average': top10_average, 
               'top10_rating': top10_rating, 
               'top10_lower': top10_lower}, file)