# Machine Learning Induction

The objective of this notebook is to induce Machine Learning models. For this, tabular datasets (generated using lazy vectorization and embedding techniques) and Machine Learning techniques (both classical and state-of-the-art) will be generated. The evaluation will be performed using hierarchical classification performance metrics.<br>  
**Source file:** select_202425091103-translated.csv<br>
**Destination file:** select_202425091103-[GPT, Llama, etc].pickle<br>

In [None]:
import logging
logging.basicConfig(level=logging.WARNING)

In [None]:
openai_key = ''

In [None]:
cnpq = ['cnpq_area_level_1',
        'cnpq_area_level_2',
        'cnpq_area_level_3',
        'cnpq_area_level_4']

## Reading the dataset

In [None]:
import pandas as pd

In [None]:
df = pd.read_csv('../input/select_202425091103-translated.csv', dtype=str, na_filter=False)

In [None]:
df.head()

In [None]:
df.shape

## Loading the Transformation Techniques

In [None]:
# %load ../src/embedding.py
import re
import numpy as np
from nltk.stem import PorterStemmer
from nltk.tokenize import word_tokenize
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.base import BaseEstimator, TransformerMixin
from sentence_transformers import SentenceTransformer

class Normalizer(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.stemmer = PorterStemmer()

    def pre_processing(self, doc):
        pattern = re.compile(r'\d+|http\S+|<.*?>', re.IGNORECASE)
        return pattern.sub('', doc).lower()

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return [' '.join(self.stemmer.stem(word) for word in word_tokenize(self.pre_processing(doc))) for doc in X]

class Lazy(BaseEstimator, TransformerMixin):

    def __init__(self, vectorizer, ngram_range, max_df, min_df, max_features):
        self.vectorizer = vectorizer(ngram_range=ngram_range, max_df=max_df, min_df=min_df, max_features=max_features)
        self.ngram_range = ngram_range
        self.max_df = max_df
        self.min_df = min_df
        self.max_features = max_features

    def fit(self, raw_documents, y=None):
        self.vectorizer.fit(raw_documents, y)
        return self

    def transform(self, raw_documents):
        return self.vectorizer.transform(raw_documents).toarray()

    def fit_transform(self, raw_documents, y=None):
        return self.vectorizer.fit_transform(raw_documents, y).toarray()

class BoW(Lazy):
    def __init__(self, ngram_range=(1, 1), max_df=1.0, min_df=1, max_features=9000):
        super().__init__(CountVectorizer, ngram_range=ngram_range, max_df=max_df, min_df=min_df, max_features=max_features)

class TFIDF(Lazy):
    def __init__(self, ngram_range=(1, 1), max_df=1.0, min_df=1, max_features=9000):
        super().__init__(TfidfVectorizer, ngram_range=ngram_range, max_df=max_df, min_df=min_df, max_features=max_features)

class Embedding(BaseEstimator, TransformerMixin):
    
    def __init__(self, model_name):
        self.model_name = model_name
        self.model = SentenceTransformer('../models/' + model_name)

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return np.array([self.model.encode(text) for text in X])

class RoBERTa(Embedding):
    def __init__(self, model_name='all-distilroberta-v1'):
        super().__init__(model_name=model_name)

class USE(Embedding):
    def __init__(self, model_name='distiluse-base-multilingual-cased-v1'):
        super().__init__(model_name=model_name)

## Loading the SVM wrapper

In [None]:
# %load ../src/svm.py
from sklearn.svm import SVC

class SVM(SVC):
    def __init__(self, C=1.0, kernel='rbf', probability=True):
        super().__init__(C=C, kernel=kernel, probability=probability)

## Loading the GPTClassifier

In [None]:
# %load ../src/gpt_classifier.py
import random, openai, json
from sklearn.base import BaseEstimator
from sklearn.utils.multiclass import unique_labels

class GPTClassifier(BaseEstimator):

    def __init__(self, model, key):
        self.model = model
        self.key = key
        openai.api_key = self.key

    def fit(self, X, y):
        self.classes_ = unique_labels(y)
        self.labels_ = [item.split('::HiClass::Separator::')[-1] for item in self.classes_.tolist()]
        return self

    def predict(self, X):
        predictions = []
        for text in X:

            pred = ''
            text = (f'Classify the article content into one correct research area:\n {text}')
            completion = openai.chat.completions.create(
                model = self.model,
                messages = [{'role': 'user', 'content': text}],
                tools = self.classify_content(self.labels_),
                tool_choice = {'type': 'function', 'function': {'name': 'classify_content'}}
            )

            try:
                content = completion.choices[0].message.tool_calls[0].function.arguments
                pred = json.loads(content)['prediction'][0]
                idx = self.labels_.index(pred)
            except:  
                pred = random.choice(self.labels_)
                idx = self.labels_.index(pred)

            predictions.append(self.classes_[idx])

        return np.array(predictions)

    def classify_content(self, labels):

        return [{
                'type': 'function',
                'function': {
                    'name': 'classify_content',
                    'description': 'Predict the research area for a given article content',
                    'parameters': {
                        'type': 'object',
                        'properties': {
                            'prediction': {
                                'type': 'array',
                                'items': {
                                    'type': 'string',
                                    'enum': labels
                                },
                                'description': 'The predicted reserach areas.'
                            }
                        },
                        'required': [
                            'prediction'
                        ]
                    }
                }
        }]

## Loading the OllamaClassifier

In [None]:
# %load ../src/ollama_classifier.py
import json, random
from sklearn.base import BaseEstimator
from sklearn.utils.multiclass import unique_labels
from langchain_community.chat_models import ChatOllama
from langchain_core.prompts import ChatPromptTemplate

class OllamaClassifier(BaseEstimator):

    def __init__(self, model):
        self.model = ChatOllama(model=model, format='json')

    def fit(self, X, y):
        self.classes_ = unique_labels(y)
        self.labels_ = [item.split('::HiClass::Separator::')[-1] for item in self.classes_.tolist()]
        self.template = ChatPromptTemplate.from_template("""
                Based on the article content:\n\n
                {text}\n\n
                Classify the content into one correct research area:
                {labels}
                Return a JSON object like ['Research Area': '']."""
            ) | self.model
        return self

    def predict(self, X):
        predictions = []
        for text in X:

            pred = ''
            self.classify_content(self.labels_)
            content = self.template.invoke({'text': text, 'labels': '; '.join(self.labels_)})

            try:
                content = content.dict()
                pred = json.loads(content['content'])['Research Area']
                idx = self.labels_.index(pred)
            except:  
                pred = random.choice(self.labels_)
                idx = self.labels_.index(pred)

            predictions.append(self.classes_[idx])

        return np.array(predictions)

        
    def classify_content(self, labels):

        self.model = self.model.bind(
            tools = [{
                'name': 'classify_content',
                'description': 'Predict the research area for a given article content',
                'parameters': {
                    'type': 'object',
                    'properties': {
                        'prediction': {
                            'type': 'array',
                            'description': 'The predicted reserach areas.',
                            'items': {
                                'type': 'string',
                                'enum': labels
                            },
                        }
                    },
                    'required': ['prediction']
                }
            }], 
            function_call={'name': 'classify_content'}
        )

## Loading the FlatClassifier

In [None]:
# %load ../src/flat_classifier.py
from sklearn.base import BaseEstimator

class FlatClassifier(BaseEstimator):

    def __init__(self, local_classifier):
        self.local_classifier = local_classifier 

    def fit(self, X, y):
        y = ["::HiClass::Separator::".join(i) for i in y]
        self.local_classifier.fit(X, y)
        return self
    
    def predict(self, X):
        return [i.split('::HiClass::Separator::') for i in self.local_classifier.predict(X)]

## Loading the evaluation metrics

In [None]:
# %load ../src/evaluate.py
from hiclass.metrics import precision, recall, f1

def accuracy_class(y_true, y_pred, level):

    total, hits = defaultdict(int), defaultdict(int)

    for t, p in zip(y_true, y_pred):

        total[t[level]] += 1
        if t[level] == p[level]:
            hits[t[level]] += 1

    return {classe: hits[classe] / total[classe] for classe in total}

def accuracy_unit(units, true, pred, level):

    acc = []
    for unit in set(units):
        true_vals = [t[level] for u, t in zip(units, true) if u == unit]
        pred_vals = [p[level] for u, p in zip(units, pred) if u == unit]
        acc.append((unit, true_vals, pred_vals))

    return acc

def accuracy_level(y_true, y_pred, level):
    acc = [(1 if true[level] == '' or true[level] == pred[level] else 0) for true, pred in zip(y_true, y_pred)]
    return sum(acc)/len(acc)

def flatly(y_true, y_pred):
    return {'Level ' + str(level) : accuracy_level(y_true, y_pred, level) for level in range(4)}

def hierarchy(y_true, y_pred, type='micro'):
    return {'F1-score': f1(y_true, y_pred, type),
            'Precision': precision(y_true, y_pred, type),
            'Recall': recall(y_true, y_pred, type)}

def performance(y_true, y_pred):
    return hierarchy(y_true, y_pred) | flatly(y_true, y_pred)

## Splitting training and test sets

In [None]:
import pickle, gzip, tqdm
from sklearn.model_selection import train_test_split

In [None]:
def save(config, result):
    for key, value in result.items():
        with gzip.open('../' + key + '/select_202425091103-' + config + '.pickle', 'wb') as handle:
            pickle.dump(value, handle)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(df['all'].to_numpy(), df[cnpq].to_numpy(), test_size=0.30, random_state=42)

In [None]:
len(X_train)

In [None]:
len(X_test)

## Executing the classical pipeline

In [None]:
import tqdm
from itertools import product
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from hiclass import LocalClassifierPerNode, LocalClassifierPerParentNode, LocalClassifierPerLevel
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB

In [None]:
transformers = {'BoW' : BoW,
                'TFIDF' : TFIDF,
                'RoBERTa' : RoBERTa,
                'USE' : USE}

In [None]:
strategies = {'LCPN' : LocalClassifierPerNode, 
              'LCPPN' : LocalClassifierPerParentNode, 
              'LCPL' : LocalClassifierPerLevel,
              'FLAT' : FlatClassifier}

In [None]:
classifiers = {'DT' : DecisionTreeClassifier, 
               'RF' : RandomForestClassifier,
               'NB' : GaussianNB,
               'SVM' : SVM}

In [None]:
for tfm, stg, cls in tqdm.tqdm(product(transformers, strategies, classifiers)):

    steps = []
    if tfm in ['BoW', 'TFIDF']:
        steps.append(('', Normalizer()))

    steps.append((tfm, transformers[tfm]()))
    steps.append((stg + ' ' + cls, strategies[stg](classifiers[cls]())))
    
    pipeline = Pipeline(steps) 

    pipeline.fit(X_train, y_train)
    pred = pipeline.predict(X_test)
    result = {'results' : performance(y_test, pred), 'models' : pipeline}
    config = ' '.join(list(pipeline.named_steps.keys()))
    save(config, result)

## Executing the language model pipeline

In [None]:
from bert_sklearn import BertClassifier

In [None]:
strategies = {'LCPPN' : LocalClassifierPerParentNode}

In [None]:
classifiers = {'GPT' : GPTClassifier('gpt-4o', openai_key), 
               'Llama' : OllamaClassifier('llama3.1:70b'),
               'BERT' : BertClassifier('bert-base-uncased', epochs=4)}

In [None]:
for stg, cls in tqdm.tqdm(product(strategies, classifiers)):

    steps = []
    steps.append((stg + ' ' + cls, strategies[stg](classifiers[cls], bert=True)))
    
    pipeline = Pipeline(steps) 

    pipeline.fit(X_train, y_train)
    pred = pipeline.predict(X_test)
    result = {'results' : performance(y_test, pred), 'models' : pipeline}
    config = ' '.join(list(pipeline.named_steps.keys()))
    save(config, result)