In [2]:
# THE BASICS
import numpy as np
import pandas as pd

# USING PYSPARK FOR EASY INTEGRATION
import pyspark.sql.types as typ 
import pyspark.ml.feature as ft
# CLASSIFICATION
import pyspark.ml.classification as cl
import pyspark.sql.functions as func
# FOR BUILDING THE FUNCTION ORDER OF FEATURE EXTRACTION TASKS
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
# FOR CLUSTERING
import pyspark.ml.clustering as clus
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
import pyspark.ml.evaluation as ev
import pyspark.ml.tuning as tune



In [13]:
# OBTAIN - METHOD FOR ADDING SIMPLE TEXT - IMPORT METHOD FROM CSV BELOW

text_data = spark.createDataFrame([
    ['''It's more of a therapeutic release than anything else, and if you're in the mood to just release it all with a grin, then Tamborine is crucial'''],
    ['''[Ryan] Coogler's overall ambition in crafting a sociologically complex story and presenting it as standard comic book movie fare succeeds: There is nothing common about Black Panther'''],
    ['''"While he may have been brought low as a husband and father, Rock's powers as a comedian have not been diminished'''],
    ['''The third full-length solo release for the Long Island singer-songwriter features contributions from Chris Farren, Dan Potthast, PUP, Laura Stevenson, and Antarctigo Vespucci'''],
    ['''While that sounds incredibly daunting--and like a really tiring listen--the album’s most impressive trait is that it makes all that vital work feel joyous and communal'''],
    ['''Gorgeous and energetic, Jeff’s manifesto in POST- (in parts), especially when we talk about politics, remind me those previous songs that some singers tried to bring along 2017 but could not pass a trusty message. He slayed them. His new record clarify minds and shake the soul with a contagious rhythm all whole the 10 tracks''']
], ['documents'])

In [14]:
# tokenizer specifies the patterns the text should be broken into
# text is being split by number of spaces

tokenizer = ft.RegexTokenizer(
    inputCol='documents', 
    outputCol='input_arr', 
    pattern='\s+|[,.\"]')

stopwords = ft.StopWordsRemover(
    inputCol=tokenizer.getOutputCol(), 
    outputCol='input_stop')

In [15]:


stringIndexer = ft.CountVectorizer(
    inputCol=stopwords.getOutputCol(), 
    outputCol="input_indexed")

tokenized = stopwords \
    .transform(
        tokenizer\
            .transform(text_data)
    )
    
stringIndexer \
    .fit(tokenized)\
    .transform(tokenized)\
    .select('input_indexed')\
    .take(2)



[Row(input_indexed=SparseVector(105, {0: 2.0, 12: 1.0, 15: 1.0, 34: 1.0, 44: 1.0, 71: 1.0, 77: 1.0, 87: 1.0, 88: 1.0, 94: 1.0})),
 Row(input_indexed=SparseVector(105, {8: 1.0, 10: 1.0, 23: 1.0, 27: 1.0, 29: 1.0, 33: 1.0, 41: 1.0, 43: 1.0, 47: 1.0, 56: 1.0, 58: 1.0, 59: 1.0, 72: 1.0, 83: 1.0, 89: 1.0, 96: 1.0, 97: 1.0, 103: 1.0, 104: 1.0}))]

In [16]:


# MODEL - USING LATENT DIRICHLET ALLOCATION
clustering = clus.LDA(k=2, optimizer='online', featuresCol=stringIndexer.getOutputCol())



In [17]:


# MODEL - BUILDING THE PIPELINE
# 1 - CREATE JUST WORD TOKENS
# 2 - REMOVE STOPWORDS
# 3 - CREATE INDEX & CLUSTERS

pipeline = Pipeline(stages=[
        tokenizer, 
        stopwords,
        stringIndexer, 
        clustering]
)



In [18]:


# INTERPRET - LOOK AT THE RESULTS OF THE CLUSTERING

topics = pipeline \
    .fit(text_data) \
    .transform(text_data)

topics.select('topicDistribution').collect()



[Row(topicDistribution=DenseVector([0.946, 0.054])),
 Row(topicDistribution=DenseVector([0.1879, 0.8121])),
 Row(topicDistribution=DenseVector([0.9462, 0.0538])),
 Row(topicDistribution=DenseVector([0.9652, 0.0348])),
 Row(topicDistribution=DenseVector([0.0385, 0.9615])),
 Row(topicDistribution=DenseVector([0.0209, 0.9791]))]