In [None]:
pip install transformers nltk



# New Section

In [None]:
import gzip
import pandas as pd
import re
from nltk.corpus import wordnet, sentiwordnet as swn
from nltk.sentiment.vader import SentimentIntensityAnalyzer
import nltk
from nltk.corpus import wordnet
import json
import os

In [None]:
from nltk.corpus import wordnet# Uncomment the below line if you haven't downloaded these NLTK resources
nltk.download('vader_lexicon')
nltk.download('wordnet')
nltk.download('sentiwordnet')

[nltk_data] Downloading package vader_lexicon to /root/nltk_data...
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package sentiwordnet to /root/nltk_data...
[nltk_data]   Unzipping corpora/sentiwordnet.zip.


True

In [None]:
def load_slang_dict():
  current_directory = os.getcwd()
  slang_file_path = os.path.join(current_directory, "slang words.xlsx")

  if os.path.exists(slang_file_path):
      try:
          slang_df = pd.read_excel(slang_file_path)
          slang_dict = dict(zip(slang_df['Slang Word'], slang_df['Meaning']))
          return slang_dict
      except Exception as e:
          print(f"Error loading slang dictionary: {e}")
          return {}  # Return an empty dictionary in case of error
  else:
      print(f"Error: Slang dictionary file not found at {slang_file_path}")
      return {}  # Return an empty dictionary in case of error
slang_dict = load_slang_dict()
slang_df = pd.read_excel("/content/slang words.xlsx")  # Adjust path if necessary
slang_dict = dict(zip(slang_df['Slang Word'], slang_df['Meaning']))

english_words = set(wordnet.words())


sentiment_analyzer = SentimentIntensityAnalyzer()

english_words = set(wordnet.words())


def process_reviews_from_json(file_path):
    results = []
    # Open the GZIP file
    with gzip.open("/content/All_Beauty.jsonl (1) (1).gz", 'rt', encoding='utf-8') as f:
        for line in f:
            try:
                review_data = json.loads(line)  # Load each review as a JSON object
                title = review_data.get('title', '')
                text = review_data.get('text', '')
                review_text = title + " " + text   # Extract the review text

                # Process the review and get the sentiment score
                slang_scores, final_slang_score = process_review(review_text)

                # Append the results to a list
                results.append({
                    'reviewText': review_text,
                    'slangScores': slang_scores,
                    'finalSlangScore': final_slang_score
                })

            except json.JSONDecodeError as e:
                print(f"Error decoding JSON: {e}")

    return results

current_directory = os.getcwd()
json_file_path = os.path.join(current_directory, "All_Beauty.jsonl (1).gz")

# Run the processing function
review_results = process_reviews_from_json(json_file_path)

def preprocess_text(text):
    # Remove non-alphabetic characters and filter out non-English words
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    words = text.split()
    processed_words = [word for word in words if word in english_words or word in slang_dict]
    return processed_words

def identify_slang(words):
    # Identify slang words in the text
    slang_words = [word for word in words if word in slang_dict]
    return slang_words


# Sentiment scoring function using SentiWordNet
def sentiment_score_sentiwordnet(word):
    # Attempt to retrieve SentiWordNet sentiment scores
    synsets = list(swn.senti_synsets(word))
    if synsets:
        # Calculate average sentiment scores
        pos_score = sum(syn.pos_score() for syn in synsets) / len(synsets)
        neg_score = sum(syn.neg_score() for syn in synsets) / len(synsets)
        # Assign categories based on scores (0=Negative, 1=Neutral, 2=Positive)
        if pos_score > neg_score:
            return 2  # Positive
        elif neg_score > pos_score:
            return 0  # Negative
        else:
            return 1  # Neutral
    else:
        # Default to neutral if no SentiWordNet score is found
        return 1

def sentiment_score(slang_words, slang_dictionary):
    # Get sentiment score for each slang term
    scores = {}
    for slang in slang_words:
        if slang in english_words:
            scores[slang] = sentiment_score_sentiwordnet(slang)
        else:
            # Fallback to VADER if not found in SentiWordNet
            expanded_text = slang_dictionary.get(slang, slang)  # Use passed dictionary
            score = sentiment_analyzer.polarity_scores(expanded_text)["compound"]
            if score >= 0.05:
                scores[slang] = 2  # Positive
            elif score <= -0.05:
                scores[slang] = 0  # Negative
            else:
                scores[slang] = 1  # Neutral
    return scores

def enhance_words(slang_scores):
    # Optional: Enhance slang words based on additional web resources (pseudo-code)
    enhanced_scores = {}
    for slang, score in slang_scores.items():
        # Example of modifying score (here we keep it simple)
        enhanced_scores[slang] = score * 1.1  # Hypothetical enhancement
    return enhanced_scores

# Get sentiment score for slang words using SentiWordNet or VADER
def sentiment_score(slang_words, slang_dictionary):  # Pass slang_dict as argument
    scores = {}
    for slang in slang_words:
        # Use SentiWordNet if available, fallback to VADER
        if slang in english_words:
            scores[slang] = sentiment_score_sentiwordnet(slang)
        else:
            # Fallback to VADER if not found in SentiWordNet
            expanded_text = slang_dictionary.get(slang, slang)  # Use passed dictionary
            score = sentiment_analyzer.polarity_scores(expanded_text)["compound"]
            if score >= 0.05:
                scores[slang] = 2  # Positive
            elif score <= -0.05:
                scores[slang] = 0  # Negative
            else:
                scores[slang] = 1  # Neutral
    return scores



def get_final_slang_score(slang_scores):
    # Calculate final slang score by averaging
    if slang_scores:
        final_score = round(sum(slang_scores.values()) / len(slang_scores))
    else:
        final_score = 1
    return final_score

# Main function to process reviews
def process_review(review):
    # Step 1: Preprocess the text
    processed_words = preprocess_text(review)

    # Step 2: Filter for slang words
    slang_words = identify_slang(processed_words)

    if not slang_words:
        return "No slang detected", 1  # No slang found

    # Step 3: Sentiment scoring
    slang_scores = sentiment_score(slang_words, slang_dict)

    # Step 4: Enhance words (optional)
    enhanced_scores = enhance_words(slang_scores)

    # Step 5: Final slang score
    final_score = get_final_slang_score(enhanced_scores)

    return slang_scores, final_score

def process_reviews_from_json(file_path):
    results = []
    try:
        # Open the GZIP file in read binary mode ('rb')
        with gzip.open(file_path, 'rb') as f:  # Changed to 'rb'
            # Decode data while reading to handle potential encoding issues
            for line in f:
                try:
                    review_data = json.loads(line.decode('utf-8'))
                    # ... (rest of your review processing logic) ...
                except json.JSONDecodeError as e:
                    print(f"Error decoding JSON: {e}")
    except EOFError as e:
        print(f"EOFError encountered: {e}. The file may be corrupted or incomplete.")
    except Exception as e:
        print(f"An error occurred: {e}")
    return results

current_directory = os.getcwd()

json_file_path = os.path.join(current_directory, "/content/All_Beauty.jsonl (1).gz")

review_results = process_reviews_from_json("/content/All_Beauty.jsonl (1).gz")
# Example usage
for result in review_results:
    print("Review Text:", result['reviewText'])
    print("Slang Scores:", result['slangScores'])
    print("Final Slang Score:", result['finalSlangScore'])
    print("---")


NameError: name 'process_review' is not defined