In [1]:
import unittest
from pathlib import Path
import pandas as pd
from collections import Counter
from nltk.stem import WordNetLemmatizer
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.decomposition import LatentDirichletAllocation
from nltk.sentiment.vader import SentimentIntensityAnalyzer

In [2]:


def load_csv(file_path):
    try:
        data = pd.read_csv(file_path)
        if data.empty:
            raise ValueError("The CSV file is empty")
        return data
    except FileNotFoundError:
        raise FileNotFoundError("The specified file does not exist")
    except pd.errors.EmptyDataError:
        raise ValueError("No data in the CSV file")
    except Exception as e:
        raise Exception(f"An unknown error occurred: {e}")



def tokenise_text(data):
    """
    Tokenise the text in the clean_text column 
    """
    try:
        # import changed oh_label into a float so changing it back
        data['oh_label'] = data['oh_label'].astype(int)
        data['clean_text'] = data['clean_text'].astype(str)
    
        # Begin by tokenizing the words
        # also lowercase all words for consistency
        data['tokens'] = data['clean_text'].apply(lambda x: [word.lower() for word in x.split()])
        print("Tokenisation successful.")
        return data
    except Exception as e:
        print(f"Tokenisation error: {e}")
        return None



def lemmatize_text(data):
    try:
        lemm = WordNetLemmatizer()

    # Lemmatize all words
        data['lemmatized'] = data['tokens'].apply(lambda x: [lemm.lemmatize(word) for word in x])
        print("Lemmatisation successful")
        return data
    except Exception as e:
        print(f"An error occurred during lemmatisation: {e}")
        return None



def word_frequency_analysis(data):
    words_1 = data[data.oh_label == 1]['lemmatized']
    words_0 = data[data.oh_label == 0]['lemmatized']

    _1_words = Counter(word for words in words_1 for word in str(words).split())
    _0_words = Counter(word for words in words_0 for word in str(words).split())

    print("Most common words for oh_label = 1:")
    print(_1_words.most_common(50))

    print("Most common words for oh_label = 0:")
    print(_0_words.most_common(50))


def bag_of_words(data, column_name, max_features=5000):
    """
    Function to create a Bag of Words representation of the text data
    """
    # Joining the lemmatized words to form a string since CountVectorizer requires string input
    data['string_lemmatized'] = data[column_name].apply(' '.join)

    # Create the CountVectorizer and fit it to the data
    vectorizer = CountVectorizer(max_features=max_features)  
    X_bag_words = vectorizer.fit_transform(data['string_lemmatized'])

    # Convert the result to a DataFrame
    df_bag_words = pd.DataFrame(X_bag_words.toarray(), columns=vectorizer.get_feature_names_out())
    
    return df_bag_words , vectorizer


def tfidf(data, column_name, max_features=5000):
    """
    Function to create a TF-IDF representation of the text data
    """
    try:
        # Joining the lemmatized words to form a string since TfidfVectorizer requires string input
        data['string_lemmatized'] = data[column_name].apply(' '.join)
        
        # Create the TfidfVectorizer and fit it to the data
        tfidf_vectorizer = TfidfVectorizer(max_features=max_features)  
        X_TFIDF = tfidf_vectorizer.fit_transform(data['string_lemmatized'])

        # Convert the result to a DataFrame
        df_TFIDF = pd.DataFrame(X_TFIDF.toarray(), columns=tfidf_vectorizer.get_feature_names_out())
        
        return df_TFIDF, tfidf_vectorizer
    except Exception as e:
        print(f"An error occurred during TF-IDF vectorization: {e}")
        return None



from sklearn.decomposition import LatentDirichletAllocation

def apply_lda(data, n_components=10, random_state=42):
    """
    Function to apply Latent Dirichlet Allocation (LDA) on a Bag of Words representation of the text data.

    """
    # Create the LDA model and fit it to the data
    lda_model = LatentDirichletAllocation(n_components=n_components, random_state=random_state) 
    lda_model.fit(data)
    
    return lda_model



def get_topics(lda_model, vectorizer, top_n=10):
    """top_n words for each topic in the lda"""
    topics = []
    feature_names = vectorizer.get_feature_names_out()
    for topic_idx, topic in enumerate(lda_model.components_):
        top_features_idx = topic.argsort()[-top_n:][::-1]
        topics.append([feature_names[i] for i in top_features_idx])
    return topics


def sentiment_analysis(data):
    # Instantiate the Sentiment Intensity Analyzer
    intensity_analyser = SentimentIntensityAnalyzer()
    
    # Convert the lemmatized words back to strings for VADER to analyze
    data['lemmatized_string'] = data['lemmatized_clean'].apply(' '.join)

    # Apply VADER analysis to compute sentiment scores
    data['sentiment_scores'] = data['lemmatized_string'].apply(lambda x: intensity_analyser.polarity_scores(x)['compound'])

    # Classify the sentiment based on the computed scores
    data['sentiment'] = data['sentiment_scores'].apply(lambda x: 'Positive' if x >= 0.05 else ('Neutral' if x > -0.05 else 'Negative'))

    # Create a pivot table to analyze the sentiment scores with respect to the oh_label
    pivot_table = pd.pivot_table(data, values='sentiment_scores', index=['sentiment'], columns=['oh_label'], aggfunc='count', fill_value=0)

    # Calculate the percentage of each sentiment category for each oh_label
    total_counts_per_label = pivot_table.sum(axis=0)
    pivot_table_percentage = (pivot_table / total_counts_per_label) * 100
    pivot_table_percentage = pivot_table_percentage.round(2)
    
    return data, pivot_table_percentage


In [5]:
class TestFunctions(unittest.TestCase):
    
    def test_load_csv(self):
        # Test loading a valid CSV file
        data = load_csv('./output_all_data.csv')
        self.assertIsInstance(data, pd.DataFrame)
        
        # Test loading a non-existing file (should raise FileNotFoundError)
        with self.assertRaises(FileNotFoundError):
            load_csv('fake_file.csv')
        

    
    def test_tokenize_text(self):

        data = pd.DataFrame({'oh_label': [1.0, 0.0], 'clean_text': ['This is a', 'Unit Test Prog']})
        result = tokenise_text(data)
        self.assertIn('tokens', result.columns)
        
    def test_lemmatize_text(self):
        # Assuming data is a DataFrame with a 'tokens' column
        data = pd.DataFrame({'tokens': [['this', 'is', 'a'], ['unit', 'test', 'prog']]})
        result = lemmatize_text(data)
        self.assertIn('lemmatized', result.columns)
        
    def test_word_frequency_analysis(self):
        data = pd.DataFrame({
            'oh_label': [1, 0],
            'lemmatized': [['this', 'is','a'], ['unit', 'test','prog']]
        })
        word_frequency_analysis(data)
        
    def test_bag_of_words(self):
        data = pd.DataFrame({'lemmatized': [['this', 'is','a'], ['unit', 'test','prog']]})
        result, vectorizer = bag_of_words(data, 'lemmatized')
        self.assertIsInstance(result, pd.DataFrame)
        self.assertIsInstance(vectorizer, CountVectorizer)
        
    def test_tfidf(self):
        data = pd.DataFrame({'lemmatized': [['this', 'is','a'], ['unit', 'test','prog']]})
        result, tfidf_vectorizer = tfidf(data, 'lemmatized')
        self.assertIsInstance(result, pd.DataFrame)
        self.assertIsInstance(tfidf_vectorizer, TfidfVectorizer)
        
    def test_apply_lda(self):
        data = pd.DataFrame({'text1': [1, 0.5, 0], 'text2': [0.5, 0.2, 1]})
        lda_model = apply_lda(data)
        self.assertIsInstance(lda_model, LatentDirichletAllocation)
        
    def test_get_topics(self):
        data = pd.DataFrame({'text1': [1, 0.5, 0], 'text2': [0.5, 0.2, 1]})
        vectorizer = CountVectorizer()
        vectorizer.fit_transform(['hello world', 'python unit test'])
        lda_model = LatentDirichletAllocation(n_components=2, random_state=123)
        lda_model.fit(data)
        topics = get_topics(lda_model, vectorizer)
        self.assertIsInstance(topics, list)
        
    def test_sentiment_analysis(self):
        data = pd.DataFrame({'lemmatized_clean': [['testing'], ['test']], 'oh_label': [1, 0]})
        result, pivot_table_percentage = sentiment_analysis(data)
        self.assertIn('sentiment_scores', result.columns)
        self.assertIn('sentiment', result.columns)
        self.assertIsInstance(pivot_table_percentage, pd.DataFrame)


In [6]:
suite = unittest.TestLoader().loadTestsFromTestCase(TestFunctions)
runner = unittest.TextTestRunner()
runner.run(suite)


.........

Lemmatisation successful
Tokenisation successful.
Most common words for oh_label = 1:
[("['this',", 1), ("'is',", 1), ("'a']", 1)]
Most common words for oh_label = 0:
[("['unit',", 1), ("'test',", 1), ("'prog']", 1)]



----------------------------------------------------------------------
Ran 9 tests in 0.256s

OK


<unittest.runner.TextTestResult run=9 errors=0 failures=0>