Note : les lignes de commandes présentées ici sont valides pour des systèmes Linux avec un packet manager basé sur Debian (comme Ubuntu), ce qui corresponds aux distributions principales Linux. Si vous n'avez ce type de système et que vous ne souhaitez pas l'installer en double boot ou en machine virtuelle, vous devrez rechercher des équivalents pour votre système d'exploitation. 

# 2 - Manipuler les textes et les symboles

## 2.1 - Segmentation des symboles (tokenization)

Nous allons ici explorer les différentes manières de segmenter les textes avec les librairies les plus communes de NLP. Dans un premier temps il faut installer Python 3 et la librairie NLTK avec la commande suivante :

sudo apt install python3 python3-dev python3-pip ipython3 python-is-python3 build-essential 

Le paquet python-is-python3 sert à créer des alias pour les commandes python3 et pip3 en enlevant le numéro de version pour plus de simplicité. Ensuite, installez les paquets suivants :

pip install nltk spacy datasets charset-normalizer autocorrect cleantext emot wordfreq textblob torchdata torchtext scikit-plot lime stanza sentence-transformers transformers torch gensim evaluate

Le paquet nltk a des données optionnelles téléchargables, comme des modèles pré-entrainés, des datasets ainsi que des dictionnaires de mots ou de symboles. Par simplicité et pour éviter d'être bloqué sur certaines parties, nous allons tout télécharger. Pour cela, lancez le terminal Python dans un terminal :

ipython # ou ipython3 si ça ne fonctionne pas

Cela va lancer le terminal d'interprétation de code Python dans lequel vous taperez :

import nltk

nltk.download()

Une interface graphique va se lancer dans laquelle il vous suffira de télécharger le méta-paquet "all". Si vous utilisez un Colab (ce que je vous encourage à ne pas faire et à utiliser votre propre système Linux), il vaut mieux télécharger un à un les paquets : 

nltk.download('stopwords')

nltk.download('wordnet')

nltk.download('punkt')

Concernant Spacy, nous aurons besoin de quatre modèles qui sont eux aussi téléchargeable séparément :

python -m spacy download fr_core_news_sm

python -m spacy download fr_core_news_lg

python -m spacy download en_core_web_sm

python -m spacy download en_core_web_lg

Les versions sm sont des petits modèles rapides et les versions lg sont des plus gros modèles plus lents mais plus performants.

Concernant stanza, nous aurons également besoin de télécharger des modèles additionnels (avec ipython) :

import stanza; stanza.download()

Note : toute librairie istallée pendant l'utilisation de ce notebook ne pourraient être disponible tout de suite. Pour que Jupyter les prenne en compte, il convient de relancer le notebook. 

Note 2 : si vous manquez de place sur votre disque principal avec tous les modèles, repérez les dossiers où sont stockés les modèles dans votre répertoire home (gensim-data, nltk_data, .config/hugginface, stanza_resources ...). Déplacez ces dossiers un autre disque interne ou externe et remplacez les dossiers par des liens symboliques (ln -s <chemin_cible> <chemin_lien>) aux anciens chemins vers les nouvelles destinations.

### Test des différentes manières de segmenter un text

Nous allons travailler sur le dataset de review d'Amazon. Comme il nous aurons besoins d'autres librairies pour travailler sur les datasets et les manipuler, vous pouvez les installer maintenant si ce n'est pas déjà fait :

pip install pandas datasets


#### Charger le dataset

In [None]:
from datasets import load_dataset

df = load_dataset('amazon_reviews_multi', split = 'train').to_pandas()

#### Exploration du dataset

In [None]:
df.head()

#### Sélectionner un commentaire français de plus de 1000 caractères

In [None]:
df_fr  = df[df.language == 'fr']
review = None
for _, rev in df_fr['review_body'].items():
    if len(rev) > 1000:
        review = rev
        break
        
print(review)

#### Décomposer le commentaire avec différents tokenizers

In [None]:
from nltk.tokenize import (
    TreebankWordTokenizer, 
    ToktokTokenizer, 
    TweetTokenizer,
    WhitespaceTokenizer
)
from nltk.tokenize import (
    word_tokenize, 
    wordpunct_tokenize, 
    sent_tokenize
)

In [None]:
tokens = word_tokenize(review)
print(tokens)
print(len(tokens))

In [None]:
tokens = wordpunct_tokenize(review)
print(tokens)
print(len(tokens))

In [None]:
tokens = sent_tokenize(review)
print(tokens)
print(len(tokens))

In [None]:
tokenizer = TreebankWordTokenizer()
tokens    = tokenizer.tokenize(review)
print(tokens)
print(len(tokens))

In [None]:
tokenizer = ToktokTokenizer()
tokens    = tokenizer.tokenize(review)
print(tokens)
print(len(tokens))

In [None]:
tokenizer = TweetTokenizer()
tokens    = tokenizer.tokenize(review)
print(tokens)
print(len(tokens))

In [None]:
tokenizer = WhitespaceTokenizer()
tokens    = tokenizer.tokenize(review)
print(tokens)
print(len(tokens))

#### Construire un vocabulaire

In [None]:
vocab     = set()
tokenizer = TweetTokenizer()

for _, rev in df_fr['review_body'].items():
    vocab.update(tokenizer.tokenize(rev))
    
print(len(vocab))

#### Enlever les stopwords

In [None]:
from nltk.corpus import stopwords

vocab = vocab - set(stopwords.words('french'))
vocab = vocab - set(stopwords.words('english'))
print(len(vocab))

In [None]:
print(stopwords.words('english'))

In [None]:
print(stopwords.words('french'))

#### Stemming and Lemmatizations

In [None]:
from nltk.stem import PorterStemmer

stemmer    = PorterStemmer()
stem_vocab = [stemmer.stem(word) for word in vocab]
print(stem_vocab[:50])

stem_vocab = set(stem_vocab)
print(len(stem_vocab))

In [None]:
from nltk.stem import SnowballStemmer

stemmer = SnowballStemmer("french")
stem_vocab = [stemmer.stem(word) for word in vocab]
print(stem_vocab[:50])

stem_vocab = set(stem_vocab)
print(len(stem_vocab))

In [None]:
from nltk.stem import WordNetLemmatizer

lemmatizer       = WordNetLemmatizer()
lemmatizer_vocab = [lemmatizer.lemmatize(word) for word in vocab]
print(lemmatizer_vocab[:50])

lemmatizer_vocab = set(lemmatizer_vocab)
print(len(lemmatizer_vocab))

In [None]:
import spacy

vocab = set()
nlp   = spacy.load('fr_core_news_sm')

for _, rev in df_fr['review_body'].items():
    doc = nlp(rev)
    for token in doc:
        vocab.add(token.lemma_)
        
vocab = vocab - set(stopwords.words('french'))
vocab = vocab - set(stopwords.words('english'))
vocab = list(vocab)
        
print(vocab[:50])
print(len(vocab))

## 2.2 - Détection de récurrences (patterns) et nettoyage de texte

### Encodage

In [None]:
import charset_normalizer

def decode_charset(raw_bytes, suggested_encoding = None):
  if raw_bytes is None:
    return None, None

  decoded       = None
  real_encoding = None
  if suggested_encoding is not None:
    try:
      decoded       = raw_bytes.decode(suggested_encoding, 'strict')
      real_encoding = suggested_encoding
    except (BaseException, Exception, ArithmeticError, BufferError, LookupError):
      matches = charset_normalizer.from_bytes(raw_bytes)
      best    = matches.best()
      if best is None:
        return None, None

      real_encoding = best.encoding
      decoded       = raw_bytes.decode(real_encoding, 'ignore')
  else:
    matches = charset_normalizer.from_bytes(raw_bytes)
    best    = matches.best()
    if best is None:
      return None, None

    real_encoding = best.encoding
    decoded       = raw_bytes.decode(real_encoding, 'ignore')

  return decoded, real_encoding

In [None]:
import urllib.request

url  = 'https://martin.slouf.name/'
page = urllib.request.urlopen(url)

# the website responds with unidentified charset
print('the page advertised encoding: ' + str(page.headers.get_content_charset()))

# so we have bytes with unidentified encoding
byte_content = page.read()

# luckily we use decode_charset to decode the bytes as string
html, encoding = decode_charset(byte_content)
print('the page real encoding: ' + str(encoding))


In [None]:
data = b"""
La longueur de l'hypoth\xe9nuse au carr\xe9 dans un triangle 
rectangle est \xe9gale \xe0 h\xb2 = a\xb2 + b\xb2
"""

print(data.decode("utf-8")) # error

In [None]:
# real encoding
data_str, encoding = decode_charset(data, suggested_encoding = 'latin-1') 
print(encoding)
print(data_str)

# encoding that works
data_str, encoding = decode_charset(data) 
print(encoding)
print(data_str)

### Regex

#### re.search

In [None]:
import re

text = 'Mon adresse email est blabla@gmail.com'
match = re.search(r'[a-z]+@[a-z]+\.com', text)

if match:
  print('found', match.group())
else:
  print('did not find')

In [None]:
text_1 = 'Mon adresse email est blabla@gmail.com'
text_2 = 'blabla@gmail.com est mon adresse mail'

regex_begin = r'\A[a-z]+@[a-z]+\.com'
regex_end   = r'[a-z]+@[a-z]+\.com\Z'

if re.search(regex_begin, text_1):
    print('email found at beginning in text_1')
if re.search(regex_begin, text_2):
    print('email found at beginning in text_2')
if re.search(regex_end, text_1):
    print('email found at the end in text_1')
if re.search(regex_end, text_2):
    print('email found at the end in text_2')

In [5]:
import re
do_match = [
    'romain@hotmail.com',
    'jeanyves@yahoo.com',
    'romain@hotmail.fr',
    'jean.yves@yahoo.com'
]

dont_match = [
    'jean-yves@yahoo.com',
    'romain@hotmail.gmail.fr',
    'roger@yahoo'
]

regex = r'[a-z\.]+@[a-z]+\.+[a-z]{1,3}\Z'

for email in do_match:
    if re.search(regex, email):
        print(f'it\'s a match : {email}')
    else:
        print(f'no match : {email}')


print('STOOOOOP')
for email in dont_match:
    if re.search(regex, email):
        print(f'it\'s a match : {email}')
    else:
        print(f'no match : {email}')

it's a match : romain@hotmail.com
it's a match : jeanyves@yahoo.com
it's a match : romain@hotmail.fr
it's a match : jean.yves@yahoo.com
STOOOOOP
it's a match : jean-yves@yahoo.com
no match : romain@hotmail.gmail.fr
no match : roger@yahoo


#### Les quantifieurs de répétition

In [None]:
do_match = [
    '1 file has been found',
    '34 files has been found',
    '2 directories has been found, 1 file has been found'
]

dont_match = [
    '15 files has been found with similar search parameters, no results with current parameters',
    'No files has been found',
    'The files has not been found',
    '5 files cound have been found, but the disk returned some I/O errors',
    '1 directory has been found, no file has been found'
]

regex = r'<your_solution>'

for res in do_match:
    if re.search(regex, res):
        print(f'it\'s a match : {res}')
    else:
        print(f'no match : {res}')
        
for res in dont_match:
    if re.search(regex, res):
        print(f'it\'s a match : {res}')
    else:
        print(f'no match : {res}')

#### Classes et intervalles de caractères

In [None]:
text = """
<dl>
  <dt id="introduction_au_html">Introduction au HTML</dt>
  <dd>
    <p>Vous faites vos premiers pas dans le développement web ? <a
    href="/fr/docs/Learn/Getting_started_with_the_web/HTML_basics">Nos articles
    sur les bases de HTML</a> expliquent ce qu'est HTML et comment
    l'utiliser.</p>
  </dd>
  <dt id="tutoriels_html">Tutoriels HTML</dt>
  <dd>
    <p>Pour plus d'informations sur l'utilisation du HTML, des tutoriels et des
    exemples complets, vous pouvez consulter <a href="/fr/docs/Learn/HTML">notre
    section Apprendre HTML</a>.</p>
  </dd>
  <dt id="référence_html">Référence HTML</dt>
  <dd>
    <p>Dans notre <a href="/fr/docs/Web/HTML/Reference">référence
    exhaustive</a>, vous trouverez le détail de chaque élément et attribut
    constituant HTML.</p>
  </dd>
</dl>
"""

In [None]:
# exercice 1

do_match = [
    '<dt id="introduction_au_html">Introduction au HTML</dt>',
    '<dt id="tutoriels_html">Tutoriels HTML</dt>',
    '<dt id="référence_html">Référence HTML</dt>'
]

dont_match = [
    '<a href="/fr/docs/Web/HTML/Reference">référence exhaustive</a>',
    '<dd><p>Vous faites vos premiers pas dans le développement web ?'
]

regex = r'your_solution'

for res in do_match:
    if re.search(regex, res):
        print(f'it\'s a match : {res}')
    else:
        print(f'no match : {res}')
        
for res in dont_match:
    if re.search(regex, res):
        print(f'it\'s a match : {res}')
    else:
        print(f'no match : {res}')

In [None]:
# exercice 2

do_match = [
    '<a href="/fr/docs/Learn/Getting_started_with_the_web/HTML_basics">Nos articles sur les bases de HTML</a>',
    '<a href="/fr/docs/Learn/HTML">notre section Apprendre HTML</a>',
    '<a href="/fr/docs/Web/HTML/Reference">référence exhaustive</a>'
]

dont_match = [
    '<dt id="référence_html">Référence HTML</dt>',
    '<dd><p>Vous faites vos premiers pas dans le développement web ?'
]

regex = r'your_solution'

for res in do_match:
    if re.search(regex, res):
        print(f'it\'s a match : {res}')
    else:
        print(f'no match : {res}')
        
for res in dont_match:
    if re.search(regex, res):
        print(f'it\'s a match : {res}')
    else:
        print(f'no match : {res}')

In [None]:
# exercice 3

do_match = [
  """ 
  <dt id="tutoriels_html">Tutoriels HTML</dt>
  <dd>
    <p>Pour plus d'informations sur l'utilisation du HTML, des tutoriels et des
    exemples complets, vous pouvez consulter <a href="/fr/docs/Learn/HTML">notre
    section Apprendre HTML</a>.</p>
  </dd>
  """,
    
  """<dt id="référence_html">Référence HTML</dt>
  <dd>
    <p>Dans notre <a href="/fr/docs/Web/HTML/Reference">référence
    exhaustive</a>, vous trouverez le détail de chaque élément et attribut
    constituant HTML.</p>
  </dd>
  """
]

dont_match = [
  """ 
  <dt id="introduction_au_html">Introduction au HTML</dt>
  <dd>
    <p>Vous faites vos premiers pas dans le développement web ? <a
    href="/fr/docs/Learn/Getting_started_with_the_web/HTML_basics">Nos articles
    sur les bases de HTML</a> expliquent ce qu'est HTML et comment
    l'utiliser.</p>
  </dd>
  """
]

regex = r'your_solution'

for res in do_match:
    if re.search(regex, res):
        print(f'it\'s a match : {res}')
    else:
        print(f'no match : {res}')
        
for res in dont_match:
    if re.search(regex, res):
        print(f'it\'s a match : {res}')
    else:
        print(f'no match : {res}')

#### re.findall, groupes capturant et non-capturants

In [None]:
text  = 'Mon adresse email est blabla@gmail.com et pas blublu@gmail.com'

match = re.findall(r'[a-z]+@[a-z]+\.com', text)
print(match)

match = re.findall(r'([a-z]+)@[a-z]+\.com', text)
print(match)

match = re.findall(r'([a-z]+)@([a-z]+\.com)', text)
print(match)

In [None]:
text = """
Les paroles de la chanson sont "bla bla bla bla, bla bla bla",
c'est ma préféré. Par contre je déteste celle qui fait
"bla bla bla, bla bla bla bla"
"""

match = re.findall(r'((bla,? ?)+)', text)
print(match)

match = re.findall(r'((?:bla,? ?)+)', text)
print(match)

In [None]:
text  = "J'aime les pêches, mais aussi les poires"

match = re.findall(r'p(êche|oire)s', text)
print(match)

match = re.findall(r'(p(?:êche|oire)s)', text)
print(match)

In [None]:
text = """
<dl>
  <dt id="introduction_au_html">Introduction au HTML</dt>
  <dd>
    <p>Vous faites vos premiers pas dans le développement web ? <a
    href="/fr/docs/Learn/Getting_started_with_the_web/HTML_basics">Nos articles
    sur les bases de HTML</a> expliquent ce qu'est HTML et comment
    l'utiliser.</p>
  </dd>
  <dt id="tutoriels_html">Tutoriels HTML</dt>
  <dd>
    <p>Pour plus d'informations sur l'utilisation du HTML, des tutoriels et des
    exemples complets, vous pouvez consulter <a href="/fr/docs/Learn/HTML">notre
    section Apprendre HTML</a>.</p>
  </dd>
  <dt id="référence_html">Référence HTML</dt>
  <dd>
    <p>Dans notre <a href="/fr/docs/Web/HTML/Reference">référence
    exhaustive</a>, vous trouverez le détail de chaque élément et attribut
    constituant HTML.</p>
  </dd>
</dl>
"""

In [None]:
# exercice 1

regex = 'your_solution'
match = re.findall(regex, text)
print(match)

In [None]:
# exercice 2

regex = 'your_solution'
match = re.findall(regex, text)
print(match)

In [None]:
# exercice 3

match = re.findall(regex, text)
print(match)

#### re.sub et options

In [None]:
text = 'Mon adresse email est blabla@gmail.com et pas blublu@gmail.com'

new_text = re.sub(r'[a-z]+@[a-z]+\.com', '<EMAIL>', text)
print(new_text)

new_text = re.sub(r'([a-z]+)@([a-z]+\.com)','\\1[at]\\2', text)
print(new_text)

#### Nettoyage et normalisation de texte

In [None]:
text_en = """
The packgae has been delivered but is damaged (photo:
https://i.postimg.cc/TPr4LQwY/MEA2-1.jpg). When i openned iy , 
every flask was broke :( I put 1 star because I cannot put 
-100 starts #wtf >>>>>>>>>>> dont buy this brand $$$$ \U0001F643
"""

text_fr = """
Le colsi a été livré mais cabossé (photo :
https://i.postimg.cc/TPr4LQwY/MEA2-1.jpg). Quan j'ai ouvert , 
tous les flacons été casser :( abusé je met 1 étoiles parce 
que je peux pas mettre -100 étoiles #labus >>>>>>>>>>> 
plus jamais cette marque $$$$ \U0001F643
"""

In [None]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

tokenizer = AutoTokenizer.from_pretrained("Bhuvana/t5-base-spellchecker")
model     = AutoModelForSeq2SeqLM.from_pretrained("Bhuvana/t5-base-spellchecker")

def correct(inputs):
    input_ids = tokenizer.encode(inputs, return_tensors = 'pt')
    sample_output = model.generate(
        input_ids,
        do_sample            = True,
        max_length           = 50,
        top_p                = 0.99,
        num_return_sequences = 1
    )
    res = tokenizer.decode(sample_output[0], skip_special_tokens = True)
    return res

text = "christmas is celbrated on decembr 25 evry ear"

print(correct(text))

In [None]:
print(correct("When i openned iy , every flask was broke"))

In [None]:
from autocorrect import Speller

spell_en = Speller('en')
spell_fr = Speller('fr')

print(spell_en("When i openned iy , every flask was broke"))
print(spell_fr("Quan j'ai ouvert , tous les flacons été casser"))

In [None]:
from cleantext import clean

print(clean(text_en,
    fix_unicode                  = True,
    to_ascii                     = False,
    lower                        = True,
    no_line_breaks               = True,
    no_urls                      = True,
    no_emails                    = True,
    no_phone_numbers             = True,
    no_numbers                   = True,
    no_digits                    = True,
    no_currency_symbols          = True,
    no_punct                     = False,
    replace_with_punct           = "",
    replace_with_url             = "<URL>",
    replace_with_email           = "<EMAIL>",
    replace_with_phone_number    = "<PHONE>",
    replace_with_number          = "<NUMBER>",
    replace_with_digit           = "<DIGIT>",
    replace_with_currency_symbol = "<CUR>",
    lang                         = "en"
))

print(clean(text_fr,
    fix_unicode                  = True,
    to_ascii                     = False,
    lower                        = True,
    no_line_breaks               = True,
    no_urls                      = True,
    no_emails                    = True,
    no_phone_numbers             = True,
    no_numbers                   = True,
    no_digits                    = True,
    no_currency_symbols          = True,
    no_punct                     = False,
    replace_with_punct           = "",
    replace_with_url             = "<URL>",
    replace_with_email           = "<EMAIL>",
    replace_with_phone_number    = "<PHONE>",
    replace_with_number          = "<NUMBER>",
    replace_with_digit           = "<DIGIT>",
    replace_with_currency_symbol = "<CUR>",
    lang                         = "fr"
))

In [None]:
import re

hashtag_regex = r'#[a-zà-ÿ_]+'

print(re.sub(hashtag_regex, '<hashtag>', text_en))
print(re.sub(hashtag_regex, '<hashtag>', text_fr))

In [None]:
from emot.emo_unicode import EMOTICONS_EMO, UNICODE_EMOJI

def convert_emojis(text):
    for emot in UNICODE_EMOJI:
        emot_regex = r'(' + re.escape(emot) + ')'
        text = re.sub(emot_regex, "_".join(
            UNICODE_EMOJI[emot].replace(",","").replace(":","").replace("-","_").lower().split()), text
        )
    return text

def convert_emoticons(text):
    for emot in EMOTICONS_EMO:
        emot_regex = r'(' + re.escape(emot) + ')'
        text = re.sub(emot_regex, "_".join(EMOTICONS_EMO[emot].replace(",","").lower().split()), text)
    return text

print(convert_emojis(convert_emoticons(text_en)))
print(convert_emojis(convert_emoticons(text_fr)))

In [None]:
special_char_regex = r'[^_A-zÀ-ÿ<>0-9\'\s.,;?!:%\(\ç)-+]+'
not_tag_regex      = r'(?:<{2,}|>{2,})'
space_norm_regex   = r'\s+'

text_en_norm = re.sub(special_char_regex, '', text_en)
text_fr_norm = re.sub(special_char_regex, '', text_fr)

text_en_norm = re.sub(not_tag_regex, '', text_en_norm)
text_fr_norm = re.sub(not_tag_regex, '', text_fr_norm)

text_en_norm = re.sub(space_norm_regex, ' ', text_en_norm).strip()
text_fr_norm = re.sub(space_norm_regex, ' ', text_fr_norm).strip()

print(text_en_norm)
print(text_fr_norm)

In [None]:
from wordfreq import word_frequency

def infer_spaces(s, lang):
    """Uses dynamic programming to infer the location of spaces in a string
    without spaces."""

    # Find the best match for the i first characters, assuming cost has
    # been built for the i-1 first characters.
    # Returns a pair (match_cost, match_length).
    def best_match(i):
        candidates = enumerate(reversed(cost[max(0, i-25):i]))
        word_costs = list()
        for k,c in candidates:
            candidate_word = s[i-k-1:i]
            word_freq      = word_frequency(candidate_word, lang)
            if word_freq == 0.0:
                word_costs.append(9e999)
            else:
                word_costs.append(c + (1.0 / (word_freq * len(candidate_word) + 1)))
        
        min_cost = min(word_costs)
        
        return min_cost, word_costs.index(min_cost) + 1

    # Build the cost array.
    cost = [0]
    for i in range(1,len(s)+1):
        c,k = best_match(i)
        cost.append(c)

    # Backtrack to recover the minimal-cost string.
    out = []
    i = len(s)
    while i>0:
        c,k = best_match(i)
        assert c == cost[i]
        out.append(s[i-k:i])
        i -= k

    return " ".join(reversed(out))

str_en = 'howtosplittextwithoutspacesintolistofwords'
str_fr = 'lesapprocheslesplusaupointdanscedomainenesontpaspubliques'

print(infer_spaces(str_en, 'en'))
print(infer_spaces(str_fr, 'fr'))

In [None]:
from textblob import TextBlob, Word

print(TextBlob(text_en).correct())

word = Word('falibility')
print(word.spellcheck())


In [None]:

def clean_en(text):
    cleaned_text = clean(text,
        fix_unicode                  = True,
        to_ascii                     = False,
        lower                        = True,
        no_line_breaks               = True,
        no_urls                      = True,
        no_emails                    = True,
        no_phone_numbers             = True,
        no_numbers                   = True,
        no_digits                    = True,
        no_currency_symbols          = True,
        no_punct                     = False,
        replace_with_punct           = "",
        replace_with_url             = "<URL>",
        replace_with_email           = "<EMAIL>",
        replace_with_phone_number    = "<PHONE>",
        replace_with_number          = "<NUMBER>",
        replace_with_digit           = "<DIGIT>",
        replace_with_currency_symbol = "<CUR>",
        lang                         = "en"
    )
    
    cleaned_text = convert_emojis(cleaned_text)
    cleaned_text = convert_emoticons(cleaned_text)
    
    hashtag_regex      = r'#[a-zà-ÿ_]+'
    special_char_regex = r'[^_A-zÀ-ÿ<>0-9\'\s.,;?!:%\(\ç)-+]+'
    not_tag_regex      = r'(?:<{2,}|>{2,})'
    space_norm_regex   = r'\s+'
    
    cleaned_text = re.sub(hashtag_regex, '<hashtag>', cleaned_text)
    cleaned_text = re.sub(special_char_regex, '', cleaned_text)
    cleaned_text = re.sub(not_tag_regex, '', cleaned_text)
    cleaned_text = re.sub(space_norm_regex, ' ', cleaned_text).strip()
    
    corrected = ''
    for word in cleaned_text.split():
        if not re.search('[<>_:,.!?\(\])]', word): 
            alt_words = Word(word).spellcheck()
            if len(alt_words) > 0:
                alt_word, confidence = alt_words[0]
                if confidence > 0.6:
                    corrected += alt_word + ' '
                    continue
            
        corrected += word + ' '
        
    cleaned_text = corrected
    
    return cleaned_text

print(clean_en(text_en))

### 2.3 - Etiqueter les symboles (token tagging)

In [2]:
text = """ 
NASA is "go" for a post-breakfast launch of the biggest rocket ever—and that
includes the final flight of the Saturn V in 1973. A flight readiness review
this week confirmed that its Artemis-1 mission will launch during a two-hour
window that opens at 8:33 a.m. EDT on Monday, August 29. However, if for any
reason it doesn’t launch on schedule then the next time it can go is at
lunchtime on Friday, September 2, 2022.
"""

#### Part-Of-Speech

In [3]:
import spacy

nlp = spacy.load("en_core_web_lg")
doc = nlp(text)

for token in doc:
    print(f'{token.text}\t{token.pos_}')

 
	SPACE
NASA	PROPN
is	AUX
"	PUNCT
go	VERB
"	PUNCT
for	ADP
a	DET
post	ADJ
-	ADJ
breakfast	ADJ
launch	NOUN
of	ADP
the	DET
biggest	ADJ
rocket	NOUN
ever	ADV
—	PUNCT
and	CCONJ
that	SCONJ

	SPACE
includes	VERB
the	DET
final	ADJ
flight	NOUN
of	ADP
the	DET
Saturn	PROPN
V	PROPN
in	ADP
1973	NUM
.	PUNCT
A	DET
flight	NOUN
readiness	NOUN
review	NOUN

	SPACE
this	DET
week	NOUN
confirmed	VERB
that	SCONJ
its	PRON
Artemis-1	PROPN
mission	NOUN
will	AUX
launch	VERB
during	ADP
a	DET
two	NUM
-	PUNCT
hour	NOUN

	SPACE
window	NOUN
that	PRON
opens	VERB
at	ADP
8:33	NUM
a.m.	NOUN
EDT	PROPN
on	ADP
Monday	PROPN
,	PUNCT
August	PROPN
29	NUM
.	PUNCT
However	ADV
,	PUNCT
if	SCONJ
for	ADP
any	DET

	SPACE
reason	NOUN
it	PRON
does	AUX
n’t	PART
launch	VERB
on	ADP
schedule	NOUN
then	ADV
the	DET
next	ADJ
time	NOUN
it	PRON
can	AUX
go	VERB
is	AUX
at	ADP

	SPACE
lunchtime	NOUN
on	ADP
Friday	PROPN
,	PUNCT
September	PROPN
2	NUM
,	PUNCT
2022	NUM
.	PUNCT

	SPACE


In [None]:
for token in doc:
    print(f'{token.text} : {token.tag_} : {token.morph}')

In [None]:
for token in doc:
    print(f'{token.text} => {token.dep_} => {token.head.text}')

In [None]:
for entity in doc.ents:
    print(f'{entity.text} : {entity.label_}')

In [4]:
nlp.add_pipe("entityLinker", last = True)
doc = nlp(text)

print('wikidata entities summary ordered by categories')
doc._.linkedEntities.print_super_entities()

print()
print('wikidata entities details')
for s_index, sentence in enumerate(doc.sents):
    entities = sentence._.linkedEntities
    print(f'sentence {s_index}')
    for e_index, entity in enumerate(entities):
        print(f'  entity {e_index}')
        print(f'    text: {entity.get_span()}')
        print(f'    id: {entity.get_id()}')
        print(f'    label: {entity.get_label()}')
        print(f'    description: {entity.get_description()}')

wikidata entities summary ordered by categories
calendar month (2) : August,September
month of the Gregorian calendar (2) : August,September
space agency (1) : National Aeronautics and Space Administration
architectural structure (1) : slipway
vehicle (1) : rocket
disposable product (1) : rocket
flying machine (1) : rocket
novel (1) : Final Flight
Saturn (1) : Saturn V
air force unit (1) : flight

wikidata entities details
sentence 0
  entity 0
    text: NASA
    id: 23548
    label: National Aeronautics and Space Administration
    description: space-related agency of the United States government
  entity 1
    text: launch
    id: 361945
    label: slipway
    description: ramp on the shore by which ships or boats can be moved to and from the water
  entity 2
    text: rocket
    id: 41291
    label: rocket
    description: pyrokinetic engine used for propulsion; for the incendiary weapon, see Q2037215
  entity 3
    text: final flight
    id: 48782185
    label: Final Flight
    des

## Partie 3 -Vectorisation de symboles

In [None]:
import numpy as np

text = """
Il jouait on ne sait quel effrayant jeu de cache-cache avec la mort ; 
chaque fois que la face camarde du spectre s'approchait, le gamin 
lui donnait une pichenette. Une balle pourtant, mieux ajustée ou 
plus traître que les autres, finit par atteindre l'enfant feu follet. 
On vit Gavroche chanceler, puis il s'affaissa.
"""

vocab       = dict()
vocab_index = 0
for word in text.split():
    if word not in vocab:
        vocab[word]  = vocab_index
        vocab_index += 1
        
def make_one_hot(word, vocab):
    vect = np.zeros(len(vocab))
    if word in vocab:
        vect[vocab[word]] = 1
    
    return vect
    
print(make_one_hot('Gavroche', vocab))

In [None]:
labels = [
    'chien', 'chat', 'ours', 'loup', 
    'chat', 'chat', 'ours', 'chien', 
    'chien', 'loup', 'ours', 'ours'
]

classes     = dict()
class_index = 0
for label in labels:
    if label not in classes:
        classes[label] = class_index
        class_index  += 1

def make_one_hot(label, classes):
    vect = np.zeros(len(classes))
    if label in classes:
        vect[classes[label]] = 1
    
    return list(vect)

one_hot_labels = list()
for label in labels:
    one_hot_labels.append(make_one_hot(label, classes))
    
print(one_hot_labels)

In [None]:
one_hot_labels = [
    [1.0, 0.0, 0.0, 0.0], [0.0, 1.0, 0.0, 0.0], 
    [0.0, 0.0, 1.0, 0.0], [0.0, 0.0, 0.0, 1.0], 
    [0.0, 1.0, 0.0, 0.0], [0.0, 1.0, 0.0, 0.0], 
    [0.0, 0.0, 1.0, 0.0], [1.0, 0.0, 0.0, 0.0], 
    [1.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0], 
    [0.0, 0.0, 1.0, 0.0], [0.0, 0.0, 1.0, 0.0]
]

In [None]:
vocab       = dict()
vocab_index = 0
for letter in text.lower():
    if letter not in vocab:
        vocab[letter]  = vocab_index
        vocab_index += 1

def vectorize_with_freq(text, vocab):
    vect = np.zeros(len(vocab))
    
    for letter in text.lower():
        vect[vocab[letter]] += 1
        
    vect /= len(text)
    return vect

print(vocab)
print(vectorize_with_freq(text, vocab))

In [None]:
def n_gram_split(text, n = 2):
    sequence = list()
    for i, c in enumerate(text):
        if i + n - 1 == len(text):
            break
            
        gram = c
        for j in range(1, n):
            gram += text[i + j]
            
        sequence.append(gram)
    
    return sequence

print(n_gram_split(text.lower()))

In [None]:
from wordfreq import word_frequency
                
min_freq = 9e999
for word in text.split():
    if word in vocab:
        word_freq = word_frequency(word, 'fr')
        if word_freq != 0.0:
            if word_freq < min_freq:
                min_freq = word_freq
                
def tf_idf(text, vocab, min_freq):
    vect = np.zeros(len(vocab))
    
    words = text.split()
    for word in words:
        vect[vocab[word]] += 1
        
    vect /= len(words)
    
    for word, index in vocab.items():
        word_freq = word_frequency(
            word    = word, 
            lang    = 'fr', 
            minimum = min_freq
        )
        vect[index] /= word_freq
        
    return vect

print(tf_idf(text, vocab, min_freq))

In [None]:
documents = [
    'un chat a un collier',
    'un chien n\'aiment pas un chat',
    'un chien a une laisse',
    'elle a un chien et un chat'
]

vocab = {
    'chien'   : 0,
    'chat'    : 1,
    'collier' : 2,
    'laisse'  : 3
}
            
word_vector_matrix = np.zeros((len(vocab), len(vocab)))
for document in documents:
    words = document.split()
    for i in range(len(words)):
        w_1 = words[i]
        if w_1 not in vocab:
            continue
            
        w_1_index = vocab[w_1]
        for j in range(len(words)):
            if i == j:
                continue
                
            w_2 = words[j]
            if w_2 not in vocab:
                continue
                
            w_2_index = vocab[w_2]
            
            word_vector_matrix[w_1_index][w_2_index] += 1
    
print(word_vector_matrix)

In [None]:
import math

word_freq          = np.zeros(len(vocab))
word_vector_matrix = np.zeros((len(vocab), len(vocab)))
                              
for document in documents:
    words = document.split()
    for i in range(len(words)):
        w_1 = words[i]
        if w_1 not in vocab:
            continue
            
        w_1_index = vocab[w_1]
        word_freq[w_1_index] += 1
        for j in range(len(words)):
            if i == j:
                continue
                
            w_2 = words[j]
            if w_2 not in vocab:
                continue
                
            w_2_index = vocab[w_2]
            
            word_vector_matrix[w_1_index][w_2_index] += 1

word_freq          /= len(vocab)
word_vector_matrix /= len(vocab)
                              
for i in range(len(word_freq)):
    for j in range(len(word_freq)):
        if i == j:
            continue
        
        if word_vector_matrix[i][j] != 0:
            word_vector_matrix[i][j] /= word_freq[i] * word_freq[j]
            word_vector_matrix[i][j]  = math.log(word_vector_matrix[i][j])
                     
print(word_vector_matrix)
print()
print(vocab)
for i, word in enumerate(vocab):
    word_vector = word_vector_matrix[i]
    for j, v in enumerate(word_vector):
        if v < 0:
            word_vector[j] = 0
            
    print(f'{word} : {word_vector}')
    
        

In [None]:
import gensim.downloader as api

# pre-trained skip-gram
model = api.load('word2vec-google-news-300')

print(f'word #10/{len(model.index_to_key)} is "{model.index_to_key[10]}"')
print(f'word vector for "was" is {model["was"]}')

In [None]:
pairs = [
    ('cherry', 'pear'),
    ('cherry', 'apple'),   
    ('cherry', 'cereal'),  
    ('cherry', 'cake'),    
    ('cherry', 'communism'),
]

for w1, w2 in pairs:
    print(f'sim({w1}, {w2}) = {model.similarity(w1, w2)}')

In [None]:
print(model.most_similar('people', topn = 5))

In [None]:
print(model.doesnt_match(['fire', 'water', 'land', 'sea', 'air', 'car']))

In [None]:
model.most_similar(positive = ['woman', 'king'], negative = ['man'], topn = 20)

In [None]:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

word_list = [
    'blue', 'red', 'orange', 'green', 
    'eagle', 'bear', 'cat', 'dog', 
    'computer', 'keyboard', 'internet', 'hacking'
]
X     = model[word_list]
#X_pca = PCA().fit_transform(X)[:,:2]

X_tsne = TSNE(
    n_components  = 2, 
    learning_rate = 'auto',
    #init          = 'random', 
    perplexity    = 3
).fit_transform(X)

plt.figure(figsize = (7, 5)) 
for i in range(X_tsne.shape[0]):
    x, y = X_tsne[i]
    plt.scatter(x, y)
    plt.annotate(
        word_list[i],
        xy         = (x, y),
        xytext     = (5, 2),
        textcoords = 'offset points',
        ha         = 'right',
        va         = 'bottom'
    )

## Partie 4 - Deep Learning pour NLP

### Partie 4.2 - Perceptron multicouches (MLP)

#### Neurone formel

In [None]:
import numpy as np

def h(x):
  if x > 0:
    return 1
  else:
    return 0

def neuron_train(X, Y, w, µ):
  it      = 0
  is_over = False
  while is_over is False:
    is_over = True

    for x, y_ex in zip(X, Y):
      a = np.inner(w, x) # vector product
      y = h(a)
      e = y_ex - y

      w = w + µ * e * x

      if e != 0:
        is_over = False

    it += 1

  return w, it

w = np.array([0.3, 0.1, -0.2])
X = np.array([ # we add 1 to represent the bias
  [1, 0, 0],
  [1, 1, 0],
  [1, 0, 1],
  [1, 1, 1]
])
y = np.array([0, 0, 0, 1])
µ = 0.5

w, it = neuron_train(X, y, w, µ)
print(w, it)

In [None]:
def neuron_predict(X, w):
  Y = list()
  for x in X:
    a = np.inner(w, x)
    y = h(a)
    Y.append(y)

  return Y

print(neuron_predict(X, w))

In [None]:
w = np.array([0.3, 0.1, -0.2])
y = np.array([0, 1, 1, 0])
w, it = neuron_train(X, y, w, µ)

print(w, it)
print(neuron_predict(X, w))

#### Implémentation d'un MLP

In [None]:
from torch.utils.data import TensorDataset
import math

def generate_dataset(n_samples):
  # nous générons des vecteurs de taille 2 dont les valeurs sont entre -1 et 1
  x = torch.rand(n_samples, 2) * 2 - 1

  # nous calculons la distance entre le point et l'origine
  x_norms = (x ** 2).sum(axis = 1).sqrt()

  # le rayon du disque
  radius = math.sqrt(2 / math.pi)

  # compare la norme au rayon du disque, créé un masque booléen,
  # puis trasnforme en entier 8 octects.
  y = (x_norms < radius).long()

  return TensorDataset(x, y)
  
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import torch.optim as optim

class MLP(nn.Module):
  def __init__(self):
    super(MLP, self).__init__()
    self.hidden_layer_1    = nn.Linear(2, 64)
    self.hidden_layer_2    = nn.Linear(64, 64)
    self.output_layer      = nn.Linear(64, 2)
    self.hidden_transfer_1 = nn.ReLU()
    self.hidden_transfer_2 = nn.ReLU()
    self.output_transfer   = nn.LogSoftmax(dim = 1)

  def forward(self, x):
    x = self.hidden_layer_1(x)
    x = self.hidden_transfer_1(x)
    x = self.hidden_layer_2(x)
    x = self.hidden_transfer_2(x)
    x = self.output_layer(x)
    x = self.output_transfer(x)

    return x

def evaluate(model, dataset, batch_size):
  dataloader = DataLoader(dataset, batch_size = batch_size, shuffle = False)

  correct_pred = 0
  total_pred   = 0
  with torch.no_grad():
    for X, y in dataloader:
      y_pred        = model(X)
      y_pred_class  = y_pred.argmax(dim = 1)
      correct_pred += (y_pred_class == y).sum().item()
      total_pred   += len(y)

  return correct_pred / total_pred

def train(
  model, loss_func, optimizer, train_dataset,
  test_dataset, epochs, batch_size
):
  dataloader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)

  for epoch in range(epochs):
    for X, y in dataloader:
      y_pred = model(X)
      loss   = loss_func(y_pred, y)
      loss.backward()
      optimizer.step()
      optimizer.zero_grad()

    if epoch % 10 == 0:
      train_accuracy = evaluate(model, train_dataset, batch_size)
      test_accuracy  = evaluate(model, test_dataset, batch_size)
      print(
        f'{epoch:3} -> {100 * train_accuracy:5.3f}% train accuracy',
        f'{epoch:3} -> {100 * test_accuracy:5.3f}% test accuracy'
      )

n_samples     = 3000
epochs        = 300
batch_size    = 32
learning_rate = 1e-3

train_dataset = generate_dataset(n_samples)
test_dataset  = generate_dataset(n_samples)
model         = MLP()
loss_func     = nn.NLLLoss()
optimizer     = optim.SGD(
  params = model.parameters(),
  lr     = learning_rate
)
train(
  model, loss_func, optimizer, train_dataset,
  test_dataset, epochs, batch_size
)

#### MLP Exercice

In [None]:
from datasets import load_dataset
import random

df_train = load_dataset('amazon_reviews_multi', split = 'train').to_pandas()
vocab       = dict() 
langs       = dict()
vocab_index = 0
langs_index = 0

# sélection de 10000 échantillons
samples = random.sample(list(df.iterrows()), 10000)

# construction du vocabulaire
for _, row in samples:
    lang   = row['language']
    review = row['review_body']
    
    if lang not in langs:
        langs[lang]  = langs_index
        langs_index += 1
        
    for c in review:
        if c not in vocab:
            vocab[c]     = vocab_index
            vocab_index += 1
        
# construction du dataset
X           = list()
y           = list()
feature_len = len(vocab)
label_len   = len(langs)

for _, row in samples:
    lang   = row['language']
    review = row['review_body']
    x      = [0.0] * feature_len
    
    for c in review:
        x[vocab[c]] += 1.0
        
    for i, _ in enumerate(x):
        x[i] /= len(review)
    
    X.append(x)
    y.append(langs[lang])

# création des dataloaders pytorch
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
import torch
from sklearn.model_selection import train_test_split

X = np.array([np.array(x) for x in X])
y = np.array([np.array(y_ex) for y_ex in y])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.1)

train_dataset = TensorDataset(
    torch.as_tensor(X_train, dtype = torch.float), 
    torch.as_tensor(y_train, dtype = torch.long)
)
test_dataset = TensorDataset(
    torch.as_tensor(X_test, dtype = torch.float), 
    torch.as_tensor(y_test, dtype = torch.long)
)

In [None]:
from datasets import load_dataset
import random
from bpemb import BPEmb

df_train = load_dataset('amazon_reviews_multi', split = 'train').to_pandas()
df_test  = load_dataset('amazon_reviews_multi', split = 'test').to_pandas()

# embeddings multilangage type GloVe 
# entraîné sur la co-occurrences de sous-mots
multibpemb = BPEmb(lang = 'multi', vs = 1000000, dim = 300)

In [None]:
print(len(df_train))
print(len(df_test))
print(df_train['language'].unique())
df_train.head()

In [None]:
text = df_train[df_train['language'] == 'fr']['review_body'].iloc[0]
text

In [None]:
segmented_text = multibpemb.encode(text)
embeddings     = multibpemb.embed(text)

print(segmented_text)
print(len(segmented_text))
print(embeddings.shape)
print(embeddings.__class__)

In [None]:
import numpy as np

padding_emb = np.zeros((1, 300))

# vu la taille du dataset, nous vectoriserons les données
# à la volée (au dernier moment)
def vectorize(text, length = 100):
    embeddings = multibpemb.embed(text)
    if len(embeddings) > length:
        embeddings = embeddings[:length, :]
    elif len(embeddings) < length:
        padding    = np.repeat(padding_emb, length - len(embeddings), axis = 0)
        embeddings = np.concatenate((embeddings, padding), axis = 0)
        
    return embeddings

vectorize(text).shape

In [None]:
df_train.drop(
    ['review_id', 'product_id', 'review_title', 
     'product_category', 'reviewer_id', 'language'], 
    axis = 1, inplace = True
)
df_test.drop(
    ['review_id', 'product_id', 'review_title', 
     'product_category', 'reviewer_id', 'language'], 
    axis = 1, inplace = True
)

In [None]:
stars, text = df_train.iloc[5]
print(stars)
print(text)

In [None]:
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

class AmazonDataset(Dataset):
    def __init__(self, df):
        self.df = df
            
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        stars, text = df_train.iloc[idx]
        x = np.float32(vectorize(text))
        y = np.array([stars], dtype = 'float32')
        return x, y
    

train_loader = DataLoader(
    AmazonDataset(df_train), batch_size = 64, 
    num_workers = 4, shuffle = True, drop_last = True
)
test_loader  = DataLoader(
    AmazonDataset(df_test), batch_size = 64, 
    num_workers = 4, shuffle = True, drop_last = True
)

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import torch.optim as optim

class MLPRegression(nn.Module):
    def __init__(self, emb_size, hidden_size):
        super(MLPRegression, self).__init__()
        self.hidden_layer_1    = nn.Linear(emb_size, hidden_size)
        self.hidden_layer_2    = nn.Linear(hidden_size, hidden_size)
        self.output_layer      = nn.Linear(hidden_size, 1)
        self.hidden_transfer_1 = nn.ReLU()
        self.hidden_transfer_2 = nn.ReLU()

    def forward(self, x):
        # les embeddings des tokens du texte sont moyennés
        x = x.mean(dim = 1) 
        x = self.hidden_layer_1(x)
        x = self.hidden_transfer_1(x)
        x = self.hidden_layer_2(x)
        x = self.hidden_transfer_2(x)
        x = self.output_layer(x)
        return x

In [None]:
def evaluate(model, loader):
    loss_func = nn.L1Loss()
    score     = 0
    count     = 0
    with torch.no_grad():
        for X, y in loader:
            y_pred  = model(X)
            score  += loss_func(y, y_pred).item()
            count  += 1
        
    return score / count

def train(
  model, loss_func, optimizer, train_loader,
  test_loader, epochs
):
    for epoch in range(epochs):
        score = 0
        count = 0
        for X, y in train_loader:
            y_pred = model(X)
            loss   = loss_func(y_pred, y)
            score += loss.detach().item()
            count += 1
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        test_score = evaluate(model, test_loader)
        print(
            f'epoch {epoch:3} -> {score / count:5.3f} train L1 Loss',
            f'{epoch:3} -> {test_score:5.3f} test L1 Loss'
        )
                

epochs    = 5
model     = MLPRegression(emb_size = 300, hidden_size = 100)
loss_func = nn.L1Loss()
optimizer = optim.Adam(model.parameters())
train(
    model, loss_func, optimizer, 
    train_loader, test_loader, epochs
)

In [None]:
import math
import torch.nn as nn

class CNNRegression(nn.Module):
    def __init__(
        self, seq_len = 100, filters = 300, 
        window_size = 5, stride = 2, emb_len = 300,
        hidden_size = 100
    ):
        super(CNNRegression, self).__init__()
        
        self.seq_len     = seq_len
        self.filters     = filters
        self.kernel_size = window_size
        self.stride      = stride
        self.emb_len     = emb_len
    
        self.conv = nn.Conv1d(
            in_channels  = emb_len, 
            out_channels = filters, 
            kernel_size  = self.kernel_size, 
            stride       = stride
        )
        
        self.pool = nn.MaxPool1d(
            kernel_size = 4, 
            stride      = 4
        )
        
        self.conv_transfer   = nn.ReLU()
        self.hidden_layer    = nn.Linear(self.conv_output_count(), hidden_size)
        self.output_layer    = nn.Linear(hidden_size, 1)
        self.hidden_transfer = nn.ReLU()
    
    def conv_output_count(self):
        out_conv = ((self.seq_len - 1 * (self.kernel_size - 1) - 1) / self.stride) + 1
        out_conv = math.floor(out_conv)
        out_pool = ((out_conv - 4) / 4) + 1
        out_pool = math.floor(out_pool)
        
        return out_pool * self.filters

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = self.conv(x)
        x = self.pool(x)
        x = self.conv_transfer(x)
        x = x.reshape(len(x), -1)
        x = self.hidden_layer(x)
        x = self.hidden_transfer(x)
        x = self.output_layer(x)

        return x
    
epochs    = 10
model     = CNNRegression()
loss_func = nn.L1Loss()
optimizer = optim.Adam(model.parameters())
train(
    model, loss_func, optimizer, 
    train_loader, test_loader, epochs
)

### CNN - Sentiment detection

In [None]:
some_files = [
    './aclImdb_v1/aclImdb/train/pos/0_9.txt',
    './aclImdb_v1/aclImdb/train/pos/115_10.txt',
    './aclImdb_v1/aclImdb/train/pos/161_8.txt',
    './aclImdb_v1/aclImdb/train/pos/4002_8.txt',
    './aclImdb_v1/aclImdb/train/pos/3940_9.txt',
    './aclImdb_v1/aclImdb/train/pos/4888_8.txt',
    './aclImdb_v1/aclImdb/train/pos/10736_10.txt',
    './aclImdb_v1/aclImdb/train/pos/10696_7.txt'
]

for filepath in some_files:
    with open(filepath, 'r') as fs:
        comment = fs.read()
        print(comment)
        print('----')

In [None]:
from cleantext import clean
import re
from nltk.corpus import stopwords

en_stopwords = set(stopwords.words('english'))

def clean_text(text):
    composed_word_regex = r'-'
    br_regex            = r'<[^<>\/]+\/>'
    anon_regex          = r'\.{4,}'
    normalise_point     = r'\.'
    space_regex         = r'\s+'
    normalise_tags      = r'><'
    unslash             = r'\/'
    
    cleaned_text = re.sub(composed_word_regex, ' ', text)
    cleaned_text = re.sub(anon_regex, '<ANON>', cleaned_text)
    cleaned_text = re.sub(br_regex, '', cleaned_text)
    cleaned_text = re.sub(normalise_point, '. ', cleaned_text)
    cleaned_text = re.sub(unslash, ' ', cleaned_text)
    
    cleaned_text = clean(cleaned_text,
        fix_unicode                  = True,
        to_ascii                     = True,
        lower                        = True,
        no_line_breaks               = True,
        no_urls                      = True,
        no_emails                    = True,
        no_phone_numbers             = True,
        no_numbers                   = True,
        no_digits                    = True,
        no_currency_symbols          = True,
        no_punct                     = True,
        replace_with_punct           = "",
        replace_with_url             = "<URL>",
        replace_with_email           = "<EMAIL>",
        replace_with_phone_number    = "<PHONE>",
        replace_with_number          = "<NUMBER>",
        replace_with_digit           = "<DIGIT>",
        replace_with_currency_symbol = "<CUR>",
        lang                         = "en"
    )
    
    cleaned_text = re.sub(space_regex, ' ', cleaned_text)
    cleaned_text = re.sub(normalise_tags, ' ', cleaned_text)
    
    cleaned = ''
    for word in cleaned_text.split():
        if not word in en_stopwords:
            cleaned += word + ' '
    
    return cleaned_text

for filepath in some_files:
    with open(filepath, 'r') as fs:
        comment = fs.read()
        print(clean_text(comment))
        print('----')

In [None]:
import gensim.downloader as gensim_api
import numpy as np
from os import listdir
from os.path import isfile, join
import pickle

embedding_model = gensim_api.load('word2vec-google-news-300')

In [None]:
from torch.utils.data import TensorDataset
import torch
from nltk.tokenize import TweetTokenizer

tokenizer   = TweetTokenizer()
padding_emb = np.zeros((1, 300))

def get_vector(word):
    try:
        return embedding_model.get_vector(word)
    except KeyError:
        return np.zeros(300)

def process_file(filepath, seq_len):
    with open(filepath, 'r') as fs:
        comment = fs.read()
        
    embeddings = list()
    comment    = clean_text(comment)
    tokens     = tokenizer.tokenize(comment)
    if len(tokens) > seq_len:
        tokens = tokens[:seq_len]
        
    embeddings = np.array([get_vector(token) for token in tokens])
    if len(embeddings) < seq_len:
        padding    = np.repeat(padding_emb, seq_len - len(embeddings), axis = 0)
        embeddings = np.concatenate((embeddings, padding), axis = 0)
    
    return embeddings

def process_dir(dirpath, seq_len):
    files = list()
    for file in listdir(dirpath):
        if isfile(join(dirpath, file)):
            files.append(join(dirpath, file))
        
    samples = [process_file(path, seq_len) for path in files]
    samples = np.array(samples)
    if 'pos' in dirpath:
        labels = np.ones(len(samples))
    else:
        labels = np.zeros(len(samples))
        
    return samples, labels

def make_dataset(part = 'train', seq_len = 100):
    pos_samples, pos_labels = process_dir(
        f'./aclImdb_v1/aclImdb/{part}/pos', 
        seq_len
    )
    neg_samples, neg_labels = process_dir(
        f'./aclImdb_v1/aclImdb/{part}/neg', 
        seq_len
    )

    x = np.concatenate((pos_samples, neg_samples), axis = 0)
    y = np.concatenate((pos_labels, neg_labels))
    return TensorDataset(
        torch.as_tensor(x, dtype = torch.float), 
        torch.as_tensor(y, dtype = torch.long)
    )

if isfile('sentiment_dataset.pkl'):
    with open('sentiment_dataset.pkl', 'rb') as fs:
        train_dataset, test_dataset = pickle.load(fs)
else:
    train_dataset = make_dataset('train')
    test_dataset  = make_dataset('test')
    
    with open('sentiment_dataset.pkl', 'wb') as fs:
        pickle.dump([train_dataset, test_dataset], fs)

In [None]:
from torch.utils.data import DataLoader

train_loader = DataLoader(
    train_dataset, batch_size = 64, 
    num_workers = 4, shuffle = True, drop_last = True
)
test_loader  = DataLoader(
    test_dataset, batch_size = 64, 
    num_workers = 4, shuffle = True, drop_last = True
)

In [None]:
import math
import torch.nn as nn

class ConvLayer(nn.Module):
    def __init__(
        self, in_channels, out_channels, 
        kernel_size, stride, seq_len, pool_k
    ):
        super(ConvLayer, self).__init__()
        
        self.seq_len     = seq_len
        self.kernel_size = kernel_size
        self.stride      = stride
        self.transfer    = nn.ReLU()
        self.pool_k      = pool_k
        
        self.conv = nn.Conv1d(
            in_channels  = in_channels, 
            out_channels = out_channels, 
            kernel_size  = kernel_size, 
            stride       = stride
        )
        self.pool = nn.MaxPool1d(
            kernel_size = pool_k, 
            stride      = pool_k
        )
        
        
    def output_seq_len(self):
        out_conv = ((self.seq_len - self.kernel_size) / self.stride) + 1
        out_conv = math.floor(out_conv)
        out_pool = ((out_conv - self.pool_k) / self.pool_k) + 1
        out_pool = math.floor(out_pool)
        
        return out_pool
    
    def forward(self, x):
        x = self.conv(x)
        x = self.pool(x)
        x = self.transfer(x)
        return x
    
    
class CNN(nn.Module):
    def __init__(
        self, seq_len = 100, filters = 50, emb_len = 300,
        hidden_size = 100, output_size = 2
    ):
        super(CNN, self).__init__()
        
        self.conv_1 = ConvLayer(
            in_channels  = emb_len, 
            out_channels = filters,
            kernel_size  = 5,
            stride       = 1,
            seq_len      = seq_len,
            pool_k       = 4
        )
        self.conv_2 = ConvLayer(
            in_channels  = filters, 
            out_channels = filters,
            kernel_size  = 4,
            stride       = 1,
            seq_len      = self.conv_1.output_seq_len(),
            pool_k       = 2
        )
        self.conv_3 = ConvLayer(
            in_channels  = filters, 
            out_channels = filters,
            kernel_size  = 3,
            stride       = 1,
            seq_len      = self.conv_2.output_seq_len(),
            pool_k       = 2
        )
        
        mlp_feature_count    = self.conv_3.output_seq_len() * filters
        self.hidden_layer    = nn.Linear(mlp_feature_count, hidden_size)
        self.output_layer    = nn.Linear(hidden_size, output_size)
        self.hidden_transfer = nn.ReLU()
        self.output_transfer = nn.LogSoftmax(dim = 1)
        self.dropout_1       = nn.Dropout(0.8)
        self.dropout_2       = nn.Dropout(0.5)

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = self.conv_1(x)
        x = self.conv_2(x)
        x = self.conv_3(x)
        x = x.reshape(len(x), -1)
        x = self.dropout_1(x)
        x = self.hidden_layer(x)
        x = self.hidden_transfer(x)
        x = self.dropout_2(x)
        x = self.output_layer(x)
        x = self.output_transfer(x)

        return x

In [None]:
from torch.utils.data import DataLoader
import torch.optim as optim

def evaluate(model, test_loader):
    correct_pred = 0
    total_pred   = 0
    with torch.no_grad():
        for X, y in test_loader:
            y_pred        = model(X)
            y_pred_class  = y_pred.argmax(dim = 1)
            correct_pred += (y_pred_class == y).sum().item()
            total_pred   += len(y)

    return correct_pred / total_pred

def train(
  model, loss_func, optimizer, train_loader,
  test_loader, epochs
):
    correct_pred = 0
    total_pred   = 0
    for epoch in range(epochs):
        for X, y in train_loader:
            y_pred = model(X)
            loss   = loss_func(y_pred, y)
            
            y_pred_class  = y_pred.detach().argmax(dim = 1)
            correct_pred += (y_pred_class == y).sum().item()
            total_pred   += len(y)
            
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        train_accuracy = correct_pred / total_pred
        test_accuracy  = evaluate(model, test_loader)
        print(
            f'{epoch:3} -> {100 * train_accuracy:5.3f}% train accuracy',
            f'{epoch:3} -> {100 * test_accuracy:5.3f}% test accuracy'
        )

#epochs    = 10
#model     = CNN(filters = 300)
#loss_func = nn.NLLLoss()
#optimizer = optim.Adam(model.parameters())

#train(
#  model, loss_func, optimizer, 
#  train_loader, test_loader, 
#  epochs
#)

In [10]:
from torch import nn
from torch.nn import functional as F

class RNN(nn.Module):
    def __init__(self, emb_len = 300, hidden_size = 100, output_size = 2):
        super(RNN, self).__init__()
        self.rnn = nn.RNN(
            input_size   = emb_len, 
            hidden_size  = hidden_size, 
            num_layers   = 2,
            nonlinearity = 'relu',
            batch_first  = True
        )
        self.hidden_layer    = nn.Linear(hidden_size, hidden_size)
        self.output_layer    = nn.Linear(hidden_size, output_size)
        self.hidden_transfer = nn.ReLU()
        self.output_transfer = nn.LogSoftmax(dim = 1)

    def forward(self, x):
        output, _ = self.rnn(x)
        x = output[:,-1]
        x = self.hidden_layer(x)
        x = self.hidden_transfer(x)
        x = self.output_layer(x)
        x = self.output_transfer(x)
        return x
    
epochs    = 50
model     = RNN()
loss_func = nn.NLLLoss()
optimizer = optim.Adam(model.parameters())

train(
  model, loss_func, optimizer, 
  train_loader, test_loader, 
  epochs
)

  0 -> 58.141% train accuracy   0 -> 59.046% test accuracy
  1 -> 60.196% train accuracy   1 -> 72.700% test accuracy
  2 -> 57.569% train accuracy   2 -> 49.984% test accuracy
  3 -> 56.333% train accuracy   3 -> 56.138% test accuracy
  4 -> 56.581% train accuracy   4 -> 49.271% test accuracy
  5 -> 56.023% train accuracy   5 -> 56.546% test accuracy
  6 -> 55.734% train accuracy   6 -> 54.948% test accuracy
  7 -> 55.774% train accuracy   7 -> 51.542% test accuracy
  8 -> 55.462% train accuracy   8 -> 51.931% test accuracy
  9 -> 55.183% train accuracy   9 -> 56.370% test accuracy
 10 -> 55.187% train accuracy  10 -> 54.884% test accuracy
 11 -> 55.471% train accuracy  11 -> 60.581% test accuracy
 12 -> 55.892% train accuracy  12 -> 55.990% test accuracy
 13 -> 56.442% train accuracy  13 -> 72.083% test accuracy
 14 -> 57.357% train accuracy  14 -> 51.254% test accuracy
 15 -> 57.543% train accuracy  15 -> 60.942% test accuracy
 16 -> 58.484% train accuracy  16 -> 72.873% test accura

In [11]:
from torch.autograd import Variable

class LSTM(nn.Module):
    def __init__(self, emb_size = 300, hidden_size = 100, output_size = 2):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size

        self.lstm = nn.LSTM(
            input_size  = emb_size, 
            num_layers  = 1,
            hidden_size = hidden_size, 
            batch_first = True
        )
        
        self.hidden_layer    = nn.Linear(hidden_size, hidden_size)
        self.output_layer    = nn.Linear(hidden_size, output_size)
        self.hidden_transfer = nn.ReLU()
        self.output_transfer = nn.LogSoftmax(dim = 1)
    
    def forward(self, x):
        h_0 = Variable(torch.zeros(1, len(x), self.hidden_size))
        c_0 = Variable(torch.zeros(1, len(x), self.hidden_size))
        
        out, (h_n, c_n) = self.lstm(x, (h_0, c_0))
        x = out[:,-1]
        
        x = self.hidden_layer(x)
        x = self.hidden_transfer(x)
        x = self.output_layer(x)
        x = self.output_transfer(x)
        return x
    
    
epochs    = 25
model     = LSTM()
loss_func = nn.NLLLoss()
optimizer = optim.Adam(model.parameters())

train(
  model, loss_func, optimizer, 
  train_loader, test_loader, 
  epochs
)

  0 -> 53.870% train accuracy   0 -> 54.772% test accuracy
  1 -> 54.419% train accuracy   1 -> 52.901% test accuracy
  2 -> 57.282% train accuracy   2 -> 77.680% test accuracy
  3 -> 62.706% train accuracy   3 -> 80.629% test accuracy
  4 -> 66.462% train accuracy   4 -> 78.946% test accuracy
  5 -> 69.145% train accuracy   5 -> 81.791% test accuracy
  6 -> 71.124% train accuracy   6 -> 82.740% test accuracy
  7 -> 72.699% train accuracy   7 -> 81.222% test accuracy
  8 -> 73.972% train accuracy   8 -> 81.931% test accuracy
  9 -> 75.058% train accuracy   9 -> 82.628% test accuracy
 10 -> 76.045% train accuracy  10 -> 83.241% test accuracy
 11 -> 76.912% train accuracy  11 -> 83.397% test accuracy
 12 -> 77.670% train accuracy  12 -> 82.620% test accuracy
 13 -> 78.408% train accuracy  13 -> 82.933% test accuracy
 14 -> 79.102% train accuracy  14 -> 82.059% test accuracy


KeyboardInterrupt: 

In [None]:
class LSTM(nn.Module):
    def __init__(self, emb_size = 300, hidden_size = 100, output_size = 2):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size

        self.lstm = nn.LSTM(
            input_size    = emb_size, 
            num_layers    = 1,
            hidden_size   = hidden_size, 
            batch_first   = True,
            bidirectional = True
        )
        
        self.hidden_layer    = nn.Linear(hidden_size * 2, hidden_size * 2)
        self.output_layer    = nn.Linear(hidden_size * 2, output_size)
        self.hidden_transfer = nn.ReLU()
        self.output_transfer = nn.LogSoftmax(dim = 1)
    
    def forward(self, x):
        batch_size = len(x)
        h_0 = Variable(torch.zeros(2, batch_size, self.hidden_size))
        c_0 = Variable(torch.zeros(2, batch_size, self.hidden_size))
        
        out, (h_n, c_n) = self.lstm(x, (h_0, c_0))
        x = out[:,-1]
        x = self.hidden_layer(x)
        x = self.hidden_transfer(x)
        x = self.output_layer(x)
        x = self.output_transfer(x)
        return x
    
    
epochs    = 10
model     = LSTM()
loss_func = nn.NLLLoss()
optimizer = optim.Adam(model.parameters())

train(
  model, loss_func, optimizer, 
  train_loader, test_loader, 
  epochs
)

#### Récurrent Neural Network (RNN)

In [None]:
import torchtext
from torch.utils.data import DataLoader
from torchtext.data import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
            
train_dataset, test_dataset = torchtext.datasets.AG_NEWS()
tokenizer = get_tokenizer('basic_english')

def build_vocabulary(dataset):
    for _, text in dataset:
        yield tokenizer(text)

vocab = build_vocab_from_iterator(
    build_vocabulary(train_dataset), 
    min_freq = 1, 
    specials = ['<unk>']
)
vocab.set_default_index(vocab['<unk>'])

In [None]:
print(len(vocab))
tokens  = tokenizer('RNN is a neural network that is able to understand sequences')
indexes = vocab(tokens)
print(tokens)
print(indexes)

In [None]:
from torchtext.data.functional import to_map_style_dataset
import torch

train_dataset  = to_map_style_dataset(train_dataset)
test_dataset   = to_map_style_dataset(test_dataset)
target_classes = ["World", "Sports", "Business", "Sci/Tech"]
max_words      = 100

def vectorize_batch(batch):
    y, X = list(zip(*batch))
    X = [vocab(tokenizer(text)) for text in X]
    
    for index, tokens in enumerate(X):
        if len(tokens) < max_words:
            X[index] = tokens + ([0] * (max_words-len(tokens)))
        else:
            X[index] = tokens[:max_words]
        
    return torch.tensor(X, dtype = torch.int32), torch.tensor(y) - 1

train_loader = DataLoader(
    train_dataset, 
    batch_size = 1024, 
    collate_fn = vectorize_batch, 
    shuffle    = True
)
test_loader = DataLoader(
    test_dataset, 
    batch_size = 1024, 
    collate_fn = vectorize_batch
)

In [None]:
from torch import nn
from torch.nn import functional as F

embed_len  = 50
hidden_dim = 50

class RNN(nn.Module):
    def __init__(self):
        super(RNN, self).__init__()
        self.embedding_layer = nn.Embedding(
            num_embeddings = len(vocab), 
            embedding_dim  = embed_len
        )
        self.rnn = nn.RNN(
            input_size   = embed_len, 
            hidden_size  = hidden_dim, 
            num_layers   = 1,
            nonlinearity = 'relu',
            batch_first  = True
        )
        self.linear   = nn.Linear(
            hidden_dim, 
            len(target_classes)
        )
        self.output_transfer = nn.LogSoftmax(dim = 1)

    def forward(self, x):
        x = self.embedding_layer(x)
        output, hidden = self.rnn(x)
        x = output[:,-1]
        x = self.linear(x)
        return self.output_transfer(x)

In [None]:
import torch.optim as optim

def evaluate(model, test_loader):
  correct_pred = 0
  total_pred   = 0
  with torch.no_grad():
    for X, y in test_loader:
      y_pred        = model(X)
      y_pred_class  = y_pred.argmax(dim = 1)
      correct_pred += (y_pred_class == y).sum().item()
      total_pred   += len(y)

  return correct_pred / total_pred

def train(
  model, loss_func, optimizer, train_loader,
  test_loader, epochs
):
  for epoch in range(epochs):
    for X, y in train_loader:
      y_pred = model(X)
      loss   = loss_func(y_pred, y)
      loss.backward()
      optimizer.step()
      optimizer.zero_grad()

    train_accuracy = evaluate(model, train_loader)
    test_accuracy  = evaluate(model, test_loader)
    print(
      f'{epoch:3} -> {100 * train_accuracy:5.3f}% train accuracy',
      f'{epoch:3} -> {100 * test_accuracy:5.3f}% test accuracy'
    )

epochs        = 20
learning_rate = 1e-3
model         = RNN()
loss_func     = nn.NLLLoss()
optimizer     = optim.Adam(
  params = model.parameters(),
  lr     = learning_rate
)
train(
  model, loss_func, optimizer, 
  train_loader, test_loader, epochs
)

In [None]:
import torch.nn.functional as F

def make_predictions(model, test_loader):
    y_true, y_pred = [], []
    with torch.no_grad():
        for X, y in test_loader:
            preds = model(X)
            y_pred.append(preds)
            y_true.append(y)

    y_true, y_pred = torch.cat(y_true), torch.cat(y_pred)
    y_pred         = y_pred.argmax(dim = -1)
    y_true, y_pred = y_true.numpy(), y_pred.numpy()
    
    return y_true, y_pred

y_true, y_pred = make_predictions(model, test_loader)

In [None]:
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

print(f"accuracy on testset : {accuracy_score(y_true, y_pred)}")
print("\nclassification report : ")
print(classification_report(y_true, y_pred, target_names = target_classes))
print("\nconfusion matrix : ")
print(confusion_matrix(y_true, y_pred))

In [None]:
import scikitplot as skplt
import matplotlib.pyplot as plt
import numpy as np

skplt.metrics.plot_confusion_matrix(
    [target_classes[i] for i in y_true], 
    [target_classes[i] for i in y_pred],
    normalize  = True,
    title      = "Confusion Matrix",
    cmap       = "Purples",
    hide_zeros = True,
    figsize    = (5,5)
);
plt.xticks(rotation = 90);

In [None]:
from lime import lime_text

def vectorize_text(text):
    tokens_idx = vocab(tokenizer(text))

    if len(tokens_idx) < max_words:
        x = tokens_idx + ([0] * (max_words - len(tokens_idx)))
    else:
        x = tokens_idx[:max_words]

    return x

def get_targets_proba(texts):
    X    = [vectorize_text(text) for text in texts]
    pred = model(torch.tensor(X, dtype = torch.int32))
    pred = torch.exp(pred)
    return pred.detach().numpy()

label, text = test_dataset[0]
label      -= 1

explainer = lime_text.LimeTextExplainer(
    class_names = target_classes, 
    verbose     = True
)

x = vectorize_text(text)
x = torch.tensor([x], dtype = torch.int32)

with torch.no_grad():
    pred = model(x).argmax(dim = -1)[0]

print("prediction : ", target_classes[pred])
print("truth :      ", target_classes[label])

explanation = explainer.explain_instance(
    text, 
    classifier_fn = get_targets_proba,
    labels        = [label]
)

explanation.show_in_notebook()

In [None]:
import torch.nn as nn
from torch.autograd import Variable
import torch

class LSTM(nn.Module):
    def __init__(self, emb_size = 300, hidden_size = 64):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size

        self.lstm = nn.LSTM(
            input_size  = emb_size, 
            hidden_size = hidden_size, 
            batch_first = True
        )
        
        self.lstm_transfer   = nn.ReLU()
        self.hidden_layer    = nn.Linear(hidden_size, 64)
        self.output_layer    = nn.Linear(64, 2)
        self.hidden_transfer = nn.ReLU()
        self.output_transfer = nn.LogSoftmax(dim = 1)
    
    def forward(self, x):
        h_0 = Variable(torch.zeros(1, len(x), self.hidden_size))
        c_0 = Variable(torch.zeros(1, len(x), self.hidden_size))
        
        out, (h_n, c_n) = self.lstm(x, (h_0, c_0))
        x = self.lstm_transfer(h_n)
        x = x.reshape(-1, self.hidden_size)
        x = self.hidden_layer(x)
        x = self.hidden_transfer(x)
        x = self.output_layer(x)
        x = self.output_transfer(x)

        return x

In [None]:
import pickle
with open('sentiment_dataset.pkl', 'rb') as fs:
    train_dataset, test_dataset = pickle.load(fs)

In [None]:
from torch.utils.data import DataLoader
import torch.optim as optim

def evaluate(model, dataset, batch_size):
    dataloader = DataLoader(dataset, batch_size = batch_size, shuffle = False)

    correct_pred = 0
    total_pred   = 0
    with torch.no_grad():
        for X, y in dataloader:
            y_pred        = model(X)
            y_pred_class  = y_pred.argmax(dim = 1)
            correct_pred += (y_pred_class == y).sum().item()
            total_pred   += len(y)

    return correct_pred / total_pred

def train(
  model, loss_func, optimizer, train_dataset,
  test_dataset, epochs, batch_size
):
    dataloader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)

    for epoch in range(epochs):
        for X, y in dataloader:
            y_pred = model(X)
            loss   = loss_func(y_pred, y)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        train_accuracy = evaluate(model, train_dataset, batch_size)
        test_accuracy  = evaluate(model, test_dataset, batch_size)
        print(
            f'{epoch:3} -> {100 * train_accuracy:5.3f}% train accuracy',
            f'{epoch:3} -> {100 * test_accuracy:5.3f}% test accuracy'
        )

epochs        = 50
learning_rate = 1e-3
batch_size    = 64
model         = LSTM()
loss_func     = nn.NLLLoss()
optimizer     = optim.Adam(
  params = model.parameters(),
  lr     = learning_rate
)

train(
  model, loss_func, optimizer, 
  train_dataset, test_dataset, 
  epochs, batch_size
)

In [None]:
!pip install stanza sentence-transformers transformers

In [None]:
from transformers          import pipeline
from sentence_transformers import SentenceTransformer

summarizer = pipeline('summarization', model = 'philschmid/distilbart-cnn-12-6-samsum')

In [None]:
text = """
Hurricane Ian continues to rain destruction onto the Florida. 
Millions have been left dark in the Sunshine State and residents 
along the low-lying peninsula’s many barrier islands have been 
cut off from the mainland as vehicles and whole roads were 
swallowed by the massive Category 4 storm.
"""

print(summarizer(
    text, 
    min_length = 10, 
    max_length = 25
)[0]['summary_text'])

In [None]:
from operator import itemgetter

zero_shot_text_classificiation = pipeline(
    'zero-shot-classification', 
    model = 'valhalla/distilbart-mnli-12-6'
)

def argmax(alist):
    return max(enumerate(alist), key = itemgetter(1))

def text_classification(text):
    res      = zero_shot_text_classificiation(text, classes)
    index, _ = argmax(res['scores'])
    label    = res['labels'][index]
    return label

classes = [
      'This product description refers to a book or an ebook',
      'This product description does not refer to a book and does not refer to an ebook'
]

text_1 = "This is the best seller book of the year, buy it now!"
text_2 = "This T-shirt will make you good looking and smarter everyday, buy it now!"

print(f'text_1: {text_classification(text_1)}')
print(f'text_2: {text_classification(text_2)}')

In [None]:
from datasets import load_dataset

dataset = load_dataset("yelp_review_full")
dataset["train"][0]

In [None]:
from transformers import AutoModelForSequenceClassification
from transformers import AutoTokenizer
from transformers import TrainingArguments, Trainer
import numpy as np
import evaluate

tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
model     = AutoModelForSequenceClassification.from_pretrained(
    "distilbert-base-uncased", 
    num_labels = 5
)

def tokenize_function(examples):
    return tokenizer(
        examples["text"], 
        padding    = "max_length", 
        truncation = True
    )

tokenized_dataset   = dataset.map(tokenize_function, batched = True)
small_train_dataset = tokenized_dataset["train"].shuffle().select(range(1000))
small_eval_dataset  = tokenized_dataset["test"].shuffle().select(range(1000))

training_args     = TrainingArguments(
    output_dir          = "test_trainer",
    evaluation_strategy = "epoch"
)
metric = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions    = np.argmax(logits, axis = -1)
    return metric.compute(predictions = predictions, references = labels)

trainer = Trainer(
    model           = model,
    args            = training_args,
    train_dataset   = small_train_dataset,
    eval_dataset    = small_eval_dataset,
    compute_metrics = compute_metrics
)

trainer.train()

In [None]:
dataset['train'][0]

In [None]:
from cleantext import clean
import re
from datasets import load_dataset

dataset = load_dataset('SetFit/enron_spam')

def clean_text(texts):
    normalise_point  = r'\.'
    normalise_tags   = r'><'
    unslash          = r'\/'
    space_norm_regex = r'\s+'
    
    cleaned_texts = []
    for text in texts:
        cleaned_text = re.sub(normalise_point, '. ', text)
        cleaned_text = re.sub(unslash, ' ', cleaned_text)

        cleaned_text = clean(cleaned_text,
            fix_unicode                  = True,
            to_ascii                     = True,
            lower                        = True,
            no_line_breaks               = True,
            no_urls                      = True,
            no_emails                    = True,
            no_phone_numbers             = True,
            no_numbers                   = True,
            no_digits                    = True,
            no_currency_symbols          = True,
            no_punct                     = False,
            replace_with_punct           = "",
            replace_with_url             = "<url>",
            replace_with_email           = "<email>",
            replace_with_phone_number    = "<phone>",
            replace_with_number          = "<number>",
            replace_with_digit           = "<digit>",
            replace_with_currency_symbol = "<cur>",
            lang                         = "en"
        )

        cleaned_text = re.sub(normalise_tags, ' ', cleaned_text)
        cleaned_text = re.sub(space_norm_regex, ' ', cleaned_text)
        cleaned_texts.append(cleaned_text)

    return cleaned_texts

def tokenize_function(examples):
    return tokenizer(
        clean_text(examples["text"]), 
        padding    = "max_length", 
        truncation = True
    )

tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
model     = AutoModelForSequenceClassification.from_pretrained(
    "distilbert-base-uncased", 
    num_labels = 2
)

tokenizer.add_tokens(['<url>', '<email>', '<phone>', '<number>', '<digit>', '<cur>'])
model.resize_token_embeddings(len(tokenizer))

tokenized_dataset   = dataset.map(tokenize_function, batched = True)
small_train_dataset = tokenized_dataset["train"].shuffle().select(range(1000))
small_eval_dataset  = tokenized_dataset["test"].shuffle().select(range(1000))

training_args     = TrainingArguments(
    output_dir          = "test_trainer",
    evaluation_strategy = "epoch"
)
metric = evaluate.load("accuracy")

trainer = Trainer(
    model           = model,
    args            = training_args,
    train_dataset   = small_train_dataset,
    eval_dataset    = small_eval_dataset,
    compute_metrics = compute_metrics
)

trainer.train()

In [None]:
print(len(dataset['train']))
print(len(dataset['test']))
print(dataset['train'][0])

In [None]:
from datasets import load_dataset

def clean_text(texts):
    normalise_point    = r'\.'
    normalise_tags     = r'><'
    space_norm_regex   = r'\s+'
    user_regex         = r'@\S+'
    
    cleaned_texts = []
    for text in texts:
        cleaned_text = re.sub(normalise_point, '. ', text)

        cleaned_text = clean(cleaned_text,
            fix_unicode                  = True,
            to_ascii                     = True,
            lower                        = True,
            no_line_breaks               = True,
            no_urls                      = True,
            no_emails                    = True,
            no_phone_numbers             = True,
            no_numbers                   = True,
            no_digits                    = True,
            no_currency_symbols          = True,
            no_punct                     = False,
            replace_with_punct           = "",
            replace_with_url             = "<url>",
            replace_with_email           = "<email>",
            replace_with_phone_number    = "<phone>",
            replace_with_number          = "<number>",
            replace_with_digit           = "<digit>",
            replace_with_currency_symbol = "<cur>",
            lang                         = "en"
        )
        
        cleaned_text = re.sub(user_regex, 'user', cleaned_text)
        cleaned_text = re.sub(normalise_tags, ' ', cleaned_text)
        cleaned_text = re.sub(space_norm_regex, ' ', cleaned_text)
        cleaned_texts.append(cleaned_text)

    return cleaned_texts

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions    = np.argmax(logits, axis = -1)
    return metric.compute(predictions = predictions, references = labels, average = 'macro')

def tokenize_function(examples):
    return tokenizer(
        clean_text(examples["text"]), 
        padding        = "max_length", 
        truncation     = True
    )

def remap_labels(sample):
    if sample['feeling'] == 0:
        return { 'label': 0, 'label_text': 'negative' }
    else:
        return { 'label': 1, 'label_text': 'positive' }

dataset   = load_dataset('carblacac/twitter-sentiment-analysis')
tokenizer = AutoTokenizer.from_pretrained('bhadresh-savani/distilbert-base-uncased-sentiment-sst2')
model     = AutoModelForSequenceClassification.from_pretrained(
    'bhadresh-savani/distilbert-base-uncased-sentiment-sst2', 
    num_labels = 2
)

dataset = dataset.map(remap_labels)
tokenizer.add_tokens(['<url>', '<email>', '<phone>', '<number>', '<digit>', '<cur>', 'user'])
model.resize_token_embeddings(len(tokenizer))

tokenized_dataset   = dataset.map(tokenize_function, batched = True)
small_train_dataset = tokenized_dataset["train"].shuffle().select(range(1000))
small_eval_dataset  = tokenized_dataset["test"].shuffle().select(range(1000))

training_args     = TrainingArguments(
    output_dir          = "test_trainer",
    evaluation_strategy = "epoch"
)
metric = evaluate.load('recall')

trainer = Trainer(
    model           = model,
    args            = training_args,
    train_dataset   = small_train_dataset,
    eval_dataset    = small_eval_dataset,
    compute_metrics = compute_metrics
)

trainer.train()

In [None]:
dataset['train'][0]

In [None]:
dataset = load_dataset('SetFit/enron_spam')
dataset['train'][0]

In [None]:
facebook/blenderbot-400M-distill

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForQuestionAnswering, pipeline

model_name = "mrm8488/longformer-base-4096-finetuned-squadv2"
tokenizer  = AutoTokenizer.from_pretrained(model_name)
model      = AutoModelForQuestionAnswering.from_pretrained(model_name)
qa         = pipeline("question-answering", model = model, tokenizer = tokenizer)

In [None]:
textbook = "To create a new account you need to : click on the sign up on the top right of the website; then fill the form with your full name, email address, shipping address, credit card and credit card information; then verify your email address by following the instructions in the email you've just received"
question = "I've just filled the form but my account is not created. What should I do?"

qa({"question": question, "context": textbook})

In [None]:
from datasets import load_dataset

dataset = load_dataset('bitext/customer-support-intent-dataset')
dataset['train'][0]

In [None]:
max_len       = 256
batch_size    = 8
epochs        = 3
learning_rate = 1e-05

alllabels = set()
def map_labels(sample):
    alllabels.add(sample['intent'])
    alllabels.add(sample['category'])
    
    tags = sample['tags']
    for c in tags:
        alllabels.add(c)
        
dataset.map(map_labels)

index     = 0
label_map = dict()
for label in alllabels:
    label_map[label] = index
    index           += 1

def remap_labels(sample):
    labels = [0] * len(label_map)
    labels[label_map[sample['intent']]]   = 1
    labels[label_map[sample['category']]] = 1
    
    tags = sample['tags']
    for c in tags:
        labels[label_map[c]] = 1

    return { 'label': labels }

dataset = dataset.map(remap_labels)

In [None]:
import transformers
import torch
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
from transformers import DistilBertModel, DistilBertTokenizer
from tqdm import tqdm

tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')

class MultiLabelDataset(Dataset):
    def __init__(self, dataset, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.texts     = dataset.data['utterance']
        self.targets   = dataset.data['label']
        self.max_len   = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, index):
        text   = self.texts[index].as_py()
        target = self.targets[index].as_py()

        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens = True,
            max_length         = self.max_len,
            padding            = 'max_length'
        )
        
        ids  = inputs['input_ids']
        mask = inputs['attention_mask']
        
        X = [
            torch.tensor(ids, dtype = torch.long),
            torch.tensor(mask, dtype = torch.long)
        ]
        y = torch.tensor(target, dtype = torch.float)

        return X, y
 
train_dataset = MultiLabelDataset(
    dataset['train'].shuffle().select(range(1000)), 
    tokenizer, max_len
)
test_dataset  = MultiLabelDataset(
    dataset['test'], 
    tokenizer, max_len
)

In [None]:
from torch import nn
from torch import optim

class MultiLabelDistilBERTClass(nn.Module):
    def __init__(self, num_labels):
        super(MultiLabelDistilBERTClass, self).__init__()
        self.distilbert      = DistilBertModel.from_pretrained('distilbert-base-uncased')
        self.hidden          = nn.Linear(768, 768)
        self.hidden_transfer = nn.ReLU()
        self.dropout         = nn.Dropout(0.3)
        self.output          = nn.Linear(768, num_labels)

    def forward(self, x):
        input_ids      = x[0]
        attention_mask = x[1]
        x = self.distilbert(
            input_ids      = input_ids, 
            attention_mask = attention_mask
        )
        
        x = x[0][:, 0]
        x = self.hidden(x)
        x = self.hidden_transfer(x)
        x = self.dropout(x)
        x = self.output(x)
        return x

model     = MultiLabelDistilBERTClass(len(alllabels))
optimizer = optim.Adam(
    params = model.parameters(), 
    lr     = learning_rate
)
loss_func = nn.BCEWithLogitsLoss()

In [None]:
def evaluate(model, dataset, batch_size):
    dataloader = DataLoader(dataset, batch_size = batch_size, shuffle = False)

    correct_pred = 0
    total_pred   = 0
    with torch.no_grad():
        for X, y in dataloader:
            y_pred        = model(X)
            y_pred_class  = y_pred.argmax(dim = 1)
            y_class       = y.argmax(dim = 1)
            correct_pred += (y_pred_class == y_class).sum().item()
            total_pred   += len(y)

    return correct_pred / total_pred

def train(
  model, loss_func, optimizer, train_dataset,
  test_dataset, epochs, batch_size
):
    dataloader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)

    for epoch in range(epochs):
        for X, y in dataloader:
            y_pred = model(X)
            loss   = loss_func(y_pred, y)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        train_accuracy = evaluate(model, train_dataset, batch_size)
        test_accuracy  = evaluate(model, test_dataset, batch_size)
        print(
            f'{epoch:3} -> {100 * train_accuracy:5.3f}% train accuracy',
            f'{epoch:3} -> {100 * test_accuracy:5.3f}% test accuracy'
        )
        
train(
  model, loss_func, optimizer, train_dataset,
  test_dataset, epochs, batch_size
)