# Analyze TV reviews in rozetka.ua

## Scrap all reviews and ratings

In [10]:
import requests
from bs4 import BeautifulSoup
import re
import time
from pprint import pprint

def scrap_rozetka(theme, page_num):
    def scrap_items_urls(theme, page_num):
        urls = []
        #for page_num in range(1, pages_count + 1):
        link = f'https://rozetka.com.ua/{theme}/page={page_num}'
        f = requests.get(link)
        for item_xml in BeautifulSoup(f.text).findAll('div', {'class': 'g-i-tile g-i-tile-catalog'}):
            item_text_xml = item_xml.find('div', {'class': 'g-i-tile-i-title clearfix'})
            urls += [item_text_xml.find('a')['href'] + '/comments']
        return urls

    def scrap_comments(comments_page_url):
        f = requests.get(comments_page_url)
        #print(len(BeautifulSoup(f.text).findAll('article', {'class': 'pp-review-i'})))
        comment_texts_and_stars = []
        for comment_xml in BeautifulSoup(f.text).findAll('article', {'class': 'pp-review-i'}):
            comment_text = comment_xml.find('div', {'class': 'pp-review-text-i'})
            stars_xml = comment_xml.find('span', {'class': 'sprite g-rating-stars-i'})
            stars = int(stars_xml['content']) if stars_xml else -1
            comment_texts_and_stars.append((comment_text.text, stars))
        return comment_texts_and_stars

    def scrap_pages_count(item_comments_link):
        f = requests.get(item_comments_link)
        all_buttons_xml = BeautifulSoup(f.text).find('ul', {'class': 'clearfix inline'})
        if all_buttons_xml:
            buttons_xmls = all_buttons_xml.findAll('li', {'class': 'paginator-catalog-l-i'})
            if buttons_xmls:
                last_button_xml = buttons_xmls[-1]
                if last_button_xml['id']:
                    return int(last_button_xml['id'].split()[-1])

        return 1

    all_comment_texts_and_stars = []
    item_comments_urls = scrap_items_urls(theme, page_num)
    for item_comments_link in item_comments_urls:
        pages_count = scrap_pages_count(item_comments_link)
        for page_num in range(pages_count):
            comments_page_url = item_comments_link + f'/page={page_num}'
            all_comment_texts_and_stars += scrap_comments(comments_page_url)

    return all_comment_texts_and_stars

<b>works 3-15 min/page

In [11]:
import json

#for page_num in range(11, 30):
for page_num in [1]:
    try:
        page_messanges_texts_and_ratings = scrap_rozetka('/all-tv/c80037/filter/', page_num)
        with open(f'tv_comments_texts_and_ratings_page={page_num}.json', 'w') as f:
            json.dump(page_messanges_texts_and_ratings, f)
    except:
        pass

In [18]:
all_comments_json = []
for page_num in range(1, 30):
    with open(f'tv_comments_texts_and_ratings_page={page_num}.json') as f:
        all_comments_json += json.load(f)

with open(f'all_tv_comments_texts_and_ratings.json', 'w') as f:
    json.dump(all_comments_json, f)

## Select relevant reviews
* has rating
* UK language

In [24]:
#!pip install langid

In [10]:
import json
import langid

reviews_and_ratings = []
with open(f'all_tv_comments_texts_and_ratings.json') as f:
    reviews_and_ratings = json.load(f)

reviews_and_ratings = [rr for rr in reviews_and_ratings if rr[1] >= 0]
reviews_and_ratings = [rr for rr in reviews_and_ratings if langid.classify(rr[0])[0] == 'uk']
with open(f'reviews_and_ratings.json', 'w') as f:
    json.dump(reviews_and_ratings, f)

In [11]:
rewiews_hist = {1:0, 2:0, 3:0, 4:0, 5:0}
for rr in reviews_and_ratings:
    rewiews_hist[rr[1]] += 1
print(rewiews_hist)

{1: 348, 2: 311, 3: 508, 4: 1742, 5: 6282}


## Preprocess rewiews

Build preprocess function

In [12]:
#!pip install tokenize_uk
#!pip install -U https://github.com/kmike/pymorphy2/archive/master.zip#egg=pymorphy2
#!pip install -U pymorphy2-dicts-uk

In [137]:
import tokenize_uk
import pymorphy2
import re

morph = pymorphy2.MorphAnalyzer(lang='uk')

assert morph.parse('встала')[0].normal_form == 'встати'
assert tokenize_uk.tokenize_uk.tokenize_words('Це речення!') == ['Це', 'речення', '!']


def text2norm_words(text):
    words = tokenize_uk.tokenize_uk.tokenize_words(text)
    words = [morph.parse(word)[0].normal_form for word in words]
    words = [word for word in words if re.match(r'[\w]+', word ,re.U)]
    return words

Build train and test sets

In [138]:
from pprint import pprint
import json
import random

with open(f'reviews_and_ratings.json') as f:
    reviews_and_ratings = json.load(f)

random.shuffle(reviews_and_ratings)

reviews_count = len(reviews_and_ratings)
train_reviews_count = int(reviews_count * 0.7)
train_reviews_and_ratings = reviews_and_ratings[:train_reviews_count]
test_reviews_and_ratings = reviews_and_ratings[train_reviews_count:]

## Baseline BOW classifier

In [139]:
def get_bow(text):
    norm_words = text2norm_words(text)
    bow = {}
    for word in norm_words:
        if word not in bow:
            bow[word] = 0
        bow[word] += 1
    return bow

def stars_rating_to_sentiment_score(rating):
    return {1:-1, 2:-1, 3:0, 4:1, 5:1}[rating]

def get_hist(texts):
    hist = {}
    for text in texts:
        for word in text2norm_words(text):
            if not word in hist:
                hist[word] = 1
            hist[word] += 1
    return hist

def get_probs(texts):
    hist = get_hist(texts)
    total = len(texts)
    return {item : (float(hist[item]) / total) for item in hist}

In [140]:
import math
import operator

# Naive Bayess
class BaselineBOWClassifier:
    def __init__(self, train_reviews_and_ratings):
        sent_classes = [-1, 0, 1]

        classes_train_texts = {sent_class:[] for sent_class in sent_classes}
        for rr in train_reviews_and_ratings:
            sent_class = stars_rating_to_sentiment_score(rr[1])
            classes_train_texts[sent_class].append(rr[0])

        self.sent_classes = sent_classes
        self.classes_probs = {sent_class : len(classes_train_texts[sent_class])/len(train_reviews_and_ratings) for sent_class in sent_classes}
        self.words_probs = {sent_class : get_probs(classes_train_texts[sent_class]) for sent_class in sent_classes}

    def get_class_probs(self, text):
        def get_probs_sum(text, words_probs):
            probs_sum = 0
            for norm_word in get_bow(text):
                if norm_word in words_probs:
                    probs_sum += math.log(words_probs[norm_word])
                # critical: beacuse log(0) - 
                else:
                    probs_sum += math.log(1e-10)
            return probs_sum

        class_probs = {}
        for sent_class in self.sent_classes:
            class_probs[sent_class] = get_probs_sum(text, self.words_probs[sent_class]) + math.log(self.classes_probs[sent_class])

        return class_probs

    # return from -1, 0, 1
    def classify(self, text):
        return max(self.get_class_probs(text).items(), key=operator.itemgetter(1))[0]

## Tone-based BOW classifier

### Get correct tone words

In [141]:
def parse_line(line_str):
    word = line_str.split('\t')[0]
    tone = int(line_str.split('\t')[1].rstrip())
    return (word, tone)

with open('tone-dict-uk.tsv') as f:
    words_and_tones = f.readlines()
words_and_tones = list(map(parse_line, words_and_tones))
#pprint(words_and_tones)

In [142]:
#pprint(train_reviews_and_ratings)

Some potential issues after review:
* <b>Not presented in tone dict:</b> 
'задоволены', 'Небольшой, как раз для кухни. Звук очень хороший', 'дуже ростроїв звук(', 'Получив,що хотів'
* <b>Not easy to judge by tone dict:</b>
'проблем небуде?', 'жодних проблем', 'НЕ КУПУЙТЕ. Самсунг набагато краще.', 'Телевізор бомба!', ['Який рік випуску даного ТВ?', 5]
* <b>Typical for positive:</b> 
'!'

In [143]:
classifier = BaselineBOWClassifier(train_reviews_and_ratings)

In [144]:
def is_relevant(tone_tuple):
    word, tone, neg_prob, neutral_prob, pos_prob = tone_tuple
    return neg_prob + neutral_prob + pos_prob

# P(neg comment|tone == -2) > P(neutral comment|tone == -2) and P(neg comment| tone == -2) > P(pos comment|tone == -2)
# ...
def is_correct(tone_tuple):
    word, tone, neg_prob, neutral_prob, pos_prob = tone_tuple
    if tone == -2:
        return neg_prob > max(neutral_prob, pos_prob)
    if tone == -1:
        return max(neg_prob, neutral_prob) > pos_prob
    if tone == 0:
        return neutral_prob > max(neg_prob, pos_prob)
    if tone == 1:
        return max(pos_prob, neutral_prob) > neg_prob
    if tone == 2:
        return pos_prob > max(neutral_prob, neg_prob)

def get_impossible_states(tone_tuple):
    word, tone, neg_prob, neutral_prob, pos_prob = tone_tuple

    res = []
    if neg_prob == 0:
        res.append(-1)
    if neutral_prob == 0:
        res.append(0)
    if pos_prob == 0:
        res.append(1)

    return res

def separates_correct_state(tone_tuple):
    word, tone, neg_prob, neutral_prob, pos_prob = tone_tuple
    if tone == -2:
        return neutral_prob == 0 and pos_prob == 0
    if tone == -1:
        return neutral_prob == 0 and pos_prob == 0
    if tone == 0:
        return neg_prob == 0 and pos_prob == 0
    if tone == 1:
        return neutral_prob == 0 and neg_prob == 0
    if tone == 2:
        return neutral_prob == 0 and neg_prob == 0

def print_tuples(tone_tuples):
    for tone_tuple in tone_tuples:
        word, tone, neg_prob, neutral_prob, pos_prob = tone_tuple
        print(f'{word} tone:{tone} -1/0/1: ({round(neg_prob, 4)}, {round(neutral_prob, 4)}, {round(pos_prob, 4)})')

words_tones_probs = []
for wt in words_and_tones:
    word = wt[0]
    tone = wt[1]
    neg_prob = classifier.words_probs[-1][word] if word in classifier.words_probs[-1] else 0
    neutral_prob = classifier.words_probs[0][word] if word in classifier.words_probs[0] else 0
    pos_prob = classifier.words_probs[1][word] if word in classifier.words_probs[1] else 0
    words_tones_probs.append((word, tone, neg_prob, neutral_prob, pos_prob))

relevant_words_tones_probs = [tone_tuple for tone_tuple in words_tones_probs if is_relevant(tone_tuple)]
correct_words_tones_probs = [tone_tuple for tone_tuple in relevant_words_tones_probs if is_correct(tone_tuple)]
impossible_states = {tone_tuple[0]:get_impossible_states(tone_tuple) for tone_tuple in correct_words_tones_probs}
feature_words_tones_probs = [tone_tuple for tone_tuple in correct_words_tones_probs if separates_correct_state(tone_tuple)]

print('All -> Relevant -> Correct -> Separates impossible states -> Separates correct state')
print(f'{len(words_tones_probs)} -> {len(relevant_words_tones_probs)} -> {len(correct_words_tones_probs)} -> {len(impossible_states)} -> {len(feature_words_tones_probs)}')

incorrect_words = [w for w in relevant_words_tones_probs if w not in correct_words_tones_probs]
#pprint(impossible_states)

All -> Relevant -> Correct -> Separates impossible states -> Separates correct state
3442 -> 483 -> 292 -> 292 -> 168


### Create tone-based classifier on top of baseline classifier

In [145]:
from sklearn.linear_model import Perceptron
from sklearn.linear_model import LogisticRegression

class ToneBasedBOWClassifier:
    def get_avg_tone(self, text):
        tone_words = 0
        tone_sum = 0
        for norm_word in get_bow(text):
            if norm_word in self.correct_words_tones_probs:
                tone_words += 1
                tone_sum += self.correct_words_tones_probs[norm_word]

        return tone_sum / tone_words if tone_words else None

    def get_x(self, text, tone):
        probs = self.baseline_classifier.get_class_probs(text)
        return [probs[-1], probs[0], probs[1], tone]

    def __init__(self, train_reviews_and_ratings, correct_words_tones_probs):
        self.baseline_classifier = BaselineBOWClassifier(train_reviews_and_ratings)

        self.correct_words_tones_probs = {wp[0]:wp[1] for wp in correct_words_tones_probs}

        X, Y = [], []
        for rr in train_reviews_and_ratings:
            text = rr[0]
            tone = self.get_avg_tone(text)
            if tone != None:
                X.append(self.get_x(text, tone))
                Y.append(stars_rating_to_sentiment_score(rr[1]))

        self.clf = LogisticRegression().fit(X, Y)

    def classify(self, text):
        tone = self.get_avg_tone(text)
        if tone != None:
            return self.clf.predict([self.get_x(text, tone)])
        else:
            return self.baseline_classifier.classify(text)

## Evaluate approaches

In [146]:
def evaluate_rating_predictor(predictor, test_set):
    def rpf1(test_predictions, gt, class_name):
        test_predictions_add_gt = list(zip(test_predictions, gt))
        tp = test_predictions_add_gt.count((class_name, class_name))
        if gt.count(class_name) == 0 or test_predictions.count(class_name) == 0:
            return 0, 0, 0
        rec = tp / gt.count(class_name)
        prec = tp / test_predictions.count(class_name)
        f1 = 2 * rec * prec / (rec + prec)
        return round(rec, 2), round(prec, 2), round(f1, 2)

    test_predictions = [predictor.classify(test_sample[0]) for test_sample in test_set]
    gt = [stars_rating_to_sentiment_score(test_sample[1]) for test_sample in test_set]

    print(' - Neg_score: ', rpf1(test_predictions, gt, -1))
    print(' - Neutral_score: ', rpf1(test_predictions, gt, 0))
    print(' - Positive_score: ', rpf1(test_predictions, gt, 1))

In [147]:
print('Baseline BOW calssifier: ')
evaluate_rating_predictor(BaselineBOWClassifier(train_reviews_and_ratings), test_reviews_and_ratings)

Baseline BOW calssifier: 
 - Neg_score:  (0.96, 0.84, 0.9)
 - Neutral_score:  (0.88, 0.73, 0.8)
 - Positive_score:  (0.97, 0.99, 0.98)


In [148]:
print('Tone-based BOW calssifier: ')
evaluate_rating_predictor(ToneBasedBOWClassifier(train_reviews_and_ratings, correct_words_tones_probs), test_reviews_and_ratings)

Tone-based BOW calssifier: 




 - Neg_score:  (0.96, 0.85, 0.9)
 - Neutral_score:  (0.84, 0.82, 0.83)
 - Positive_score:  (0.98, 0.99, 0.98)
