In [1]:
!pip install -U scikit-learn
!pip install torch
!pip install pytorch-transformers
!pip install accelerate -U
!pip install transformers

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m23.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m23.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m23.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m23.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip instal

In [2]:
try:
    import sagemaker_datawrangler
except ImportError:
    !pip install --upgrade sagemaker-datawrangler
    import sagemaker_datawrangler

In [3]:
from sagemaker import get_execution_role #this contains the permissions to access the bucket
import boto3 #aws python client
import pandas as pd
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
s3 = boto3.client('s3') #connect to S3
bucketName = "cs539-e2023-149b5d57-81a4-4e31-a476-c81e7e07dc33"
live_split_prefix = 'split/tsv/reviews/trimmed/'
train_split_prefix = 'split/tsv/training_reviews/trimmed/'

In [4]:
from transformers import Trainer, TrainingArguments, AutoTokenizer, AutoModelForSequenceClassification
tokenizer = AutoTokenizer.from_pretrained('bert-base-cased')
model = AutoModelForSequenceClassification.from_pretrained('./Model')

In [16]:
def getNumChunks(prefix):
    delim = 'chunk_*.tsv'
    response = response = s3.list_objects_v2(Bucket=bucketName, Prefix=prefix, Delimiter=delim)['Contents']
    return len(response)

def readChunk(chunk_id, prefix):
    chunk_key = prefix + 'chunk_' + '{:03d}'.format(chunk_id) + '.tsv'
    response = s3.get_object(Bucket=bucketName, Key=chunk_key)
    df = pd.read_csv(response.get('Body'), sep='\t')
    return df
    
def readChunkRange(start, end, prefix):
    df = None
    delim = 'chunk_*.tsv'
    #get all keys so we can skip ones that don't exist
    response = s3.list_objects_v2(Bucket=bucketName, Prefix=prefix, Delimiter=delim)['Contents']
    keys = []
    for obj in response:
        keys.append(obj['Key'])
    for chunk_id in range(start, end):
        chunk_key = prefix + 'chunk_' + '{:03d}'.format(chunk_id) + '.tsv'
        if chunk_key in keys:
            chunk_response = s3.get_object(Bucket=bucketName, Key=chunk_key)
            chunk_df = pd.read_csv(chunk_response.get('Body'), sep='\t')
            if df is None:
                df = chunk_df
            else:
                df = pd.concat([df,chunk_df], axis=0)
        else:
            print('WARNING: Chunk Key:', chunk_key, ' not found in bucket')
    return df

def readTsvDataset(filename):
    key = 'datasets/tsv/' + filename;
    response = s3.get_object(Bucket=bucketName, Key=key)
    df = pd.read_csv(response.get('Body'), sep='\t')
    return df

def writeChunk(chunk_df, prefix, chunk_index):
    chunk_id = "{:03d}".format(chunk_index)
    writeKey = prefix + str(chunk_id) + '.tsv'
    with io.StringIO() as tsv_buffer: #buffer to write the data
        chunk_df.to_csv(tsv_buffer, index=False, sep='\t')
        s3.put_object(Bucket=bucketName, Key=writeKey, Body=tsv_buffer.getvalue())

In [17]:
import io
from torch.utils.data import DataLoader, TensorDataset
import multiprocessing
import copy

def tokenizeFunc(reviewText):
    return tokenizer(reviewText, max_length=256, truncation=True, padding='max_length', add_special_tokens=True, return_token_type_ids=False, return_tensors='pt')

def predictFunc(encoding):
    output = model(**encoding)
    logits = output.logits
    pred = logits.argmax(dim=1)
    return pred

def threadRunner(df_chunk, chunk_index, write_to_disk=True):
    
    #create output structure and copy values
    output_chunk = df_chunk.copy()
    #drop text from output early
    output_chunk = output_chunk.drop(columns='text')
    
    #tokenize the text column and store the token_ids and attention mask in the output
    encodings = df_chunk['text'].map(tokenizeFunc)
    #encodings = tokenizer(df_chunk['text'].tolist(), max_length=256, truncation=True, padding='max_length', add_special_tokens=True, return_token_type_ids=False, return_tensors='pt')
    
    # Perform prediction
    outputs = encodings.map(predictFunc)
    logits = outputs.logits
    predicted_labels = logits.argmax(dim=1).tolist()
    output_chunk['text_pred'] = predicted_labels
    
    
    if write_to_disk:
        writeChunk(output_chunk, 'predictions/stage1/training_reviews/', chunk_index)
    return output_chunk

def predict_stage_1(df, chunk_size = 10000, pool_size=100):
    num_chunks = int(df.shape[0]/chunk_size) + 1
    num_batches = int(num_chunks/pool_size) + 1
    print('DF Size:',str(df.shape[0]),' | Chunk Size:', str(chunk_size), ' | Num Chunks:', str(num_chunks), ' | Pool Size:', str(pool_size), ' | Num Batches:', str(num_batches))
    print('Creating Chunks')
    chunks = []
    for i in range(0, len(df), chunk_size):
        chunk = df[i:i+chunk_size]
        #reset row index
        chunk = chunk.reset_index(drop=True)
        chunks.append(chunk)
        
    print('Chunks created:', str(len(chunks)))
    if(len(chunks) != num_chunks):
        print('ERROR: incorrect number of chunks')
        return;
    
    print('Running Pools')
    for pool_itr in range(num_batches):
        pool_start = pool_size * pool_itr
        pool_end = min(pool_start + pool_size, num_chunks)
        processes = []
        print('Pool:', str(pool_itr), ' | Start:', str(pool_start), ' | End:', str(pool_end))
        for chunk_id in range(pool_start, pool_end):
            print(str(chunk_id), ',', end='')
            p = multiprocessing.Process(target=threadRunner, args=(chunks[chunk_id], chunk_id))
            processes.append(p)
            p.start()
        for p in processes:
            p.join()
        print()
    print('Prediction complete')

In [18]:
run_prefix = train_split_prefix
batch_size = 500
num_chunks = getNumChunks(run_prefix)
for chunk_id in range(num_chunks):
    print("Chunk:", str(chunk_id), ' | Loading:', end='')
    df = readChunk(chunk_id, run_prefix)
    df['text_pred'] = 2
    num_batches = int(len(df)/batch_size)
    if(len(df) % batch_size != 0):
        num_batches += 1
    print("DONE | Analyzing:", end='')
    pred = []
    for batch in range(num_batches):
        print('B', str(batch), '(', end='')
        start = batch * batch_size
        end = start + batch_size
        batch_encodings = tokenizeFunc(df['text'][start:end].tolist())
        print('T', end='')
        batch_pred = predictFunc(batch_encodings)
        print('P', end='')
        if pred == []:
            pred = batch_pred.tolist()
        else:
            pred.extend(batch_pred.tolist())
        print('A), ', end='')
    df['text_pred'] = pred
    print("DONE | Dropping Text:", end='')
    df = df.drop(columns=['text'])
    print("DONE | Writing:", end='')
    writeChunk(df, 'predictions/stage1/reviews/', chunk_id)
    print("DONE")

Chunk: 0  | Loading:DONE | Analyzing:B 0 (TPA), B 1 (TPA), B 2 (TPA), B 3 (TPA), B 4 (TPA), B 5 (TPA), B 6 (TPA), B 7 (TPA), B 8 (TPA), B 9 (TPA), B 10 (TPA), B 11 (TPA), B 12 (TPA), B 13 (TPA), B 14 (TPA), B 15 (TPA), B 16 (TPA), B 17 (TPA), B 18 (TPA), B 19 (TPA), DONE | Dropping Text:DONE | Writing:DONE
Chunk: 1  | Loading:DONE | Analyzing:B 0 (TPA), B 1 (TPA), B 2 (TPA), B 3 (TPA), B 4 (TPA), B 5 (TPA), B 6 (TPA), B 7 (TPA), B 8 (TPA), B 9 (TPA), B 10 (TPA), B 11 (TPA), B 12 (TPA), B 13 (TPA), B 14 (TPA), B 15 (TPA), B 16 (TPA), B 17 (TPA), B 18 (TPA), B 19 (TPA), DONE | Dropping Text:DONE | Writing:DONE
Chunk: 2  | Loading:DONE | Analyzing:B 0 (TPA), B 1 (TPA), B 2 (TPA), B 3 (TPA), B 4 (TPA), B 5 (TPA), B 6 (TPA), B 7 (TPA), B 8 (TPA), B 9 (TPA), B 10 (TPA), B 11 (TPA), B 12 (TPA), B 13 (TPA), B 14 (TPA), B 15 (TPA), B 16 (TPA), B 17 (TPA), B 18 (TPA), B 19 (TPA), DONE | Dropping Text:DONE | Writing:DONE
Chunk: 3  | Loading:DONE | Analyzing:B 0 (TPA), B 1 (TPA), B 2 (TPA), B 3 (T

TypeError: TextEncodeInput must be Union[TextInputSequence, Tuple[InputSequence, InputSequence]]

In [21]:
run_prefix = train_split_prefix
batch_size = 500
chunk_id = 6

df = readChunk(chunk_id, run_prefix)
df['text_pred'] = 2
pred = []
batch = 4
start = batch * batch_size
end = start + batch_size
part = df['text'][start:end]
print(part)
batch_encodings = tokenizeFunc(part.tolist())
batch_pred = predictFunc(batch_encodings)
pred = batch_pred.tolist()

2000    Walking in to this restaurant you know you're ...
2001    Came here for drinks and appetizers last night...
2002    Went last week to see what all the fuss was ab...
2003    Mastro's definitely delivered on ambience and ...
2004    The food is as good as I was expecting. I wish...
                              ...                        
2495    We loved loved loved dining here and especiall...
2496    My first visit was on a Tuesday night last wee...
2497    Went on a Monday night for a dinner with a gro...
2498    Absolutely disgusted on our wedding night.Grea...
2499    the places it's great atmosfere decoration emp...
Name: text, Length: 500, dtype: object


TypeError: TextEncodeInput must be Union[TextInputSequence, Tuple[InputSequence, InputSequence]]