In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
import numpy as np
from time import sleep
import os
import pandas as pd
from tqdm import tqdm
import json
from typing import Union, List, Dict

In [3]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/mittal.nit/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [4]:
from nltk.corpus import stopwords 

In [5]:
stop_words = set(stopwords.words('english')) 

In [20]:
PROJECT_PATH = os.getcwd().replace("notebooks", "")
DATA_PATH = os.path.join(PROJECT_PATH, "data")
PROCESSED_REVIEWS_PATH = os.path.join(DATA_PATH, "processed_reviews")

try:
    os.mkdir(PROCESSED_REVIEWS_PATH)
except OSError as error:
    print(error) 

In [7]:
os.listdir(DATA_PATH)

['yelp_academic_dataset_review.json',
 'glove.6B.zip',
 'glove.6B.200d.txt',
 'glove.6B.50d.txt',
 'word_expansion.json',
 'glove.6B.300d.txt',
 'test.out',
 '1.out',
 '2.out',
 'glove.6B.100d.txt']

In [8]:
# loading word expansions
word_expansions = ""
with open(os.path.join(DATA_PATH, "word_expansion.json")) as f:
    for i, line in tqdm(enumerate(f)):
        word_expansions += line
        
word_expansions = word_expansions.replace("\n","")
word_expansions = word_expansions.replace("\t","")
word_expansions = word_expansions.replace(",}","}")
word_expansions = word_expansions.replace(",]","]")
word_expansions = json.loads(word_expansions)

76it [00:00, 303877.12it/s]


In [9]:
data = []
max_reviews = 100
with open(os.path.join(DATA_PATH, "yelp_academic_dataset_review.json")) as f:
    for i, line in tqdm(enumerate(f)):
        data.append(line)
        if i > max_reviews:
            break
temp = [json.loads(data[i].replace("\n",""))["text"] for i in range(len(data))]

101it [00:00, 185474.91it/s]


In [10]:
glove_100d_path = os.path.join(DATA_PATH, "glove.6B.100d.txt")

In [11]:
cluster = SLURMCluster(
    n_workers=45,
    cores=45, 
    memory="100GB")
cluster

In [12]:
client = Client(cluster)

In [13]:
def process_review(
    reviews: str, 
    save_path: str,
    embedding_matrix: Dict[str, np.ndarray]=None,
    embedding_matrix_path: str=None) -> bool:
    
    """
    Parameters
    ----------
    reviews: str
    
    embedding_matrix: Dictionary of embeddings
    
    embedding_matrix_path: str
    
    Returns
    -------
    embeddings
    """
    reviews = [json.loads(reviews[i].replace("\n","")) for i in range(len(reviews))]
    
    if embedding_matrix is None:
        embedding_matrix = {}
        with open(embedding_matrix_path) as f:
            for line in f:
                word, embedding = line.split(maxsplit=1)
                embedding = np.fromstring(embedding, dtype=float, sep=" ")
                embedding_matrix[word] = embedding 

    unknown_word_embedding = np.zeros(len(next(iter(embedding_matrix.values()))))
    
    reviews_embedding = []
        
    for review in reviews:
        review_embedding = []
        
        for word in review["text"].split():
            review_embedding.append(
                embedding_matrix[word] if word in embedding_matrix else unknown_word_embedding)
        
        # mean embedding for a review
        review_embedding = np.mean(np.array(review_embedding), axis=0)        
        reviews_embedding.append(
            dict(review_id= review["review_id"], stars=review["stars"], embedding=review_embedding.tolist()))
    
    try:
        with open(save_path, "w") as f:
            json.dump(reviews_embedding , f)
        return True
    except:
        return False

# Sequential

In [14]:
%%time
current_batch_no = 1
batch_size = 10
total_size = 20

glove_100d_path = os.path.join(DATA_PATH, "glove.6B.100d.txt")
embedding_matrix = {}
with open(glove_100d_path) as f:
    for line in f:
        word, embedding = line.split(maxsplit=1)
        embedding = np.fromstring(embedding, dtype=float, sep=" ")
        embedding_matrix[word] = embedding 

result_futures = {}
with open(os.path.join(DATA_PATH, "yelp_academic_dataset_review.json")) as f:
    reviews = []
    for i, line in tqdm(enumerate(f), leave=False):
        
        if i >= total_size:
            break
        
        if len(reviews) == batch_size:
            result_futures[current_batch_no] = process_review(
                reviews=reviews, 
                save_path=os.path.join(PROCESSED_REVIEWS_PATH, f"{current_batch_no}.out"),
                embedding_matrix=embedding_matrix)
            current_batch_no += 1
            reviews = []
            reviews.append(line)
        else:
            reviews.append(line)
            
if len(reviews) > 0:  
    result_futures[current_batch_no] = process_review(
        reviews=reviews, 
        save_path=os.path.join(PROCESSED_REVIEWS_PATH, f"{current_batch_no}.out"),
        embedding_matrix=embedding_matrix)

                  

CPU times: user 5.76 s, sys: 437 ms, total: 6.19 s
Wall time: 6.06 s




# On HPC

In [None]:
%%time
[broadcasted_embedding_matrix] = client.scatter([embedding_matrix], broadcast=True)

In [19]:
%%time
current_batch_no = 1
result_futures = {}
with open(os.path.join(DATA_PATH, "yelp_academic_dataset_review.json")) as f:
    
    reviews = []
    for i, line in tqdm(enumerate(f), leave=False):
        
        if i >= total_size:
            break
            
        if len(reviews) == batch_size:
            
            result_futures[current_batch_no] = client.submit(
                process_review, 
                reviews=reviews,
                save_path=os.path.join(PROCESSED_REVIEWS_PATH, f"{current_batch_no}.out"),
                embedding_matrix=broadcasted_embedding_matrix)
            reviews = []
            current_batch_no += 1
            reviews.append(line)
        else:
            reviews.append(line)
            
if len(reviews) > 0:  
     result_futures[current_batch_no] = client.submit(
                process_review, 
                reviews=reviews,
                save_path=os.path.join(PROCESSED_REVIEWS_PATH, f"{current_batch_no}.out"),
                embedding_matrix=broadcasted_embedding_matrix)
    
results = client.gather(result_futures)

                            

CPU times: user 413 ms, sys: 42.6 ms, total: 455 ms
Wall time: 700 ms


In [None]:
%%time
current_batch_no = 1
batch_size = 1000
total_size = 8635402
result_futures = {}
with open(os.path.join(DATA_PATH, "yelp_academic_dataset_review.json")) as f:
    
    reviews = []
    for i, line in tqdm(enumerate(f), leave=False):
        
        if i >= total_size:
            break
            
        if len(reviews) == batch_size:
            
            result_futures[current_batch_no] = client.submit(
                process_review, 
                reviews=reviews,
                save_path=os.path.join(PROCESSED_REVIEWS_PATH, f"{current_batch_no}.out"),
                embedding_matrix=broadcasted_embedding_matrix)
            reviews = []
            current_batch_no += 1
            reviews.append(line)
        else:
            reviews.append(line)
            
if len(reviews) > 0:  
     result_futures[current_batch_no] = client.submit(
                process_review, 
                reviews=reviews,
                save_path=os.path.join(PROCESSED_REVIEWS_PATH, f"{current_batch_no}.out"),
                embedding_matrix=broadcasted_embedding_matrix)
    
results = client.gather(result_futures)

7495001it [02:57, 44664.48it/s]

In [26]:
with open(os.path.join(DATA_PATH, "yelp_academic_dataset_review.json")) as f:
    for i, line in tqdm(enumerate(f), leave=False):
        i
print(i)

                                 

8635402




In [None]:
# results_futures.keys()

In [None]:
# [len(result) for result in results_futures.values()]

In [None]:
# list(results_futures.values())[0].status