Install necessary libraries not on AWS-sagemaker instance.

In [None]:
!pip install spacy
!pip install gensim
!python -m spacy download en_core_web_lg

In [1]:
import boto3
from sagemaker import get_execution_role
import sagemaker.amazon.common as smac
import pandas as pd
import spacy
from gensim.models.phrases import Phrases, Phraser
from spacy.lang.en.stop_words import STOP_WORDS
import en_core_web_lg
import re
import string
from tqdm import tqdm
import time
import multiprocessing

Utlizing 72 core CPU computational optimized instance for data preprocessing job
Normally python program only use one core to process it's kernel

In [2]:
multiprocessing.cpu_count()

72

In [3]:
role = get_execution_role()
bucket = 'medium-text'
folder = 'data'
filename = 'Medium_aggregatedData.csv'
data_location = 's3://{}/{}/{}'.format(bucket,folder,filename)

In [4]:
df = pd.read_csv(data_location,low_memory=False)

In [5]:
df = df[df['language']=='en']
df = df.drop_duplicates(['postId'])

In [6]:
nlp=spacy.load('en_core_web_lg')

In [7]:
table = str.maketrans(dict.fromkeys(string.punctuation))

In [8]:
def preprocess_text(s):
    s = s.translate(table)
    s=s.replace('\n',' ')
    text = nlp(s)
    text = [i.text.lower() for i in text]
    return text

In [10]:
pool = multiprocessing.Pool(68)

In [11]:
if __name__ == '__main__':
    starttime = time.time()
    doc = pool.map(preprocess_text, [i for i in df['text']])
    pool.close()
    print('Processing time {} seconds'.format(time.time() - starttime))

Processing time 213.28333353996277 seconds


In [12]:
#Tokenizing and clean 66380 articles in 213s by processing data using 68 cpu core at the same time
len(df)

66380

Extract meaningful phrases from a given corpus using NPMI

In [13]:
phrases = Phrases(doc, min_count=5, threshold=5,progress_per=1000)

In [14]:
corpus = []
for i in tqdm(doc):
    text = phrases(i)
    text = ' '.join(text)
    corpus.append(text)

100%|██████████| 66380/66380 [00:02<00:00, 26079.96it/s]


In [15]:
import io
pkl_buffer = io.BytesIO()
txt_buffer = io.StringIO()

In [16]:
for item in corpus:
    txt_buffer.write("{}\n".format(item))

In [17]:
phrases.save(pkl_buffer)

In [18]:
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket,'utility/phraser.pkl').put(Body=pkl_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'A867E2BD8FCA0BF3',
  'HostId': '+qZs8YvBFB2vir6POB9TSZST3GFCa4NbUu4TIGT3XdhFCP/GTeXqZQFiX2O/aFeyvh18Y7H77P0=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '+qZs8YvBFB2vir6POB9TSZST3GFCa4NbUu4TIGT3XdhFCP/GTeXqZQFiX2O/aFeyvh18Y7H77P0=',
   'x-amz-request-id': 'A867E2BD8FCA0BF3',
   'date': 'Wed, 20 Nov 2019 21:46:16 GMT',
   'etag': '"7a0150c1df8fce305866485a1b413005"',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 1},
 'ETag': '"7a0150c1df8fce305866485a1b413005"'}

In [19]:
s3_resource.Object(bucket,'data/medium_corpus.txt').put(Body=txt_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '8C912408D254D657',
  'HostId': 'FH+C/yuJNDHs4yD+ydKi+E3CNePfYMDNoyBhrzM+IC8kLvmqCUSjPuxPG7kacOdbTcLpclAhSOs=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'FH+C/yuJNDHs4yD+ydKi+E3CNePfYMDNoyBhrzM+IC8kLvmqCUSjPuxPG7kacOdbTcLpclAhSOs=',
   'x-amz-request-id': '8C912408D254D657',
   'date': 'Wed, 20 Nov 2019 21:46:25 GMT',
   'etag': '"9b113e082d9786392a058b4e74a96e3d"',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"9b113e082d9786392a058b4e74a96e3d"'}