In [1]:
from datetime import datetime
# Modules used for PySpark solution
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer, StopWordsRemover, StringIndexer
from pyspark.ml import Pipeline as PySparkPipeline
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.clustering import KMeans as PySparkKMeans
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Modules used for non distributed solution
import collections

import spacy
from spacy.lang.pt.stop_words import STOP_WORDS
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import accuracy_score, classification_report
from sklearn.cluster import KMeans
from fastparquet import write 
import pandas as pd

def create_session():
    ''' Function used to instantiate a pySpark Session with 
    the specific configurations'''
    sc_conf = SparkConf()
    sc_conf.setAppName('SparkPreProcessing')
    sc_conf.setMaster('local')
    sc_conf.set('spark.executor.memory', '6g')
    sc_conf.set('spark.executor.cores', '8')
    sc_conf.set('spark.logConf', True)
    print(sc_conf.getAll())
    sc = SparkContext.getOrCreate(conf=sc_conf)
    ss = SparkSession(sc)
    return ss

start_time = datetime.now()

# 1. PySpark

## 1.1 Loading Files and Creating Session

In [2]:
%%time
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
RELATIVE_FOLDER_PATH = "../assets/data/"
filename = "data"
pyspark_session = create_session()

ailab_df = pyspark_session.read.parquet(RELATIVE_FOLDER_PATH +"/data.parquet.gzip")

[('spark.executor.memory', '6g'), ('spark.master', 'local'), ('spark.logConf', 'True'), ('spark.submit.deployMode', 'client'), ('spark.executor.cores', '8'), ('spark.ui.showConsoleProgress', 'true'), ('spark.app.name', 'SparkPreProcessing')]
CPU times: user 20.3 ms, sys: 7.23 ms, total: 27.5 ms
Wall time: 5.18 s


## 1.2 Preprocessing and Vectorizing

In [3]:
%%time
ailab_df.cache().count()

tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
remover = StopWordsRemover(inputCol="tokens", outputCol="stopWordsRemovedTokens")
hashingTF = HashingTF(inputCol="stopWordsRemovedTokens", outputCol="rawFeatures", numFeatures=2000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
label_stringIdx = StringIndexer(inputCol = "process_class", outputCol = "label")

pre_processing_pipeline = PySparkPipeline(stages=[tokenizer, remover, hashingTF, idf, label_stringIdx])

pre_processing_pipeline_model = pre_processing_pipeline.fit(ailab_df)

treated_df = pre_processing_pipeline_model.transform(ailab_df)

CPU times: user 75.9 ms, sys: 32 ms, total: 108 ms
Wall time: 12.6 s


## 1.3 Clustering

In [4]:
%%time
kmeans = PySparkKMeans(k=20)
kmeans_trained_model = kmeans.fit(treated_df)
kmeans_result_df = kmeans_trained_model.transform(treated_df)

CPU times: user 16.5 ms, sys: 1.83 ms, total: 18.4 ms
Wall time: 11.6 s


### 1.4 Classifying

In [5]:
%%time
(trainingData, testData) = treated_df.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions_df = lrModel.transform(testData)

CPU times: user 26 ms, sys: 0 ns, total: 26 ms
Wall time: 14 s


### 1.5 Model Evaluation

In [6]:
%%time
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions_df)

CPU times: user 10.4 ms, sys: 0 ns, total: 10.4 ms
Wall time: 36.4 s


0.7459806286135295

### 1.6 Storing Results

In [7]:
%%time
predictions_df.write.mode("overwrite").format("parquet").option("compression", "gzip").mode("overwrite").save(RELATIVE_FOLDER_PATH +"pyspark_result.parquet")
kmeans_result_df.write.mode("overwrite").format("parquet").option("compression", "gzip").mode("overwrite").save(RELATIVE_FOLDER_PATH +"pyspark_cluster_result.parquet")
pyspark_session.stop()

CPU times: user 4.66 ms, sys: 4.14 ms, total: 8.8 ms
Wall time: 36.4 s


In [8]:
pyspark_time = datetime.now() - start_time
print("PySpark solution took:", pyspark_time)

PySpark solution took: 0:01:50.810402


In [9]:
pyspark_time = datetime.now() - start_time
print("PySpark solution took:", pyspark_time)
start_time = datetime.now()

PySpark solution took: 0:01:50.975982


# 2. Spacy e Sci-kit learn

## 2.1 Loading files and Models

In [10]:
%%time 

VECTOR_MODEL_NAME = "pt_core_news_sm"
NLP_SPACY = spacy.load(VECTOR_MODEL_NAME)
filename = "data"
stopwords_set = set(STOP_WORDS)

parquet_filename = RELATIVE_FOLDER_PATH + filename + ".parquet.gzip"
ailab_df = pd.read_parquet(parquet_filename)
print(ailab_df.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2036 entries, 0 to 2035
Data columns (total 7 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   process_class  2036 non-null   object
 1   process_id     2036 non-null   object
 2   doc_id         2036 non-null   object
 3   path_img       2036 non-null   object
 4   text           2036 non-null   object
 5   doc_type       2036 non-null   object
 6   num_pag        2036 non-null   int64 
dtypes: int64(1), object(6)
memory usage: 111.5+ KB
None
CPU times: user 4.46 s, sys: 334 ms, total: 4.79 s
Wall time: 4.79 s


## 2.2 Preprocessing and Vectorizing

In [11]:
%%time
tokenizer = NLP_SPACY.Defaults.create_tokenizer(NLP_SPACY)
raw_text = ailab_df['text'].to_list()

tokenized_text = []
for row in raw_text:
    doc = tokenizer(row)
    preprocessed_doc = [token for token in doc if not token.norm_ in stopwords_set]
    tokenized_text.append(" ".join([word.text for word in preprocessed_doc]))

count_vectorizer = CountVectorizer()
tfidf_transformer = TfidfTransformer()

''' Encapsuling components in pipeline '''
pipeline = Pipeline([
    ('count_vectorizer', count_vectorizer),
    ('tfidf_transformer', tfidf_transformer)
])

vectorized_docs = pipeline.fit_transform(tokenized_text)

CPU times: user 49.6 s, sys: 106 ms, total: 49.7 s
Wall time: 49.7 s


## 2.3 Clustering


In [12]:
%%time
kmeans = KMeans(20)
kmeans.fit(vectorized_docs)
clustering = collections.defaultdict(list)
kmeans_df = ailab_df.copy()
kmeans_df['cluster_label'] = [label for label in kmeans.labels_]

CPU times: user 4min 25s, sys: 43.2 s, total: 5min 8s
Wall time: 3min 54s


## 2.4 Classyfing

In [13]:
%%time
targets_labels = ailab_df['process_class'].to_list()
''' Let's evaluate more deeply the best model '''
X_train, X_test, y_train, y_test = train_test_split(
     vectorized_docs,
    targets_labels,
    test_size=0.25, random_state=42)

clf = SGDClassifier()

train1 = X_train
labelsTrain1 = y_train
test1 = X_test
labelsTest1 = y_test
"""  train """
clf.fit(train1, labelsTrain1)
"""  test """
preds = clf.predict(test1)

CPU times: user 675 ms, sys: 464 ms, total: 1.14 s
Wall time: 317 ms


### 2.5 Model Evaluation

In [14]:
%%time
print("accuracy:", accuracy_score(labelsTest1, preds))
print(
    classification_report(
        labelsTest1,
        preds,
        target_names=ailab_df['process_class'].unique()))

accuracy: 0.8408644400785854
              precision    recall  f1-score   support

          RE       0.00      0.00      0.00        10
         ARE       0.84      0.87      0.86       274
          AI       0.84      0.84      0.84       225

    accuracy                           0.84       509
   macro avg       0.56      0.57      0.57       509
weighted avg       0.82      0.84      0.83       509

CPU times: user 9.83 ms, sys: 0 ns, total: 9.83 ms
Wall time: 8.94 ms


  _warn_prf(average, modifier, msg_start, len(result))


## 2.6 Storing Results

In [15]:
%%time
ailab_df['path_img'] = [ str(doc) for doc in ailab_df['path_img']]
kmeans_df['path_img'] = [ str(doc) for doc in kmeans_df['path_img']]

write(RELATIVE_FOLDER_PATH +"result.parquet", ailab_df, compression='gzip')
write(RELATIVE_FOLDER_PATH +"cluster_result.parquet", kmeans_df, compression='gzip')

CPU times: user 3.71 s, sys: 60.4 ms, total: 3.77 s
Wall time: 3.8 s


In [16]:
undistributed_time = datetime.now() - start_time
print("Undistributed solution took:", undistributed_time)

Undistributed solution took: 0:04:53.636143
