### **Batch Processing of Reviews using Spacy and Parallel Processing**

In this notebook, we the following is conducted:
- Batch processing of reviews using spaCy
- Batch + Parallel processing of reviews using spaCy (using ThreadpoolExecutor)

In [6]:
import pandas as pd
import re
import spacy 
import time
import tracemalloc


In [2]:
df_reviews  = pd.read_csv('Projects/df_review_analysis.csv')
df_reviews.head(5)

Unnamed: 0,drugName,condition,review,rating,date,usefulCount
0,Valsartan,Left Ventricular Dysfunction,"""It has no side effect, I take it in combinati...",9,2012-05-20,27
1,Guanfacine,ADHD,"""My son is halfway through his fourth week of ...",8,2010-04-27,192
2,Lybrel,Birth Control,"""I used to take another oral contraceptive, wh...",5,2009-12-14,17
3,Ortho Evra,Birth Control,"""This is my first time using any form of birth...",8,2015-11-03,10
4,Buprenorphine / naloxone,Opiate Dependence,"""Suboxone has completely turned my life around...",9,2016-11-27,37


In [3]:
# Load spaCy model
nlp = spacy.load('en_core_web_sm')

In [4]:
review_list = df_reviews["review"].dropna().tolist()

#### **Batch Processing of Reviews using Spacy**

In [7]:
def chunk_generator(iterable, chunk_size):
    for i in range(0, len(iterable), chunk_size):
        yield iterable[i:i+chunk_size]  # using yield generator method to generate data chunk on the fly and not load on in memory

def data_stripper(text):
    # Convert the input text to lowercase
    text = text.lower()
    # Remove standalone numbers
    text = re.sub(r'\b\d+\b', '', text)
    # Remove special characters, keeping only alphanumeric characters and spaces
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    # Remove any extra whitespace (including newlines /n)
    text = re.sub(r'\s+', ' ', text).strip()
    return text  # lets think on it whether to use yield or return benefit

# def preprocess_text_spacy_pipe(chunk=None, batch_size=None, stop=False, lemmer=True, return_tokens=False):
#     processed_texts = []
#     processed_tokens = []
#     for doc in nlp.pipe(chunk, batch_size=batch_size):
#         tokens = []
#         for token in doc:
#             if stop and token.is_stop:
#                 continue
#             if lemmer:
#                 tokens.append(token.lemma_)
#             else:
#                 tokens.append(token.text)
#         prepro_text = ' '.join(tokens)
#         # processed_tokens.append(tokens)
#         processed_texts.append(prepro_text)
#     return tokens,processed_texts

def preprocess_text_spacy_pipe(chunk=None, batch_size=None, stop=False, lemmer=True, return_tokens=False):
    processed_texts = []
    all_tokens = []
    for doc in nlp.pipe(chunk, batch_size=batch_size):
        tokens = []
        for token in doc:
            if stop and token.is_stop:
                continue
            if lemmer:
                tokens.append(token.lemma_)
            else:
                tokens.append(token.text)
        prepro_text = ' '.join(tokens)
        processed_texts.append(prepro_text)
        if return_tokens:
            all_tokens.append(tokens)
    if return_tokens:
        return all_tokens, processed_texts
    else:
        return processed_texts

# Load the spaCy model
nlp = spacy.load("en_core_web_sm")

# Define the chunk size
chunk_size = 100

# Create chunks using the generator
chunks = chunk_generator(review_list, chunk_size)

# List to store tokenized reviews
review_token_list = []
review_text_list = []

# Start measuring time and memory
start_time = time.time()
tracemalloc.start()

# Process each chunk
for chunk in chunks:
    # Instead of storing stripped_chunk, process each review on-the-fly
    stripped_chunk = (data_stripper(review) for review in chunk)
    # Process the stripped chunk with spaCy
    processed_tokens, prepro_texts = preprocess_text_spacy_pipe(stripped_chunk, batch_size=25, lemmer=True, return_tokens= True)
    # Append the processed texts to the review_token_list
    review_token_list.append(processed_tokens)
    review_text_list.append(prepro_texts)

# Stop measuring time and memory
end_time = time.time()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()

review_token_list = [item for sublist in review_token_list for item in sublist]
review_text_list = [item for sublist in review_text_list for item in sublist]


# Print results
print(f"Execution Time: {end_time - start_time} seconds")
print(f"Current Memory Usage: {current / 10**6} MB")
print(f"Peak Memory Usage: {peak / 10**6} MB")

# Output the tokenized reviews list to verify correctness (optional)
# print(review_token_list)


Execution Time: 1731.2893810272217 seconds
Current Memory Usage: 904.953882 MB
Peak Memory Usage: 947.502712 MB


In [10]:
review_token_list

[['it',
  'have',
  'no',
  'side',
  'effect',
  'I',
  'take',
  'it',
  'in',
  'combination',
  'of',
  'bystolic',
  'mg',
  'and',
  'fish',
  'oil'],
 ['my',
  'son',
  'be',
  'halfway',
  'through',
  'his',
  'fourth',
  'week',
  'of',
  'intuniv',
  'we',
  'become',
  'concerned',
  'when',
  'he',
  'begin',
  'this',
  'last',
  'week',
  'when',
  'he',
  'start',
  'take',
  'the',
  'high',
  'dose',
  'he',
  'will',
  'be',
  'on',
  'for',
  'two',
  'day',
  'he',
  'could',
  'hardly',
  'get',
  'out',
  'of',
  'bed',
  'be',
  'very',
  'cranky',
  'and',
  'sleep',
  'for',
  'nearly',
  'hour',
  'on',
  'a',
  'drive',
  'home',
  'from',
  'school',
  'vacation',
  'very',
  'unusual',
  'for',
  'he',
  'I',
  'call',
  'his',
  'doctor',
  'on',
  'monday',
  'morning',
  'and',
  'she',
  'say',
  'to',
  'stick',
  'it',
  'out',
  'a',
  'few',
  'day',
  'see',
  'how',
  'he',
  'do',
  'at',
  'school',
  'and',
  'with',
  'get',
  'up',
  'in',
 

Without using parallel processing took long time and even using nlp.pipe() didnt help.

---

#### **Batch + Parallel processing of reviews using spaCy (using ThreadpoolExecutor)**

Using the parallel processing:

In [None]:
# import re
# import spacy
# import tracemalloc
# import time
# from concurrent.futures import ThreadPoolExecutor, as_completed

# def chunk_generator(iterable, chunk_size):
#     for i in range(0, len(iterable), chunk_size):
#         yield iterable[i:i+chunk_size]  # using yield generator method to generate data chunk on the fly and not load on in memory

# def data_stripper(text):
#     # Convert the input text to lowercase
#     text = text.lower()
#     # Remove standalone numbers
#     text = re.sub(r'\b\d+\b', '', text)
#     # Remove special characters, keeping only alphanumeric characters and spaces
#     text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
#     # Remove any extra whitespace (including newlines /n)
#     text = re.sub(r'\s+', ' ', text).strip()
#     return text  # lets think on it whether to use yield or return benefit

# def preprocess_text_spacy_pipe(chunk=None, batch_size=None, stop=False, lemmer=True, tokens= False):
#     processed_texts = []
#     for doc in nlp.pipe(chunk, batch_size=batch_size):
#         tokens = []
#         for token in doc:
#             if stop and token.is_stop:
#                 continue
#             if lemmer:
#                 tokens.append(token.lemma_)
#             else:
#                 tokens.append(token.text)
#         prepro_text = ' '.join(tokens)
#         processed_texts.append(prepro_text)
#     if tokens:
#         return tokens, processed_texts
#     else:   
#         return processed_texts

# def process_chunk(index, chunk, batch_size):
#     stripped_chunk = (data_stripper(review) for review in chunk)
#     prepro_texts = preprocess_text_spacy_pipe(stripped_chunk, batch_size=batch_size, lemmer=True)
#     return index, prepro_texts

# # Load the spaCy model
# nlp = spacy.load("en_core_web_sm")


# # Define the chunk size
# chunk_size = 100

# # Create chunks using the generator and preserve their indices
# chunks = list(enumerate(chunk_generator(review_list[:1000], chunk_size)))

# # List to store tokenized reviews
# review_token_list_parallel_proc = [None] * len(chunks)  # Preallocate list for ordered results
# review_token_list_parallel_proc_tokens = [None] * len(chunks)

# # Start measuring time and memory
# start_time = time.time()
# tracemalloc.start()

# # Use ThreadPoolExecutor for parallel processing
# with ThreadPoolExecutor() as executor:
#     # Submit tasks to the executor
#     futures = {executor.submit(process_chunk, index, chunk, 25): index for index, chunk in chunks}
    
#     # Process futures as they complete
#     for future in as_completed(futures):
#         index, result = future.result()
#         # print(f"Result for chunk {index}: {result}")
#         review_token_list_parallel_proc_tokens[index], review_token_list_parallel_proc[index] = result
#         # review_token_list_parallel_proc[index] = result
#         # print(f"Result for chunk {index}: {review_token_list_para3[index]}")

# # Stop measuring time and memory
# end_time = time.time()
# current, peak = tracemalloc.get_traced_memory()
# tracemalloc.stop()

# # Flatten the list
# # review_token_list = [item for sublist in review_token_list for item in sublist]

# # Print results
# print(f"Execution Time: {end_time - start_time} seconds")
# print(f"Current Memory Usage: {current / 10**6} MB")
# print(f"Peak Memory Usage: {peak / 10**6} MB")

# # Output the tokenized reviews list to verify correctness (optional)
# # print(review_token_list)


Execution Time: 5.585376024246216 seconds
Current Memory Usage: 4.05627 MB
Peak Memory Usage: 266.302408 MB


### Note: Using `ThreadPoolExecutor` for Parallel Processing in Batch Text Preprocessing

This code uses `ThreadPoolExecutor` from Python's `concurrent.futures` module to enable parallel processing of text data during batch preprocessing. Below is a brief explanation of how and why it is used:

#### Purpose of Parallel Processing
When processing large datasets, text preprocessing tasks like tokenization, lemmatization, and cleaning can be computationally intensive. Using parallel processing helps to speed up this workflow by allowing multiple chunks of data to be processed simultaneously.

#### How It Works
1. **Chunking the Data:**
   - The `chunk_generator` function divides the dataset into smaller chunks of a fixed size (`chunk_size`), making it easier to process the data in batches.
   - Each chunk is processed independently.

2. **ThreadPoolExecutor Workflow:**
   - **Task Submission:** Chunks are submitted as tasks to the thread pool using `executor.submit`, which assigns each task to a separate thread.
   - **Asynchronous Processing:** The `as_completed` function ensures that tasks are processed as soon as they are completed, avoiding delays due to sequential execution.
   - **Maintaining Order:** Results from threads are stored in a preallocated list using the chunk's index, preserving the original order of the dataset.

3. **Integration with Preprocessing:**
   - Each thread processes its assigned chunk by first cleaning the data using the `data_stripper` function and then applying spaCy's `nlp.pipe` for tokenization and lemmatization.
   - The results (tokens and processed text) are collected and combined at the end.

#### Benefits of Using ThreadPoolExecutor
- **Speedup:** By dividing the workload across multiple threads, overall execution time is reduced compared to processing the data sequentially.
- **Efficiency:** CPU-bound tasks, such as those using spaCy (a C-optimized library), benefit from multithreading as they bypass Python's Global Interpreter Lock (GIL) to some extent.
- **Scalability:** This approach can handle larger datasets efficiently, as each thread processes a smaller, manageable portion of the data.

This method is an effective way to optimize text preprocessing workflows for large datasets, ensuring faster execution and better resource utilization.


In [11]:
import re
import spacy
import tracemalloc
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

def chunk_generator(iterable, chunk_size):
    for i in range(0, len(iterable), chunk_size):
        yield iterable[i:i+chunk_size]

def data_stripper(text):
    text = text.lower()
    text = re.sub(r'\b\d+\b', '', text)
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def preprocess_text_spacy_pipe(chunk=None, batch_size=None, stop=False, lemmer=True, return_tokens=False):
    processed_texts = []
    all_tokens = []
    for doc in nlp.pipe(chunk, batch_size=batch_size):
        tokens = []
        for token in doc:
            if stop and token.is_stop:
                continue
            if lemmer:
                tokens.append(token.lemma_)
            else:
                tokens.append(token.text)
        prepro_text = ' '.join(tokens)
        processed_texts.append(prepro_text)
        if return_tokens:
            all_tokens.append(tokens)
    if return_tokens:
        return all_tokens, processed_texts
    else:
        return processed_texts

def process_chunk(index, chunk, batch_size, return_tokens=False, stop=False, lemmer=True):
    stripped_chunk = (data_stripper(review) for review in chunk)
    results = preprocess_text_spacy_pipe(stripped_chunk, batch_size=batch_size, lemmer=lemmer, stop=stop, return_tokens=return_tokens)
    return index, results

# Load the spaCy model
nlp = spacy.load("en_core_web_sm")

# Example list of reviews
# review_list = ["This is the first review.", "This is the second review.", "Another review here."] * 50

# Define the chunk size
chunk_size = 100

# Create chunks using the generator and preserve their indices
chunks = list(enumerate(chunk_generator(review_list, chunk_size)))

# List to store tokenized reviews and tokens
review_token_list_parallel_proc = [None] * len(chunks)  # Preallocate list for ordered results
review_token_list_parallel_proc_tokens = [None] * len(chunks)

# Start measuring time and memory
start_time = time.time()
tracemalloc.start()

# Use ThreadPoolExecutor for parallel processing
with ThreadPoolExecutor() as executor:
    # Submit tasks to the executor
    futures = {executor.submit(process_chunk, index, chunk, 25, return_tokens=True, stop=False, lemmer=True): index for index, chunk in chunks}
    
    # Process futures as they complete
    for future in as_completed(futures):
        index, result = future.result()
        if isinstance(result[0], list) and isinstance(result[1], list):
            review_token_list_parallel_proc_tokens[index], review_token_list_parallel_proc[index] = result
        else:
            review_token_list_parallel_proc[index] = result

# Stop measuring time and memory
end_time = time.time()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()

# Flatten the list 
review_token_list_parallel_proc = [item for sublist in review_token_list_parallel_proc if sublist for item in sublist]
review_token_list_parallel_proc_tokens = [item for sublist in review_token_list_parallel_proc_tokens if sublist for item in sublist]

# Print results
print(f"Execution Time: {end_time - start_time} seconds")
print(f"Current Memory Usage: {current / 10**6} MB")
print(f"Peak Memory Usage: {peak / 10**6} MB")

# Output the tokenized reviews list to verify correctness (optional)
# print(review_token_list_parallel_proc)


Execution Time: 963.8470420837402 seconds
Current Memory Usage: 909.091816 MB
Peak Memory Usage: 1097.042576 MB


Only took half time to run where as the without parallel processing it was taking ~ 30minutes.

Now lets run the function with stop words as well:

In [26]:
review_token_list_parallel_proc_stpwrd = [None] * len(chunks)

# Start measuring time and memory
start_time = time.time()
tracemalloc.start()

# Use ThreadPoolExecutor for parallel processing
with ThreadPoolExecutor() as executor:
    # Submit tasks to the executor
    futures = {executor.submit(process_chunk, index, chunk, 25, return_tokens=False, stop=True, lemmer=True): index for index, chunk in chunks}
    
    # Process futures as they complete
    for future in as_completed(futures):
        index, result = future.result()
        if isinstance(result[0], list) and isinstance(result[1], list): # will not use in this case
            review_token_list_parallel_proc_tokens[index], review_token_list_parallel_proc[index] = result
        else:
            review_token_list_parallel_proc_stpwrd[index] = result

# Stop measuring time and memory
end_time = time.time()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()

In [27]:
review_token_list_parallel_proc_stpwrd = [item for sublist in review_token_list_parallel_proc_stpwrd if sublist for item in sublist]

In [28]:
review_token_list_parallel_proc_stpwrd

['effect combination bystolic mg fish oil',
 'son halfway fourth week intuniv concerned begin week start take high dose day hardly bed cranky sleep nearly hour drive home school vacation unusual call doctor monday morning say stick day school get morning day problem free agreeable emotional good thing cranky remember thing overall behavior well try different medication far effective',
 'oral contraceptive pill cycle happy light period max day effect contain hormone gestodene available switch lybrel ingredient similar pill end start lybrel immediately day period instruction say period last week take second pack week pack thing get bad period last week end week daily brown discharge positive not effect idea period free tempting alas',
 'time form birth control m glad go patch month decrease libido subside downside period long day exact period day max cramp intense day period cramp birth control happy patch',
 'suboxone completely turn life feel healthy m excel job money pocket saving acc

In [None]:
# Now we have three preprocessed lists on the refiew column bases on  3 strategies: only lemmitization, only lemmitization tokens and lem + stop words.
review_token_list_parallel_proc
review_token_list_parallel_proc_tokens
review_token_list_parallel_proc_stpwrd

Lets add the preprocessed lists to the dataframe:

In [32]:
# df_reviews['prep_review'] = review_token_list_parallel_proc
df_reviews['prep_review_token'] = review_token_list_parallel_proc_tokens
df_reviews['prep_review_stpwrd'] = review_token_list_parallel_proc_stpwrd

In [33]:
df_reviews

Unnamed: 0,drugName,condition,review,rating,date,usefulCount,prep_review,prep_review_token,prep_review_stpwrd
0,Valsartan,Left Ventricular Dysfunction,"""It has no side effect, I take it in combinati...",9,2012-05-20,27,it have no side effect I take it in combinatio...,"[it, have, no, side, effect, I, take, it, in, ...",effect combination bystolic mg fish oil
1,Guanfacine,ADHD,"""My son is halfway through his fourth week of ...",8,2010-04-27,192,my son be halfway through his fourth week of i...,"[my, son, be, halfway, through, his, fourth, w...",son halfway fourth week intuniv concerned begi...
2,Lybrel,Birth Control,"""I used to take another oral contraceptive, wh...",5,2009-12-14,17,I use to take another oral contraceptive which...,"[I, use, to, take, another, oral, contraceptiv...",oral contraceptive pill cycle happy light peri...
3,Ortho Evra,Birth Control,"""This is my first time using any form of birth...",8,2015-11-03,10,this be my first time use any form of birth co...,"[this, be, my, first, time, use, any, form, of...",time form birth control m glad go patch month ...
4,Buprenorphine / naloxone,Opiate Dependence,"""Suboxone has completely turned my life around...",9,2016-11-27,37,suboxone have completely turn my life around I...,"[suboxone, have, completely, turn, my, life, a...",suboxone completely turn life feel healthy m e...
...,...,...,...,...,...,...,...,...,...
161292,Campral,Alcohol Dependence,"""I wrote my first report in Mid-October of 201...",10,2015-05-31,125,I write my first report in midoctober of I hav...,"[I, write, my, first, report, in, midoctober, ...",write report midoctober alcohol post reduce do...
161293,Metoclopramide,Nausea/Vomiting,"""I was given this in IV before surgey. I immed...",1,2011-11-01,34,I be give this in iv before surgey I immediate...,"[I, be, give, this, in, iv, before, surgey, I,...",give iv surgey immediately anxious sit pa say ...
161294,Orencia,Rheumatoid Arthritis,"""Limited improvement after 4 months, developed...",2,2014-03-15,35,limit improvement after month develop bad rash...,"[limit, improvement, after, month, develop, ba...",limit improvement month develop bad rash md re...
161295,Thyroid desiccated,Underactive Thyroid,"""I&#039;ve been on thyroid medication 49 years...",10,2015-09-19,79,I ve be on thyroid medication year I spend my ...,"[I, ve, be, on, thyroid, medication, year, I, ...",ve thyroid medication year spend synthroid t4 ...


In [36]:
df_reviews.to_csv('Projects/preprocessed_reviews.csv', index=False)

In [37]:
df_test  = pd.read_csv('Projects/preprocessed_reviews.csv')
df_test.head(5)

Unnamed: 0,drugName,condition,review,rating,date,usefulCount,prep_review,prep_review_token,prep_review_stpwrd
0,Valsartan,Left Ventricular Dysfunction,"""It has no side effect, I take it in combinati...",9,2012-05-20,27,it have no side effect I take it in combinatio...,"['it', 'have', 'no', 'side', 'effect', 'I', 't...",effect combination bystolic mg fish oil
1,Guanfacine,ADHD,"""My son is halfway through his fourth week of ...",8,2010-04-27,192,my son be halfway through his fourth week of i...,"['my', 'son', 'be', 'halfway', 'through', 'his...",son halfway fourth week intuniv concerned begi...
2,Lybrel,Birth Control,"""I used to take another oral contraceptive, wh...",5,2009-12-14,17,I use to take another oral contraceptive which...,"['I', 'use', 'to', 'take', 'another', 'oral', ...",oral contraceptive pill cycle happy light peri...
3,Ortho Evra,Birth Control,"""This is my first time using any form of birth...",8,2015-11-03,10,this be my first time use any form of birth co...,"['this', 'be', 'my', 'first', 'time', 'use', '...",time form birth control m glad go patch month ...
4,Buprenorphine / naloxone,Opiate Dependence,"""Suboxone has completely turned my life around...",9,2016-11-27,37,suboxone have completely turn my life around I...,"['suboxone', 'have', 'completely', 'turn', 'my...",suboxone completely turn life feel healthy m e...


In [1]:
# In next notebook sentiment analysis.ipynb

___
___