# RAPIDS + Dask NLP Walkthrough

In this notebook, we'll start by introducing a small subset of the Natural Language Processing capabilities that RAPIDS provides. We'll then demonstrate how you can combine RAPIDS with Dask to scale out these capabilities across many GPUs to process large datasets, and even do complex tasks like TF-IDF based similarity search.

In [None]:
import cudf

## Single GPU NLP Capabilities

Let's analyze some coronavirus related tweets from April 1st, 2020.

In [None]:
path = "/raid/vjawa/string_exp/tweets/2020-04-01 Coronavirus Tweets.CSV"
df = cudf.read_csv(path)
df = df.loc[df.lang == 'en']
df.shape

In [None]:
df.head(2)

In [None]:
df.text.head(3)

Let's tokenize the data.

In [None]:
df.text.str.tokenize()

What are the most common tokens?

In [None]:
df.text.str.tokenize().value_counts()

Stopwords. Of course we need to handle these.

In [None]:
import nltk
nltk.download('stopwords')

In [None]:
STOPWORDS = nltk.corpus.stopwords.words('english')

(df
 .text
 .str.replace_tokens(STOPWORDS, "")
 .str.tokenize()
).value_counts()

Case-sensitivity. Need to handle that too.

In [None]:
(df
 .text
 .str.lower()
 .str.replace_tokens(STOPWORDS, "")
 .str.tokenize()
).value_counts()

Punctuation may be affecting the results.

In [None]:
PUNCTUATION = [ '!', '"', '#', '$', '%', '&', '(', ')', '*', '+', '-', '.', '/',  '\\', ':', ';', '<', '=', '>',
           '?', '@', '[', ']', '^', '_', '`', '{', '|', '}', '\~', '\t','\\n',"'",",",'~' , '—']

In [None]:
(df
 .text
 .str.lower()
 .str.replace(PUNCTUATION, [" "]*len(PUNCTUATION), regex=False)
 .str.replace_tokens(STOPWORDS, "")
 .str.tokenize()
).value_counts()

Looks like web address terms are the most common now. That kind of makes sense. We should explicitly include these in our `STOPWORDS`.

In [None]:
STOPWORDS += ["co", "https", "com"]

In [None]:
(df
 .text
 .str.lower()
 .str.replace(PUNCTUATION, [" "]*len(PUNCTUATION), regex=False)
 .str.replace_tokens(STOPWORDS, "")
 .str.tokenize()
).value_counts()

Handling newlines and doing whitespace normalization is generally a good idea.

In [None]:
results = (df
 .text
 .str.lower()
 .str.replace(PUNCTUATION, [" "]*len(PUNCTUATION), regex=False)
 .str.replace_tokens(STOPWORDS, "")
 .str.normalize_spaces()
 .str.tokenize()
).value_counts()

results.head(10)

We've got the most common tokens. What about bigrams or trigrams?

In [None]:
(df
 .text
 .str.lower()
 .str.replace(PUNCTUATION, [" "]*len(PUNCTUATION), regex=False)
 .str.replace_tokens(STOPWORDS, "")
 .str.normalize_spaces()
 .str.ngrams_tokenize(n=2, separator=" ")
 .value_counts()
).head(10)

This makes sense. These sound like they could be terms used commonly in hashtags. What about trigrams?

In [None]:
(df
 .text
 .str.lower()
 .str.replace(PUNCTUATION, [" "]*len(PUNCTUATION), regex=False)
 .str.replace_tokens(STOPWORDS, "")
 .str.normalize_spaces()
 .str.ngrams_tokenize(n=3, separator=" ")
 .value_counts()
).head(10)

RAPIDS provides an immense amount of NLP functionality, and what's particularly powerful is that we can take this into the Dask world.

# Expanding to Larger, More Complex Tasks using Dask

Let's touch on the previous example, and then move to something more complex like document search.

In [None]:
import IPython

IPython.Application.instance().kernel.do_shutdown(True)

In [None]:
import nltk

from dask.distributed import Client
import dask.array as da

from dask_cuda import LocalCUDACluster
import cudf
import dask_cudf
import cupy as cp

from cuml.dask.feature_extraction.text import TfidfTransformer
from cuml.feature_extraction.text import HashingVectorizer as CumlHashVect

In [None]:
cluster = LocalCUDACluster(
    CUDA_VISIBLE_DEVICES="0,1,2,3",
    
)
client = Client(cluster)
client

In [None]:
path = "/raid/vjawa/string_exp/tweets/*.CSV"
df = dask_cudf.read_csv(path)

df = df.loc[df.lang == 'en'].persist()
print(len(df))

In [None]:
df['text'].head(5)

## Tokenization (Again)

We can do all the same processing we did before, this time using all of our GPU power.

In [None]:
STOPWORDS = nltk.corpus.stopwords.words('english')
STOPWORDS += ["co", "https", "com"]

PUNCTUATION = [ '!', '"', '#', '$', '%', '&', '(', ')', '*', '+', '-', '.', '/',  '\\', ':', ';', '<', '=', '>',
           '?', '@', '[', ']', '^', '_', '`', '{', '|', '}', '\~', '\t','\\n',"'",",",'~' , '—']

In [None]:
# Same code, using Dask this time to scale out to unlimited data

results = (df
 .text
 .str.lower()
 .str.replace(PUNCTUATION, [" "]*len(PUNCTUATION), regex=False)
 .str.replace_tokens(STOPWORDS, "")
 .str.normalize_spaces()
 .str.tokenize()
).value_counts()

results.head(10)

## Distributed TF-IDF Based Document Search

Now that we know we can do these kinds of NLP operations with Dask, let's build a search tool using TF-IDF that lets us find tweets corresponding to our search query.

In [None]:
vectorizer = CumlHashVect(stop_words='english')
multi_gpu_transformer = TfidfTransformer()

Note that there is a `preprocessor` argument for the HashingVectorizer and it takes a callable. Let's actually redefine this with our own function, using the core logic from above.

In [None]:
def our_preprocessor(s):
    processed = (s
                .str.lower()
                .str.replace(PUNCTUATION, [" "]*len(PUNCTUATION), regex=False)
                .str.replace_tokens(STOPWORDS, "")
                .str.normalize_spaces()
                )
    return processed

vectorizer = CumlHashVect(stop_words='english', preprocessor=our_preprocessor)

In [None]:
meta = da.from_array(cp.sparse.csr_matrix(cp.zeros(1, dtype=cp.float32)))
X = df["text"].map_partitions(vectorizer.fit_transform, meta=meta).astype(cp.float32)
X = X.persist()
X.compute_chunk_sizes()

In [None]:
X_transformed = multi_gpu_transformer.fit_transform(X).persist()
X_transformed.compute_chunk_sizes()

For simplicity, we'll collect our corpus and sparse tf-idf matrix to a single GPU and use the Dask multi-GPU vectorizer. This is not the most optimized approach, but it's simple and easy to walk through.

In [None]:
corpus = df[["text", "status_id"]].compute()
X_transformed_singlegpu = X_transformed.compute()

Using cuML's NearestNeighbors we can calculate the most similar records using Cosine Similarity on the sparse tf-idf matrix.

In [None]:
from cuml.neighbors import NearestNeighbors

nn = NearestNeighbors(n_neighbors=5, metric="cosine")
nn.fit(X_transformed_singlegpu)

def search(haystack, needle):
    query_vector = vectorizer.transform(cudf.Series(needle))
    distances, indices = nn.kneighbors(query_vector)
    return haystack.iloc[indices.ravel()]

In [None]:
search(corpus, "NVIDIA AI")

In [None]:
search(corpus, "distributed computing")

In [None]:
search(corpus, "python programming gpu")

We've only scratched the surface of the NLP capabilities that Dask and RAPIDS make possible. We encourage you to look at the [RAPIDS](https://docs.rapids.ai/) and [Dask](https://docs.dask.org/en/latest/) documentation to learn more!