# Latent Semantic Analysis of a corpus

# Simple Example

In [1]:
noise_list = ["is", "a", "this", "..."] 
def _remove_noise(input_text):
    words = input_text.split() 
    noise_free_words = [word for word in words if word not in noise_list] 
    noise_free_text = " ".join(noise_free_words) 
    return noise_free_text

_remove_noise("this is a sample text")

'sample text'

# Base path for the ACL IMDB data set

In [2]:
# Load all the reviews into data frames.
base_path = "/Users/hujol/Projects/advanced_analytics_spark/data/aclImdb/"

# Loading the files into a data frame

In [8]:
# Set up Python system path to find our modules.
import os
import sys
module_path = os.path.abspath(os.path.join('../src'))
if module_path not in sys.path:
    sys.path.append(module_path)

# Import our modules.
import file_loader as fl

In [11]:
# Load the data in a parquet file.
file_pqt = fl.load_data(base_path, 252, spark)

In [12]:
!ls {base_path}

README                     [34maclImdb_211_raw.parquet[m[m
aclImdb_100000.csv         [34maclImdb_250_raw.parquet[m[m
[34maclImdb_100000_raw.parquet[m[m [34maclImdb_251_raw.parquet[m[m
[34maclImdb_10000_raw.parquet[m[m  [34maclImdb_252_raw.parquet[m[m
[34maclImdb_1000_raw.parquet[m[m   [34maclImdb_300_raw.parquet[m[m
[34maclImdb_100_raw.parquet[m[m    [34maclImdb_50000_raw.parquet[m[m
[34maclImdb_20000_raw.parquet[m[m  imdb.vocab
[34maclImdb_2000_raw.parquet[m[m   imdbEr.txt
[34maclImdb_200_raw.parquet[m[m    [34mtest[m[m
[34maclImdb_210_raw.parquet[m[m    [34mtrain[m[m


# Store as CVS

In [None]:
import numpy as np

# Define the CSV file.
file_csv = os.path.join(base_path, ("aclImdb_%s.csv" % ttl))

# Get the data as Pandas data frame.
pdf = df.toPandas()

# Re index to shuffle the data before saving it.
pdf = pdf.reindex(np.random.permutation(pdf.index))
pdf.to_csv(file_csv, index=False, encoding='utf-8')

# Load the CSV file for checking data is stored

In [None]:
import pandas as pd

ttl = 100000

# Define the CSV file.
file_csv = os.path.join(base_path, ("aclImdb_%s.csv" % ttl))

pdf_read = pd.read_csv(file_csv, encoding='utf-8')
pdf_read[1:3]

In [None]:
df_csv = spark.createDataFrame(pdf_read)
df_csv.show()

In [None]:
df_csv.printSchema()

# Load the data from the parquet file

In [8]:
# Check the parquet file is good.
df_pqt = spark.read.parquet(file_pqt)

# As needed.
# df_pqt = df_pqt.drop('words')

# Showing some observations (entries).
df_pqt.persist()
df_pqt.show()

+-----------+------------+----------------+--------+--------------+------------+--------------------+
|datasettype|    filename| datetimecreated|reviewid|reviewpolarity|reviewrating|                text|
+-----------+------------+----------------+--------+--------------+------------+--------------------+
|       test|11813_10.txt|20181024T150644Z|   11813|             1|          10|The Cure is a fan...|
|       test|   835_8.txt|20181024T150644Z|     835|             1|           8|The original Fema...|
|       test|  4245_8.txt|20181024T150644Z|    4245|             1|           8|remember back whe...|
|       test| 11856_7.txt|20181024T150644Z|   11856|             1|           7|Sophisticated sex...|
|       test|  6133_8.txt|20181024T150644Z|    6133|             1|           8|I stumbled upon t...|
|       test| 10167_9.txt|20181024T150644Z|   10167|             1|           9|A film that tends...|
|       test|  903_10.txt|20181024T150644Z|     903|             1|          10|Th

In [9]:
df_pqt.count()

251

# Preprocessing the text to clean HTML tags

In [18]:
import re

def clean_html(text):
    res = re.sub('<.*/>', '', text)
    return  re.sub('[\W]+', ' ', res)

a_text = 'the secrets] of the universe. <br /><br />Unfortunately, '
print(a_text)
print(clean_html(a_text))

from pyspark.sql import Row 

from functools import partial

def preprocess(new_column_name, row):
    data = row.asDict()
    text = data['text']
    
    # Use a regex to clean HTML tags
    text = clean_html(text)
    data[new_column_name] = text
    
    return Row(**data)

def transform_html_clean(df, new_column_name):
    f = partial(preprocess, new_column_name)
    return df.rdd.map(f).toDF()

the secrets] of the universe. <br /><br />Unfortunately, 
the secrets of the universe Unfortunately 


In [19]:
# Clean the parquet data frame.
df_pqt = transform_html_clean(df_pqt, 'textclean')
df_pqt.select('text', 'textclean').take(1)

[Row(text="Latcho Drom, or Safe Journey, is the second film in Tony Gatlif's trilogy of the Romany people. The film is a visual depiction and historical record of Romany life in European and Middle Eastern countries. Even though the scenes are mostly planned, rehearsed, and staged there is not a conventional story line and the dialog does not explain activities from scene to scene. Instead, the film allows the viewer to have sometimes a glimpse, sometimes a more in-depth view of these people during different eras and in different countries, ranging from India, Egypt, Romania, Hungary, Slovakia, France, and Spain.<br /><br />The importance of music in Romany culture is clearly expressed throughout the film. It is a vital part of every event and an important means of communication. Everything they do is expressed with music. Dance is another important activity. Like Romany music, it is specialized and deeply personal, something they alone know how to do correctly. We are provided glimpse

# Stemming and Lemmatization of text

## Example using NLTK

In [12]:
import nltk
from nltk.stem import PorterStemmer

words = ['write','writer','writing','writers']
ps = PorterStemmer()

for word in words:
    print(f"{word}: {ps.stem(word)}")

write: write
writer: writer
writing: write
writers: writer


In [13]:
from nltk.stem import WordNetLemmatizer

lemmatizer = WordNetLemmatizer()
nltk.download('wordnet')

lemmatizer.lemmatize('dogs')

[nltk_data] Downloading package wordnet to /Users/hujol/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


'dog'

## Spark Features Extractor Bag of Words

In [20]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

df_pqt = df_pqt.drop('words')
            
tokenizer = Tokenizer(inputCol="textclean", outputCol="words")
df_pqt = tokenizer.transform(df_pqt)

df_pqt.select('words').take(1)

[Row(words=['latcho', 'drom', 'or', 'safe', 'journey', 'is', 'the', 'second', 'film', 'in', 'tony', 'gatlif', 's', 'trilogy', 'of', 'the', 'romany', 'people', 'the', 'film', 'is', 'a', 'visual', 'depiction', 'and', 'historical', 'record', 'of', 'romany', 'life', 'in', 'european', 'and', 'middle', 'eastern', 'countries', 'even', 'though', 'the', 'scenes', 'are', 'mostly', 'planned', 'rehearsed', 'and', 'staged', 'there', 'is', 'not', 'a', 'conventional', 'story', 'line', 'and', 'the', 'dialog', 'does', 'not', 'explain', 'activities', 'from', 'scene', 'to', 'scene', 'instead', 'the', 'film', 'allows', 'the', 'viewer', 'to', 'have', 'sometimes', 'a', 'glimpse', 'sometimes', 'a', 'more', 'in', 'depth', 'view', 'of', 'these', 'people', 'during', 'different', 'eras', 'and', 'in', 'different', 'countries', 'ranging', 'from', 'india', 'egypt', 'romania', 'hungary', 'slovakia', 'france', 'and', 'spain', 'the', 'other', 'films', 'in', 'the', 'trilogy', 'are', 'les', 'princes', '1983', 'and', 'ga

## Text cleansing

In [21]:
import nltk
from nltk.corpus import stopwords

# Remove the stop words
nltk.download('stopwords')
stopwords_bc = spark.sparkContext.broadcast(set(stopwords.words('english')))

[nltk_data] Downloading package stopwords to /Users/hujol/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [22]:
def row_text_cleaner(words):
    words_clean = []
    for a_w in words:
        if not a_w in stopwords_bc.value:
            words_clean.append(a_w)
    
    # Return the cleaned words.
    return words_clean

test_words = ['it', 'is', 'great', 'you', 'have', 'been', 'kitesurfing', 'that', 'long']
row_text_cleaner(test_words)

['great', 'kitesurfing', 'long']

In [23]:
from pyspark.sql import Row 

def f(x):
    data = x.asDict()
    data['wordsclean'] = row_text_cleaner(x.words)
    
    # The purpose of ** is to give the ability to feed a function's arguments 
    # by providing a dictionary (e.g. f(**{'x' : 1, 'y' : 2}) ).
    return Row(**data)

# NOTE:
# There is a need to store the result into df_pqt2 otherwise the
# added words_clean added column does not show well if we store it in the same df_pqt when running:
# df_pqt.select('words_clean').show()
rdd_tmp = df_pqt.rdd.map(f)
df_pqt = rdd_tmp.toDF()

df_pqt.select('wordsclean').show()

+--------------------+
|          wordsclean|
+--------------------+
|[latcho, drom, sa...|
|[another, pleasan...|
|[admit, awe, sea,...|
|[film, shows, ser...|
|[co, scripted, wi...|
|[okay, let, start...|
|[serious, film, p...|
|[film, chock, ful...|
|[ok, following, r...|
|[rajkumar, santos...|
|[think, pauly, sh...|
|[, north, south, ...|
|[, whoops, looks,...|
|[went, film, thin...|
|[anyone, spent, t...|
|[, last, time, la...|
|[british, documen...|
|[upon, time, trom...|
|[really, liked, m...|
|[beguiled, one, e...|
+--------------------+
only showing top 20 rows



In [24]:
# Remove intermediairy data not needed anymore.
df_pqt.printSchema()
# df_pqt = df_pqt.withColumnRenamed('words_clean', 'words')
df_pqt = df_pqt.drop('textclean')
df_pqt.printSchema()

root
 |-- datasettype: string (nullable = true)
 |-- datetimecreated: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- reviewid: long (nullable = true)
 |-- reviewpolarity: long (nullable = true)
 |-- reviewrating: long (nullable = true)
 |-- text: string (nullable = true)
 |-- textclean: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- wordsclean: array (nullable = true)
 |    |-- element: string (containsNull = true)

root
 |-- datasettype: string (nullable = true)
 |-- datetimecreated: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- reviewid: long (nullable = true)
 |-- reviewpolarity: long (nullable = true)
 |-- reviewrating: long (nullable = true)
 |-- text: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- wordsclean: array (nullable = true)
 |    |-- element: string (containsNull = true)



# Computation of the TF-IDF

## HashingTF testing on small vocabulary

In [25]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0, "Hi I heard about Spark and I love SPark with Java, I read a lot about java and spark, sparky!"),
    (0, "I wish Java could use case classes"),
    (1, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

# This uses the hash and the modulo numFeatures to define a bucket where to put a word.
# It is efficient as it does not store the vocabulary.
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=5096)
featurizedData = hashingTF.transform(wordsData)

featurizedData.select('words', 'rawFeatures').show(truncate=False)

+------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|words                                                                                                             |rawFeatures                                                                                                                                 |
+------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|[hi, i, heard, about, spark, and, i, love, spark, with, java,, i, read, a, lot, about, java, and, spark,, sparky!]|(5096,[56,338,565,677,1568,1722,2321,2455,2799,2905,3001,3202,4672,4673,4959],[1.0,1.0,1.0,2.0,1.0,1.0,3.0,1.0

## CountVectorizer test on small data frame

In [26]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="words", outputCol="rawfeatures", vocabSize=70, minDF=1.0)
model = cv.fit(featurizedData)
result = model.transform(featurizedData)

In [27]:
result.select('rawfeatures', 'words').head()

Row(rawfeatures=SparseVector(25, {0: 3.0, 1: 1.0, 2: 2.0, 3: 2.0, 4: 2.0, 5: 1.0, 7: 1.0, 10: 1.0, 12: 1.0, 13: 1.0, 14: 1.0, 17: 1.0, 19: 1.0, 21: 1.0, 24: 1.0}), words=['hi', 'i', 'heard', 'about', 'spark', 'and', 'i', 'love', 'spark', 'with', 'java,', 'i', 'read', 'a', 'lot', 'about', 'java', 'and', 'spark,', 'sparky!'])

# TF-IDF on reviews

In [28]:
from pyspark.ml.feature import CountVectorizer

df_pqt = df_pqt.drop('featurestf')
cv = CountVectorizer(inputCol="wordsclean", outputCol="featurestf", vocabSize=30000, minDF=1.0)
model_cv = cv.fit(df_pqt)
df_pqt = model_cv.transform(df_pqt)

In [29]:
print("Number of unique words in the corpus: %s" % len(model_cv.vocabulary))
print("Excerpt of the vocabulary\n" + str(model_cv.vocabulary[1:100]))

# result.select('features').rdd.map(lambda x: print(x)).take(1)

Number of unique words in the corpus: 4680
Excerpt of the vocabulary
['film', 'one', 'like', 'see', 'time', 'really', 'good', 'would', 'people', 'even', 'great', 'watch', 'bad', 'movies', 'get', 'well', 'think', 'seen', 'films', 'first', 'story', 'could', 'much', 'also', 'made', 'ever', 'plot', 'two', 'acting', 'show', 'many', 'make', 'never', 'know', 'way', 'man', 'love', 'characters', 'say', 'back', 'original', 'something', 'want', 'still', 'things', 'going', 'little', 'watching', 'best', 'dvd', 'better', 'take', 'right', 'character', 'life', 'real', 'comedy', 'funny', 'horror', 'director', 'actors', '10', 'thought', 'cast', 'work', 'nothing', 'look', 'another', 'long', 'enough', 'every', '2', 'anyone', 'effects', 'end', 'go', 'watched', 'music', 'though', 'sound', 'book', 'far', 'lot', 'thing', 'short', 'day', 'may', 'role', 'job', 'minutes', 'different', 'classic', 'world', 'point', 'fan', 'read', 'worth', 'saw', 'put']


In [30]:
df_pqt.take(1)[0].featurestf

SparseVector(4680, {1: 3.0, 9: 2.0, 10: 1.0, 19: 1.0, 21: 1.0, 55: 1.0, 79: 1.0, 91: 2.0, 100: 1.0, 130: 1.0, 141: 2.0, 149: 1.0, 160: 1.0, 192: 1.0, 360: 1.0, 386: 1.0, 407: 1.0, 785: 1.0, 787: 1.0, 810: 1.0, 837: 2.0, 842: 1.0, 945: 1.0, 948: 1.0, 968: 1.0, 998: 1.0, 1068: 1.0, 1162: 1.0, 1495: 1.0, 1499: 2.0, 1540: 2.0, 1593: 2.0, 1705: 1.0, 1760: 1.0, 1870: 1.0, 1911: 1.0, 1917: 1.0, 2239: 1.0, 2290: 1.0, 2429: 1.0, 2759: 1.0, 2904: 1.0, 2984: 1.0, 2998: 1.0, 3190: 1.0, 3222: 1.0, 3537: 1.0, 3714: 1.0, 3868: 1.0, 4105: 1.0, 4156: 1.0, 4172: 1.0, 4193: 1.0, 4206: 1.0, 4271: 1.0, 4290: 1.0, 4656: 1.0, 4671: 1.0})

## IDF

In [31]:
from pyspark.ml.feature import IDF

# Drop the column first.
df_pqt = df_pqt.drop('featuresidf')

# IDF uses a term frequency vector:
# http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html?highlight=tfidf#pyspark.mllib.feature.IDF
idf = IDF(inputCol="featurestf", outputCol="featuresidf")
idfModel = idf.fit(df_pqt)
df_pqt = idfModel.transform(df_pqt)

df_pqt.persist()

DataFrame[datasettype: string, datetimecreated: string, filename: string, reviewid: bigint, reviewpolarity: bigint, reviewrating: bigint, text: string, words: array<string>, wordsclean: array<string>, featurestf: vector, featuresidf: vector]

In [32]:
df_pqt.select('reviewpolarity', "reviewrating", "featuresidf", 'text').take(1)

[Row(reviewpolarity=1, reviewrating=7, featuresidf=SparseVector(4680, {1: 2.3767, 9: 3.286, 10: 1.4446, 19: 1.8601, 21: 1.719, 55: 2.2655, 79: 2.3609, 91: 5.168, 100: 2.584, 130: 2.7175, 141: 5.7434, 149: 2.7175, 160: 2.8717, 192: 2.8717, 360: 3.4107, 386: 3.4107, 407: 3.4107, 785: 3.7471, 787: 3.9703, 810: 4.258, 837: 8.5159, 842: 3.9703, 945: 3.9703, 948: 3.9703, 968: 4.258, 998: 3.9703, 1068: 4.258, 1162: 4.258, 1495: 4.258, 1499: 9.3269, 1540: 9.3269, 1593: 9.3269, 1705: 4.258, 1760: 4.258, 1870: 4.6634, 1911: 4.6634, 1917: 4.6634, 2239: 4.6634, 2290: 4.6634, 2429: 4.6634, 2759: 4.6634, 2904: 4.6634, 2984: 4.6634, 2998: 4.6634, 3190: 4.6634, 3222: 4.6634, 3537: 4.6634, 3714: 4.6634, 3868: 4.6634, 4105: 4.6634, 4156: 4.6634, 4172: 4.6634, 4193: 4.6634, 4206: 4.6634, 4271: 4.6634, 4290: 4.6634, 4656: 4.6634, 4671: 4.6634}), text="Latcho Drom, or Safe Journey, is the second film in Tony Gatlif's trilogy of the Romany people. The film is a visual depiction and historical record of Roma

# Training a logistic regression for Sentiment Analysis

### Logistic regression model using N-fold stratified cross-validation

In [None]:
import nltk
from nltk.corpus import stopwords

# Remove the stop words
nltk.download('stopwords')
stopwords_set = list(set(stopwords.words('english')))

stopwords_set[1:20]
# stopwords_bc = spark.sparkContext.broadcast(set(stopwords.words('english')))

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover

# Create a test df.
df0 = transform_html_clean(df_pqt, 'textclean')

tokenizer = Tokenizer(inputCol="textclean", outputCol="words_tknz")
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="words_test", stopWords=stopwords_set)
pipeline = Pipeline(stages=[tokenizer, remover])

len(pipeline.fit(df0).transform(df0).head().words_test)

In [None]:
# Split the df into train and test
df_training, df_test = df_pqt.randomSplit([0.9, 0.1], seed=12345)

df_training.count(), df_test.count()

In [None]:
df_training.groupBy('reviewpolarity').count().show()
df_test.groupBy('reviewpolarity').count().show()

In [None]:
df_training = df_training.drop('words')
df_training = df_training.drop('featurestf')

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator , RegressionEvaluator
from pyspark.ml.feature import StopWordsRemover, HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Based on Spark doc
# https://spark.apache.org/docs/latest/ml-tuning.html#cross-validation

# Define the stages.
tokenizer = Tokenizer(inputCol="textclean", outputCol="words_tknz")
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="words", stopWords=stopwords_set)

# The idea is to create a features vector from a list of words.

# 1) Use this hashing Term Frequency.
hashingTF = HashingTF(inputCol=remover.getOutputCol(), outputCol="features")

# Or 2) use the Term Frequency - Inverse Document Frequency.
cv = CountVectorizer(inputCol=remover.getOutputCol(), outputCol="featurestf", vocabSize=30000, minDF=1.0)
idf = IDF(inputCol=cv.getOutputCol(), outputCol="features")

lr = LogisticRegression(maxIter=10)

# Create the pipeline.
pipeline = Pipeline(stages=[tokenizer, remover, cv, idf, lr])

# Define the criteria ranges.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [100, 50000, 200000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

# The evaluator of each models.
# evaluator = RegressionEvaluator(metricName="r2")
evaluator = BinaryClassificationEvaluator()

# Define the cross validation runner.
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
df_training_tmp = df_training.withColumnRenamed('reviewpolarity', 'label')
df_training_ppl = transform_html_clean(df_training_tmp, 'textclean')

# Train the model.
cvModel = crossval.fit(df_training_ppl)

In [None]:
cvModel.avgMetrics

In [None]:
model_best = cvModel.bestModel

In [None]:
df_training_pip = model_best.transform(df_training_ppl)
eval_val = evaluator.evaluate(df_training_pip)
print(evaluator.isLargerBetter())
print(eval_val)

In [None]:
df_training_pip.filter(df_training_pip.label == df_training_pip.prediction) \
    .select('label', 'probability', 'prediction', 'words').take(2)

## Evaluation of the cross validation model

In [None]:
df_test = df_test.drop('featurestf')
df_test = df_test.drop('words')

In [None]:
# Prepare data.
df_test_tmp = df_test.withColumnRenamed('reviewpolarity','label')
df_test_ppl = transform_html_clean(df_test_tmp, 'textclean')

# Make prediction.
df_test_res = model_best.transform(df_test_ppl)
df_test_res.select('probability', 'label','prediction', 'features', 'words').take(2)

In [None]:
print(evaluator.evaluate(df_test_res))
# df_test_res.filter(df_test_res.label == df_test_res.prediction) \
#     .select('label', 'probability', 'prediction', 'features', 'words').show()

# Confusion Matrix

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

rdd_training_pip = df_training_pip.select('prediction', 'label').rdd.map(lambda row: (row[0], float(row[1])))
rdd_training_pip.take(2)

# print(rdd_training_pip.toDF().toPandas().shape)

metrics = MulticlassMetrics(rdd_training_pip)
print(metrics.confusionMatrix().toArray())
print()
print(metrics.truePositiveRate(1.0))
print(metrics.falsePositiveRate(1.0))

# Receiver Operating Characteristics (ROC)

In [None]:
cvModel.bestModel.stages[-1]

In [None]:
from pyspark.ml.classification import LogisticRegressionSummary

# Get the Logistic regression model to get the summary.
summary = cvModel.bestModel.stages[-1].summary
summary.roc.show()

# Plot the ROC

In [None]:
import matplotlib.pyplot as plt

# As defined by IPython matplotlib kernel
# https://ipython.readthedocs.io/en/stable/interactive/plotting.html#id1
%matplotlib inline

aPlt = summary.roc.toPandas().plot(x='FPR', y='TPR', colormap='winter_r')
plt.plot([0.0, 1.0], [0.0, 1.0], linestyle='--', color='black')
plt.show()

# Stochastic Gradient Descent for online and out-of-core learning Using scikit-learn

In [None]:
# Use the df_csv loaded earlier.
print("%s entries from the CSV file" % df_csv.count())

In [None]:
# Define a generator to load the data from the file simulating a streaming.
ttl = 100000
file_csv = os.path.join(base_path, ("aclImdb_%s.csv" % ttl))

def stream_doc():
    with open(file_csv, 'r', encoding='utf-8') as csv:
        # skip header.
        next(csv)
        
        for line in csv:
            cells = line.split(',')
#             datasettype,filename,datetimecreated,reviewid,reviewpolarity,reviewrating,text = cells[0], \
#             cells[1], cells[2], cells[3], cells[4], cells[5], ",".join(cells[6:]).strip()

            filename,reviewpolarity,text = cells[1], cells[4], ",".join(cells[6:]).strip()

            yield filename,reviewpolarity,text

In [None]:
generator = stream_doc()
print(next(generator))
print(next(generator))

In [None]:
# This function returns a number of documents (id, text) and their label from the doc stream.
def get_mini_batch(doc_stream, size):
    docs, y = [], []
    try:
        for _ in range(size):
            filename,reviewpolarity,text = next(doc_stream)
            docs.append([filename, text])
            y.append(int(reviewpolarity))
    except StopIteration:
        return docs, y
    
    return docs, y

In [None]:
# Check the function we just wrote.
get_mini_batch(stream_doc(), 2)

# Example of SciKit Learn data set

In [None]:
from sklearn.datasets import load_boston
# from scipy.sparse.csr import csr_matrix

boston = load_boston()
print(type(boston.data[:]))

In [None]:
# Create a pipeline.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression 
from pyspark.ml.evaluation import BinaryClassificationEvaluator , RegressionEvaluator
from pyspark.ml.feature import StopWordsRemover, HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Define the stages.
tokenizer = Tokenizer(inputCol="textclean", outputCol="words_tknz")
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="words", stopWords=stopwords_set)

# The idea is to create a features vector from a list of words.

# 1) Use this hashing Term Frequency.
hashingTF = HashingTF(inputCol=remover.getOutputCol(), outputCol="features")

# Create the pipeline.
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF])

# The evaluator of each models.
# evaluator = RegressionEvaluator(metricName="r2")
evaluator = BinaryClassificationEvaluator()

# Stochastic Gradient Descent Using scikit-learn

In [None]:
from sklearn.linear_model import SGDClassifier
import numpy as np
from scipy.sparse.csr import csr_matrix

clf = SGDClassifier(loss="log", penalty="l2", max_iter=5, shuffle=True)

# Get the X and y labels.
def generate_X_y_labels(size):
    data_batch, labels_batch = get_mini_batch(data_stream, size)
    
    if not data_batch: return np.empty(), np.empty()
    
    df_batch = spark.createDataFrame(data_batch, ('id', 'text'))

    # Data cleansing.
    df_batch_clean = transform_html_clean(df_batch, 'textclean')
    df_training_tmp = df_batch_clean.withColumnRenamed('reviewpolarity', 'label')

    # Run the tokenizer and remover pipeline.
    m_pip = pipeline.fit(df_training_tmp)
    df_pip_batch = m_pip.transform(df_training_tmp)
    # Update the SGD regression weights.

    # Let's get the right shape for the SparseVector data into numpy arrays.
    series = df_pip_batch.toPandas()['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
    X = np.apply_along_axis(lambda x : x[0], 1, series)
    y_labels =  np.array(labels_batch)

    return X, y_labels

classes = np.array([0, 1])

# print(X[:])
# print(y_labels[1:10])

# Simulating a streaming
data_stream = stream_doc()

# Train the 45000 data from the entire data set.
for i in range(4):
    print("range %i" % i)
    X_train, y_labels_train = generate_X_y_labels(1000)
    if not len(X_train): break
        
    model_sgd = clf.partial_fit(X_train, y_labels_train, classes=classes)

# Test on the last 5000 entries.
X_test, y_labels_test = generate_X_y_labels(5000)

print(X_test)
if len(X_test):
    print("\nscore: %.3f" % model_sgd.score(X_test, y_labels_test))
else:
    print('No data')

# Train the model.