This notebook contains the necessary codes that we utilize to extract various features from the introduction, body, and conclusion of the text.

Codes are provided in a Jupyter Notebook for easy interpretation and future modification.

We need to start from the data files (reuter/enron/persuade) with introduction, body, conclusion text separated, the corresponding code will add individual features as columns to the dataframes sequentially.

# Initialization

Install the following library for the feature extractions

In [None]:
!pip install spacy
!pip install nltk
!pip install textstat
!python -m spacy download en_core_web_sm
!pip install spacytextblob
!python -m textblob.download_corpora
!pip install torch
!pip install transformers
!pip install sentence_transformers
!pip install liwc
!pip install writeprints-static
import nltk
nltk.download('punkt')
nltk.download('punkt_tab')

In [None]:
from glob import glob
import shutil
import glob
import json
import math
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict, Counter
from statistics import stdev
import networkx as nx
import pandas as pd
import codecs
import requests
from bs4 import BeautifulSoup
import re
import pickle
from tqdm import tqdm
tqdm.pandas()
import random
from sklearn.feature_extraction.text import TfidfVectorizer
import csv
from sklearn.metrics import classification_report, precision_score,recall_score
from sklearn.model_selection import train_test_split
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
import warnings
import email # for handling email data format

# Ignore all warnings
warnings.filterwarnings('ignore')

# Feature extractions

In [None]:
csv_file = 'datasets//reuter.csv'  # you need to replace it with the corresponding dataset csv file
df = pd.read_csv(csv_file)

## Vocabulary richness

We calculate brunet index as the vocabulary richness metric

In [None]:
def calculate_lexical_div(text):
  try:
    doc = nlp(text)
    tokens = [token.text.lower() for token in doc if not token.is_space and not token.is_punct]
    unique_tokens = set(tokens)

    vocabulary_size = len(unique_tokens)
    total_tokens = len(tokens)
    brunets_index = vocabulary_size / (total_tokens ** (0.165))

    return  brunets_index
  except:
    return 0


In [None]:
part = 'intro'
source_col = part + '_text'
feature_names = part + '_brunet_index'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(calculate_lexical_div(s)))

part = 'body'
source_col = part + '_text'
feature_names = part + '_brunet_index'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(calculate_lexical_div(s)))

part = 'conclusion'
source_col = part + '_text'
feature_names = part + '_brunet_index'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(calculate_lexical_div(s)))

## redability score

In [None]:
import textstat

def calculate_readability_scores(text):
    try:
      # Flesch Reading Ease Score
      flesch_reading_ease = textstat.flesch_reading_ease(text)
      return flesch_reading_ease
    except:
      return 0

In [None]:
part = 'intro'
source_col = part + '_text'
feature_names = part + '_readability_score'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(calculate_readability_scores(s)))

part = 'body'
source_col = part + '_text'
feature_names = part + '_readability_score'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(calculate_readability_scores(s)))

part = 'conclusion'
source_col = part + '_text'
feature_names = part + '_readability_score'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(calculate_readability_scores(s)))

## Pos-tags distributions

In [None]:
import spacy
nlp = spacy.load("en_core_web_sm")
def get_pos_tag_counts(text):
    # Process the text using spaCy
    pos_tags = ['ADJ', 'ADP', 'ADV', 'AUX', 'CONJ', 'DET', 'INTJ', 'NOUN', 'NUM', 'PART', 'PRON', 'PROPN', 'PUNCT', 'SCONJ', 'SYM', 'VERB', 'X']
     # Initialize a Counter with all possible POS tags
    pos_tag_counts = Counter({tag: 0 for tag in pos_tags})
    try:
      doc = nlp(text)
      # Iterate through each token in the document and update the counts of POS tags
      for token in doc:
          pos_tag_counts[token.pos_] += 1

      # Convert Counter to dictionary
      pos_tag_map = dict(pos_tag_counts)

      return pos_tag_map
    except:
      return dict(pos_tag_counts)

In [None]:
part = 'intro'
source_col = part + '_text'
feature_names = part + '_pos_tag_counts'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_pos_tag_counts(s)))

part = 'body'
source_col = part + '_text'
feature_names = part + '_pos_tag_counts'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_pos_tag_counts(s)))

part = 'conclusion'
source_col = part + '_text'
feature_names = part + '_pos_tag_counts'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_pos_tag_counts(s)))

## named-entity distributions

In [None]:
def get_named_entity_counts(text):

    # Initialize a Counter with all possible NER tags
    ner_tags = ['PERSON', 'NORP', 'FAC', 'ORG', 'GPE', 'LOC', 'PRODUCT', 'EVENT', 'WORK_OF_ART', 'LAW', 'LANGUAGE', 'DATE', 'TIME', 'PERCENT', 'MONEY', 'QUANTITY', 'ORDINAL', 'CARDINAL']
    ner_counts = Counter({tag: 0 for tag in ner_tags})

    try:
      # Process the text using spaCy
      doc = nlp(text)
      # Iterate through each entity in the document and update the counts of NER tags
      for ent in doc.ents:
          ner_counts[ent.label_] += 1

      # Convert Counter to dictionary
      ner_tag_map = dict(ner_counts)

      return ner_tag_map
    except:
      return dict(ner_counts)

In [None]:
part = 'intro'
source_col = part + '_text'
feature_names = part + '_ner_counts'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_named_entity_counts(s)))

part = 'body'
source_col = part + '_text'
feature_names = part + '_ner_counts'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_named_entity_counts(s)))

part = 'conclusion'
source_col = part + '_text'
feature_names = part + '_ner_counts'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_named_entity_counts(s)))

## Stopwords distributions

In [None]:
def get_stopwords_count(text):
  try:
    # Process the text with SpaCy
    text = text.lower()
    # print(text)
    doc = nlp(text)

    # Initialize a Counter to count stopwords
    stopwords_count = Counter()

    # Iterate over tokens in the document
    for token in doc:
        # Check if the token is a stopword
        if token.is_stop:
            # Increment the count of the stopword
            stopwords_count[token.text] += 1
    stopwords_count_desc = dict(sorted(stopwords_count.items(), key=lambda item: item[1], reverse=True))
    return dict(stopwords_count_desc)
  except:
    return {}

In [None]:
part = 'intro'
source_col = part + '_text'
feature_names = part + '_stopwords_count'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_stopwords_count(s)))

part = 'body'
source_col = part + '_text'
feature_names = part + '_stopwords_count'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_stopwords_count(s)))

part = 'conclusion'
source_col = part + '_text'
feature_names = part + '_stopwords_count'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_stopwords_count(s)))

## sentiment analysis

For sentiment analysis, we divide the text part into sentences. For each sentence, we calculate the polarity score, subjectivity score, and count of positive and negative words in that sentence. While evaluating the whole text part, we averaged over the sentences.

In [None]:
def get_sentiment_scores(text):
  try:
      # Tokenize the text into sentences
      sentences = nltk.sent_tokenize(text)

      # Compute embeddings for each sentence
      pos_wc = 0
      neg_wc = 0
      polarity_scores = []
      subjectivity_scores = []
      for sentence in sentences:
          doc = nlp(sentence)
          polarity_scores.append(doc._.blob.polarity)
          subjectivity_scores.append(doc._.blob.subjectivity)
          for x in doc._.blob.sentiment_assessments.assessments:
            if x[1] > 0:
              pos_wc = pos_wc + len(x[0])
            elif x[1] < 0:
              neg_wc = neg_wc + len(x[0])
            else:
              pass
      return polarity_scores, subjectivity_scores, pos_wc, neg_wc
  except:
      return [],[],0,0

In [None]:
part = 'intro'
source_col = part + '_text'
F = ['polarity_scores','subjectivity_scores','pos_wc','neg_wc',]
feature_names = [part+'_'+f for f in F]
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_sentiment_scores(s)))

part = 'body'
source_col = part + '_text'
F = ['polarity_scores','subjectivity_scores','pos_wc','neg_wc',]
feature_names = [part+'_'+f for f in F]
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_sentiment_scores(s)))

part = 'conclusion'
source_col = part + '_text'
F = ['polarity_scores','subjectivity_scores','pos_wc','neg_wc',]
feature_names = [part+'_'+f for f in F]
df[feature_names] = = df[source_col].progress_apply(lambda s: pd.Series(get_sentiment_scores(s)))

## Formality score

Similar to sentiment analysis, we calculate the formality score for each sentence and then average over the sentences to get the formality score for the whole text part. We utilize https://huggingface.co/s-nlp/roberta-base-formality-ranker from HuggingFace for formality calculation.

In [None]:
# Use a pipeline as a high-level helper
from transformers import pipeline
pipe = pipeline("text-classification", model="s-nlp/roberta-base-formality-ranker")

def get_formality_scores(text):
  try:
      # Tokenize the text into sentences
      sentences = nltk.sent_tokenize(text)

      # Compute formality for each sentence
      scores = []
      for sentence in sentences:
          result = pipe(sentence)
          if result[0]['label'] == 'informal':
            score = round(1 - result[0]['score'],2)
          else:
            score = round(result[0]['score'],2)
          scores.append(score)
      return scores
  except:
      return []

In [None]:
df['intro_formality_score'] = df['intro_text'].progress_apply(get_formality_scores)
df['body_formality_score'] = df['body_text'].progress_apply(get_formality_scores)
df['conclusion_formality_score'] = df['conclusion_text'].progress_apply(get_formality_scores)

## Text embedding (for content similarity and change)

 we get the total text embedding using the OpenAI embedding, which has a higher context length.

In [None]:
import openai

openai.api_key = "YOUR_API_KEY"

def get_embedding(text, model="text-embedding-ada-002"):
    response = openai.Embedding.create(
        input=text,
        model=model
    )
    return response['data'][0]['embedding']

In [None]:
part = 'intro'
source_col = part + '_text'
feature_names = part + '_embedding'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_embedding(s)))

part = 'body'
source_col = part + '_text'
feature_names = part + '_embedding'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_embedding(s)))

part = 'conclusion'
source_col = part + '_text'
feature_names = part + '_embedding'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_embedding(s)))

## Perplexity scores calculations

We calculate the perplexity of total text using a small language model (ex: GPT-2-xl). The output will be of the size [token_length] of the text parts that we are considering.

In [None]:
import torch
from transformers import GPT2Tokenizer, GPT2LMHeadModel
# Load pre-trained GPT2 model and tokenizer
model_name = "gpt2-xl"
tokenizer = GPT2Tokenizer.from_pretrained(model_name)
model = GPT2LMHeadModel.from_pretrained(model_name)
def get_perplexity_score(text):
  try:
    # Tokenize the text
    tokenized_text = tokenizer.encode(text, return_tensors="pt")
    # Keep only the first 1024 tokens if the length exceeds the limit
    if tokenized_text.size(1) > 1024:
        tokenized_text = tokenized_text[:, :1024]
    # Calculate conditional probabilities for each token
    with torch.no_grad():
        outputs = model(tokenized_text)
        logits = outputs.logits[0]  # Logits for the last layer
        softmax_scores = torch.softmax(logits, dim=-1)

    prob_scores = softmax_scores.tolist()
    perplexity_scores = []
    for i,token in enumerate(tokenized_text[0]):
      # print("Token: ",tokenizer.decode([token.item()]),token.item()," surprisal: ",-np.log(prob_scores[i][token.item()]))
      perplexity_scores.append(-np.log(prob_scores[i][token.item()]))
    return perplexity_scores
  except:
    return []

In [None]:
part = 'intro'
source_col = part + '_text'
feature_names = part + '_perplexity_score'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_perplexity_score(s)))

part = 'body'
source_col = part + '_text'
feature_names = part + '_perplexity_score'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_perplexity_score(s)))

part = 'conclusion'
source_col = part + '_text'
feature_names = part + '_perplexity_score'
df[feature_names] = df[source_col].progress_apply(lambda s: pd.Series(get_perplexity_score(s)))

## avg perplexity and burstiness

We calculate the average perplexity and burstiness of the text part using the GPTZERO API directly.

In [None]:
import time
import pprint
import requests
url = "https://api.gptzero.me/v2/predict/text"
gptzero_api_key = "YOUR_API_KEY"
headers = {
    "Accept": "application/json",
    "Content-Type": "application/json",
    "x-api-key": gptzero_api_key}
def get_preplexity_burstiness(text):
  try:
    data = {
      "document": text,
      "version": "2024-01-09",
      "multilingual": False,
      "writing_stats": True
    }
    try_counter = 0
    while try_counter <2:
      try:
          response = requests.post(url, headers=headers, json=data,).json()
          time.sleep(0.5)
          break
      except:
          try_counter += 1
          if try_counter == 1:
              return [], 'NA', 'NA', 'NA'
          continue
    perplexities = []
    for k in response['documents'][0]['sentences']:
      perplexities.append(round(k['perplexity'],2))
    avg_perplexity = round(np.average(perplexities),2)
    std_perplexity = round(np.std(perplexities),2)
    avg_burstiness = round(response['documents'][0]['overall_burstiness'],2)
    return perplexities, avg_perplexity,std_perplexity, avg_burstiness
  except:
    return [], 'NA', 'NA', 'NA'


We iterately run over in our case to store the values frequently since each iteration is costly

In [None]:
def get_preplexity_burstiness(part):
  # Define the iteration interval for saving the DataFrame
  save_interval = 100  # Save after every 100 iterations
  col = part + '_text'
  perplexity_col =part +  '_perplexity_scores'
  avg_perplexity_col =part +  '_avg_perplexity_score'
  std_perplexity_col =part +  '_std_perplexity_score'
  avg_burstiness_col =part +  '_avg_burstiness_score'
  df[perplexity_col] = ''
  df[avg_perplexity_col] = ''
  df[std_perplexity_col] = ''
  df[avg_burstiness_col] = ''
  # Apply the function to each row and insert the result into a new column
  for i, row in tqdm(df.iterrows(), total=df.shape[0]):
      # Process the row
      scores = get_preplexity_burstiness(row[col])

      # Insert the processed value into a new column
      df.at[i, perplexity_col] = scores[0]
      df.at[i, avg_perplexity_col] = scores[1]
      df.at[i, std_perplexity_col] = scores[2]
      df.at[i, avg_burstiness_col] = scores[3]

      # Check if it's time to save the DataFrame
      if (i+1) % save_interval == 0:
          print(scores)
          df.to_csv(csv_file, index=False)  # Save DataFrame to CSV file
          print(f"Saved DataFrame after {i+1} iterations.")

  # Save the final DataFrame
  df.to_csv(csv_file, index=False)

In [None]:
part = 'intro'
get_preplexity_burstiness(part)

part = 'body'
get_preplexity_burstiness(part)

part = 'conclusion'
get_preplexity_burstiness(part)

## LIWC features

In [None]:
parse, category_names = liwc.load_token_parser('REPLACE_WITH_CORRESPONDING_LIWC_DICTIONARY')
def tokenize(text):
    tokens = nltk.word_tokenize(text)
    return tokens
def get_liwc_features(text):
  text = text.lower()
  tokenlist = tokenize(text)
  STYLE_FEATURES = {}
  token_counts = Counter(category for token in tokenlist for category in parse(token))
  total_words = len(tokenlist)
  for category in category_names:
    f = category + '_frac'
    if category not in token_counts.keys():  # then frequency 0
      STYLE_FEATURES[f] = 0
    else:
      STYLE_FEATURES[f] = token_counts[category]
  return  STYLE_FEATURES

In [None]:
part = 'intro'
source_col = part + '_text'
feature_names = part + '_liwc_features'
df[feature_names] = df[source_col].progress_apply(get_liwc_features)

part = 'body'
source_col = part + '_text'
feature_names = part + '_liwc_features'
df[feature_names] = df[source_col].progress_apply(get_liwc_features)

part = 'conclusion'
source_col = part + '_text'
feature_names = part + '_liwc_features'
df[feature_names] = df[source_col].progress_apply(get_liwc_features)

## Writeprint features

In [None]:
from writeprints_static import WriteprintsStatic
vec = WriteprintsStatic()
texts = ["a sample text",]
X = vec.transform(texts)
features = list(vec.get_feature_names()) # extract the feature names on a sample text so that we can use them to populate the dictionary with % value

def count_char_bigrams(text):
    char_bigrams = [text[i:i+2] for i in range(len(text)-1) if text[i] != ' ' and text[i+1] != ' ']
    return len(char_bigrams)

def count_char_trigrams(text):
    char_trigrams = [text[i:i+3] for i in range(len(text)-2) if text[i] != ' ' and text[i+1] != ' ' and text[i+2] != ' ']
    return len(char_trigrams)
def extract_from_writeprint_features(text):
  STYLE_FEATURES = {}
  X = vec.transform([text])

  # to check the feature values
  X = X.toarray()[0]
  feature_names = list(vec.get_feature_names())
  W = X[features.index('total_words')]
  C = X[features.index('total_chars')]
  B = count_char_bigrams(text)
  T = count_char_trigrams(text)
  # print(W,C,B,T)

  # letter features
  letter_features = [string for string in feature_names if string.startswith('letter')]
  for f in letter_features:
    STYLE_FEATURES[f] = X[features.index(f)] / C
  # print(STYLE_FEATURES)

  # digit features
  letter_features = [string for string in feature_names if string.startswith('digit_')]
  for f in letter_features:
    STYLE_FEATURES[f] = X[features.index(f)] / C

  # special chars features
  letter_features = [string for string in feature_names if string.startswith('special_char_')]
  for f in letter_features:
    STYLE_FEATURES[f] = X[features.index(f)] / C

  # bigram features
  letter_features = [string for string in feature_names if string.startswith('bigram_')]
  for f in letter_features:
    STYLE_FEATURES[f] = X[features.index(f)] / B


  # trigram features
  letter_features = [string for string in feature_names if string.startswith('trigram_')]
  for f in letter_features:
    STYLE_FEATURES[f] = X[features.index(f)] / T

  # function word features
  letter_features = [string for string in feature_names if string.startswith('function_')]
  for f in letter_features:
    STYLE_FEATURES[f] = X[features.index(f)] / W

  # function word features
  letter_features = [string for string in feature_names if string.startswith('pos_')]
  for f in letter_features:
    STYLE_FEATURES[f] = X[features.index(f)] / W

  STYLE_FEATURES['hapax_legomena_ratio'] = X[features.index('hapax_legomena_ratio')]
  STYLE_FEATURES['dis_legomena_ratio'] = X[features.index('dis_legomena_ratio')]
  STYLE_FEATURES['avg_word_length'] = X[features.index('avg_word_length')]
  STYLE_FEATURES['short_words'] = X[features.index('short_words')]/W
  STYLE_FEATURES['digits_ratio'] = X[features.index('digits_ratio')]

  return STYLE_FEATURES

In [None]:
df['intro_writeprint_features'] = df['intro_text'].progress_apply(extract_from_writeprint_features)
df['body_writeprint_features'] = df['body_text'].progress_apply(extract_from_writeprint_features)
df['conclusion_writeprint_features'] = df['conclusion_text'].progress_apply(extract_from_writeprint_features)