# Libraries Import

In [1]:
import json
import math
import numpy as np
import os
import pandas as pd
import re
import spacy
import string

from datetime import datetime
from tqdm import tqdm

import nltk
from nltk.corpus import stopwords
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk.tokenize import word_tokenize

import multiprocessing
from multiprocessing import Manager

In [2]:
nltk.download("stopwords")
nltk.download('punkt')
nltk.download('vader_lexicon')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package vader_lexicon to /root/nltk_data...


True

# Colab Configuration

In [3]:
# Set True to execute code by using Google Colab
# Set False to execute code by using local computer
USE_COLAB=True

In [4]:
# If Google Colab is used, add google.colab library
if USE_COLAB:
  from google.colab import drive

In [5]:
# If Google Colab is used, mount Google Drive to Colab System
if USE_COLAB:
  drive.mount("/content/gdrive/")

Mounted at /content/gdrive/


In [6]:
# Define the root path for working directory
root_path = "/content/gdrive/MyDrive/Master-Thesis/master-thesis-sentiment-analysis" if USE_COLAB else '.'

In [7]:
data_path = f"{root_path}/datasets/temp/Amazon Reviews 2023"

# Data Reading

In [None]:
dataset_path = f"{data_path}/Sports_and_Outdoors.jsonl"

In [None]:
with open(dataset_path, 'r') as fs:
  first_review = [json.loads(next(fs).strip()) for _ in range(1)]

first_review

[{'rating': 5.0,
  'title': 'Crazy comfy!',
  'text': 'Not gonna lie- they are not much to look at. Lol. Luckily I’m one of those ppl that values things for function over looks & these function well so far. They are seriously one of the most comfortable pairs of socks I’ve owned in 5 decades.  I have not tried to wash them yet, so fingers crossed on that rn.  They feel very cushiony.  I wear them in my winter boots & just on my feet shoeless around my home.  I wish they came in more colors.  I’m one of those ppl that absolutely cannot stand toe seams on socks, but these have not bothered me at all.  I have super high arches so the only change I would make to the socks would be some compression there.  However, the socks fit perfectly as-is which really surprised me given my arches.  I just like having compression at my arches bc it feels good on them.  I wear a ladies 10-1/2 shoe- mens 8-1/2 and I bought the medium socks. They fit perfectly.  That’s never happened.  I had honestly expe

In [None]:
def read_reviews():
  records = []
  with open(dataset_path, 'r') as fs:
    for line in fs:
      json_text = json.loads(line.strip())
      records.append((json_text["title"], json_text["text"], json_text["rating"]))

  return pd.DataFrame(records, columns =["review_title", "review_text", "rating"])

In [None]:
%%time
df = read_reviews()

CPU times: user 1min 35s, sys: 11.5 s, total: 1min 47s
Wall time: 1min 47s


In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19595170 entries, 0 to 19595169
Data columns (total 3 columns):
 #   Column        Dtype  
---  ------        -----  
 0   review_title  object 
 1   review_text   object 
 2   rating        float64
dtypes: float64(1), object(2)
memory usage: 448.5+ MB


In [None]:
df.head()

Unnamed: 0,review_title,review_text,rating
0,Crazy comfy!,Not gonna lie- they are not much to look at. L...,5.0
1,Excellent!,I love it. Pretty!,5.0
2,Best saddle pads,Huge fan of B Vertigo and this dressage pad do...,5.0
3,Perfect repair kit,"I have a great Weaver halter. Recently, the Ch...",5.0
4,Works great,This was great for a slightly too-short girth!...,5.0


In [None]:
df["rating"].value_counts()

rating
5.0    12981998
4.0     2518170
1.0     1836990
3.0     1324911
2.0      933101
Name: count, dtype: int64

# Data Cleansing

#### Fix missing values

In [None]:
df.dropna(how="all", inplace=True)

In [None]:
df.dropna(how="all",axis=1, inplace=True)

In [None]:
%%time
df.fillna({"review_title": "", "review_text": ""}, inplace=True)

CPU times: user 7 s, sys: 546 ms, total: 7.54 s
Wall time: 7.45 s


#### Combine "review_title" and "review_text" columns into Text

In [None]:
%%time
df["review"] = (df["review_title"].str.rstrip('.!? \n\t') +  ". " +  df["review_text"]).str.lstrip('.!? \n\t')

CPU times: user 20 s, sys: 3.78 s, total: 23.7 s
Wall time: 23.5 s


In [None]:
# Remove 'review_title' and 'review_text' columns
df.drop(columns=["review_title", "review_text"], inplace=True)

In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19595170 entries, 0 to 19595169
Data columns (total 2 columns):
 #   Column  Dtype  
---  ------  -----  
 0   rating  float64
 1   review  object 
dtypes: float64(1), object(1)
memory usage: 299.0+ MB


In [None]:
df.head()

Unnamed: 0,rating,review
0,5.0,Crazy comfy. Not gonna lie- they are not much ...
1,5.0,Excellent. I love it. Pretty!
2,5.0,Best saddle pads. Huge fan of B Vertigo and th...
3,5.0,Perfect repair kit. I have a great Weaver halt...
4,5.0,Works great. This was great for a slightly too...


# Text Processing

In [8]:
from abc import ABC, abstractmethod

In [9]:
class SentimentTaskHandler(ABC):

  @abstractmethod
  def before_handle(self, data_frame, shared_dict=None):
    pass

  @abstractmethod
  def on_handle(self, data_frame, shared_dict=None, task_number=1):
    pass

  @abstractmethod
  def after_handle(self, data_frame, shared_dict=None):
    pass

In [10]:
class SentimentTaskManager:

  @staticmethod
  def execute(data_frame, page_size=100, cpu_count = 1, command_handler: SentimentTaskHandler = None):
    if command_handler is None:
      raise Exception("'command_handler' object not constructed. Cannot access a 'None' object.")

    total_records = len(data_frame.index)
    total_pages = math.ceil(len(data_frame.index) / page_size)
    current_page = 0

    print(f"Page Size    : {page_size}")
    print(f"Total records: {total_records}")
    print(f"Total pages  : {total_pages}")
    with Manager() as manager:
      processes = []
      shared_dict = manager.dict()

      command_handler.before_handle(data_frame, shared_dict)
      while current_page < total_pages:
        processes.clear()

        for i in range(current_page, total_pages):
          current_page = i + 1
          start = i * page_size
          end = (i + 1) * page_size

          if end > total_records:
            end = total_records

          print("-" * 16)
          print(f"Page: {current_page} - From {start} to {end}")

          processes.append(multiprocessing.Process(target=command_handler.on_handle, args=(data_frame.iloc[start:end], shared_dict, current_page)))

          if current_page % cpu_count == 0:
            break

        print("-" * 16)

        for p in processes:
          p.start()

        for p in processes:
          p.join()

        command_handler.after_handle(data_frame, shared_dict)
        shared_dict.clear()

      print("All tasks are completed")

#### Convert text to lower - Remove digits - Remove stop words

In [None]:
class CleanReviewCommandHandler(SentimentTaskHandler):

  def __init__(self):
    self.__stop_words = stopwords.words('english')


  def __clean_review_core(self, document):
    document = document.lower().translate(str.maketrans('', '', string.punctuation))
    document = re.sub(r'\w*\d\w*', '', document)
    return " ".join([token for token in word_tokenize(document) if len(token) >=3 and token not in self.__stop_words])


  def before_handle(self, data_frame, shared_dict=None):
    print("Set empty value for 'review_cleaned' column")
    data_frame.loc[:,"review_cleaned"] = ""


  def on_handle(self, data_frame, shared_dict=None, task_number=1):
    print(f"Task {task_number} is started at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")

    if shared_dict is None:
      for i, r in data_frame.iterrows():
        data_frame.at[i, "review_cleaned"] = self.__clean_review_core(r["review"])
    else:
      for i, r in data_frame.iterrows():
        shared_dict[i] = self.__clean_review_core(r["review"])

    print(f"Task {task_number} is completed at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")


  def after_handle(self, data_frame, shared_dict=None):
    print(f"Merge results from all tasks")
    for k,v in shared_dict.items():
      data_frame.at[k, "review_cleaned"] = v

In [None]:
%%time
cleaned_tasks = [
    { "id": 0, "count": -1, "page_size": 1_000_000, "cpu_count": 10 },
    { "id": 1, "count": 100, "page_size": 10, "cpu_count": 10 }
]

selected_id = 0
selected_cleaned_task = cleaned_tasks[selected_id] if selected_id < len(cleaned_tasks) and cleaned_tasks[selected_id]["id"] == selected_id else cleaned_tasks[1]

df["review_cleaned"] = ""
SentimentTaskManager().execute(
    data_frame= df.iloc[: selected_cleaned_task["count"]] if selected_cleaned_task["count"] > 0 else df,
    page_size=selected_cleaned_task["page_size"],
    cpu_count=selected_cleaned_task["cpu_count"],
    command_handler=CleanReviewCommandHandler()
)

Page Size    : 1000000
Total records: 19595170
Total pages  : 20
Set empty value for 'review_cleaned' column
----------------
Page: 1 - From 0 to 1000000
----------------
Page: 2 - From 1000000 to 2000000
----------------
Page: 3 - From 2000000 to 3000000
----------------
Page: 4 - From 3000000 to 4000000
----------------
Page: 5 - From 4000000 to 5000000
----------------
Page: 6 - From 5000000 to 6000000
----------------
Page: 7 - From 6000000 to 7000000
----------------
Page: 8 - From 7000000 to 8000000
----------------
Page: 9 - From 8000000 to 9000000
----------------
Page: 10 - From 9000000 to 10000000
----------------
Task 1 is started at 26/07/2024 17:28:38
Task 2 is started at 26/07/2024 17:28:38
Task 3 is started at 26/07/2024 17:28:39
Task 4 is started at 26/07/2024 17:28:39
Task 5 is started at 26/07/2024 17:28:39
Task 6 is started at 26/07/2024 17:28:39
Task 7 is started at 26/07/2024 17:28:40
Task 8 is started at 26/07/2024 17:28:40
Task 9 is started at 26/07/2024 17:28:40

In [None]:
(df["review_cleaned"].values == '').sum()

23153

In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19595170 entries, 0 to 19595169
Data columns (total 3 columns):
 #   Column          Dtype  
---  ------          -----  
 0   rating          float64
 1   review          object 
 2   review_cleaned  object 
dtypes: float64(1), object(2)
memory usage: 448.5+ MB


In [None]:
df.head()

Unnamed: 0,rating,review,review_cleaned
0,5.0,Crazy comfy. Not gonna lie- they are not much ...,crazy comfy gon lie much look lol luckily one ...
1,5.0,Excellent. I love it. Pretty!,excellent love pretty
2,5.0,Best saddle pads. Huge fan of B Vertigo and th...,best saddle pads huge fan vertigo dressage pad...
3,5.0,Perfect repair kit. I have a great Weaver halt...,perfect repair kit great weaver halter recentl...
4,5.0,Works great. This was great for a slightly too...,works great great slightly tooshort girth stur...


#### Lemmatize text

In [None]:
class LemmatizeReviewCommandHandler(SentimentTaskHandler):

  def __init__(self):
    self.__nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"])


  def __lemmatize_review_core(self, document):
    return " ".join([token.lemma_ for token in self.__nlp(document) if len(token.lemma_) >= 3 and not token.is_stop])


  def before_handle(self, data_frame, shared_dict=None):
    print("Set empty value for 'review_lemmatized' column")
    data_frame.loc[:,"review_lemmatized"] = ""


  def on_handle(self, data_frame, shared_dict=None, task_number=1):
    print(f"Task {task_number} is started at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")

    if shared_dict is None:
      for i, r in data_frame.iterrows():
        data_frame.at[i, "review_lemmatized"] = self.__lemmatize_review_core(r["review_cleaned"])
    else:
      for i, r in data_frame.iterrows():
        shared_dict[i] = self.__lemmatize_review_core(r["review_cleaned"])

    print(f"Task {task_number} is completed at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")


  def after_handle(self, data_frame, shared_dict=None):
    print("Merge results from all tasks")
    for k,v in shared_dict.items():
      data_frame.at[k, "review_lemmatized"] = v

In [None]:
%%time
lemmatized_tasks = [
    { "id": 0, "count": -1, "page_size": 200_000, "cpu_count": 50 },
    { "id": 1, "count": 200, "page_size": 10, "cpu_count": 10 },
    { "id": 2, "count": 4_000_000, "page_size": 200_000, "cpu_count": 20 },  # 16mins
    { "id": 3, "count": 5_000_000, "page_size": 200_000, "cpu_count": 25 },  # 17mins 7s
    { "id": 4, "count": 10_000_000, "page_size": 200_000, "cpu_count": 50 }  # 31min 32s
]

selected_id = 0
selected_lemmatized_task = lemmatized_tasks[selected_id] if selected_id < len(lemmatized_tasks) and lemmatized_tasks[selected_id]["id"] == selected_id else lemmatized_tasks[1]

df["review_lemmatized"] = ""
SentimentTaskManager().execute(
    data_frame=df.iloc[: selected_lemmatized_task["count"]] if selected_lemmatized_task["count"] > 0 else df,
    page_size=selected_lemmatized_task["page_size"],
    cpu_count=selected_lemmatized_task["cpu_count"],
    command_handler=LemmatizeReviewCommandHandler()
)

Page Size    : 200000
Total records: 19595170
Total pages  : 98
Set empty value for 'review_lemmatized' column
----------------
Page: 1 - From 0 to 200000
----------------
Page: 2 - From 200000 to 400000
----------------
Page: 3 - From 400000 to 600000
----------------
Page: 4 - From 600000 to 800000
----------------
Page: 5 - From 800000 to 1000000
----------------
Page: 6 - From 1000000 to 1200000
----------------
Page: 7 - From 1200000 to 1400000
----------------
Page: 8 - From 1400000 to 1600000
----------------
Page: 9 - From 1600000 to 1800000
----------------
Page: 10 - From 1800000 to 2000000
----------------
Page: 11 - From 2000000 to 2200000
----------------
Page: 12 - From 2200000 to 2400000
----------------
Page: 13 - From 2400000 to 2600000
----------------
Page: 14 - From 2600000 to 2800000
----------------
Page: 15 - From 2800000 to 3000000
----------------
Page: 16 - From 3000000 to 3200000
----------------
Page: 17 - From 3200000 to 3400000
----------------
Page: 18 - 

In [None]:
(df["review_lemmatized"].values == '').sum()

27863

In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19595170 entries, 0 to 19595169
Data columns (total 4 columns):
 #   Column             Dtype  
---  ------             -----  
 0   rating             float64
 1   review             object 
 2   review_cleaned     object 
 3   review_lemmatized  object 
dtypes: float64(1), object(3)
memory usage: 598.0+ MB


In [None]:
df.head()

Unnamed: 0,rating,review,review_cleaned,review_lemmatized
0,5.0,Crazy comfy. Not gonna lie- they are not much ...,crazy comfy gon lie much look lol luckily one ...,crazy comfy gon lie look lol luckily ppl value...
1,5.0,Excellent. I love it. Pretty!,excellent love pretty,excellent love pretty
2,5.0,Best saddle pads. Huge fan of B Vertigo and th...,best saddle pads huge fan vertigo dressage pad...,good saddle pad huge fan vertigo dressage pad ...
3,5.0,Perfect repair kit. I have a great Weaver halt...,perfect repair kit great weaver halter recentl...,perfect repair kit great weaver halter recentl...
4,5.0,Works great. This was great for a slightly too...,works great great slightly tooshort girth stur...,work great great slightly tooshort girth sturd...


# Backup Lemmatized Step

In [None]:
df.drop(columns=["review"], inplace=True)

In [None]:
# df[["rating", "review_cleaned", "review_lemmatized"]].to_json(f"{data_path}/Reviews_Lemmatized_Min.zip",  orient="records", lines=True, compression="zip")

# Restore Lemmatized Step

In [None]:
# %%time
# restore_path_1 = f"{data_path}/Reviews_Lemmatized_Min.zip"

# if 'df' not in globals():
#   if os.path.exists(restore_path_1):
#     print(f"Read data from Reviews_Lemmatized_Min.zip")
#     df = pd.read_json(restore_path_1, orient="records", lines=True, compression="zip")
#   else:
#     print(f"{restore_path_1} not found")

In [None]:
# df.head()

# Sentiment Score

In [None]:
class DetectSentimentBasedOnReviewCommandHandler(SentimentTaskHandler):

  def __init__(self, reivew_text_column, sentiment_score_column, sentiment_column):
    self.__sentiment_analyzer = SentimentIntensityAnalyzer()
    self.__reivew_text_column = reivew_text_column
    self.__sentiment_score_column = sentiment_score_column
    self.__sentiment_column = sentiment_column


  def __calcualte_sentiment_score(self, document):
    return self.__sentiment_analyzer.polarity_scores(document)["compound"]


  def __detect_sentiment(self, score):
    return "positive" if score >= 0.5 else "negative" if score <= -0.5 else "neutral"


  def before_handle(self, data_frame, shared_dict=None):
    print(f"Set empty value for '{self.__sentiment_score_column}' column")
    data_frame.loc[:, self.__sentiment_score_column] = np.nan

    print(f"Set empty value for '{self.__sentiment_column}' column")
    data_frame.loc[:, self.__sentiment_column] = ""


  def on_handle(self, data_frame, shared_dict=None, task_number=1):
    print(f"Task {task_number} is started at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")

    if shared_dict is None:
      for i, r in data_frame.iterrows():
        data_frame.at[i, self.__sentiment_score_column] = self.__calcualte_sentiment_score(r[self.__reivew_text_column])
        data_frame.at[i, self.__sentiment_column] = self.__detect_sentiment(data_frame.at[i, self.__sentiment_score_column])
    else:
      for i, r in data_frame.iterrows():
        shared_dict[i] = self.__calcualte_sentiment_score(r[self.__reivew_text_column])

    print(f"Task {task_number} is completed at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")


  def after_handle(self, data_frame, shared_dict=None):
    print("Merge results from all tasks")
    for k,v in shared_dict.items():
      data_frame.at[k, self.__sentiment_score_column] = v
      data_frame.at[k, self.__sentiment_column] = self.__detect_sentiment(v)


In [None]:
%%time
sentiment_tasks = [
    { "id": 0, "count": -1, "page_size": 100_000, "cpu_count": 10 },            # 36min 19s
    { "id": 1, "count": 100, "page_size": 10, "cpu_count": 10 },                # 15.8 s
    { "id": 2, "count": 100_000, "page_size": 5_000, "cpu_count": 10 },         # 37.1 s
    { "id": 3, "count": 100_000, "page_size": 5_000, "cpu_count": 20 },         # 34.3 s
    { "id": 4, "count": 100_000, "page_size": 10_000, "cpu_count": 10 },        # 25.2 s
    { "id": 5, "count": 100_000, "page_size": 20_000, "cpu_count": 5 },         # 24.6 s
    { "id": 6, "count": 1_000_000, "page_size": 10_000, "cpu_count": 25 },      # 2min 58s
    { "id": 7, "count": 1_000_000, "page_size": 10_000, "cpu_count": 50 },      # 2min 37s
    { "id": 8, "count": 1_000_000, "page_size": 20_000, "cpu_count": 50 },      # 2min 47s
    { "id": 9, "count": 1_000_000, "page_size": 100_000, "cpu_count": 10 },     # 2min 3s
    { "id": 10, "count": 2_000_000, "page_size": 100_000, "cpu_count": 20 },    # 5min 9s
    { "id": 11, "count": 2_000_000, "page_size": 200_000, "cpu_count": 10 },    # 3min 55s
    { "id": 12, "count": 3_000_000, "page_size": 300_000, "cpu_count": 10 },    # 5min 47s
    { "id": 13, "count": 10_000_000, "page_size": 1_000_000, "cpu_count": 10 }  # 19min
]

selected_id = 0
selected_sentiment_task = sentiment_tasks[selected_id] if selected_id < len(sentiment_tasks) and sentiment_tasks[selected_id]["id"] == selected_id else sentiment_tasks[1]

df["sentiment_score_cleaned"] = np.nan
df["sentiment_cleaned"] = ""
SentimentTaskManager().execute(
    data_frame=df.iloc[: selected_sentiment_task["count"]] if selected_sentiment_task["count"] > 0 else df,
    page_size=selected_sentiment_task["page_size"],
    cpu_count=selected_sentiment_task["cpu_count"],
    command_handler=DetectSentimentBasedOnReviewCommandHandler(
        reivew_text_column="review_cleaned",
        sentiment_score_column="sentiment_score_cleaned",
        sentiment_column="sentiment_cleaned")
)

Page Size    : 100000
Total records: 19595170
Total pages  : 196
Set empty value for 'sentiment_score' column
Set empty value for 'sentiment' column
----------------
Page: 1 - From 0 to 100000
----------------
Page: 2 - From 100000 to 200000
----------------
Page: 3 - From 200000 to 300000
----------------
Page: 4 - From 300000 to 400000
----------------
Page: 5 - From 400000 to 500000
----------------
Page: 6 - From 500000 to 600000
----------------
Page: 7 - From 600000 to 700000
----------------
Page: 8 - From 700000 to 800000
----------------
Page: 9 - From 800000 to 900000
----------------
Page: 10 - From 900000 to 1000000
----------------
Task 1 is started at 26/07/2024 19:12:49
Task 2 is started at 26/07/2024 19:12:49
Task 3 is started at 26/07/2024 19:12:50
Task 4 is started at 26/07/2024 19:12:50
Task 5 is started at 26/07/2024 19:12:50
Task 6 is started at 26/07/2024 19:12:51
Task 7 is started at 26/07/2024 19:12:51
Task 8 is started at 26/07/2024 19:12:52
Task 9 is started a

In [None]:
(df["sentiment_cleaned"].values == '').sum()

0

In [None]:
%%time
sentiment_tasks = [
    { "id": 0, "count": -1, "page_size": 100_000, "cpu_count": 10 },
    { "id": 1, "count": 100, "page_size": 10, "cpu_count": 10 },                # 13.6s
    { "id": 2, "count": 100_000, "page_size": 5_000, "cpu_count": 10 },         # 32.7s
    { "id": 3, "count": 100_000, "page_size": 5_000, "cpu_count": 20 },         # 27s
    { "id": 4, "count": 100_000, "page_size": 10_000, "cpu_count": 10 },        # 20.5s
    { "id": 5, "count": 100_000, "page_size": 20_000, "cpu_count": 5 },         # 23.1s
    { "id": 6, "count": 1_000_000, "page_size": 10_000, "cpu_count": 25 },      # 2min 34s
    { "id": 7, "count": 1_000_000, "page_size": 10_000, "cpu_count": 50 },      # 2min 25s
    { "id": 8, "count": 1_000_000, "page_size": 20_000, "cpu_count": 50 },      # 2min 45s
    { "id": 9, "count": 1_000_000, "page_size": 100_000, "cpu_count": 10 },     # 2min 2s
    { "id": 10, "count": 2_000_000, "page_size": 100_000, "cpu_count": 20 },    # 5min 11s
    { "id": 11, "count": 2_000_000, "page_size": 200_000, "cpu_count": 10 },    # 3min 51s
    { "id": 12, "count": 3_000_000, "page_size": 300_000, "cpu_count": 10 },    # 5min 44s
    { "id": 13, "count": 10_000_000, "page_size": 1_000_000, "cpu_count": 10 }, # 18min 52s
]

selected_id = 0
selected_sentiment_task = sentiment_tasks[selected_id] if selected_id < len(sentiment_tasks) and sentiment_tasks[selected_id]["id"] == selected_id else sentiment_tasks[1]

df["sentiment_score_lemmatized"] = np.nan
df["sentiment_lemmatized"] = ""
SentimentTaskManager().execute(
    data_frame=df.iloc[: selected_sentiment_task["count"]] if selected_sentiment_task["count"] > 0 else df,
    page_size=selected_sentiment_task["page_size"],
    cpu_count=selected_sentiment_task["cpu_count"],
    command_handler=DetectSentimentBasedOnReviewCommandHandler(
        reivew_text_column="review_lemmatized",
        sentiment_score_column="sentiment_score_lemmatized",
        sentiment_column="sentiment_lemmatized")
)

Page Size    : 100000
Total records: 19595170
Total pages  : 196
Set empty value for 'sentiment_score' column
Set empty value for 'sentiment' column
----------------
Page: 1 - From 0 to 100000
----------------
Page: 2 - From 100000 to 200000
----------------
Page: 3 - From 200000 to 300000
----------------
Page: 4 - From 300000 to 400000
----------------
Page: 5 - From 400000 to 500000
----------------
Page: 6 - From 500000 to 600000
----------------
Page: 7 - From 600000 to 700000
----------------
Page: 8 - From 700000 to 800000
----------------
Page: 9 - From 800000 to 900000
----------------
Page: 10 - From 900000 to 1000000
----------------
Task 1 is started at 26/07/2024 19:50:42
Task 2 is started at 26/07/2024 19:50:43
Task 3 is started at 26/07/2024 19:50:43
Task 4 is started at 26/07/2024 19:50:44
Task 5 is started at 26/07/2024 19:50:44
Task 6 is started at 26/07/2024 19:50:44
Task 7 is started at 26/07/2024 19:50:45
Task 8 is started at 26/07/2024 19:50:45
Task 9 is started a

In [None]:
(df["sentiment_lemmatized"].values == '').sum()

0

In [11]:
class DetectSentimentBasedOnRatingCommandHandler(SentimentTaskHandler):

  def __init__(self, rating_column, sentiment_column):
    self.__sentiment_analyzer = SentimentIntensityAnalyzer()
    self.__rating_column = rating_column
    self.__sentiment_column = sentiment_column


  def __detect_sentiment(self, rating):
    return "positive" if rating > 3 else "negative" if rating < 3 else "neutral"


  def before_handle(self, data_frame, shared_dict=None):
    print(f"Set empty value for '{self.__sentiment_column}' column")
    data_frame.loc[:, self.__sentiment_column] = ""


  def on_handle(self, data_frame, shared_dict=None, task_number=1):
    print(f"Task {task_number} is started at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")

    if shared_dict is None:
      for i, r in data_frame.iterrows():
        data_frame.at[i, self.__sentiment_column] = self.__detect_sentiment(r[self.__rating_column])
    else:
      for i, r in data_frame.iterrows():
        shared_dict[i] = self.__detect_sentiment(r[self.__rating_column])

    print(f"Task {task_number} is completed at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")


  def after_handle(self, data_frame, shared_dict=None):
    print("Merge results from all tasks")
    for k,v in shared_dict.items():
      data_frame.at[k, self.__sentiment_column] = v


In [38]:
%%time
sentiment_rating_tasks = [
    { "id": 0, "count": -1, "page_size": 500_000, "cpu_count": 10 },            # 24min
    { "id": 1, "count": 10_000, "page_size": 10_000, "cpu_count": 1 },          # 3.39s
    { "id": 2, "count": 100_000, "page_size": 100_000, "cpu_count": 1 },        # 14.8s
    { "id": 3, "count": 100_000, "page_size": 10_000, "cpu_count": 10 },        # 9.66s
    { "id": 4, "count": 200_000, "page_size": 10_000, "cpu_count": 20 },        # 19.1
    { "id": 5, "count": 1_000_000, "page_size": 1_000_000, "cpu_count": 1 },    # 2min 7s
    { "id": 6, "count": 1_000_000, "page_size": 100_000, "cpu_count": 10 },     # 1min 17s
    { "id": 7, "count": 2_000_000, "page_size": 100_000, "cpu_count": 20 },     # 3min 53s
    { "id": 8, "count": 2_000_000, "page_size": 200_000, "cpu_count": 10 },     # 2min 38s
    { "id": 9, "count": 5_000_000, "page_size": 500_000, "cpu_count": 10 },     # 6min 33s
    { "id": 10, "count": 10_000_000, "page_size": 500_000, "cpu_count": 20 },   # 20min 2s
    { "id": 11, "count": 10_000_000, "page_size": 1_000_000, "cpu_count": 10 }  # 12min 39s o
]

selected_id = 0
selected_sentiment_rating_task = sentiment_rating_tasks[selected_id] if selected_id < len(sentiment_rating_tasks) and sentiment_rating_tasks[selected_id]["id"] == selected_id else sentiment_rating_tasks[1]

df["sentiment_rating"] = ""
SentimentTaskManager().execute(
    data_frame=df.iloc[: selected_sentiment_rating_task["count"]] if selected_sentiment_rating_task["count"] > 0 else df,
    page_size=selected_sentiment_rating_task["page_size"],
    cpu_count=selected_sentiment_rating_task["cpu_count"],
    command_handler=DetectSentimentBasedOnRatingCommandHandler(
        rating_column="rating",
        sentiment_column="sentiment_rating")
)

Page Size    : 500000
Total records: 19595170
Total pages  : 40
Set empty value for 'sentiment_rating' column
----------------
Page: 1 - From 0 to 500000
----------------
Page: 2 - From 500000 to 1000000
----------------
Page: 3 - From 1000000 to 1500000
----------------
Page: 4 - From 1500000 to 2000000
----------------
Page: 5 - From 2000000 to 2500000
----------------
Page: 6 - From 2500000 to 3000000
----------------
Page: 7 - From 3000000 to 3500000
----------------
Page: 8 - From 3500000 to 4000000
----------------
Page: 9 - From 4000000 to 4500000
----------------
Page: 10 - From 4500000 to 5000000
----------------
Task 1 is started at 27/07/2024 07:31:34
Task 2 is started at 27/07/2024 07:31:35
Task 3 is started at 27/07/2024 07:31:35
Task 4 is started at 27/07/2024 07:31:35
Task 5 is started at 27/07/2024 07:31:36
Task 6 is started at 27/07/2024 07:31:36
Task 7 is started at 27/07/2024 07:31:36
Task 8 is started at 27/07/2024 07:31:37
Task 9 is started at 27/07/2024 07:31:37
T

In [40]:
(df["sentiment_rating"].values == '').sum()

0

In [41]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19595170 entries, 0 to 19595169
Data columns (total 8 columns):
 #   Column                      Dtype  
---  ------                      -----  
 0   rating                      int64  
 1   review_cleaned              object 
 2   review_lemmatized           object 
 3   sentiment_score_cleaned     float64
 4   sentiment_cleaned           object 
 5   sentiment_score_lemmatized  float64
 6   sentiment_lemmatized        object 
 7   sentiment_rating            object 
dtypes: float64(2), int64(1), object(5)
memory usage: 1.2+ GB


In [44]:
df.head()

Unnamed: 0,rating,review_cleaned,review_lemmatized,sentiment_score_cleaned,sentiment_cleaned,sentiment_score_lemmatized,sentiment_lemmatized,sentiment_rating
0,5,crazy comfy gon lie much look lol luckily one ...,crazy comfy gon lie look lol luckily ppl value...,0.9951,positive,0.9959,positive,positive
1,5,excellent love pretty,excellent love pretty,0.9022,positive,0.9022,positive,positive
2,5,best saddle pads huge fan vertigo dressage pad...,good saddle pad huge fan vertigo dressage pad ...,0.9091,positive,0.8298,positive,positive
3,5,perfect repair kit great weaver halter recentl...,perfect repair kit great weaver halter recentl...,0.9496,positive,0.9485,positive,positive
4,5,works great great slightly tooshort girth stur...,work great great slightly tooshort girth sturd...,0.9246,positive,0.9246,positive,positive


# Backup Sentiment Score Step

In [45]:
df.to_json(f"{data_path}/Reviews_Sentiment_Min.zip",  orient="records", lines=True, compression="zip")

# Restore Sentiment Score Step

In [None]:
# %%time
# restore_path_2 = f"{data_path}/Reviews_Sentiment_Min.zip"

# if 'df' not in globals():
#   if os.path.exists(restore_path_2):
#     print(f"Read data from Reviews_Sentiment_Min.zip")
#     df = pd.read_json(restore_path_2, orient="records", lines=True, compression="zip")
#   else:
#     print(f"{restore_path_2} not found")

In [14]:
# df.head()

Unnamed: 0,rating,review_cleaned,review_lemmatized,sentiment_score_cleaned,sentiment_cleaned,sentiment_score_lemmatized,sentiment_lemmatized
0,5,crazy comfy gon lie much look lol luckily one ...,crazy comfy gon lie look lol luckily ppl value...,0.9951,positive,0.9959,positive
1,5,excellent love pretty,excellent love pretty,0.9022,positive,0.9022,positive
2,5,best saddle pads huge fan vertigo dressage pad...,good saddle pad huge fan vertigo dressage pad ...,0.9091,positive,0.8298,positive
3,5,perfect repair kit great weaver halter recentl...,perfect repair kit great weaver halter recentl...,0.9496,positive,0.9485,positive
4,5,works great great slightly tooshort girth stur...,work great great slightly tooshort girth sturd...,0.9246,positive,0.9246,positive
