<a href="https://colab.research.google.com/github/jon-chun/sentiment_cruxes/blob/main/hpotter_gobletoffire_20211122.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Jon Chun
24 Oct 2021

# **Setup and Configuration**

In [None]:
!pip install transformers[sentencepiece]

In [None]:
import transformers

In [None]:
!pip install texthero

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

%matplotlib inline

In [None]:
import string
import re

In [None]:
from google.colab import files

In [None]:
%matplotlib inline

In [None]:
plt.rcParams["figure.figsize"] = (20,10)

# **Utility Functions**

In [None]:
def twoway_probability2sentiment(text_str, sentiment_2polarity_fn, pol_labels=['negative','positive']):
  '''
  Given a text string, sentiment_fn (return 0.0 to 1.0) and a list of 2 labels for negative and positive classes
    e.g. CamemBERT returns (LABEL_0/LABEL_1) for (Negative/Positive)
         xxx returns (LABEL_1/LABEL_2) for (Negative/Positive)
         xxx returns (NEGATIVE/POSITIVE)
         xxx returns (Neg/Pos)
  Get return a sign adjusted sentiment score -1.0 to 1.0
  '''

  model_score = sentiment_2polarity_fn(text_str)
  pol_str = model_score[0]['label']

  # print(f'pol_str = {pol_str} and is type:{type(pol_str)}')
  score_fl = float(model_score[0]['score'])
  # print(f'score_fl = {score_fl} and is type{type(score_fl)}')

  # print(f'pol_str.lower: {pol_str.lower()} and pol_labels[0]: {pol_labels[0]}')
  if (pol_str.lower() in pol_labels[0].lower()):
    # print('negative')
    sign_fl = -1.0
  elif (pol_str.lower() in pol_labels[1].lower()):
    # print('positive')
    sign_fl = 1.0
  else:
    print(f'ERROR polarity string: {pol_str} must be one of two values (e.g. [Nn]egative|[Pp]ositive)')
    return -99
    
  return sign_fl * score_fl

# Test
# test_fl = wrapper_polprob2sentiment('I hate your guts you bastard!') # sentiment_analysis('section')[0]['label'],sentiment_analysis('section')[0]['score']))
# print(f'test_fl: {test_fl}')

In [None]:
def threeway_probability2sentiment(text_str, sentiment_2polarity_fn):
  '''
  Given a text string and sentiment_fn that returns ['negative|positive|neutral', float(0.0-1.0)]
  Get return a sign adjusted sentiment score -1.0 to 1.0
  '''
  sign_fl = 1.0
  
  # Special case for Pysentimiento
  score_pysentimiento_fl = -99.0    # Use Pysentimeinto score as flag with val -99.0
  if False: # sentiment_2polarity_fn == analyzer.predict:
    # from pysentimiento import SentimentAnalyzer
    # analyzer = SentimentAnalyzer(lang="en")
    # print('Using Pysentimiento')
    text_str_ls = text_str.split()[:125]
    text_125_str = ' '.join(text_str_ls)
    pol_object = analyzer.predict(text_125_str)
    pol_str = pol_object.output
    if pol_str == 'NEG':
      sign_fl = -1.0
    elif pol_str == 'NEU':
      sign_fl = 1.0
    else:
      # Polarity is 'POS' by default
      sign_fl = 1.0
    score_pysentimiento_fl = sign_fl * pol_object.probas[pol_str]

    # Distribute the Neutral values between -0.5 and +0.5
    if pol_str == 'NEU':
      score_pysentimiento_fl = score_pysentimiento_fl - 0.5

  # General case for other 3-way sentiment models
  else:
    # print('Not using Pysentimiento')
    model_score = sentiment_2polarity_fn(text_str)
    pol_str = model_score[0]['label']
    # print(f'pol_str = {pol_str} and is type:{type(pol_str)}')
    score_fl = float(model_score[0]['score'])
    # print(f'score_fl = {score_fl} and is type{type(score_fl)}')

    if (pol_str.lower().startswith('neu')) | (pol_str in ['NEU','LABEL_0']):
      # print('negative')
      if score_fl < 0.5:
        sign_fl = -1.0
      else:
        sign_fl = +1.0
      adj_base = 0.0
    elif (pol_str.lower().startswith('neg')) | (pol_str in ['NEG','LABEL_1']):
      # print('positive')
      sign_fl = -1.0
      adj_base = -1.0
    elif (pol_str.lower().startswith('pos')) | (pol_str in ['POS','LABEL_2']):
      # print('positive')
      sign_fl = 1.0
      adj_base = 1.0
    else:
      print(f'ERROR polarity string: {pol_str} must be one of two values (e.g. [Nn]egative|[Pp]ositive)')

  if score_pysentimiento_fl == -99.0:
    adj_score = (sign_fl * score_fl) + adj_base
  else:
    adj_score = score_pysentimiento_fl

  return adj_score # , adj_base

# Test
# test_fl = wrapper_polprob2sentiment('I hate your guts you bastard!') # sentiment_analysis('section')[0]['label'],sentiment_analysis('section')[0]['score']))
# print(f'test_fl: {test_fl}')

In [None]:
def fiveway_probability2sentiment(text_str, sentiment_5star_fn):
  '''
  Given a text string and sentiment_fn that returns '1 star' to '5 stars' rating with probability]
  Get return a sign adjusted sentiment score 0.0 to 5.0
  '''

  model_score = sentiment_5star_fn(text_str)
  pol_str = model_score[0]['label']
  # print(f'pol_str = [{pol_str}]')
  if pol_str in ['1 star','LABEL_0']:
    score_base = 0.0
  elif pol_str in ['2 stars','LABEL_1']:
    score_base = 1.0
  elif pol_str in ['3 stars','LABEL_2']:
    score_base = 2.0
  elif pol_str in ['4 stars','LABEL_3']:
    score_base = 3.0
  elif pol_str in ['5 stars','LABEL_4']:
    score_base = 4.0
  else:
    print(f"ERROR: polarity string = {pol_str} must be in [1-5] 'stars'")
    score_base = 2.0

  score_fl = score_base + model_score[0]['score']

  return score_fl

# Test
# test_fl = wrapper_polprob2sentiment('I hate your guts you bastard!') # sentiment_analysis('section')[0]['label'],sentiment_analysis('section')[0]['score']))
# print(f'test_fl: {test_fl}')

# **Get the Novel Text**

## **OPTION (a): Connect and Read from gDrive** *italicized text*

(Upload novel textfile to Google Drive under root folder **./MyDrive** first)

In [None]:
from google.colab import drive

drive.mount('/gdrive')
%cd /gdrive

In [None]:
# drive.mount("/gdrive", force_remount=True)

In [None]:
!ls

In [None]:
%cd ./MyDrive/

In [None]:
# %cd ./research/2021/sa_book_code/books_sa/bsmith_atreegrowsinbrooklyn/

%cd ./research/2021/sa_book_code/books_sa/jkrowling_4gobletoffire/

In [None]:
!ls *.txt

In [None]:
novel_ls = []
novel_parags_ls = []

novel_filename = 'screenplay_potter_goblet_of_fire.txt'

In [None]:
#@title Enter the Novel_Title in the form [Title] by [Author]


Novel_Title = "Harry Potter and the Goblet of Fire by J.K. Rowling" #@param {type:"string"}

In [None]:
# Read novel into list of Sentences/lines

with open(novel_filename, 'r') as fp:
  novel_ls = fp.readlines()

print(f'Line Count: {len(novel_ls)}')

In [None]:
print(novel_ls[:5])

In [None]:
def strip_ascii(text):
  return "".join(
    char for char
    in text
    if 31 < ord(char) < 127
  )

In [None]:
# Strip out non-printable ASCII

# novel_ls = [x.encode('ascii',errors='ignore').decode() for x in novel_ls]

novel_ls = [strip_ascii(x) for x in novel_ls]
novel_ls = [x for x in novel_ls if len(x) > 0]


In [None]:
print(novel_ls[:5])

In [None]:
# Read novel into list of Paragraphs

delimiter = "\n\n"

with open(novel_filename, "r") as fp:
  all_content = fp.read() #reading all the content in one step
  #using the string methods we split it
  novel_parags_ls = all_content.split(delimiter)
  novel_parags_ls = [x.strip() for x in novel_parags_ls if len(x.strip()) > 2]

print(f'Paragraph Count: {len(novel_parags_ls)}')

In [None]:
novel_parags_ls[:10]

In [None]:
novel_ls[4]

In [None]:
novel_clean_str = '\n'.join(novel_ls)

print(novel_clean_str[:5000])

## OPTION (b): **Scrape Project Gutenberg**

**Goto *https://gutenberg.net.au* and find the *.HTML (not *.TXT) version of your novel**

In [None]:
from bs4 import BeautifulSoup

import requests

In [None]:
#@title Enter the URL of your novel at ***gutenberg.net.au***
#@markdown Paste the URL to the ***HTML version*** (not plain text).

Novel_Title = "The Adventures of Huckleberry Finn by Mark Twain"  #@param {type: "string"}

Gutenberg_URL = 'https://gutenberg.org/cache/epub/76/pg76-images.html'  #@param {type: "string"}


In [None]:
# Get raw HTML of novel from Gutenberg.net.au

response=requests.get(Gutenberg_URL)  # TODO: Pass the URL to the .get() method of the requests object
html = response.text

In [None]:
# View raw HTML that we need to clean up

# TODO: What is the difference between these two outputs?

# Option A: 
html

# Option B:
# print(html)

## **Using Beautiful Soup**

In [None]:
#Create a BeautifulSoup object from the HTML

soup = BeautifulSoup(html, "html.parser")


paragraph=soup.find_all("p")  # TODO: get all the <P>Paragraphs</P> 
                                #       see bs4 API ref: https://beautiful-soup-4.readthedocs.io/en/latest/#kinds-of-objects
parag_ls = []
for para in paragraph:
    parag_ls.append(para.text)

print(f'There were {len(parag_ls)} Paragraphs:\n') # TODO how do you get the number of paragraphs in the list parag_ls?

print(f"First 3 Paragraphs: ==============================    \n")
print(f"    {list(print(x) for x in parag_ls[:3])}\n")  # TODO: Give index to retrieve the first 3 paragraphs

print(f"Last 3 Paragraphs: ============================== \n")
print(f"    {list(print(x) for x in parag_ls[-3:])}\n")  # TODO: Give index to retrieve the last 3 paragraphs


## **Using Python [string].partition() or RegEx**

In [None]:
# Concatenate all paragraphs into a single novel string

# For every paragraph, replace all hardcoded \r\n with a single space
parag_flat_ls = [re.sub(r'\r\n', ' ', aparag) for aparag in parag_ls]

# Concatenate all paragraphs into a single string, separated by \n
novel_str = '\n'.join(parag_flat_ls)

print('\nSTART OF NOVEL: -----')
print(novel_str[:1000] + '\n')

print('\nEND OF NOVEL: -----\n')
print(novel_str[-1000:])

**Enter the First and Last several words to use as RegEx for trimming header/footers**

In [None]:
#@title Enter the first sentence in the body of your novel
sentence_first_str = 'You don\u2019t know about me without'  #@param {type: "string"}

#@title Enter the last sentence in the body of your novel
sentence_last_str = 'stand it. I been there before.'  #@param {type: "string"}


In [None]:
# Strip off the header
novel_clean = ' '.join(novel_str.partition(sentence_first_str)[1:])

# Strip off the footer
' '.join(novel_clean.partition(sentence_last_str)[:2])[-500:]

In [None]:
# Strip off the header
novel_clean_str = ' '.join(novel_str.partition(sentence_first_str)[1:])

# Strip off the footer
novel_clean_str = ' '.join(novel_clean_str.partition(sentence_last_str)[:2])

# Verify

print('\nSTART OF CLEAN NOVEL: -----')
print(novel_clean_str[:1000] + '\n')

print('\nEND OF CLEAN NOVEL: -----\n')
print(novel_clean_str[-1000:])

# **Split Novel into Sentences**

* https://github.com/zaemyung/sentsplit (CRF: mincut)

* https://github.com/adobe/NLP-Cube and Rank ~15 https://aclanthology.org/K18-2017.pdf

In [None]:
# Read novel into list of Paragraphs

delimiter = "\n"

novel_parags_ls = novel_clean_str.split(delimiter)
novel_parags_ls = [x.strip() for x in novel_parags_ls if len(x.strip()) > 2]
novel_parags_ls = [' '.join(x.split()) for x in novel_parags_ls]

print(f'Paragraph Count: {len(novel_parags_ls)}')

In [None]:
novel_parags_ls[:15]

In [None]:
for i,aline in enumerate(novel_parags_ls):
  if (len(aline.strip()) < 5):
    print(f'Line #{i}: {aline}')

In [None]:
# Prior several code blocks for future functionality, can start execution in this section with cell below

In [None]:
novel_clean_str = '\n'.join(novel_parags_ls)
novel_clean_str[:2000]

In [None]:
import nltk

nltk.download('punkt')

from nltk.tokenize import sent_tokenize

In [None]:
novel_sents_ls = sent_tokenize(novel_clean_str)

sent_ct = len(novel_sents_ls)
sent_show = 10

print('\nFirst Sentences: -----\n')
# for i, asent in enumerate(novel_sents_ls[:sent_show]):
for i, asent in enumerate(novel_sents_ls[:sent_show]):
  print(f'Sentences #{i}: {asent}')


print('\nLast Sentences: -----\n')
for i, asent in enumerate(novel_sents_ls[-sent_show:]):
  print(f'Sentences #{sent_ct - (sent_show - i)}: {asent}')


print(f'\n\nThere are {sent_ct} Sentences in the novel')

In [None]:
# View the Sentences that have no letters in them

[x.strip() for x in novel_sents_ls if not re.search('[a-zA-Z]', x)]

In [None]:
# Delete the short Sentences and those without any alphabetic characters

novel_sents_ls = [x.strip() for x in novel_sents_ls if len(x.strip()) > 2]
novel_sents_ls = [x.strip() for x in novel_sents_ls if re.search('[a-zA-Z]', x)]
len(novel_sents_ls)

In [None]:
# View the shortest Setences

sorted(novel_sents_ls, key=len)[:100]
# type(min(novel_sents_ls, key=len))
# novel_sents_ls[:1000]

In [None]:
len(novel_sents_ls)

# **Expand Contractions**

In [None]:
!pip install contractions

In [None]:
import contractions
contractions.fix("you're happy now")

In [None]:
# novel_clean_ls = [re.sub(r'[\n]+', ' ', x).strip() for x in novel_ls]
novel_clean_ls = [contractions.fix(x) for x in novel_ls]
novel_clean_ls = [re.sub(r'[\n]+', ' ', x).strip() for x in novel_clean_ls]
novel_clean_ls = [x.strip() for x in novel_clean_ls if len(x.strip()) > 1]
# novel_clean_ls = [re.sub(r"^[\"\']", "", x) for x in novel_clean_ls]  # re.sub("[\"\']", "", s)
# novel_clean_ls = [re.sub(r"[\"\']$", "", x) for x in novel_clean_ls]
novel_clean_ls = [x.encode('ascii',errors='ignore').decode() for x in novel_clean_ls]
# novel_bin = novel_clean_str.encode('ascii',errors='ignore')
# novel_clean_str = novel_bin.decode()

[f'[{x}]' for x in novel_clean_ls]

In [None]:
novel_clean_str = '\n'.join(novel_clean_ls)
print(novel_clean_str[:5000])

In [None]:
# novel_clean_ls[1] = "THE SILVER SPOON I was born twice: first, as a baby girl, on a remarkably smogless Detroit day in January of 1960; and then again, as a teenage boy, in an emergency room near Petoskey, Michigan, in August of 1974. Specialized readers may have come across me in Dr. Peter Luce’s study, “Gender Identity in 5-Alpha-Reductase Pseudohermaphrodites,” published in theJournal of Pediatric Endocrinology in 1975. Or maybe you’ve seen my photograph in chapter sixteen of the now sadly outdatedGenetics and Heredity."

In [None]:
# novel_clean_ls.pop(0)

In [None]:
novel_clean_ls[:10]

In [None]:
novel_clean_ls[-10:]

In [None]:
# novel_clean_str = '\n'.join(novel_clean_ls)

# novel_bin = novel_clean_str.encode('ascii',errors='ignore')
# novel_clean_str = novel_bin.decode()

# print(novel_clean_str[:5000])

# **Clean and Slice Strings**

In [None]:
import texthero as hero

In [None]:
hero.get_default_pipeline()

In [None]:
# Texthero works on Pandas Series

# novel_df = pd.DataFrame({'text_raw': novel_sents_ls})
novel_df = pd.DataFrame({'text_raw': novel_clean_ls})
novel_df.head()

In [None]:
# Convert string column/Series from 'object' to 'string'

novel_df['text_raw'] = novel_df['text_raw'].astype('string')
novel_df['text_raw'] = novel_df['text_raw'].str.strip()

novel_df.info()

In [None]:
# Use texthero.clean() to clean the 'text_raw' column and create the 'text_clean' column

novel_df['text_clean'] = hero.clean(novel_df['text_raw'])
novel_df.head()

In [None]:
novel_df.shape

In [None]:
novel_df.head()

In [None]:
# Delete the (near)null Sentences

novel_df['text_raw_len'] = novel_df['text_raw'].apply(lambda x : len(x.strip()))
novel_df.head()
novel_df.shape

In [None]:
# View the shortests Sentences before and after cleaning

novel_df.sort_values(by=['text_raw_len']).head(400)

In [None]:
novel_df['text_raw_len'].value_counts().sort_values(na_position='first')[:50]

In [None]:
# Drop Sentence if Raw length < 2

novel_df = novel_df[novel_df['text_raw_len'] > 2]
novel_df.shape

In [None]:
novel_df.text_clean = novel_df.text_clean.astype('string')
novel_df.info()

In [None]:
novel_df.sort_values(by=['text_raw_len']).head(20)

# **Sentiment Analysis**

## **VADER**

In [None]:
!pip install vaderSentiment

In [None]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

vader_sa = SentimentIntensityAnalyzer()

In [None]:
vader_sa.polarity_scores('I love lint')['compound']

In [None]:
novel_df['vader'] = novel_df['text_clean'].apply(lambda x : vader_sa.polarity_scores(x)['compound'])
novel_df.head(100)

## **TextBlob**

In [None]:
from textblob import TextBlob

In [None]:
testimonial = TextBlob("Textblob is amazingly simple to use. What great fun!")
print(testimonial.sentiment.polarity)

In [None]:
novel_df['textblob'] = novel_df['text_clean'].apply(lambda x : TextBlob(x).sentiment.polarity)
novel_df.head()

## **Sentiment Analysis: (5-way) RoBERTa Large 15 Datasets**

* https://huggingface.co/siebert/sentiment-roberta-large-english
* https://huggingface.co/roberta-base 

In [None]:
from transformers import pipeline

sentiment_analysis = pipeline("sentiment-analysis",model="siebert/sentiment-roberta-large-english")
print(sentiment_analysis("I love this!"))

In [None]:
# Direct Test

sentiment_analysis('I love wonderful good things')
print('\n')
sentiment_analysis('I hate your guts you filthy bastard')
print('\n')
sentiment_analysis('It is')

In [None]:
# Test

model_adj_score = twoway_probability2sentiment('I love wonderful good things', sentiment_analysis, pol_labels=['NEGATIVE','POSITIVE'])
model_adj_score
print('\n')
model_adj_score = threeway_probability2sentiment('It is not good', sentiment_analysis) # , pol_labels=['NEGATIVE','POSITIVE'])
model_adj_score

In [None]:
# Test

model_adj_score = twoway_probability2sentiment('I hate your guts you bastard', sentiment_analysis, pol_labels=['NEGATIVE','POSITIVE'])
model_adj_score


In [None]:
novel_df['roberta15lg'] = novel_df['text_raw'].apply(lambda x : twoway_probability2sentiment(x, sentiment_analysis, pol_labels=['NEGATIVE','POSITIVE']))
novel_df.head()

In [None]:
%%time

# NOTE:

win_per = 5
win_width = int(win_per/100*novel_df.shape[0])

novel_df['roberta15lg_gauss05'] = novel_df['roberta15lg'].rolling(win_width, center=True, min_periods=0).mean()
novel_df.head()

In [None]:
win_per = 5
win_width = int(win_per/100*novel_df.shape[0])
novel_df['roberta15lg_gauss05'] = novel_df['roberta15lg'].rolling(win_width, center=True, min_periods=0).mean()

win_per = 10
win_width = int(win_per/100*novel_df.shape[0])
novel_df['roberta15lg_gauss10'] = novel_df['roberta15lg'].rolling(win_width, center=True, min_periods=0).mean()

win_per = 15
win_width = int(win_per/100*novel_df.shape[0])
novel_df['roberta15lg_gauss15'] = novel_df['roberta15lg'].rolling(win_width, center=True, min_periods=0).mean()

win_per = 20
win_width = int(win_per/100*novel_df.shape[0])
novel_df['roberta15lg_gauss20'] = novel_df['roberta15lg'].rolling(win_width, center=True, min_periods=0).mean()

win_per = 25
win_width = int(win_per/100*novel_df.shape[0])
novel_df['roberta15lg_gauss25'] = novel_df['roberta15lg'].rolling(win_width, center=True, min_periods=0).mean()

win_per = 30
win_width = int(win_per/100*novel_df.shape[0])
novel_df['roberta15lg_gauss30'] = novel_df['roberta15lg'].rolling(win_width, center=True, min_periods=0).mean()

In [None]:
%whos str

In [None]:
%%time

# NOTE:

sns.histplot(data=novel_df, x='roberta15lg_gauss05', bins=100, color='purple', alpha=0.3, kde=True, line_kws={'linewidth':5}, label='5%')
sns.histplot(data=novel_df, x='roberta15lg_gauss10', bins=100, color='blue', alpha=0.3, kde=True, line_kws={'linewidth':5}, label='10%')
sns.histplot(data=novel_df, x='roberta15lg_gauss15', bins=100, color='violet', alpha=0.3, kde=True, line_kws={'linewidth':5}, label='15%')
sns.histplot(data=novel_df, x='roberta15lg_gauss20', bins=100, color='yellow', alpha=0.3, kde=True, line_kws={'linewidth':5}, label='20%')
sns.histplot(data=novel_df, x='roberta15lg_gauss25', bins=100, color='orange', alpha=0.3, kde=True, line_kws={'linewidth':5}, label='25%')
sns.histplot(data=novel_df, x='roberta15lg_gauss30', bins=100, color='red', alpha=0.3, kde=True, line_kws={'linewidth':5}, label='30%')

plt.title(f'{Novel_Title} Sentiment Analysis \n RoBERTa SMA 5-30% Window')
plt.xlabel('Sentiment Polarity')
plt.legend(title='SMA Window', loc='best')
# sns.histplot(data=novel_df, x='textblob', bins=100, color='blue', alpha=0.3, kde=True)
# sns.histplot(data=novel_df, x='vader', bins=100, color='green', alpha=0.3, kde=True);

In [None]:
%%time

# NOTE:

sns.histplot(data=novel_df, x='roberta15lg_gauss25', bins=100, color='red', alpha=0.3, kde=True, line_kws={'linewidth':5}, label='RoBERTa 25%')
sns.histplot(data=novel_df, x='roberta15lg_gauss30', bins=100, color='orange', alpha=0.3, kde=True, line_kws={'linewidth':5}, label='RoBERTa 30%')
sns.histplot(data=novel_df, x='textblob', bins=100, color='blue', alpha=0.3, kde=True, line_kws={'linewidth':5}, label='TextBlob')
sns.histplot(data=novel_df, x='vader', bins=100, color='green', alpha=0.3, kde=True, line_kws={'linewidth':5}, label='VADER')

plt.title(f'{Novel_Title} Sentiment Analysis \n RoBERTa SMA 5-30% Window, TextBlob and VADER')
plt.xlabel('Sentiment Polarity')
plt.legend(title='SMA Window', loc='best')

In [None]:
novel_df.info()

In [None]:
from sklearn.preprocessing import StandardScaler, MinMaxScaler

In [None]:
col_std_ls = [x for x in novel_df.columns if 'text_' not in x]

# scaler = StandardScaler()
scaler = MinMaxScaler()

for acol in col_std_ls:
  acol_z = f'{acol}_z'
  # scaler.fit(novel_df[acol].values.reshape(-1,1))
  novel_df[acol_z] = scaler.fit_transform(novel_df[acol].values.reshape(-1,1))

novel_df.head()

In [None]:
novel_df.info()

In [None]:
cols_z_ls = [x for x in novel_df if '_z' in x]
cols_z_ls

In [None]:
%%time

# NOTE:

sns.histplot(data=novel_df, x='roberta15lg_gauss25_z', bins=100, color='red', alpha=0.3, kde=True, line_kws={'linewidth':5}, label='RoBERTa 25%')
sns.histplot(data=novel_df, x='roberta15lg_gauss30_z', bins=100, color='orange', alpha=0.3, kde=True, line_kws={'linewidth':5}, label='RoBERTa 30%')
sns.histplot(data=novel_df, x='textblob_z', bins=100, color='blue', alpha=0.3, kde=True, line_kws={'linewidth':5}, label='TextBlob')
sns.histplot(data=novel_df, x='vader_z', bins=100, color='green', alpha=0.3, kde=True, line_kws={'linewidth':5}, label='VADER')

plt.title(f'{Novel_Title} Sentiment Analysis \n RoBERTa SMA 5-30% Window, TextBlob and VADER')
plt.xlabel('Sentiment Polarity')
plt.legend(title='SMA Window', loc='best')

In [None]:
win_per = 30
win_size = int(win_per/100*novel_df.shape[0])
mag_factor = 10

x_mean = novel_df['textblob_z'].mean()
novel_df['textblob_z'].apply(lambda x: mag_factor*(x-x_mean)+x_mean).rolling(win_size, center=True, min_periods=0).mean().plot(label='TextBlob_z', linewidth=5)

x_mean = novel_df['vader_z'].mean()
novel_df['vader_z'].apply(lambda x: mag_factor*(x-x_mean)+x_mean).rolling(win_size, center=True, min_periods=0).mean().plot(label='VADER_z', linewidth=5)

novel_df['roberta15lg_gauss10_z'].plot(label='RoBERTa_10_z', linewidth=5)
novel_df['roberta15lg_gauss25_z'].plot(label='RoBERTa_25_z', linewidth=5)
novel_df['roberta15lg_gauss30_z'].plot(label='RoBERTa_30_z', linewidth=5)

plt.legend(loc='best')
plt.show();

In [None]:
%%time

# NOTE:

win_per = 20
win_width = int(win_per/100*novel_df.shape[0])

novel_df['roberta15lg_gauss'] = novel_df['roberta15lg'].rolling(win_width, center=True, min_periods=0).mean()
novel_df.head()

In [None]:
%%time

# NOTE:

sns.histplot(data=novel_df, x='roberta15lg', bins=100, color='red', alpha=0.3, kde=True)
sns.histplot(data=novel_df, x='roberta15lg_gauss', bins=100, color='orange', alpha=0.3, kde=True)
sns.histplot(data=novel_df, x='textblob', bins=100, color='blue', alpha=0.3, kde=True)
sns.histplot(data=novel_df, x='vader', bins=100, color='green', alpha=0.3, kde=True);

In [None]:
%%time

# NOTE:

win_per = 5
win_width = int(win_per/100*novel_df.shape[0])

novel_df['roberta15lg_gauss'] = novel_df['roberta15lg'].rolling(win_width, center=True, min_periods=0).mean()
novel_df.head()

In [None]:
%%time

# NOTE:

sns.histplot(data=novel_df, x='roberta15lg', bins=100, color='red', alpha=0.3, kde=True)
sns.histplot(data=novel_df, x='roberta15lg_gauss', bins=100, color='orange', alpha=0.3, kde=True)
sns.histplot(data=novel_df, x='textblob', bins=100, color='blue', alpha=0.3, kde=True)
sns.histplot(data=novel_df, x='vader', bins=100, color='green', alpha=0.3, kde=True);

In [None]:
%%time 

# NOTE:

# Read the *.txt file, calculate line sentiment and write out results as *.csv

for i, filename_text in enumerate(filenames_clean_ls):

  print(f'\n\nOPENING FILE #{i}: {filename_text} ==========\n')
  with open(filename_text, 'r+') as fp:
    file_lines_ls = fp.readlines()

  sentiment_all_ls = []
  for j,aline in enumerate(file_lines_ls):
    
    sentiment_aline = twoway_probability2sentiment(aline, sentiment_analysis, pol_labels=['NEGATIVE','POSITIVE'])
    sentiment_all_ls.append(sentiment_aline)
    print(f'Line #{j}: Sentiment={sentiment_aline}\n{aline}\n')

  filename_sentiment = filename_text.replace(".txt", ".csv")
  sentiment_dt = {'text': file_lines_ls, 'roberta15lg': sentiment_all_ls} 
  sentiment_df = pd.DataFrame(sentiment_dt) 
  sentiment_df.to_csv(filename_sentiment)

  print(f'Wrote Sentiment File: {filename_sentiment}')
  # with open(filename_clean, 'w+') as fp:
  #   fp.write(file_clean_str)
  
  # print(f'File #{i}: {filename_text}\n  Cleaned: {filename_clean}\n\n')

# **Plot Sentiment**

In [None]:
novel_df.head()

In [None]:
#@title Enter the Sliding Window width as Percent of Novel length (default 10%, larger=smoother)

window_percent = 10 #@param {type:"slider", min:1, max:20, step:1}

win_xper = int(window_percent/100 * novel_df.shape[0])

vader_col = f'vader_sma{window_percent}'
novel_df[vader_col] = novel_df['vader'].rolling(win_xper, center=True, min_periods=1).mean()

textblob_col = f'textblob_sma{window_percent}'
novel_df[textblob_col] = novel_df['textblob'].rolling(win_xper, center=True, min_periods=1).mean()
novel_df.plot(y=[vader_col, textblob_col])

plt.title(f'{Novel_Title}\n Sentiment Analysis (SMA {window_percent}%)')
plt.ylabel('Sentiment')
plt.xlabel('Sentence No.')
plt.grid(True, alpha=0.3)
plt.legend(loc='best')
plt.show();

In [None]:
novel_df.head()

# **Crux Detection**

## **Scipy Signal Find_Peaks**

* https://stackoverflow.com/questions/1713335/peak-finding-algorithm-for-python-scipy

In [None]:
from scipy.signal import find_peaks

In [None]:
#@title Which Lexicon?

Sentiment_Model = "VADER" #@param ["VADER", "TextBlob"]

In [None]:
#@title Tune the main Hyperparameter for each of the 4 Peak Detection Algorithms:

Distance_Min = 380 #@param {type:"slider", min:50, max:1000, step:10}
Prominence_Min = 0.013 #@param {type:"slider", min:0.001, max:0.05, step:0.001}
Width_Min = 265 #@param {type:"slider", min:10, max:500, step:5}
Threshold_Min = 0.0005 #@param {type:"slider", min:0.0001, max:0.002, step:0.0001}

plt.rcParams['figure.figsize'] = [30, 20]

model_name = f'{Sentiment_Model.lower()}_sma{window_percent}'

x = novel_df[model_name]

# Peak Algo #1 (by Distance)
distance_min = Distance_Min # 750

# Peak Algo #2 (by Prominence)
prominence_min = Prominence_Min # 0.01

# Peak Algo #3 (by Width)
width_min = Width_Min # 175

# Peak Algo #4 (by Threshold)
threshold_min = Threshold_Min # 0.001


peaks, _ = find_peaks(x, distance=distance_min)
peaks2, _ = find_peaks(x, prominence=prominence_min)      # BEST!
peaks3, _ = find_peaks(x, width=width_min)
peaks4, _ = find_peaks(x, threshold=threshold_min)     # Required vertical distance to its direct neighbouring samples, pretty useless


x_inv = pd.Series([-x for x in novel_df[model_name].to_list()])

valleys, _ = find_peaks(x_inv, distance=distance_min)
valleys2, _ = find_peaks(x_inv, prominence=prominence_min)      # BEST!
valleys3, _ = find_peaks(x_inv, width=width_min)
valleys4, _ = find_peaks(x_inv, threshold=threshold_min)     # Required vertical distance to its direct neighbouring samples, pretty useless


plt.subplot(2, 2, 1)
plt.grid(True, alpha=0.3)
plt.plot(x)
plt.title(f'Distance Peak Detection\n Distance Minimum={distance_min}')
plt.plot(peaks, x[peaks], "^g", markersize=7)
plt.plot(valleys, x[valleys], "vr", markersize=7)
for x_val in peaks:
  plt.text(x_val, x[x_val], f'-----{x_val}', ha='center', va='bottom', rotation=90, size='large', color='black', weight='semibold')
for x_val in valleys:
  plt.text(x_val, x[x_val], f'-----{x_val}', ha='center', va='top', rotation=270, size='large', color='black', weight='semibold')

plt.subplot(2, 2, 2)
plt.grid(True, alpha=0.3)
plt.plot(x)
plt.title(f'Prominence Peak Detection\n Prominence Minimum={prominence_min}')
plt.plot(peaks2, x[peaks2], "^g", markersize=7)
plt.plot(valleys2, x[valleys2], "vr", markersize=7)
for x_val in peaks2:
  plt.text(x_val, x[x_val], f'-----{x_val}', ha='center', va='bottom', rotation=90, size='large', color='black', weight='semibold')
for x_val in valleys2:
  plt.text(x_val, x[x_val], f'-----{x_val}', ha='center', va='top', rotation=270, size='large', color='black', weight='semibold')


plt.subplot(2, 2, 3)
plt.grid(True, alpha=0.3)
plt.plot(x)
plt.title(f'Width Peak Detection\n Width Minimum={width_min}')
plt.plot(valleys3, x[valleys3], "vr", markersize=7)
plt.plot(peaks3, x[peaks3], "^g", markersize=7)
for x_val in peaks3:
  plt.text(x_val, x[x_val], f'-----{x_val}', ha='center', va='bottom', rotation=90, size='large', color='black', weight='semibold')
for x_val in valleys3:
  plt.text(x_val, x[x_val], f'-----{x_val}', ha='center', va='top', rotation=270, size='large', color='black', weight='semibold')


plt.subplot(2, 2, 4)
plt.grid(True, alpha=0.3)
plt.plot(x)
plt.title(f'Threshold Peak Detection\n Threshold Minimum={threshold_min}')
plt.plot(valleys4, x[valleys4], "vr", markersize=7)
plt.plot(valleys4, x[valleys4], "^g", markersize=7)
for x_val in peaks4:
  plt.text(x_val, x[x_val], f'-----{x_val}', ha='center', va='bottom', rotation=90, size='large', color='black', weight='semibold')
for x_val in valleys4:
  plt.text(x_val, x[x_val], f'-----{x_val}', ha='center', va='top', rotation=270, size='large', color='black', weight='semibold')

plt.suptitle(f'{Novel_Title}\n Peak Detection of Sentiment Analysis (SMA {window_percent}%)', fontsize=20)
plt.grid(True, alpha=0.3)

plt.show()

In [None]:
#@title Select a Peak Detection Algorithms to View in Detail (usually Distance or Width is best):

plt.rcParams['figure.figsize'] = [20, 10]

Peak_Algorithm = "Distance" #@param ["Distance", "Prominence", "Width", "Threshold"]

if Peak_Algorithm == 'Distance':
  hyperparam = distance_min
  peaks = peaks
  valleys = valleys
elif Peak_Algorithm == 'Prominence':
  hyperparam = prominence_min
  peaks = peaks2
  valleys = valleys2  
elif Peak_Algorithm == 'Width':
  hyperparam = width_min
  peaks = peaks3
  valleys = valleys3
else:
  # Assume Peak_Algorithm == 'Threshold'
  hyperparam = threshold_min
  peaks = peaks4
  valleys = valleys4

# model_name = f'{Sentiment_Model.lower()}_sma10'

# x = novel_clean_df[model_name]

# peaks2, _ = find_peaks(x, prominence=peak_prominence)  

# x_inv = pd.Series([-x for x in novel_clean_df[model_name].to_list()])
# valleys2, _ = find_peaks(x_inv, prominence=peak_prominence)     

plt.plot(x)
plt.plot(peaks, x[peaks], "^g", markersize=15, label='peak sentence#')
plt.plot(valleys, x[valleys], "vr", markersize=15, label='valley sentence#')
for x_val in peaks:
  plt.text(x_val, x[x_val], f'    {x_val}', horizontalalignment='left', size='medium', color='black', weight='semibold')
for x_val in valleys:
  plt.text(x_val, x[x_val], f'    {x_val}', horizontalalignment='left', size='medium', color='black', weight='semibold')
plt.title(f'{Novel_Title}\n Sentiment Analysis (SMA {window_percent}%) \n Peak Detection using {Peak_Algorithm} Algorithm with Hyperparameter={hyperparam}', fontsize=16)
plt.ylabel('Sentiment')
plt.xlabel('Sentence No.')
plt.legend(loc='best')
plt.grid(True, alpha=0.3)

filename_plot = f"cruxes_plot_{Novel_Title.replace(' ', '_')}.png"
plt.savefig(filename_plot, dpi=300)
plt.show();

print(f'\n\n     >>>>> SAVED PLOT TO FILE: [{filename_plot}] <<<<<')

In [None]:
# Download Crux Point Plot file 'crux_plot.png' to your laptop

files.download(filename_plot)

# **Get Context around Crux Points**

In [None]:
#@title How many Sentences around Crux Point do you want to view for context?

Crux_Sentence_Context_Count = 10 #@param {type:"slider", min:1, max:20, step:1}


**[NOTE] May have to run 2-3x times to save file**

In [None]:
peaks

In [None]:
%%capture cap --no-stderr

# Print Context around each Sentiment Peak

novel_sent_len = novel_df.shape[0]
halfwin = int(Crux_Sentence_Context_Count/2)
crux_sents_ls = []
nl = '\n'

print('==================================================')
print('============     Peak Crux Points   ==============')
print('==================================================\n\n')

# for i, apeak in enumerate(peaks2):
for i, apeak in enumerate(peaks):
  crux_sents_ls = []
  win_start = max(0, apeak-halfwin)
  win_end = min(apeak+halfwin+1, novel_sent_len)
  # for sent_idx in range(apeak-halfwin,apeak+halfwin+1):
  for sent_idx in range(win_start,win_end):

    sent_cur = novel_df.iloc[sent_idx].text_raw
    if sent_idx == apeak:
      sent_str = sent_cur.upper()
    else:
      sent_str = sent_cur
    crux_sents_ls.append(sent_str)
  
  # context_ls = novel_df.iloc[apeak-halfwin:apeak+halfwin].text_raw
  print(f"Peak #{i} at Sentence #{apeak}:\n\n{nl.join(crux_sents_ls)}\n\n\n")

print('==================================================')
print('===========     Crux Valley Points    ============')
print('==================================================\n\n')


# for i, avalley in enumerate(valleys2):
for i, avalley in enumerate(valleys):
  crux_sents_ls = []
  win_start = max(0, avalley-halfwin)
  win_end = min(avalley+halfwin+1, novel_sent_len)
  # for sent_idx in range(avalley-halfwin,avalley+halfwin+1):
  for sent_idx in range(win_start,win_end):
    sent_cur = novel_df.iloc[sent_idx].text_raw
    if sent_idx == avalley:
      sent_str = sent_cur.upper()
    else:
      sent_str = sent_cur
    crux_sents_ls.append(sent_str)

  # context_ls = novel_df.iloc[avalley-halfwin:avalley+halfwin].text_raw
  print(f"Valley #{i} at Sentence #{avalley}:\n\n{nl.join(crux_sents_ls)}\n\n\n")

filename_cruxes = f"cruxes_context_{Novel_Title.replace(' ', '_')}.txt" 

with open(filename_cruxes, 'w') as f:
    f.write(str(cap))

In [None]:
# Download Crux Point Report file 'cruxes.txt' to your laptop

files.download(filename_cruxes)

# **END OF NOTEBOOK**