<a href="https://colab.research.google.com/github/PascalBreuer/inl-meet-ir-v2/blob/Pascal/Bert_and_sentiment_dict.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# pip install transformers

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
import re
import sklearn
import string
import nltk
import transformers as ppb
import pandas as pd
import numpy as np
import torch
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.naive_bayes import GaussianNB, BernoulliNB
from sklearn.pipeline import Pipeline
from transformers import pipeline
from datetime import datetime

nltk.download('stopwords')


print(pipeline('sentiment-analysis')('we love you'))


# Man kann auch mehrere hintereinander machen

class BertTransformer(BaseEstimator, TransformerMixin):

    def __init__(self):
        model_class, tokenizer_class, pretrained_weights = (
            ppb.DistilBertModel, ppb.DistilBertTokenizer, 'distilbert-base-uncased')

        # Load pretrained model/tokenizer

        self.tokenizer = tokenizer_class.from_pretrained(pretrained_weights)
        self.model = model_class.from_pretrained(pretrained_weights)

    def fit(self, X, y=None):
        return self

    def transform(self, data):
        # Sätze zerstückeln lassen
        dataList = data['Sentence'].tolist()
        dataList = list((str(s) for s in dataList))
        tokenized = []

        for s in dataList:
            tokenized.append(self.tokenizer.encode(s, add_special_tokens=True))

        # Padding hinzufügen
        max_len = 0
        for i in tokenized:
            if len(i) > max_len:
                max_len = len(i)

        padded = np.array([i + [0] * (max_len - len(i)) for i in tokenized])

        # Maske erstellen, um das Padding bei der Verarbeitung zu filtern
        mask = np.where(padded != 0, 1, 0)
        mask.shape

        # mache padded Array und Maske zu einem Tensor
        # Tensor = mehrdimensionale Matrix mit einheitlichem Datentyp
        input = torch.tensor(padded).to(torch.long).long()
        mask = torch.tensor(mask).to(torch.long).long()

        # gib unser Zeug an BERT
        # no_grad = Angabe zur Simplifikation des Rechenvorgangs
        with torch.no_grad():
            output = self.model(input, attention_mask=mask)

        # nur die erste Spalte auslesen = von BERT geschriebene Kennwerte
        features = output[0][:, 0, :].numpy()

        return (data, features)


class PreprocessorTransformer(BaseEstimator, TransformerMixin):

    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, data_and_features, y=None):
        data, features = data_and_features
        sentences = data['Sentence'].tolist()
        sentences = list((str(s) for s in sentences))

        # muss vom generator object zurück zur liste gemacht werden
        sentences = list((s.lower() for s in sentences))

        table = str.maketrans('', '', string.punctuation)
        sentences = [s.translate(table) for s in sentences]

        sentences = [re.sub(r'\d+', 'num', s) for s in sentences]

        stopwords = set(nltk.corpus.stopwords.words('english'))
        sentences = [[word for word in s.split() if word not in stopwords] for s in sentences]
        return (sentences, features)


class SentimentOpinionValueCalculatorSingleValueTransformer(BaseEstimator, TransformerMixin):

    def __init__(self, dict_name):
        self.dict_name = dict_name
        # print(f'name of dict: {dict_name}')
        df = pd.read_csv(dict_name, sep=';')
        self.value_dict = pd.Series(df.value.values, index=df.word).to_dict()
        # for key in self.value_dict.keys():
        #     print(f'{key} - {self.value_dict[key]}')
        # print(f'unique values: {df.nunique()}')
        # print(self.value_dict)
        # print(len(self.value_dict))
        # d = {}
        # for word in df.word:
        #     d[word] = df.value[df.word == word]
        # for key in d.keys():
        #     print(f'{key} - {d[key]}')

    def fit(self, X, y=None):
        return self

    def transform(self, sentences_and_features):
        sentiment_opinion_scores = []
        sentences, features = sentences_and_features
        for sentence in sentences:
            word_count = len(sentence)
            # print(f'length of sentence {sentence} = {word_count}')
            sentiment_opinion_score = 0
            if word_count > 0:
                for word in sentence:
                    if word in self.value_dict:
                        sentiment_opinion_score = sentiment_opinion_score + self.value_dict[word]
                sentiment_opinion_score = sentiment_opinion_score / word_count
            sentiment_opinion_scores.append([sentiment_opinion_score])
        for i in range(len(sentiment_opinion_scores)):
            features[i] = features[i] + (sentiment_opinion_scores[i][0])
        return features


class SentimentOpinionValueCounterTransformer(BaseEstimator, TransformerMixin):

    def __init__(self, value_file_name):
        df = pd.read_csv(value_file_name, sep=';')
        self.value_dict = pd.Series(df.value.values, index=df.word).to_dict()

    def fit(self, X, y=None):
        return self

    def transform(self, sentences_and_features):
        sentiment_opinion_scores = []
        sentences, features = sentences_and_features
        for sentence in sentences:
            word_count = len(sentence)
            # print(f'length of sentence {sentence} = {word_count}')
            sentiment_opinion_score = 0
            if word_count > 0:
                for word in sentence:
                    if word in self.value_dict:
                        sentiment_opinion_score += 1
                # sentiment_opinion_score = sentiment_opinion_score / word_count
            sentiment_opinion_scores.append([sentiment_opinion_score])
        for i in range(len(sentiment_opinion_scores)):
            features[i] = features[i] + (sentiment_opinion_scores[i][0])
        return features


class PipelineRunner:

    def __init__(self, dict_file, training_file, test_file, log_file='results_with_correct_input.log'):
        self.dict_file = dict_file
        self.log_file = log_file
        self.data_training = pd.read_excel(training_file, sheet_name='sentences')
        self.data_training.drop(
            ['SUBJindl', 'SUBJsrce', 'SUBJrhet', 'SUBJster', 'SUBJspee', 'SUBJinspe', 'SUBJprop', 'SUBJpolit'],
            axis=1,
            inplace=True)
        self.data_test = pd.read_excel(test_file, sheet_name='sentences')
        self.data_test.drop(
            ['SUBJindl', 'SUBJsrce', 'SUBJrhet', 'SUBJster', 'SUBJspee', 'SUBJinspe', 'SUBJprop', 'SUBJpolit'],
            axis=1,
            inplace=True)
        self.Cs = np.logspace(-6, 6, 200)

    def start_all_pipelines(self, data_column):
        transformer_list = [BertTransformer(),
                            PreprocessorTransformer(),
                            SentimentOpinionValueCalculatorSingleValueTransformer(dict_file)]

        description = f'Bert und Sentiment Durchschnittswert (mit langem Dictonary). Spalte {data_column}'


        print('Starting with Logistic Regression')

        pipeline_to_use = self.make_pipeline(transformer_list, LogisticRegression(max_iter=500), dict(C=self.Cs))

        log_reg_typ = "Logistic Regression"
        accuracy = self.fit_and_predict_and_calculate_accuracy_pipe(pipeline_to_use, data_column)
        self.write_result_to_file(accuracy, log_reg_typ, description)

        # Die folgenden Zeilen kann man wieder einkommentiern, wenn man noch mal was an der Pipeline geändert hat oder testen möchte,
        # ob mit mehr Testdaten einer der beiden Classifier besser ist. Ansonsten braucht man die nicht.
        # print('Starting with Gaussian Naive Bayes')
        #
        # gau_nb_typ = "Gaussian Naive Bayes"
        #
        # pipeline_to_use = self.make_pipeline(transformer_list, GaussianNB(), dict(var_smoothing=self.Cs))
        # accuracy = self.fit_and_predict_and_calculate_accuracy_pipe(pipeline_to_use, data_column)
        # self.write_result_to_file(accuracy, gau_nb_typ, description)
        #
        #
        # print('Starting with Bernoulli Naive Bayes')
        #
        # bernoulli_nb_typ = "Bernoulli Naive Bayes"
        #
        # pipeline_to_use = self.make_pipeline(transformer_list, BernoulliNB(), dict(alpha=self.Cs, binarize=self.Cs))
        # accuracy = self.fit_and_predict_and_calculate_accuracy_pipe(pipeline_to_use, data_column)
        # self.write_result_to_file(accuracy, bernoulli_nb_typ, description)

    def make_pipeline(self, transformer_list, estimator, param_gird):
        clf = GridSearchCV(estimator=estimator, param_grid=param_gird, n_jobs=-1, scoring='accuracy')

        return sklearn.pipeline.Pipeline(
            [(f'stage: {index}', transformer_list[index]) for index in range(len(transformer_list))] + [('clf', clf)]
        )

    def fit_and_predict_and_calculate_accuracy_pipe(self, pipe, data_column):
        pipe.fit(self.data_training, self.data_training[data_column].to_numpy())

        y_pred_pipe = pipe.predict(self.data_test)

        return accuracy_score(self.data_test[data_column].to_numpy(), y_pred_pipe)

    def write_result_to_file(self, accuracy, type, description):
        with open(self.log_file, 'a', encoding='utf-8') as file:
            file.write('#------------------------------------------------------------------------------------------\n')
            file.write(f'{datetime.now().strftime("%b-%d-%Y %H:%M:%S")}\n')
            file.write(f'\t{description}\n')
            file.write(f'\t\tAccuracy for classifier {type}: {accuracy}\n')
            file.write('#------------------------------------------------------------------------------------------\n')



def fit_and_predict_and_calculate_accuracy_pipe(pipe, train_input, train_ouput, test_input, test_output):
    pipe.fit(train_input, train_ouput)

    y_pred_pipe = pipe.predict(test_input)

    return accuracy_score(y_pred_pipe, test_output)


In [None]:
# Falls man das Dictonary in einem eigenen Ordner legen möchte (wenn man zum Beispiel mehrere Varianten hat)
dict_dir_path = ''
# dict_file = dir_path + 'AFINN-both-abs.csv'
dict_file = dict_dir_path + 'sentiment_dict.csv'

# Wenn man einen Ordner anlegen möchte kann man das hier machen
data_dir_path = ''
# Datei der Daten zum trainieren
training_file = data_dir_path + 'Trainingdata_train.xlsx'
# Datei der Daten zum testen
test_file = data_dir_path + 'Trainingdata_test.xlsx'

# Ordner für das Ergebnis
result_dir_path = ''
# Wo soll das Ergebnis gespeichert werden
result_file = result_dir_path + 'results_with_correct_input.log'

# Man muss jetzt nur noch dem PipelineRunner erzeugen und auf dem dann für jede Spalte die Methode starten
pipeline_runner = PipelineRunner(dict_file, training_file, test_file)
pipeline_runner.start_all_pipelines('SUBJopin01')
pipeline_runner.start_all_pipelines('SUBJlang01')