### Settaggio Pyspark

Web app per monitorare i jobs in esecuzione su Spark: **http://localhost:4040/jobs/**

In [1]:
from pyspark import SparkConf, SparkContext

In [2]:
conf = SparkConf().setAppName('AmazonReviews').setMaster('local')
sc = SparkContext(conf = conf)

### Contesto

In questo notebook utilizzeremo gli RDD di Spark per analizzare circa 22.5 milioni di valutazioni di libri prese da Amazon. Le domande alle quali risponderemo sono le seguenti:

* Quante valutazioni ci sono nel dataset ?
* Quanti libri ci sono nel dataset ?
* Quante valutazioni ha ricevuto ogni libro ?
* Quali sono i 10 libri più valutati ?
* Qual è la valutazione media per ogni libro ?
* Quali sono i 10 libri con la valutazione più alta ?
* Chi sono i 10 recensori più critici ?

### Importo il dataset

In [3]:
rdd = sc.textFile('ratings_Books.csv')

In [4]:
rdd.take(5)

['AH2L9G3DQHHAJ,0000000116,4.0,1019865600',
 'A2IIIDRK3PRRZY,0000000116,1.0,1395619200',
 'A1TADCM7YWPQ8M,0000000868,4.0,1031702400',
 'AWGH7V0BDOJKB,0000013714,4.0,1383177600',
 'A3UTQPQPM4TQO0,0000013714,5.0,1374883200']

Vediamo cosa rappresentano le 4 colonne:

* **ID utente**: codice identificativo dell'utente che ha scritto la recensione;
* **ID libro**: codice identificativo del libro oggetto della recensione;
* **Voto**: valutazione eseguita dall'utente sul libro;
* **Timestamp**: quando è stata scritta la recensione

Utilizzando il seguente url seguito dal codice del libro ci si ricollega alla pagina Amazon relativa a quel libro:
**https://www.amazon.com/dp/codice_libro**

In [5]:
rdd = rdd.map(lambda x:x.split(','))

In [6]:
rdd.take(5)

[['AH2L9G3DQHHAJ', '0000000116', '4.0', '1019865600'],
 ['A2IIIDRK3PRRZY', '0000000116', '1.0', '1395619200'],
 ['A1TADCM7YWPQ8M', '0000000868', '4.0', '1031702400'],
 ['AWGH7V0BDOJKB', '0000013714', '4.0', '1383177600'],
 ['A3UTQPQPM4TQO0', '0000013714', '5.0', '1374883200']]

#### Quante valutazioni ci sono nel dataset ?

In [7]:
numValutazioni = rdd.count()

In [8]:
print('Il numero di valutazioni presenti nel dataset è',numValutazioni)

Il numero di valutazioni presenti nel dataset è 22507155


#### Quanti libri ci sono nel dataset ?

Considera che ogni libro è associato a un codice della seconda colonna.

In [9]:
numLibri = rdd.map(lambda x:x[1]).distinct().count()

In [10]:
print('Il numero di libri presenti nel dataset è',numLibri)

Il numero di libri presenti nel dataset è 2330066


#### Quante valutazioni ha ricevuto ogni libro ?

In [11]:
books_and_num_eval = rdd.map(lambda x:x[1]).countByValue()

Visualizziamo solo i primi 10.

In [15]:
contatore = 0

for i,j in books_and_num_eval.items():
    if contatore <=10:
        print((i,j))
        contatore = contatore + 1
    else:
        break

('0000000116', 2)
('0000000868', 1)
('0000013714', 14)
('0000015393', 1)
('0000029831', 5)
('0000038504', 2)
('0000041696', 4)
('0000095699', 1)
('0000174076', 1)
('0000202010', 1)
('0000230022', 10)


#### Quali sono i 10 libri più valutati ?

In [16]:
ten_most_evaluated = sorted(books_and_num_eval.items(), key = lambda kv: kv[1], reverse=True)[:10]

In [17]:
ten_most_evaluated

[('0439023483', 21398),
 ('030758836X', 19867),
 ('0439023513', 14114),
 ('0385537859', 12973),
 ('0007444117', 12629),
 ('0375831002', 12571),
 ('038536315X', 12564),
 ('0345803485', 12290),
 ('0316055433', 11746),
 ('0849922070', 10424)]

Si può anche fare in un altro  modo (meno veloce)

In [18]:
ten_most_evaluated = rdd.map(lambda x:(x[1],1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1],ascending = False).take(10)

In [19]:
ten_most_evaluated

[('0439023483', 21398),
 ('030758836X', 19867),
 ('0439023513', 14114),
 ('0385537859', 12973),
 ('0007444117', 12629),
 ('0375831002', 12571),
 ('038536315X', 12564),
 ('0345803485', 12290),
 ('0316055433', 11746),
 ('0849922070', 10424)]

#### Qual è la valutazione media per ogni libro ?

Per praticità elenco solo i primi 10.

In [20]:
books_mean_eval = rdd.map(lambda x:[x[1],(float(x[2]),1)]).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).mapValues(lambda x:x[0]/x[1])

In [21]:
books_mean_eval.take(10)

[('0001006657', 5.0),
 ('0001922408', 5.0),
 ('0002000601', 3.8333333333333335),
 ('0002006650', 4.0),
 ('0002007770', 4.398933511081486),
 ('0002153327', 5.0),
 ('0002153904', 5.0),
 ('0002159260', 5.0),
 ('0002159627', 5.0),
 ('0002161621', 3.75)]

#### Quali sono i 10 libri con la valutazione più alta ?

Innanzitutto stampo a video anche il numero di recensioni.

In [22]:
books_mean_eval_with_num_reviews = rdd.map(lambda x:[x[1],(float(x[2]),1)]).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).mapValues(lambda x:(x[0]/x[1],x[1]))

In [23]:
books_mean_eval_with_num_reviews.take(10)

[('0001006657', (5.0, 2)),
 ('0001922408', (5.0, 2)),
 ('0002000601', (3.8333333333333335, 6)),
 ('0002006650', (4.0, 2)),
 ('0002007770', (4.398933511081486, 6001)),
 ('0002153327', (5.0, 1)),
 ('0002153904', (5.0, 3)),
 ('0002159260', (5.0, 1)),
 ('0002159627', (5.0, 5)),
 ('0002161621', (3.75, 4))]

Filtro tutti i libri che hanno avuto almeno 100 recensioni.

In [24]:
books_mean_eval_with_num_reviews_filtered = books_mean_eval_with_num_reviews.filter(lambda x: x[1][1]>=100)

In [25]:
books_mean_eval_with_num_reviews_filtered.take(10)

[('0002007770', (4.398933511081486, 6001)),
 ('000222383X', (4.518518518518518, 108)),
 ('0007102658', (4.560344827586207, 116)),
 ('0007423632', (4.0449050086355784, 579)),
 ('0020519109', (4.168454935622318, 932)),
 ('003061368X', (4.787644787644788, 259)),
 ('006009334X', (3.718562874251497, 167)),
 ('0060098902', (4.215700660308144, 4089)),
 ('0060195339', (4.3618421052631575, 152)),
 ('0060245603', (4.542857142857143, 105))]

Ora li ordino in maniera decrescente di valutazione e stampo i primi 10.

In [26]:
books_mean_eval_with_num_reviews_filtered.sortBy(lambda x:x[1][0],ascending = False).take(10)

[('0983408904', (5.0, 128)),
 ('0830766316', (5.0, 103)),
 ('0972394648', (4.992647058823529, 136)),
 ('1499390165', (4.991803278688525, 122)),
 ('0849381185', (4.990566037735849, 106)),
 ('0757317723', (4.9862068965517246, 145)),
 ('1939629071', (4.983193277310924, 119)),
 ('1499381921', (4.982857142857143, 350)),
 ('1616387165', (4.981308411214953, 107)),
 ('0814416993', (4.980769230769231, 104))]

#### Chi sono i 10 recensori più critici ?

Con "recensori più critici" si intendono coloro che hanno rilasciato le recensioni più basse.

Dunque andremo a calcolare la valutazione media lasciata da ogni recensore e andremo a ordinare in maniera crescente (considerando solo i rcensori che hanno scritto più di 100 recensioni).

In [27]:
mean_reviewer = rdd.map(lambda x:[x[0],(float(x[2]),1)]) \
                   .reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) \
                   .filter(lambda x:x[1][1]>=100) \
                   .mapValues(lambda x:x[0]/x[1])

In [28]:
mean_reviewer.sortBy(lambda x:x[1]).take(10)

[('AH62BQTCMR3BR', 1.0534188034188035),
 ('A186OSXC7LHJDB', 1.2014925373134329),
 ('A2HESNQJZ9OB7H', 1.2543859649122806),
 ('A36IQRD3B5MK8G', 1.505050505050505),
 ('A3JF63XRSLLH0P', 1.5648148148148149),
 ('A344N0X5LIV43M', 1.646551724137931),
 ('A1SS16UHYW77D4', 1.855421686746988),
 ('A19UFCMSFGOZ2K', 2.076923076923077),
 ('A1NJHOGKZZRAX8', 2.1588785046728973),
 ('A1QMY19SW6RHJP', 2.2)]