# NLP Assignment


In [1]:
from collections import defaultdict
import numpy as np
import math
import pandas as pd
import re
from nltk.corpus import stopwords
import re
import string
import nltk
from nltk import ngrams
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer, PorterStemmer
from sklearn.feature_extraction.text import TfidfVectorizer
from nltk.stem import PorterStemmer
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
# Ensure you have the necessary NLTK data
import nltk

In [2]:
#split data into test, train and development
import os
import random

def read_reviews(folder_path):
    reviews = []
    labels = []
    for label in ["neg", "pos"]:
        label_folder = os.path.join(folder_path, label)
        for filename in os.listdir(label_folder):
            with open(os.path.join(label_folder, filename), 'r', encoding='utf-8') as file:
                reviews.append(file.read())
                labels.append(label)
    return reviews, labels

# Read and combine data from both train and test folders
#train_reviews, train_labels = read_reviews("C:/Users/Rohit/Documents/NLP_Project/train")
#test_reviews, test_labels = read_reviews("C:/Users/Rohit/Documents/NLP_Project/test")
train_reviews, train_labels = read_reviews("C:/Users/rohit/OneDrive/Documents/NLP_Project/train")
test_reviews, test_labels = read_reviews("C:/Users/rohit/OneDrive/Documents/NLP_Project/test")

all_reviews = train_reviews + test_reviews
all_labels = train_labels + test_labels

# Shuffle the combined data
combined = list(zip(all_reviews, all_labels))
random.shuffle(combined)
all_reviews, all_labels = zip(*combined)

# Split the data
total_reviews = len(all_reviews)
train_size = int(0.6 * total_reviews)
dev_size = int(0.2 * total_reviews)

train_data = (all_reviews[:train_size], all_labels[:train_size])
dev_data = (all_reviews[train_size:train_size + dev_size], all_labels[train_size:train_size + dev_size])
test_data = (all_reviews[train_size + dev_size:], all_labels[train_size + dev_size:])


In [3]:
#removes capitalisation and punctuation
def preprocess_text(text):
    text = text.lower()
    text=re.sub(r'[^\w\s]', '', text)
    return text

In [4]:
def ngrams(text, n):
    ngrams=[]
    text=preprocess_text(text)
    tokens = text.split()
    for i in range(len(tokens)- n+1):
         ngrams.append(tokens[i:i+n])

    return ngrams

In [5]:
def remove_stop_words(text):
    tokens = text.split()
    sw_nltk = stopwords.words('english')
    filtered_tokens = [word for word in tokens if word not in sw_nltk]
    return " ".join(filtered_tokens)

In [6]:
def stem_text(text):
    stemmer = PorterStemmer()
    tokens = word_tokenize(text)
    stemmed_tokens = [stemmer.stem(token) for token in tokens]
    return ' '.join(stemmed_tokens)

In [7]:
def lemmatize_text(text):
    lemmatizer = WordNetLemmatizer()
    tokens = word_tokenize(text)
    lemmatized_tokens = [lemmatizer.lemmatize(token) for token in tokens]
    return ' '.join(lemmatized_tokens)

In [8]:
def full_preprocessing(text, use_stemming=True):
    # Step 1: Lowercase and clean
    processed_text = preprocess_text(text)
    # Step 2: Remove stop words
    processed_text = remove_stop_words(processed_text)
    # Step 3: Stemming or Lemmatization
    if use_stemming:
        processed_text = ''.join(processed_text)  # Convert list back to string
        processed_text = stem_text(processed_text)
    else:
        processed_text = ''.join(processed_text)  # Convert list back to string
        processed_text = lemmatize_text(processed_text)

    return processed_text

In [9]:
class TFIDFVectorizer:
    def __init__(self):
        self.idf_values = {}
        self.vocab = []
        self.documents = []

    def fit(self, documents):
        self.documents = documents
        word_in_docs = defaultdict(int)
        total_docs = len(documents)

        for doc in documents:
            words = set(doc.split())
            for word in words:
                word_in_docs[word] += 1

        self.vocab = list(word_in_docs.keys())
        for word in self.vocab:
            self.idf_values[word] = math.log(total_docs / (1 + word_in_docs[word]))

    def transform(self):
        tfidf_matrix = []
        for doc in self.documents:
            word_count = defaultdict(int)
            words = doc.split()
            for word in words:
                word_count[word] += 1

            tfidf = [0] * len(self.vocab)
            for idx, word in enumerate(self.vocab):
                tf = word_count[word] / len(words)
                tfidf[idx] = tf * self.idf_values[word]

            tfidf_matrix.append(tfidf)

        return np.array(tfidf_matrix)


In [10]:
import scipy.sparse as sp

class NaiveBayesClassifierWithTFIDF:
    
    def __init__(self):
        self.class_probs = {}
        self.feature_probs = {}
        self.vocab = []

    def fit(self, X, y, vocab):
        self.vocab = vocab
        class_counts = y.value_counts()
        total_docs = len(y)

        self.class_probs = {c: np.log(class_counts[c] / total_docs) for c in class_counts.index}

        self.feature_probs = {c: defaultdict(lambda: 0) for c in class_counts.index}
        """for c in class_counts.index:
            class_docs = X[y == c]
            total_class_docs = class_docs.shape[0]
            sum_tfidf = class_docs.sum(axis=0).A1  # Convert to a 1D array if it's a sparse matrix

            for idx, word in enumerate(self.vocab):
                if idx < len(sum_tfidf):
                    self.feature_probs[c][word] = np.log((sum_tfidf[idx] + 1) / (total_class_docs + len(self.vocab)))
                else:
                    # Handle the case where the index is out of bounds
                    self.feature_probs[c][word] = np.log(1 / (total_class_docs + len(self.vocab)))"""
        for c in class_counts.index:
            class_docs = X[y == c]
            total_class_docs = class_docs.shape[0]
            sum_tfidf = class_docs.sum(axis=0)  # Assuming class_docs is a sparse matrix

            for idx, word in enumerate(self.vocab):
                if idx < len(sum_tfidf):
                    self.feature_probs[c][word] = np.log((sum_tfidf[idx] + 1) / (total_class_docs + len(self.vocab)))


    def predict(self, X):
        predictions = []
        for i in range(X.shape[0]):
            class_scores = {c: self.class_probs[c] for c in self.class_probs}
            for c in self.class_probs:
                # Convert the ith row of X to a dense array if it's a sparse matrix
                row = X[i].toarray().flatten() if sp.issparse(X[i]) else X[i]
                for idx, tfidf_value in enumerate(row):
                    class_scores[c] += tfidf_value * self.feature_probs[c][self.vocab[idx]]

            predictions.append(max(class_scores, key=class_scores.get))

        return predictions

In [11]:
"""train_data = [
    "love this phone", 
    "hate this phone", 
    "best phone ever", 
    "bad phone",
    "really happy with this purchase",
    "totally disappointed by the product",
    "would recommend it to everyone",
    "worst experience ever had",
    "excellent quality and service",
    "poor battery life and performance"
]
train_labels = pd.Series([
    "positive", "negative", "positive", "negative",
    "positive", "negative", "positive", "negative",
    "positive", "negative"
])


test_data = [
    "good phone", 
    "worst phone",
    "very satisfied with the quality",
    "not happy with the purchase",
    "amazing features and quality",
    "terrible customer service"
]
test_labels = pd.Series([
    "positive", "negative",
    "positive", "negative",
    "positive", "negative"
])


train_processed=[]
test_processed=[]
for review in train_data:
    processed_review = full_preprocessing(review, use_stemming=False)
    train_processed.append(processed_review)

for review in test_data:
    processed_review = full_preprocessing(review, use_stemming=False)
    test_processed.append(processed_review)

print(train_processed)
print(test_processed)

# Vectorize the training data using TF-IDF
tfidf_vectorizer = TFIDFVectorizer()
tfidf_vectorizer.fit(train_processed)
X_train = tfidf_vectorizer.transform()

# Vectorize the test data using the same TF-IDF vectorizer
tfidf_vectorizer.fit(test_processed)
X_test = tfidf_vectorizer.transform()

# Train the Naive Bayes Classifier
nb_classifier = NaiveBayesClassifierWithTFIDF()
nb_classifier.fit(X_train, train_labels, tfidf_vectorizer.vocab)

# Predict using the classifier
predictions = nb_classifier.predict(X_test)

print(predictions)"""

'train_data = [\n    "love this phone", \n    "hate this phone", \n    "best phone ever", \n    "bad phone",\n    "really happy with this purchase",\n    "totally disappointed by the product",\n    "would recommend it to everyone",\n    "worst experience ever had",\n    "excellent quality and service",\n    "poor battery life and performance"\n]\ntrain_labels = pd.Series([\n    "positive", "negative", "positive", "negative",\n    "positive", "negative", "positive", "negative",\n    "positive", "negative"\n])\n\n\ntest_data = [\n    "good phone", \n    "worst phone",\n    "very satisfied with the quality",\n    "not happy with the purchase",\n    "amazing features and quality",\n    "terrible customer service"\n]\ntest_labels = pd.Series([\n    "positive", "negative",\n    "positive", "negative",\n    "positive", "negative"\n])\n\n\ntrain_processed=[]\ntest_processed=[]\nfor review in train_data:\n    processed_review = full_preprocessing(review, use_stemming=False)\n    train_processed

In [14]:
from sklearn.feature_extraction.text import TfidfVectorizer
vectorizer = TFIDFVectorizer()
#tfidf_vectorizer = TFIDFVectorizer()

def preprocess_and_vectorize_data(reviews, vectorizer, use_stemming=False):
    processed_reviews = []
    total_reviews = len(reviews)
    print(f"Total reviews to process: {total_reviews}")

    for i, review in enumerate(reviews):
        # Preprocess the review
        processed_review = full_preprocessing(review, use_stemming=use_stemming)
        processed_reviews.append(processed_review)

        # Print progress every 100 reviews (or any other number you prefer)
        if (i + 1) % 500 == 0 or i == total_reviews - 1:
            print(f"Processed {i + 1} / {total_reviews} reviews")

    # Vectorize the processed reviews
    vectorizer.fit(processed_reviews)
    
    return vectorizer.transform()

# Usage
print("Processing training data...")
X_train = preprocess_and_vectorize_data(train_reviews, vectorizer, use_stemming=False)

print("Processing test data...")
X_test = preprocess_and_vectorize_data(test_reviews, vectorizer, use_stemming=False)


Processing training data...
Total reviews to process: 25000
Processed 100 / 25000 reviews
Processed 200 / 25000 reviews
Processed 300 / 25000 reviews
Processed 400 / 25000 reviews
Processed 500 / 25000 reviews
Processed 600 / 25000 reviews
Processed 700 / 25000 reviews
Processed 800 / 25000 reviews
Processed 900 / 25000 reviews
Processed 1000 / 25000 reviews
Processed 1100 / 25000 reviews
Processed 1200 / 25000 reviews
Processed 1300 / 25000 reviews
Processed 1400 / 25000 reviews
Processed 1500 / 25000 reviews
Processed 1600 / 25000 reviews
Processed 1700 / 25000 reviews
Processed 1800 / 25000 reviews
Processed 1900 / 25000 reviews
Processed 2000 / 25000 reviews
Processed 2100 / 25000 reviews
Processed 2200 / 25000 reviews
Processed 2300 / 25000 reviews
Processed 2400 / 25000 reviews
Processed 2500 / 25000 reviews
Processed 2600 / 25000 reviews
Processed 2700 / 25000 reviews
Processed 2800 / 25000 reviews
Processed 2900 / 25000 reviews
Processed 3000 / 25000 reviews
Processed 3100 / 25

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

def preprocess_and_vectorize_data(reviews, vectorizer, use_stemming=False):
    processed_reviews = []
    total_reviews = len(reviews)
    print(f"Total reviews to process: {total_reviews}")

    for i, review in enumerate(reviews):
        # Preprocess the review
        processed_review = full_preprocessing(review, use_stemming=use_stemming)
        processed_reviews.append(processed_review)

        # Print progress every 100 reviews (or any other number you prefer)
        if (i + 1) % 100 == 0 or i == total_reviews - 1:
            print(f"Processed {i + 1} / {total_reviews} reviews")

    # Vectorize the processed reviews
    # Note: Use fit_transform for training data and transform for test data
    return vectorizer.fit_transform(processed_reviews)

# Example usage
#vectorizer = TfidfVectorizer()
vectorizer = TFIDFVectorizer()
X_train = preprocess_and_vectorize_data(train_reviews, vectorizer, use_stemming=False)
X_test = preprocess_and_vectorize_data(test_reviews, vectorizer, use_stemming=False)


Total reviews to process: 25000
Processed 100 / 25000 reviews
Processed 200 / 25000 reviews
Processed 300 / 25000 reviews
Processed 400 / 25000 reviews
Processed 500 / 25000 reviews
Processed 600 / 25000 reviews
Processed 700 / 25000 reviews
Processed 800 / 25000 reviews
Processed 900 / 25000 reviews
Processed 1000 / 25000 reviews
Processed 1100 / 25000 reviews
Processed 1200 / 25000 reviews
Processed 1300 / 25000 reviews
Processed 1400 / 25000 reviews
Processed 1500 / 25000 reviews
Processed 1600 / 25000 reviews
Processed 1700 / 25000 reviews
Processed 1800 / 25000 reviews
Processed 1900 / 25000 reviews
Processed 2000 / 25000 reviews
Processed 2100 / 25000 reviews
Processed 2200 / 25000 reviews
Processed 2300 / 25000 reviews
Processed 2400 / 25000 reviews
Processed 2500 / 25000 reviews
Processed 2600 / 25000 reviews
Processed 2700 / 25000 reviews
Processed 2800 / 25000 reviews
Processed 2900 / 25000 reviews
Processed 3000 / 25000 reviews
Processed 3100 / 25000 reviews
Processed 3200 /

In [None]:
# Train the Naive Bayes Classifier
nb_classifier = NaiveBayesClassifierWithTFIDF()
nb_classifier.fit(X_train, train_labels, tfidf_vectorizer.vocab)



KeyboardInterrupt: 

In [None]:

# Predict using the classifier
predictions = nb_classifier.predict(X_test)

In [None]:

# Evaluate the predictions (You can add more evaluation metrics as needed)
accuracy = np.mean(predictions == test_labels)
print(f"Accuracy: {accuracy}")