#  Brazil
## Goiás State Court of Accounts
### Department of Strategic Information

This document describes the data mining process for predicting whether a particular public spend is for advertising based on text mining.

Author: Mauricio Barros de Jesus on 04/19/2019
Paper: Using text mining to categorize the purpose of public spending for the benefit of transparency and accountability - ICMLA 2019.



# Load Libraries

In [2]:
from pyspark.ml.classification import  NaiveBayes, NaiveBayesModel,MultilayerPerceptronClassifier,MultilayerPerceptronClassifier,LogisticRegression,LinearSVC,RandomForestClassifier
from pyspark.ml.feature import HashingTF, Tokenizer, IDF, StopWordsRemover, CountVectorizer,StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator
from pyspark.sql import Row
from pyspark.sql.functions import col, lower, regexp_replace
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import BinaryClassificationMetrics,MulticlassMetrics


import numpy as np
import nltk
from nltk.stem import SnowballStemmer
pt_stemmer = SnowballStemmer('portuguese')

#Show graphics on notebook
%matplotlib inline


## Start Spark Session

In [3]:
# Spark Session
spSession = SparkSession.builder.master("local").appName("TCEGO").config("spark.some.config.option", "session").getOrCreate()


## Load Data set from Hadoop HDFS 

In [4]:
#Load Dataset 
dfEmpenhos = spSession.read.csv('hdfs://your_server:your_port/your_path/ds_icmla_spendingcategory.csv',header=True, sep=";")


In [4]:
dfEmpenhos.rdd.getNumPartitions()

20

In [5]:
dfEmpenhos = dfEmpenhos.repartition(32)

In [6]:
dfEmpenhos.rdd.getNumPartitions()

32

In [7]:
dfEmpenhos.columns

['key_nr_empenho', 'desc_empenho', 'e_natureza_publicidade', 'key_nat_desp']

In [8]:
#Filter Columns
dfEmpenhos = dfEmpenhos[["key_nr_empenho","desc_empenho","e_natureza_publicidade","key_nat_desp"]]


In [9]:
#Show instaces number
dfEmpenhos.count()


143487

In [10]:
# Show class
dfEmpenhos.select("e_natureza_publicidade").distinct().show()


+----------------------+
|e_natureza_publicidade|
+----------------------+
|                     3|
|                     0|
|                     1|
|                     2|
+----------------------+



In [11]:
# Count instances by classes
dfEmpenhos.groupBy("e_natureza_publicidade").count().show()


+----------------------+------+
|e_natureza_publicidade| count|
+----------------------+------+
|                     3|  2454|
|                     0|122858|
|                     1|  1168|
|                     2| 17007|
+----------------------+------+



## Pre-processing


In [12]:
def mudarAtributoClasse(lrow):
    
    e_natureza_publicidade = float(lrow["e_natureza_publicidade"])
    
    #change class number 3 to zero and current zero to 2 (unknown class)
    if e_natureza_publicidade == 0.0:
        natureza = 2.0
    elif (e_natureza_publicidade == 3.0):
        natureza = 0.0
    elif (e_natureza_publicidade == 1.0):
        natureza = 1.0
    else:
        natureza = 2.0
        
    linhas = Row(key_nr_empenho = lrow["key_nr_empenho"], desc_empenho = lrow["desc_empenho"], e_natureza_publicidade = float(natureza))
    return linhas


In [13]:
##Filtering Known Classes
empenhosDFDML_TMP = dfEmpenhos.rdd.map(mudarAtributoClasse).toDF()




In [14]:
# Count instances by classes
empenhosDFDML_TMP.groupBy("e_natureza_publicidade").count().show()


+----------------------+------+
|e_natureza_publicidade| count|
+----------------------+------+
|                   0.0|  2454|
|                   1.0|  1168|
|                   2.0|139865|
+----------------------+------+



In [15]:
# Select only labeled instances (0: No advertising; 1: Advertising)
empenhosDFML = empenhosDFDML_TMP.rdd.filter(lambda x: x["e_natureza_publicidade"] == 0 or x["e_natureza_publicidade"] == 1).toDF()


In [16]:
empenhosDFML.groupBy("e_natureza_publicidade").count().show()


+----------------------+-----+
|e_natureza_publicidade|count|
+----------------------+-----+
|                   0.0| 2454|
|                   1.0| 1168|
+----------------------+-----+



In [17]:
empenhosDFML.show(2)

+--------------------+----------------------+----------------+
|        desc_empenho|e_natureza_publicidade|  key_nr_empenho|
+--------------------+----------------------+----------------+
|empenho tratase d...|                   1.0|2017660501000076|
|item qtd un      ...|                   0.0|2017070100600821|
+--------------------+----------------------+----------------+
only showing top 2 rows



In [18]:
#Remove punctuation
empenhosFinal = empenhosDFML.select('key_nr_empenho','e_natureza_publicidade', (lower(regexp_replace('desc_empenho', "[?|º|°|`|¿|$|&|*|%|@|(|)|~|,|\.|:|\-|\'|/]", " ")).alias('desc_empenho')))
empenhosFinal.show(10)


+----------------+----------------------+--------------------+
|  key_nr_empenho|e_natureza_publicidade|        desc_empenho|
+----------------+----------------------+--------------------+
|2018045201300163|                   1.0|empenho  referent...|
|2017670101600122|                   1.0|valor destinado a...|
|2018010102800439|                   0.0|referente a auxil...|
|2018020100500184|                   0.0|quantia que se em...|
|2018045201300101|                   1.0|modalidade de lic...|
|2017670101600114|                   1.0|valor destinado a...|
|2017385400600064|                   0.0|empenhase para co...|
|2017290201100070|                   0.0|empenho referente...|
|2015290200500191|                   0.0|destinase ao paga...|
|2015290200500111|                   0.0|destinase ao paga...|
+----------------+----------------------+--------------------+
only showing top 10 rows



In [19]:
# Create Stop Words List
nltk.download('stopwords')
nltk.download('punkt')
stopword_list_nltk = nltk.corpus.stopwords.words('portuguese')

## Adding stopwords informed by the experts.
stopword_list_nltk.append("VLR")
stopword_list_nltk.append("UN")
stopword_list_nltk.append("TOT")
stopword_list_nltk.append("QTD")
stopword_list_nltk.append("QT")
stopword_list_nltk.append("VALOR")
stopword_list_nltk.append("NO")
stopword_list_nltk.append("PDF")
stopword_list_nltk.append("2010")
stopword_list_nltk.append("2011")
stopword_list_nltk.append("2012")
stopword_list_nltk.append("2013")
stopword_list_nltk.append("2015")
stopword_list_nltk.append("2016")
stopword_list_nltk.append("2017")
stopword_list_nltk.append("MES")
stopword_list_nltk.append("MÊS")
stopword_list_nltk.append("ATA")
stopword_list_nltk.append("0012013")
stopword_list_nltk.append("PPT")
stopword_list_nltk.append("TOTAL")
stopword_list_nltk.append("GOIÁS")
stopword_list_nltk.append("02")
stopword_list_nltk.append("FLS")
stopword_list_nltk.append("FL")
stopword_list_nltk.append("03")
stopword_list_nltk.append("01")
stopword_list_nltk.append("Nº")
stopword_list_nltk.append("DPS")
stopword_list_nltk.append("DDO")
stopword_list_nltk.append("RDS")
stopword_list_nltk.append("jjbn")
stopword_list_nltk.append("rdf")
stopword_list_nltk.append("ser")
stopword_list_nltk.append("meses")
stopword_list_nltk.append("tipo")
stopword_list_nltk.append("obs")
stopword_list_nltk.append("doze")
stopword_list_nltk.append("12")
stopword_list_nltk.append("além")
stopword_list_nltk.append("bem")
stopword_list_nltk.append("rjus")
stopword_list_nltk.append("25042017")
stopword_list_nltk.append("fazer")
stopword_list_nltk.append("face")

stopword_list = []
[stopword_list.append(i.lower())  for i in stopword_list_nltk]
stopword_list

[nltk_data] Downloading package stopwords to /home/pi/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /home/pi/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


['de',
 'a',
 'o',
 'que',
 'e',
 'é',
 'do',
 'da',
 'em',
 'um',
 'para',
 'com',
 'não',
 'uma',
 'os',
 'no',
 'se',
 'na',
 'por',
 'mais',
 'as',
 'dos',
 'como',
 'mas',
 'ao',
 'ele',
 'das',
 'à',
 'seu',
 'sua',
 'ou',
 'quando',
 'muito',
 'nos',
 'já',
 'eu',
 'também',
 'só',
 'pelo',
 'pela',
 'até',
 'isso',
 'ela',
 'entre',
 'depois',
 'sem',
 'mesmo',
 'aos',
 'seus',
 'quem',
 'nas',
 'me',
 'esse',
 'eles',
 'você',
 'essa',
 'num',
 'nem',
 'suas',
 'meu',
 'às',
 'minha',
 'numa',
 'pelos',
 'elas',
 'qual',
 'nós',
 'lhe',
 'deles',
 'essas',
 'esses',
 'pelas',
 'este',
 'dele',
 'tu',
 'te',
 'vocês',
 'vos',
 'lhes',
 'meus',
 'minhas',
 'teu',
 'tua',
 'teus',
 'tuas',
 'nosso',
 'nossa',
 'nossos',
 'nossas',
 'dela',
 'delas',
 'esta',
 'estes',
 'estas',
 'aquele',
 'aquela',
 'aqueles',
 'aquelas',
 'isto',
 'aquilo',
 'estou',
 'está',
 'estamos',
 'estão',
 'estive',
 'esteve',
 'estivemos',
 'estiveram',
 'estava',
 'estávamos',
 'estavam',
 'estivera'

In [20]:
# Pipeline Steps: Tokenization, Stopwords Removal, and TF-IDF
tokenizer = Tokenizer(inputCol = "desc_empenho", outputCol = "words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered",stopWords=stopword_list)
hashingTF = HashingTF(inputCol = remover.getOutputCol(), outputCol = "tempfeatures")
idf = IDF(inputCol = hashingTF.getOutputCol(), outputCol = "features", minDocFreq=5)
label_stringIdx = StringIndexer(inputCol = "e_natureza_publicidade", outputCol = "label")


In [21]:
# Pipeline Creation
pipeline = Pipeline(stages = [tokenizer,remover,hashingTF,idf,label_stringIdx])

pipelineFit = pipeline.fit(empenhosFinal)
dataset = pipelineFit.transform(empenhosFinal)

In [22]:
dataset.show(10)

+----------------+----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|  key_nr_empenho|e_natureza_publicidade|        desc_empenho|               words|            filtered|        tempfeatures|            features|label|
+----------------+----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|2017385400600064|                   0.0|empenhase para co...|[empenhase, para,...|[empenhase, cobri...|(262144,[6419,555...|(262144,[6419,555...|  0.0|
|2017290201100070|                   0.0|empenho referente...|[empenho, referen...|[empenho, referen...|(262144,[1251,528...|(262144,[1251,528...|  0.0|
|2016220100700522|                   0.0|valor que se empe...|[valor, que, se, ...|[empenha, empresa...|(262144,[5590,222...|(262144,[5590,222...|  0.0|
|2016220217800009|                   0.0|valor que se empe...|[valor, que, se, ...

In [23]:
# Holdout - Training and test data (70% training, 30% test)
(dados_treino, dados_teste) = dataset.randomSplit([0.7, 0.3])


### Imbalance classes

In [24]:
dados_teste.count()

1035

In [25]:
dataset.select("filtered").take(2)

[Row(filtered=['empenho', '', 'referente', '', '', 'contratação', '', 'empresa', 'especializada', 'prestação', 'deserviços', '', '', 'gerenciamento', 'abastecimento', 'veículos', '', 'mediante', 'emissãode', 'cartões', 'magnéticos', 'controle', 'consumo', '', 'frota', 'veículos', 'levese', '', 'pesados', '', '', 'tribunal', '', '', 'justiça', '', '', 'estado', '', '', 'período', 'de12', '', 'oriunda', 'edital', 'licitação', 'n', '', '0802015', '', '', '', 'modalidadepregão', '', 'eletrônico', '', '', '', 'menor', '', 'preço', '', 'conforme', 'despacho', 'n', '', '0029342015da', 'diretoria', 'geral', 'doc', '', '100', 'especificação', 'abaixo', '', '', 'item', '', '', '', '', '', 'und', '', '', 'especificação', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 'vl', '', 'unit', '', '', '', '', '', '', 'vl', '', 'mensal03', '', '', '30', '000', '', 'litro', '', 'etanol', 'comum', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '2', '32', 

## Modeling

### Cross-Validator  (CV)

In [26]:
#Generic Function for CrossValidator
def executarCV(p_df,p_estimator,p_evaluator,p_grid,nr_folds):
    #Define CV
    crossval = CrossValidator(estimator=p_estimator,
                              evaluator=p_evaluator,
                              estimatorParamMaps=p_grid,
                              numFolds=nr_folds)
    # Executing CV
    cvModel = crossval.fit(p_df)
    
    return cvModel


In [27]:
##Show data from CV
def mostrarCV(cvModel):
    # Apresentar resultado de forma amigavel
    params = [{p.name: v for p, v in m.items()} for m in cvModel.getEstimatorParamMaps()]
    
    for ps, metric in zip(params, cvModel.avgMetrics):
        print(cvModel.getEvaluator().getMetricName(),ps, metric)


In [28]:

#Creating Naive Bayes (NB) model
nbClassifier = NaiveBayes()
nbClassifier.setFeaturesCol("features")
nbClassifier.setLabelCol("label")

pipelineNB = Pipeline(stages = [nbClassifier])
paramGridNB = ParamGridBuilder().addGrid(nbClassifier.smoothing, [0,0.5,1]).build()
evaluatorNB = BinaryClassificationEvaluator(rawPredictionCol = "prediction", labelCol = "label", metricName = "areaUnderROC")

#Executing NB
cvModelNB = executarCV(dados_treino,pipelineNB,evaluatorNB,paramGridNB,5)


In [29]:
# Show Results
mostrarCV(cvModelNB)


areaUnderROC {'smoothing': 0.0} 0.5771869167718134
areaUnderROC {'smoothing': 0.5} 0.959479710309446
areaUnderROC {'smoothing': 1.0} 0.9555941248084148


In [30]:
#Creating LogisticRegression
classificadorRL = LogisticRegression(maxIter=10)
classificadorRL.setFeaturesCol("features")
classificadorRL.setLabelCol("label")

pipelineRL = Pipeline(stages = [classificadorRL])
paramGridRL = ParamGridBuilder().addGrid(classificadorRL.regParam, [0.1,0.01]).addGrid(classificadorRL.elasticNetParam, [0,0.5]).build()
evaluatorRL = BinaryClassificationEvaluator(rawPredictionCol = "prediction", labelCol = "label", metricName = "areaUnderROC")

#Executing LogisticRegression in CV 
cvModelRL = executarCV(dados_treino,pipelineRL,evaluatorRL,paramGridRL,5)


In [31]:
# Show Results
mostrarCV(cvModelRL)


areaUnderROC {'regParam': 0.1, 'elasticNetParam': 0.0} 0.9775708696573728
areaUnderROC {'regParam': 0.1, 'elasticNetParam': 0.5} 0.899364150848875
areaUnderROC {'regParam': 0.01, 'elasticNetParam': 0.0} 0.9819990047230942
areaUnderROC {'regParam': 0.01, 'elasticNetParam': 0.5} 0.974763682418981


In [32]:
#Criando modelo com LinearSVC
svcClassifier = LinearSVC()
svcClassifier.setFeaturesCol("features")
svcClassifier.setLabelCol("label")

pipelineSVC = Pipeline(stages = [svcClassifier])
paramGridSVC = ParamGridBuilder().addGrid(svcClassifier.regParam, [1.0, 0.1]).addGrid(svcClassifier.maxIter, [5,10,15]).build()
evaluatorSVC = BinaryClassificationEvaluator(rawPredictionCol = "prediction", labelCol = "label", metricName = "areaUnderROC")

#Executing LinearSVC in CV 
cvModelSVC = executarCV(dados_treino,pipelineSVC,evaluatorSVC,paramGridSVC,5)
mostrarCV(cvModelSVC)


areaUnderROC {'regParam': 1.0, 'maxIter': 5} 0.9709553124293372
areaUnderROC {'regParam': 1.0, 'maxIter': 10} 0.9775371920651394
areaUnderROC {'regParam': 1.0, 'maxIter': 15} 0.9776166170284054
areaUnderROC {'regParam': 0.1, 'maxIter': 5} 0.9731876139291639
areaUnderROC {'regParam': 0.1, 'maxIter': 10} 0.9810587933953888
areaUnderROC {'regParam': 0.1, 'maxIter': 15} 0.9817870889300647


### Test step

In [33]:
def mostarResultadoTeste(previsoes):
    resultados = previsoes.select(['prediction', 'label'])
    
    ## prepare score-label set
    resultados_list = [list(i) for i in resultados.collect()]
    predictionAndLabels = sc.parallelize(resultados_list)
    
    metricsML = MulticlassMetrics(predictionAndLabels)
    metricsBI = BinaryClassificationMetrics(predictionAndLabels)
    
    print("Precision: ", metricsML.precision(1.0) , metricsML.precision(0.0))
    print("Recall: ", metricsML.recall(1.0) , metricsML.recall(0.0))
    print("fMeasure: ", metricsML.fMeasure(1.0) , metricsML.fMeasure(0.0))
    print("areaUnderROC: ", metricsBI.areaUnderROC )

In [34]:
# Teste NB
previsoesNB = cvModelNB.bestModel.transform(dados_teste)
mostarResultadoTeste(previsoesNB)

Precision:  0.9935275080906149 0.9669421487603306
Recall:  0.9274924471299094 0.9971590909090909
fMeasure:  0.959375 0.9818181818181819
areaUnderROC:  0.9623257690195003


In [35]:
# Teste RL
previsoesRL = cvModelRL.bestModel.transform(dados_teste)
mostarResultadoTeste(previsoesRL)

Precision:  0.996875 0.9832167832167832
Recall:  0.9637462235649547 0.9985795454545454
fMeasure:  0.9800307219662058 0.9908386187455955
areaUnderROC:  0.98116288450975


In [36]:
# Tests whit SVC
previsoesSVC = cvModelSVC.bestModel.transform(dados_teste)
mostarResultadoTeste(previsoesSVC)



Precision:  0.9968847352024922 0.9845938375350141
Recall:  0.9667673716012085 0.9985795454545454
fMeasure:  0.9815950920245399 0.9915373765867419
areaUnderROC:  0.9826734585278769


In [37]:
cvModelSVC.params

[Param(parent='CrossValidatorModel_d01b3939eab6', name='estimator', doc='estimator to be cross-validated'),
 Param(parent='CrossValidatorModel_d01b3939eab6', name='estimatorParamMaps', doc='estimator param maps'),
 Param(parent='CrossValidatorModel_d01b3939eab6', name='evaluator', doc='evaluator used to select hyper-parameters that maximize the validator metric'),
 Param(parent='CrossValidatorModel_d01b3939eab6', name='seed', doc='random seed.')]

In [38]:
teste = cvModelSVC.bestModel.stages[-1]

In [39]:
teste.extractParamMap()


{Param(parent='LinearSVC_ed658f57cfca', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LinearSVC_ed658f57cfca', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LinearSVC_ed658f57cfca', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LinearSVC_ed658f57cfca', name='labelCol', doc='label column name'): 'label',
 Param(parent='LinearSVC_ed658f57cfca', name='maxIter', doc='maximum number of iterations (>= 0)'): 15,
 Param(parent='LinearSVC_ed658f57cfca', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LinearSVC_ed658f57cfca', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LinearSVC_ed658f57cfca', name='regParam', doc='regularization parameter (>= 0)'): 0.1,
 Param(parent='LinearSVC_ed658f57cfca', name='standardization', doc='whether to standardize the training features before fitting the

# Conclusion

## The best model is LinearSVC 