<a href="https://colab.research.google.com/github/villantoine/categorisation_textes/blob/main/categorisation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#####################################################################
### ------ IMPORTS ET INSTALLATION DE LIBRAIRIES EXTERNES ------- ###
#####################################################################

In [None]:
# LIBRAIRIES EXTERNES 

!pip install mwparserfromhell #parser wikipedia
!pip install pdfplumber #outil extraction pdf
!python -m spacy download fr_core_news_sm #pipelines spacy, fr et en
!python -m spacy download en_core_web_sm
!pip install --user -U nltk #nltk

# IL FAUT EXECUTER CETTE CELLULE ET RESTART LE RUNTIME POUR POUVOIR EXECUTER LE RESTE !!!! (Runtime > Restart runtime)

In [None]:
# Création du dataset : exports et prétraitement

import os
import pdfplumber
from textblob import TextBlob
import nltk
nltk.download('stopwords')
import spacy
from spacy.lang.fr.stop_words import STOP_WORDS
import xml.sax
import subprocess
import re
import mwparserfromhell

# Entrainements, tests, affichage

%matplotlib inline

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import make_pipeline
from sklearn.metrics import confusion_matrix
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()
from sklearn.model_selection import train_test_split

La cellule suivante est la fonction de nettoyage appliquée à tous les textes

In [None]:
# Nettoyage (fonction commune)

def cleaning(text_input):

  tokenizer = nltk.tokenize.RegexpTokenizer("r'''\w'|[A-zÀ-ú]+|[^\w\s]")
  nltk_stopword = nltk.corpus.stopwords.words('french') + nltk.corpus.stopwords.words('english')
  global_stopwords = STOP_WORDS.copy()

  b = TextBlob(text_input)
  if (b.detect_language() == 'fr'):
    nlp = spacy.load('fr_core_news_sm')
  elif (b.detect_language() == 'en'):
    nlp = spacy.load('en_core_web_sm')
  else:
    print("ni francais ni anglais / nor english nor french")
    raise ValueError

  doc = nlp(text_input)
  data_lemm = " ".join([token.lemma_ for token in doc])

  data_tokenized = tokenizer.tokenize(data_lemm)
  final_data = [word.lower() for word in data_tokenized if not word.lower() in global_stopwords and len(word) > 2]

  return (" ".join([word for word in final_data]))

Extraction des articles Wikipédia depuis un fichier XML (l.45)

In [None]:
# EXTRACTION DATA WIKI

class WikiXmlHandler(xml.sax.handler.ContentHandler):
    """Content handler for Wiki XML data using SAX"""
    def __init__(self):
        xml.sax.handler.ContentHandler.__init__(self)
        self._buffer = None
        self._values = {}
        self._current_tag = None
        self._pages = []

    def characters(self, content):
        """Characters between opening and closing tags"""
        if self._current_tag:
            self._buffer.append(content)

    def startElement(self, name, attrs):
        """Opening tag of element"""
        if name in ('title', 'text'):
            self._current_tag = name
            self._buffer = []

    def endElement(self, name):
        """Closing tag of element"""
        if name == self._current_tag:
            self._values[name] = ' '.join(self._buffer)

        if name == 'page':
            self._pages.append((self._values['title'], self._values['text']))


def clean_wiki(dirty_entry):
  tmp = re.sub('==.*?==', '', dirty_entry)
  tmp = re.sub('< ref >.*?< /ref > ', '', tmp)
  tmp = re.sub('< gallery.*?< /gallery > ', '', tmp)
  result = re.sub('[[Image.*?]]', '', tmp)

  return result

  

handler = WikiXmlHandler()

parser = xml.sax.make_parser()
parser.setContentHandler(handler)

f = open("/content/drive/MyDrive/Stage - Caractérisation automatique de texte (D2IT)/Wikipédia-20210507095826.xml", "r") #chemin d'un fichier xml contenant l'export d'une catégorie d'articles
line = f.readline()
while(line):
      parser.feed(line)
      line = f.readline()      
f.close()

## Cleaning

texts = []

for entry in handler._pages:
  wiki = mwparserfromhell.parse(entry[1])
  entry_text_stripped = wiki.strip_code(normalize=True)
  entry_text_stripped = clean_wiki(entry_text_stripped) 
  texts.append(cleaning(entry_text_stripped))

Extraction des documents Stelia depuis des fichiers pdf (lignes 3 & 4) grâce à pdfplumber. Ils sont divisés en deux catégories selon leur contenu, très textuel ou non.

In [None]:
#EXTRACTION DOCS STELIA
    
chemin_docs_stelia = "/content/drive/MyDrive/Stage - Caractérisation automatique de texte (D2IT)/docs_stelia_ok/"
chemin_docs_stelia_parfaits = "/content/drive/MyDrive/Stage - Caractérisation automatique de texte (D2IT)/docs_stelia_parfaits/"

file_list = os.listdir(chemin_docs_stelia)
file_list_parfait = os.listdir(chemin_docs_stelia_parfaits)

pdf_data = []

for i in file_list:
  content = ""
  with pdfplumber.open(chemin_docs_stelia + i) as pdf:
    nb_pages = pdf.pages

    for p in nb_pages:
      content += " " + p.extract_text()

  pdf_data.append(cleaning(content))

for i in file_list_parfait:
  content = ""
  with pdfplumber.open(chemin_docs_stelia_parfaits + i) as pdf:
    nb_pages = pdf.pages

    for p in nb_pages:
      content += " " + str(p.extract_text())

  pdf_data.append(cleaning(content))

Extraction des documents HAL/Scholar depuis des fichiers PDF (ligne 3) grâce à pdfplumber 

In [None]:
# EXTRACTION HAL/SCHOLAR

chemin_docs_HAL = "/content/drive/MyDrive/Stage - Caractérisation automatique de texte (D2IT)/docs_HAL_aero_materiaux/"

file_list_HAL = os.listdir(chemin_docs_HAL)

HAL_data = []

for i in file_list_HAL:
  content = ""
  with pdfplumber.open(chemin_docs_HAL + i) as pdf:
    nb_pages = pdf.pages

    for p in nb_pages:
      content += " " + str(p.extract_text())

  HAL_data.append(cleaning(content))

Fonction d'affichage du résultat des entraînement sous forme de matrice avec en abscisse le label réel, en ordonnée le label prédit par le modèle.

In [None]:
# affichage (fonction commune)

def display(y_true,y_model):

  mat = confusion_matrix(y_true, y_model)
  sns.heatmap(mat.T, square=True, annot=True, fmt='d', cbar=False,
            xticklabels=["HAL","stelia","wiki"], yticklabels=["HAL","stelia","wiki"])
  plt.xlabel('true label')
  plt.ylabel('predicted label')

Les deux cellules suivantes finalisent la constitution du dataset. La première est spécifique à ce problème, la deuxième est plus globale.

In [None]:
# DATASET CREATION

X = texts + pdf_data + HAL_data
y = ['wiki'] * len(texts) + ['stelia'] * len(pdf_data) + ['HAL'] * len(HAL_data)

X_train,X_test,y_train,y_test = train_test_split(X, y, test_size=0.25)

In [None]:
# DATA SPLITTING

def data_splitting(X,y):
  return train_test_split(X, y, test_size=0.25)

Les cinq cellules suivantes sont l'implémentation des différents modèles avec à chaque fois : 

* Leur import  
* La création d'un pipeline avec Tfidf
* Leur entraînement
* Le test et l'affichage des résultats



In [None]:
# NAIVE BAYES

from sklearn.naive_bayes import MultinomialNB

model = make_pipeline(TfidfVectorizer(), MultinomialNB(0.1,False))
model.fit(X_train, y_train)

y_multinb = model.predict(X_test)

display(y_test,y_multinb)

In [None]:
# LOGISTIC REGRESSION

from sklearn.linear_model import LogisticRegression

Multiclass_model = make_pipeline(TfidfVectorizer(),LogisticRegression(multi_class='ovr'))
Multiclass_model.fit(X_train, y_train)

y_logreg = Multiclass_model.predict(X_test)

display(y_test,y_logreg)

In [None]:
# DECISION TREE

from sklearn.ensemble import RandomForestClassifier

model = make_pipeline(TfidfVectorizer(), RandomForestClassifier(n_estimators=1000,criterion="gini"))
model.fit(X_train, y_train)

y_tree = model.predict(X_test)

display(y_test,y_tree)

In [None]:
# KNN

from sklearn.neighbors import KNeighborsClassifier

model = make_pipeline(TfidfVectorizer(), KNeighborsClassifier(n_neighbors=18))
model.fit(X_train, y_train)

y_knn = model.predict(X_test)

display(y_test,y_knn)

In [None]:
# SVC

from sklearn.svm import SVC

model = make_pipeline(TfidfVectorizer(), SVC(C = 10, degree = 2, probability = True, tol = 1))
model.fit(X_train, y_train)

y_svc = model.predict(X_test)

display(y_test,y_svc)

Cellule dédiée au test des modèles. Ils sont tous les 5 entraînés 100 fois sur un échantillon de données différent et leur précision est affichée à la fin.

In [None]:
# TESTING ALL THE MODELS (execution time, accuracy)

from time import time

def train_model(input,X_train,y_train):
  model = make_pipeline(TfidfVectorizer(), input)
  model.fit(X_train, y_train)
  return model

model_labels = ["multinb","logreg","tree","knn","svc"]
models = [MultinomialNB(0.1,False),LogisticRegression(multi_class='ovr'),RandomForestClassifier(n_estimators=1000,criterion="gini"),KNeighborsClassifier(n_neighbors=18),SVC(C = 10, degree = 2, probability = True, tol = 1)]
acc_models = [0,0,0,0,0]
n = 100

for i in range(5):

  print("Evaluating " + model_labels[i] + "...")
  t1 = time()
  
  for k in range(n):
    X_train, X_test, y_train, y_test = data_splitting(X,y)
    trained_model = train_model(models[i],X_train,y_train)
    acc_models[i] += trained_model.score(X_test,y_test) 
  acc_models[i] /= n
  
  t2 = time()
  print("Temps écoulé : " + str(t2-t1))

print(acc_models)

In [None]:
######################################
### ------ PARAMETRISATION ------- ###
######################################

Les cinq cellules suivantes sont la paramétrisation des différents modèles avec à chaque fois :

* La définition d'une grille de paramètre propre au modèle
* La création du pipeline habituel en remplaçant le modèle par un GridSearchCV
* L'entraînement et l'affichage des résultats (la meilleure combinaison de paramètres)

In [None]:
# NAIVE BAYES - PARAMETRISATION

from sklearn.naive_bayes import MultinomialNB
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report

param_grid = {'alpha': [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.75, 1, 2],  
              'fit_prior': [True,False],
              }  
   
grid = make_pipeline(TfidfVectorizer(),GridSearchCV(MultinomialNB(), param_grid, refit = True, verbose = 3,n_jobs=-1)) 
   
# fitting the model for grid search 
grid.fit(X_train, y_train) 
 
# print best parameter after tuning 
print(grid[1].best_params_) 
grid_predictions = grid.predict(X_test) 
   
# print classification report 
print(classification_report(y_test, grid_predictions)) 


In [None]:
# LOGISTIC REGRESSION - PARAMETRISATION

from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report

param_grid = {'penalty': ["l1","l2","elasticnet","none"],  
              'tol': [1,1e-1,1e-2,1e-3,1e-4],
              'C': [0.01, 0.1, 1, 10, 100, 1000],
              'solver' : ["newton-cg","lbfgs","liblinear","sag","saga"]
              }  
   
grid = make_pipeline(TfidfVectorizer(),GridSearchCV(LogisticRegression(), param_grid, refit = True, verbose = 3,n_jobs=-1)) 
   
# fitting the model for grid search 
grid.fit(X_train, y_train) 
 
# print best parameter after tuning 
print(grid[1].best_params_) 
grid_predictions = grid.predict(X_test) 
   
# print classification report 
print(classification_report(y_test, grid_predictions)) 


In [None]:
# DECISION TREE - PARAMETRISATION

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report

param_grid = {'n_estimators': [10,100,1000,10000],  
              'criterion': ["gini","entropy"]
              }  
   
grid = make_pipeline(TfidfVectorizer(),GridSearchCV(RandomForestClassifier(), param_grid, refit = True, verbose = 3,n_jobs=-1)) 
   
# fitting the model for grid search 
grid.fit(X_train, y_train) 
 
# print best parameter after tuning 
print(grid[1].best_params_) 
grid_predictions = grid.predict(X_test) 
   
# print classification report 
print(classification_report(y_test, grid_predictions)) 


In [None]:
# KNN - PARAMETRISATION

from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report

param_grid = {'n_neighbors': [15,16,17,18,19,20]
              }  
   
grid = make_pipeline(TfidfVectorizer(),GridSearchCV(KNeighborsClassifier(), param_grid, refit = True, verbose = 3,n_jobs=-1)) 
   
# fitting the model for grid search 
grid.fit(X_train, y_train) 
 
# print best parameter after tuning 
print(grid[1].best_params_) 
grid_predictions = grid.predict(X_test) 
   
# print classification report 
print(classification_report(y_test, grid_predictions)) 


In [None]:
# SVC - PARAMETRISATION

from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report

param_grid = {'C': [0.1, 1, 10, 100],
              'kernel' : ["linear","poly","rbf","sigmoid"],
              'degree' : [2,3,4],
              'shrinking' : [True,False],
              'probability' : [True,False],
              'tol': [1,1e-1,1e-2,1e-3,1e-4],
              }  
   
grid = make_pipeline(TfidfVectorizer(),GridSearchCV(SVC(), param_grid, refit = True, verbose = 3,n_jobs=-1)) 
   
# fitting the model for grid search 
grid.fit(X_train, y_train) 
 
# print best parameter after tuning 
print(grid[1].best_params_) 
grid_predictions = grid.predict(X_test) 
   
# print classification report 
print(classification_report(y_test, grid_predictions)) 


Dernière partie du projet, début d'entraînement sur le contenu des documents Stelia grâce à un document texte précisant le thème de chacun (ligne 11). Le reste est un entraînement classique et un test sur les thèses de HAL

In [None]:
######################################################################
###  ENTRAINEMENT SUR LE THEME DES DOCS STELIA AVEC DECISION TREE  ###
######################################################################

from sklearn.ensemble import RandomForestClassifier

X = pdf_data #documents Stelia, voir plus haut
y = []

labels_table = []
labels_doc = open("/content/drive/MyDrive/Stage - Caractérisation automatique de texte (D2IT)/labels_stelia.txt", "r")
line = labels_doc.readline()
while(line):
  line1 = line
  line = labels_doc.readline()
  labels_table.append([line1,line])
  line = labels_doc.readline()
labels_doc.close()

for i in range(len(labels_table)):
  y.append(labels_table[i][1])  

X_train,X_test,y_train,y_test = train_test_split(X, y, test_size=0.25)

model = make_pipeline(TfidfVectorizer(), RandomForestClassifier(n_estimators=1000,criterion="gini"))
model.fit(X_train, y_train)

y_tree = model.predict(X_test)

#display(y_test,y_tree)

In [None]:
chemin_docs_HAL = "/content/drive/MyDrive/Stage - Caractérisation automatique de texte (D2IT)/docs_HAL_aero_materiaux/"

file_list_HAL = os.listdir(chemin_docs_HAL)

for i in range(len(HAL_data)):
  print(file_list_HAL[i])
  pred = model.predict([HAL_data[i]])