# Intoduccion

El siguiente proyecto tiene por objetivo el uso de ***redes neuronales*** con el fin de realizar clasificacion de texto y reconocer entidades cubanas. Con entidades cubanas nos referimos a cualquier ente que pertenezca a Cuba, pueden ser desde animales autoctonos, instituciones, personas, marcas de empresas, productos, platos gastronomicos, etc. cualquier cosa que lleve el sello de cubania, la idea es decir si es una entidad cubana y de que tipo. Las entidades cubanas se encuentran en el texto en forma de frases, por ejemplo: Jose Marti, Republica de Cuba, etc. Entre los tipos de entidades cubanas se encuentran *ley*, *organizacion*, *persona*, etc., el tipo *otro* es utilizado para clasificar cualquier palabra no relacionada con una entidad cubana. Como se puede apreciar tal tarea es casi imposible de cometer, principalmente por el tamano del espacio de entidades cubanas y los distintos tipos de las mismas, clasificar todas las entidades cubanas en texto es una tarea titanica, no digamos ya crear un buen clasificador. Sin embargo, en este proyecto intentamos, con un dataset bastante modesto, realizar ese objetivo. Se probaron varios modelos, los cuales veremos desde el mas simple hasta el mas complejo que es el estado del arte.

# Baseline

El primer modelo es un *baseline*, la idea es crear un clasificador que dado el trainset, le asignara a cada palabra la categoria en la que esta aparezca mas, posteriormente para clasificar el testset, si la palabra se encuentra almacenada y clasificada con la categoria en la que mas se repite en el trainset, se devuelve esta categoria, si no se devuelve la categoria de palabra que no pertenece a entidades cubanas, es decir *otro*.

In [3]:
import sklearn.base
from sklearn.metrics import f1_score, make_scorer, precision_score
from keras.models import Sequential
from keras.layers import Embedding, Input
from keras.utils import to_categorical
from keras_contrib.layers import CRF
from keras_contrib.losses import crf_loss
from keras_contrib.metrics import crf_viterbi_accuracy
from keras_preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split, RandomizedSearchCV, learning_curve
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from keras.wrappers.scikit_learn import KerasClassifier
import sklearn_crfsuite
import scipy.stats
from typing import List, Tuple, Dict, Any

Simple metodo que lee el dataset y lo devuelve.

In [4]:
def read_tsv(name: str):
	data = None
	with open(name, 'r', encoding='utf-8') as file:
		data = file.read()
	return data

Metodo que transforma de la representacion del dataset a una representacion **BIO**, la representacion **BIO** debe su nombre a *begin*, *in*, *other*, la idea es poder conocer que frases de un texto son consideradas entidades cubanas, en este caso la primera palabra de la frase sera *taggeada* (del spanglish *taggear* que se deriba del ingles *tag* y tiene por significado categorizar, clasificar, marcar, etc.) de *begin* y las restantes palabras que se encuentren en la frase seran *taggueadas* de *in*. De esta forma, por ejemplo, las palabras de una organizacion como pudiera ser Union de Jovenes Comunistas, terminaria con las palabras categorizadas del siguiente modo: `Union:begin-ORG`, `de:in-ORG`, `Jovenes:in-ORG`, `Comunistas:in-ORG`. En donde *ORG* se refiere a organizacion, de esta forma podemos conocer que esta entidad es cubana y es una organizacion. Esta forma de clasificar tiene la ventaja de permitirnos conocer donde inicia, termina y las palabras internas de una entidad.

In [5]:
def transform_tsv2BIO(name: str) -> Tuple[List[str], List[str]]:
	words = []
	tags = []
	actual: str = ''
	with open(name, 'r', encoding='utf-8') as file:
		for item in file.readlines():
			word: str
			tag: str
			word, tag = item.replace('\n', '').split('\t')
			if tag != 'O':
				words_in = word.split()
				words.append(words_in.pop(0))
				tags.append(f'B-{tag}')
				for word_in in words_in:
					words.append(word_in)
					tags.append(f'I-{tag}')
			else:
				words.append(word)
				tags.append(tag)

	return words, tags

Elimina las categorias que no pertenezcan a organizacion.

In [6]:
def del_not_org_tags(tags: List[str]) -> List[str]:
	n_tags = []
	tag: str
	for tag in tags:
		if tag != 'O' and tag.split('-')[1] != 'ORG':
			n_tags.append('O')
		else: n_tags.append(tag)
	return n_tags

Transforma las categorias en numeros, categorias numericas, de esta forma se podra entrenar las *redes neuronales* mas adelante.

In [7]:
def to_categories(a_list: List[str]) -> List[int]:
	cats = {}
	index = 0
	other_list = []
	for tag in a_list:
		try:
			other_list.append(cats[tag])
		except:
			cats[tag] = index
			index += 1
			other_list.append(cats[tag])

	return other_list

In [8]:
def to_feature_array(feature_names: List[str], feature_vectors: List[Dict[str, Any]]):

	new_features = []
	for vector in feature_vectors:
		new_features.append([vector.get(name, 0) for name in feature_names])

	return new_features

Dado un texto como una sola cadena lo divide en oraciones por los puntos. Tambien utilizado para dividir el texto por cantidad de oraciones, util para transformar la cadena del texto en trainset y testset.

In [9]:
class SentenceGetter:

	def __init__(self, **kwargs):
		if kwargs.get('words', False):
			self.text = self.to_sentences(kwargs['words'])
		else: raise Exception('incorrect arguments')

	@staticmethod
	def to_sentences(sents: List[str]) -> List[List[str]]:
		resp = []
		sents_cop = sents
		for times in range(sents.count('.')):
			part = sents_cop[: sents_cop.index('.') + 1]
			resp.append(part)
			sents_cop = sents_cop[sents_cop.index('.') + 1:]
		return resp

	def split(self, number: int) -> Tuple[List[str], List[str]]:
		part1, part2 = self.text[: number], self.text[number:]
		t1 = []
		ret1 = [t1.extend(ls) for ls in part1][0]
		t2 = []
		ret2 = [t2.extend(ls) for ls in part2][0]

		return t1, t2

In [10]:
def to_tags(vectors, tags_index: dict):
	vektor = list(vectors)
	tags = []
	index2tags = {value: key for key, value in tags_index.items()}
	for sentence in vektor:
		sent = []
		for vec in sentence:
			sent.append(index2tags[list(vec).index(1.0)])
		tags.append(sent)

	new_tags = []

	for el in tags:
		new_tags.extend(el)

	return new_tags

In [11]:
def cross_val(model_maker, train: np.ndarray, target: list, cv=3, epochs=30,
			  batch_size=32, validation_split=0.1):
	model: keras.Model
	num_validation_samples = len(train) // cv
	validation_scores = []

	for fold in range(cv):
		validation_data = train[num_validation_samples * fold:
							   num_validation_samples * (fold + 1)]
		target_validation_data = target[num_validation_samples * fold:
							   num_validation_samples * (fold + 1)]

		training_data = train[:num_validation_samples * cv].tolist() + \
			train[num_validation_samples * (cv + 1):].tolist()
		training_data = np.array(training_data)

		target_training_data = target[:num_validation_samples * cv] + \
			target[num_validation_samples * (cv + 1):]

		model = model_maker()
		model.fit(training_data, np.array(target_training_data), epochs=epochs,
				  batch_size=batch_size, validation_split=validation_split)

		validation_score = model.evaluate(validation_data,
										  np.array(target_validation_data))
		validation_scores.append(validation_score)

	return sum([el[0] for el in validation_scores]) / len(validation_scores), \
		sum([el[1] for el in validation_scores]) / len(validation_scores)

Clase encargada de como se hablo al inicio de la seccion de clasificar las palabras. El *baseline* en si mismo.

In [12]:
class MemoryTagger(sklearn.base.BaseEstimator):

	def fit(self, X, y):
		'''
		Expects a list of words as X and a list of tags as y.
		'''
		voc = {}
		self.tags = []
		for x, t in zip(X, y):
			if t not in self.tags:
				self.tags.append(t)
			if x in voc:
				if t in voc[x]:
					voc[x][t] += 1
				else:
					voc[x][t] = 1
			else:
				voc[x] = {t: 1}
		self.memory = {}
		for k, d in voc.items():
			self.memory[k] = max(d, key=d.get)

	def predict(self, X, y=None):
		'''
		Predict the the tag from memory. If word is unknown, predict 'O'.
		'''
		return [self.memory.get(x, 'O') for x in X]

Se carga el *corpus* (texto clasificado) y lo dividimos en trainset y testset, para el testset utilizaremos las palabras de las ultimas 20 oraciones.

In [13]:
words, tags = transform_tsv2BIO('code/corpus.tsv')

all_words = list(set(words))
all_tags = list(set(tags))

n_words = len(all_words)
n_tags = len(all_tags)

sent_get = SentenceGetter(words=words)
w_train, w_test = sent_get.split(-20)
t_train, t_test = tags[:len(w_train)], tags[len(w_train):]

In [14]:
model = MemoryTagger()
model.fit(w_train, t_train)

pred = model.predict(w_test)

print(precision_score(t_test, pred, average='weighted'))

0.9105711385651969
  precision = _prf_divide(tp_sum, pred_sum,


# Conditional Random Fields

[Conditional Random Fields][1] fue una tecnica muy utilizada en procesamiento de lenguaje natural y clasificaciond de textos antes de que se extendiera el uso de las redes neuronales, aun puede verse esta tecnica comparandose con el estado del arte. Para aplicarla al problema que tenemos creamos una representacion vectorial de las palabras utilizando caracteristicas de la palabra en si misma.

[1]: https://en.wikipedia.org/wiki/Conditional_random_field

In [15]:
sentence = List[str]

In [16]:
def word2features(sent: sentence, i: int) -> Dict:
	# todo modificar los features para obtener mayor presicion con un contexto
	#  mas grande
	word = sent[i]

	features = {
        'bias': 1.0,
        'word.lower()': word.lower(),
        'word[-3:]': word[-3:],
        'word[-2:]': word[-2:],
        'word.isupper()': word.isupper(),
        'word.istitle()': word.istitle(),
        'word.isdigit()': word.isdigit(),
    }
	if i > 0:
		word1 = sent[i - 1]
		features.update({
            '-1:word.lower()': word1.lower(),
            '-1:word.istitle()': word1.istitle(),
            '-1:word.isupper()': word1.isupper(),
		})
	else:
		features['BOS'] = True

	if i < len(sent) - 1:
		word1 = sent[i + 1]
		features.update({
            '+1:word.lower()': word1.lower(),
            '+1:word.istitle()': word1.istitle(),
            '+1:word.isupper()': word1.isupper(),
        })
	else:
		features['EOS'] = True

	return features

Transformamos en dataset en estos *feature vectors* o vectores de caracteristicas.

In [17]:
def sent2features(sent: sentence):
	return [word2features(sent, i) for i in range(len(sent))]

In [26]:
w_train_f = sent2features(w_train)
w_test_f = sent2features(w_test)

crf = sklearn_crfsuite.CRF(algorithm='lbfgs', c1=0.1, c2=0.1,
						   max_iterations=100, all_possible_transitions=True
)

crf.fit([].append(w_train_f), [].append(t_train))

# params_space = {'c1': scipy.stats.expon(scale=0.5),
# 				'c2': scipy.stats.expon(scale=0.05),
# }
# print(len(w_train_f), len(w_train), len(t_train))
# f1_scorer = make_scorer(f1_score, average='weighted')

# rs = RandomizedSearchCV(crf, params_space, verbose=1, cv=3,
# 						n_jobs=4, n_iter=200, scoring=f1_scorer)
# rs.fit(w_train_f, t_train)

# print('best params:', rs.best_params_)
# print('best CV score:', rs.best_score_)
# print('model size: {:0.2f}M'.format(rs.best_estimator_.size_ / 1000000))

# crf = rs.best_estimator_

# t_pred2 = crf.predict(sent2features(['el', 'Ministerio', 'del', 'Interior',
# 									  'y', 'la', 'Union', 'de', 'Jovenes',
# 									  'Capitalistas', 'de', 'Chile',
# 									  'participan', 'en', 'la', 'recogida',
# 									  'de', 'materias', 'primas', '.']))
# print(t_pred2)
# new = to_feature_array(features, w_train_f)

# learning_curve(crf, new, t_train, cv=3)

# pred = crf.predict(w_test_f)

# print(f1_score(t_test, pred, average='weighted'))

TypeError: object of type 'NoneType' has no len()