In [5]:
import os
os.environ["TRANSFORMERS_NO_TF"] = "1"
os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0"
import transformers
import torch
import pandas as pd
import json
import random
from tqdm import tqdm
import time
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AdamW
import nltk
from nltk.tokenize import sent_tokenize
import pathlib
import multiprocessing
import gc

In [6]:
print(f"PyTorch Version：{torch.__version__}")
print(f"CUDA Available：{torch.cuda.is_available()}")
print(f"CUDA ver：{torch.version.cuda}")

gpu_count = torch.cuda.device_count()
if torch.cuda.is_available():
    print(f"GPU Num：{gpu_count}")
else:
    print("No GPU ava.")

for i in range(gpu_count):
    print(f"\nGPU {i} info：")
    print(f"Dev Name：{torch.cuda.get_device_name(i)}")
    print(f"Dev Idx：{i}")
    print(f"Is Cur Dev：{'Yes' if torch.cuda.current_device() == i else 'No'}")
    print(f"GPU Mem：{torch.cuda.get_device_properties(i).total_memory / 1024**3:.2f} GB")
    print(f"CUDA abi：{torch.cuda.get_device_properties(i).major}.{torch.cuda.get_device_properties(i).minor}")
    print()

PyTorch Version：2.6.0+cu126
CUDA Available：True
CUDA ver：12.6
GPU Num：1

GPU 0 info：
Dev Name：NVIDIA GeForce RTX 4060 Laptop GPU
Dev Idx：0
Is Cur Dev：Yes
GPU Mem：8.00 GB
CUDA abi：8.9



In [8]:
#!!! This block determines which files are preprocessed next
files_dir = os.listdir("./Data/")
print(files_dir)

for i in range(len(files_dir)):
    files_dir[i] = os.path.join("./Data/", files_dir[i])

print(files_dir)

['Clothing_Shoes_and_Jewelry.jsonl', 'Electronics.jsonl', 'Health_and_Personal_Care.jsonl']
['./Data/Clothing_Shoes_and_Jewelry.jsonl', './Data/Electronics.jsonl', './Data/Health_and_Personal_Care.jsonl']


In [None]:
# Stream reads JSONL files
with open(files_dir[0], "r") as f:
    i=0
    for line in f:
        # Print some fields to see how they read
        print(json.dumps(json.loads(line.strip()),indent=4))
        print("-" * 50)

        i = i + 1
        if i==5:
            break

In [None]:
# Preprocessing: stream read, distinguish by purchase, training set test set 8:2
batch_size = 10000

path = pathlib.Path("./Processed Data/")
if not path.exists():
    os.makedirs(path)

def prepro(file_path, true_reviews, false_reviews):
    # Shuffle
    random.shuffle(true_reviews)
    random.shuffle(false_reviews)

    # Divide training set and test set (80% training, 20% testing)
    train_true = true_reviews[:int(0.8 * len(true_reviews))]
    test_true = true_reviews[int(0.8 * len(true_reviews)):]
    train_false = false_reviews[:int(0.8 * len(false_reviews))]
    test_false = false_reviews[int(0.8 * len(false_reviews)):]

    # Save training set and test set data
    file_name = file_path.split("/")[2].split(".jsonl")[0]
    with open("./Processed Data/" + file_name + "_train_true" + ".jsonl", "a") as train_true_f:
        for review in train_true:
            train_true_f.write(json.dumps(review) + "\n")

    with open("./Processed Data/" + file_name + "_test_true" + ".jsonl", "a") as test_true_f:
        for review in test_true:
            test_true_f.write(json.dumps(review) + "\n")
        
    with open("./Processed Data/" + file_name + "_train_false" + ".jsonl", "a") as train_false_f:
        for review in train_false:
            train_false_f.write(json.dumps(review) + "\n")

    with open("./Processed Data/" + file_name + "_test_false" + ".jsonl", "a") as test_false_f:
        for review in test_false:
            test_false_f.write(json.dumps(review) + "\n")


    
for file_path in files_dir:
    # Calculate the total number of lines in the file
    start1 = time.time()
    with open(file_path, "r") as f:
        total_lines = sum(1 for _ in f)
    end1 = time.time()

    # Calculate the total number of batches
    total_batches = total_lines // batch_size + (1 if total_lines % batch_size != 0 else 0)

    start2 = time.time()
    with open(file_path, "r",) as f:
        true_reviews = []
        false_reviews = []
        with tqdm(total=total_batches, desc=f"Processing {file_path}") as pbar:
            for line in f:
                review = json.loads(line.strip())
    
                if review.get("verified_purchase", False):
                    true_reviews.append(review)
                else:
                    false_reviews.append(review)
    
                if len(true_reviews) + len(false_reviews) >= batch_size:
                    prepro(file_path, true_reviews, false_reviews)
                    true_reviews = []
                    false_reviews = []
                    pbar.update(1)
                    
            if true_reviews or false_reviews:
                prepro(file_path, true_reviews, false_reviews)
                true_reviews = []
                false_reviews = []
                pbar.update(1)
    end2 = time.time()
    print("{0}".format(file_path.split("/")[2]))
    print("Cal num rows time：{0}ms".format(int((end1-start1)*1000)))
    print("Execute 1M lines time：{0}ms".format(int((end2-start2)/total_lines*1000000*1000)))
    print()

print("Data processing and saving complete!")

In [None]:
# Preprocessing: Stream read, differentiated by purchase, for testing only
batch_size = 10000
upper_limit = 500000 # if 0, no limit

path = pathlib.Path("./Processed Data/")
if not path.exists():
    os.makedirs(path)

def prepro(file_path, true_reviews, false_reviews):
    # shuffle
    random.shuffle(true_reviews)
    random.shuffle(false_reviews)

    # Save training set and test set data
    file_name = file_path.split("/")[2].split(".jsonl")[0]
    with open("./Processed Data/" + file_name + "_true" + ".jsonl", "a") as true_f:
        for review in true_reviews:
            true_f.write(json.dumps(review) + "\n")
        
    with open("./Processed Data/" + file_name + "_false" + ".jsonl", "a") as false_f:
        for review in false_reviews:
            false_f.write(json.dumps(review) + "\n")


    
for file_path in files_dir:
    # Calculate the total number of lines in the file
    start1 = time.time()
    with open(file_path, "r") as f:
        total_lines = sum(1 for _ in f)
    end1 = time.time()

    effective_lines = total_lines if upper_limit == 0 else min(total_lines, upper_limit)
    
    # Calculate the total number of batches
    total_batches = effective_lines // batch_size + (1 if effective_lines % batch_size != 0 else 0)

    start2 = time.time()
    with open(file_path, "r",) as f:
        true_reviews = []
        false_reviews = []
        processed_count = 0
        
        with tqdm(total=total_batches, desc=f"Processing {file_path}") as pbar:
            for line in f:
                if upper_limit > 0 and processed_count >= upper_limit:
                    break
                    
                review = json.loads(line.strip())
                processed_count += 1
    
                if review.get("verified_purchase", False):
                    true_reviews.append(review)
                else:
                    false_reviews.append(review)
    
                if len(true_reviews) + len(false_reviews) >= batch_size:
                    prepro(file_path, true_reviews, false_reviews)
                    true_reviews = []
                    false_reviews = []
                    pbar.update(1)
                    
            if true_reviews or false_reviews:
                prepro(file_path, true_reviews, false_reviews)
                pbar.update(1)
    end2 = time.time()
    print("{0}".format(file_path.split("/")[2]))
    print("Num lines：{0}".format(total_lines))
    print("Cal num rows time：{0}ms".format(time.strftime("%H:%M:%S", time.gmtime(end1-start1))))
    print("Batch num：{}".format(total_batches))
    print("Execute 1M lines tme：{0}ms".format(time.strftime("%H:%M:%S", time.gmtime((end2-start2)*1000000))))
    print("Actual execute {0} lines".format(processed_count))
    print()

print("Data processing and saving complete!")

In [None]:
# Directory path
output_file = os.path.join("./", "line_counts.json")

# Count the number of lines in all JSONL files. If the file already exists, the existing content is loaded
if os.path.exists(output_file):
    with open(output_file, "r") as f:
        line_counts = json.load(f)
else:
    line_counts = {}

# Get all JSONL files
jsonl_files = [f for f in os.listdir("./Processed Data") if f.endswith(".jsonl")]

# Only unrecorded files are processed
new_files = [f for f in jsonl_files if f not in line_counts]

# Walk through and count the number of rows
for file_name in tqdm(new_files, desc="Calculating new file line counts"):
    file_path = os.path.join("./Processed Data", file_name)
    
    # Count the number of lines in the current file
    with open(file_path, "r") as f:
        total_lines = sum(1 for _ in f)
    
    # Save to dictionary
    line_counts[file_name] = total_lines

# Save the line count results to the JSON file
with open(output_file, "w") as f:
    json.dump(line_counts, f, indent=4)

print(f"Complete. The row number information is saved to {output_file}. Number of statistics files added: {len(new_files)}")

In [10]:
test_files_dir = os.listdir("./Processed Data/")
print(test_files_dir)

output_files_dir = []

for i in range(len(test_files_dir)):
    output_files_dir.append(os.path.join("./Test Result/", test_files_dir[i]))

for i in range(len(test_files_dir)):
    test_files_dir[i] = os.path.join("./Processed Data/", test_files_dir[i])

print(test_files_dir)
print()
print(output_files_dir)

['Clothing_Shoes_and_Jewelry_false.jsonl', 'Clothing_Shoes_and_Jewelry_true.jsonl', 'Electronics_false.jsonl', 'Electronics_true.jsonl', 'Health_and_Personal_Care_false.jsonl', 'Health_and_Personal_Care_true.jsonl']
['./Processed Data/Clothing_Shoes_and_Jewelry_false.jsonl', './Processed Data/Clothing_Shoes_and_Jewelry_true.jsonl', './Processed Data/Electronics_false.jsonl', './Processed Data/Electronics_true.jsonl', './Processed Data/Health_and_Personal_Care_false.jsonl', './Processed Data/Health_and_Personal_Care_true.jsonl']

['./Test Result/Clothing_Shoes_and_Jewelry_false.jsonl', './Test Result/Clothing_Shoes_and_Jewelry_true.jsonl', './Test Result/Electronics_false.jsonl', './Test Result/Electronics_true.jsonl', './Test Result/Health_and_Personal_Care_false.jsonl', './Test Result/Health_and_Personal_Care_true.jsonl']


In [11]:
# Load tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("nlptown/bert-base-multilingual-uncased-sentiment")
model = AutoModelForSequenceClassification.from_pretrained("nlptown/bert-base-multilingual-uncased-sentiment")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Computing hardware in use now:", device)
model.to(device)
# Put the model into evaluation mode
model.eval()

#nltk word divider
nltk.download('punkt')
nltk.download('punkt_tab')

# Define a function that analyzes emotion
def predict_sentiment(texts, json_lines=None, batch_size=32):
    try:
        # Split text into sentences
        sentences_list = [sent_tokenize(text) if text.strip() else [""] for text in texts]
        flat_sentences = [s for sent_list in sentences_list for s in sent_list]  # flatten
        
        if not flat_sentences:
            raise ValueError("The text has no valid sentences!")
    
        inputs = tokenizer(flat_sentences, return_tensors="pt", max_length=512, padding=True, truncation=True)
        inputs = {k: v.to(device) for k, v in inputs.items()}  # Push to GPU
        
        # Let BERT calculate the emotion score of all sentences at once
        with torch.no_grad():
            outputs = model(**inputs)
    
        # Gets the logits for all sentences
        logits = outputs.logits
        predictions = torch.argmax(logits, dim=-1) + 1  # 1-5 stars

        # Calculates the final rating for each text
        index = 0
        results = []
        for sentences in sentences_list:
            num_sentences = len(sentences)
            if num_sentences == 0:
                final_prediction = 3  # Default neutral score
            else:
                final_prediction = round(predictions[index:index + num_sentences].float().mean().item())
            results.append(final_prediction)
            index += num_sentences

        return results
        
    except Exception as e:
        print("\nBatch error:", str(e))
        if json_lines:
            print("JSON line:", json.dumps(json.loads(json_lines[0].strip()), indent=4))
        return [3] * len(texts)  # avoid crash

    finally:
        for var_name in ["inputs", "outputs", "logits", "predictions"]:
            if var_name in locals():
                del locals()[var_name]
        gc.collect()
        torch.cuda.empty_cache()

Computing hardware in use now: cuda


[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\31445\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\31445\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


In [12]:
# Single process compute inference function
def process_jsonl(input_file, output_file, batch_size=32):
    print(f"[process_jsonl] process now: {input_file}")
    # Read total line num
    with open("./line_counts.json", "r") as f:
        line_counts = json.load(f)
        total_lines = line_counts.get(input_file.split("/")[-1], 0)

    # Check whether the output file has content, and perform breakpoint continuation processing
    processed_lines = 0
    if os.path.exists(output_file):
        with open(output_file, "r") as f:
            processed_lines = sum(1 for _ in f)
        print(f"A {processed_lines} record already exists and will be skipped to continue processing.")

    # Process JSONL files
    with open(input_file, "r") as infile, open(output_file, "a") as outfile:
        with tqdm(total=total_lines, desc=f"Processing {input_file.split("/")[2].split(".jsonl")[0]}", unit=" reviews", initial=processed_lines) as pbar:
            batch_texts = []
            batch_lines = []
            batch_reviews = []
            
            for line_index, line in enumerate(infile):
                if line_index < processed_lines:
                    continue  # skip processed line

                review = json.loads(line.strip())
        
                # Get the text and title fields
                title = review.get("title", "").strip()
                text = review.get("text", "").strip()

                # If title or text contains only Spaces, it is considered empty
                title = title if title else None
                text = text if text else None
                
                # Merge title and text
                full_text = (title + ". " + text) if title and text else (title or text or "")

                batch_texts.append(full_text)
                batch_lines.append(line)
                batch_reviews.append(review)

                # When accumulated to batch_size, batch prediction is performed
                if len(batch_texts) >= batch_size:
                    predictions = predict_sentiment(batch_texts, batch_lines, batch_size=batch_size)
                    for i, review in enumerate(batch_reviews):
                        review["predict_stars"] = predictions[i]
                        outfile.write(json.dumps(review) + "\n")

                    batch_texts.clear()
                    batch_lines.clear()
                    batch_reviews.clear()
                    pbar.update(batch_size)
                    torch.cuda.empty_cache()
                    gc.collect()
                    
            # Process the remaining data
            if batch_texts:
                predictions = predict_sentiment(batch_texts, batch_lines, batch_size=batch_size)
                for i, review in enumerate(batch_reviews):
                    review["predict_stars"] = predictions[i]
                    outfile.write(json.dumps(review) + "\n")

                pbar.update(len(batch_texts))
    
    print("Processing complete! A new file has been generated: ", output_file)

In [None]:
status_file = "./process_status.json"

# Load processed state
if os.path.exists(status_file):
    with open(status_file, "r") as f:
        process_status = json.load(f)
else:
    process_status = {}

# Single threaded inference
for i in range(len(test_files_dir)):
    input_file = test_files_dir[i]
    output_file = output_files_dir[i]
    file_name = os.path.basename(input_file)

    if process_status.get(file_name) == "done":
        print(f"File {file_name} has been processed, skipped.")
        continue

    start = time.time()
    print(f"\n Start processing file: {file_name}")

    process_jsonl(input_file, output_file, batch_size=40) # batch_size change here!

    print(f"Processing completed: {file_name}")
    end = time.time()
    print("Time use：{0}".format(time.strftime("%H:%M:%S", time.gmtime(end-start))))

    # Mark as processed
    process_status[file_name] = "done"
    with open(status_file, "w") as f:
        json.dump(process_status, f, indent=4)