# Imports

In [1]:
import inspect
import pathlib

In [2]:
from reader import HTMLPickledCorpusReader
from transformer import TextNormalizer

# Constants

In [3]:
PROJECT_DIR = pathlib.Path('~/project').expanduser()
DATA_DIR = PROJECT_DIR / 'data'

In [4]:
SAMPLE_ROOT = DATA_DIR / 'sample'
HOBBIES_ROOT = DATA_DIR / 'hobbies'

# Classes

## HTMLPickledCorpusReader

In [5]:
corpus = HTMLPickledCorpusReader(SAMPLE_ROOT.as_posix())
print(corpus.describes())

HTML corpus contains 2,538 files in 12 categories.
Structured as:
    43,922 paragraphs (17.306 mean paragraphs per file)
    74,899 sentences (1.705 mean sentences per paragraph).
Word count of 1,624,862 with a vocabulary of 58,748 (27.658 lexical diversity).
Corpus scan took 1.508 seconds.


## TextNormalizer

In [6]:
corpus = HTMLPickledCorpusReader(SAMPLE_ROOT.as_posix())
normalizer = TextNormalizer()

In [7]:
len(list(normalizer.fit_transform(corpus.docs())))

2538

# Python Multiprocessing

## Running Tasks in Parallel

In [8]:
from mp_train import sequential, parallel

In [9]:
print("beginning sequential tasks")
_, delta = sequential(SAMPLE_ROOT.as_posix())
print("total sequential fit time: {:0.2f} seconds".format(delta))

beginning sequential tasks


MainProcess 2024-03-03 18:54:51 naive bayes training took 31.92 seconds with an average score of 0.459
MainProcess 2024-03-03 18:56:21 logistic regression training took 90.45 seconds with an average score of 0.570
MainProcess 2024-03-03 18:57:24 multilayer perceptron training took 63.13 seconds with an average score of 0.556


total sequential fit time: 185.51 seconds


In [10]:
print("beginning parallel tasks")
_, delta = parallel(SAMPLE_ROOT.as_posix())
print("total parallel fit time: {:0.2f} seconds".format(delta))

beginning parallel tasks


fit_naive_bayes 2024-03-03 18:58:12 naive bayes training took 48.19 seconds with an average score of 0.459
fit_multilayer_perceptron 2024-03-03 18:58:53 multilayer perceptron training took 88.74 seconds with an average score of 0.572
fit_logistic_regression 2024-03-03 18:59:17 logistic regression training took 112.90 seconds with an average score of 0.570


total parallel fit time: 112.93 seconds


## Process Pools and Queues

In [11]:
from mcpi import mcpi_sequential, mcpi_parallel

In [12]:
N = 10000000

In [13]:
pi, delta = mcpi_sequential(N)
print("sequential pi: {} in {:0.2f} seconds".format(pi, delta))

sequential pi: 3.1421316 in 1.56 seconds


In [14]:
pi, delta = mcpi_parallel(N)
print("parallel pi: {} in {:0.2f} seconds".format(pi, delta))

parallel pi: 3.14214 in 0.34 seconds


## Parallel Corpus Preprocessing

In [15]:
from preprocessor import Preprocessor

In [16]:
print(inspect.getsource(Preprocessor))

class Preprocessor(object):
    """
    The preprocessor wraps a corpus object (usually a `HTMLCorpusReader`)
    and manages the stateful tokenization and part of speech tagging into a
    directory that is stored in a format that can be read by the
    `HTMLPickledCorpusReader`. This format is more compact and necessarily
    removes a variety of fields from the document that are stored in the JSON
    representation dumped from the Mongo database. This format however is more
    easily accessed for common parsing activity.
    """

    def __init__(self, corpus, target=None, **kwargs):
        """
        The corpus is the `HTMLCorpusReader` to preprocess and pickle.
        The target is the directory on disk to output the pickled corpus to.
        """
        self.corpus = corpus
        self.target = target

    @property
    def target(self):
        return self._target

    @target.setter
    def target(self, path):
        if path is not None:
            # Normalize the path

# Cluster Computing with Spark

## Anatomy of a Spark Job

In [8]:
from sc_template import confugure_spark

In [9]:
APP_NAME = 'My Spark Application'

In [10]:
sc, spark = confugure_spark(APP_NAME)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/24 21:30:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Distributing the Corpus

In [11]:
corpus = sc.wholeTextFiles((HOBBIES_ROOT / '*' / '*.txt').as_posix())

## RDD Operations

In [12]:
# !/opt/spark/bin/spark-submit sc_bigramcount.py

In [13]:
from sc_bigramcount import count_labels, count_bigrams

In [14]:
count_labels(corpus)

[Stage 0:>                                                          (0 + 2) / 2]

Label      Count
-------  -------
books         72
cinema       100
gaming       128
sports       118
cooking       30


                                                                                

In [15]:
count_bigrams(corpus)

                                                                                

unique bigrams: 138204


                                                                                

Row(_1=Row(_1='From', _2='to'), _2=1)


## NLP with Spark

### From Scikit-Learn to MLLib

### Feature extraction

In [16]:
# !/opt/spark/bin/spark-submit sc_vectorization.py

In [17]:
from sc_vectorization import load_corpus, make_vectorizer

In [18]:
corpus = load_corpus(
    sc, spark,
    path=(HOBBIES_ROOT / '*' / '*.txt').as_posix()
)

In [19]:
vectorizer = make_vectorizer()
vectorizer = vectorizer.fit(corpus)

                                                                                

In [20]:
vectors = vectorizer.transform(corpus)

In [21]:
vectors.show(5)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|                text|              tokens|     filtered_tokens|           frequency|               tfidf|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|books|\r\n\r\nFrom \n\n...|[, , , , from, , ...|[, , , , , , , , ...|(4096,[38,71,106,...|(4096,[38,71,106,...|
|books|The Lonely City b...|[the, lonely, cit...|[lonely, city bri...|(4096,[89,132,156...|(4096,[89,132,156...|
|books|\n\n\n\nRelated P...|[, , , , related,...|[, , , , related,...|(4096,[445,2545,3...|(4096,[445,2545,3...|
|books|The first story i...|[the, first, stor...|[first, story, sa...|(4096,[3,27,31,57...|(4096,[3,27,31,57...|
|books|by Sonny Liew\n\n...|[by, sonny, liew,...|[sonny, liew, , h...|(4096,[315,480,53...|(4096,[315,480,53...|
+-----+--------------------+--------------------+--------------------+--------------------+-----

### Text clustering with MLLib

In [22]:
# !/opt/spark/bin/spark-submit sc_clustering.py

In [23]:
from sc_clustering import load_corpus, make_clusterer, evaluate_clusterer

In [24]:
corpus = load_corpus(
    sc, spark,
    path=(HOBBIES_ROOT / '*' / '*.txt').as_posix()
)

In [25]:
clusterer = make_clusterer()
clusterer = clusterer.fit(corpus)

24/03/24 21:31:07 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

In [26]:
predictions = clusterer.transform(corpus)

In [27]:
evaluate_clusterer(clusterer, predictions)

  Cluster    Size  Terms
---------  ------  -------------------------------------------------------------
        0       4  piled game," dispassion, jimmy (wal) saoirse pitch,
        1      10  superwoman. relegation" grazer bookend arrest, gaimon,
        2      30  and 360 tendon. bookend --  toeing
        3     110  and toeing in banana-point that 360 for
        4      65  and who that in ex-wife banana-point toeing
        5      92  that who and water—as lionsgate) tow. banana-point
        6      82  who that lionsgate) society’s pseudonym water—as acceptance.
        7      34  society’s usefully, lionsgate) three-year trash. pseudonym he
        8       3  culminating, and 1880 lyndon -- until, pelicans,
        9      18  the and to a of in
Sum of square distance to center: 2.697
Silhouette with squared euclidean distance: 0.270


### Text classification with MLLib

In [28]:
# !/opt/spark/bin/spark-submit sc_classification.py

In [29]:
from sc_classification import load_corpus, make_classifier, evaluate_classifier

In [30]:
corpus = load_corpus(
    sc, spark,
    path=(HOBBIES_ROOT / '*' / '*.txt').as_posix()
)

In [31]:
train, test = corpus.randomSplit([0.8, 0.2])

In [32]:
classifier = make_classifier()
classifier = classifier.fit(train)

In [33]:
predictions = classifier.transform(test)

In [34]:
evaluate_classifier(classifier, predictions)

+----------+------------+--------------------+
|prediction|indexedLabel|               tfidf|
+----------+------------+--------------------+
|       0.0|         3.0|(4096,[54,126,172...|
|       0.0|         3.0|(4096,[445,2545,3...|
|       0.0|         3.0|(4096,[196,726,76...|
|       0.0|         3.0|(4096,[38,71,106,...|
|       0.0|         3.0|(4096,[32,55,64,1...|
+----------+------------+--------------------+
only showing top 5 rows

Test Accuracy = 0.284


### Local fit, global evaluation

In [35]:
# !/opt/spark/bin/spark-submit sc_sklearn_sample_model.py

In [36]:
from sklearn.ensemble import AdaBoostClassifier

In [37]:
from sc_sklearn_sample_model import (
    load_corpus,
    make_vectorizer,
    make_accuracy_closure
)

In [38]:
corpus = load_corpus(
    sc, spark,
    path=(HOBBIES_ROOT / '*' / '*.txt').as_posix()
)

In [39]:
vectorizer = make_vectorizer()
vectorizer = vectorizer.fit(corpus)

In [40]:
vectors = vectorizer.transform(corpus)

In [41]:
sample = (
    vectors
    .sample(withReplacement=False,
            fraction=0.1,
            seed=42)
    .collect()
)
X = [row['tfidf'] for row in sample]
y = [row['label'] for row in sample]

In [42]:
clf = AdaBoostClassifier()
clf.fit(X, y)

In [43]:
clf = sc.broadcast(clf)

In [44]:
correct = sc.accumulator(0)
incorrect = sc.accumulator(1)

In [45]:
accuracy = make_accuracy_closure(clf, incorrect, correct)

In [46]:
vectors.foreachPartition(accuracy)

                                                                                

In [47]:
accuracy = correct.value / (correct.value + incorrect.value)
print(f'Global accuracy of model was {accuracy:.3f}')

Global accuracy of model was 0.695
