# Set up

In [None]:
import argparse
import os
import logging
import time
import pickle
from tqdm import tqdm
from IPython.display import display, HTML

import torch
from torch.utils.data import DataLoader
import pytorch_lightning as pl

from transformers import AdamW, T5ForConditionalGeneration, T5Tokenizer
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from transformers import get_linear_schedule_with_warmup

In [None]:
class T5FineTuner(pl.LightningModule):
    """
    Fine tune a pre-trained T5 model
    """
    def __init__(self, tfm_model, tokenizer):
        super(T5FineTuner, self).__init__()
        self.model = tfm_model
        self.tokenizer = tokenizer
        self.automatic_optimization = False
        self.validation_step_outputs = []

    def is_logger(self):
        return True

    def forward(self, input_ids, attention_mask=None, decoder_input_ids=None,
                decoder_attention_mask=None, labels=None):
        return self.model(
            input_ids,
            attention_mask=attention_mask,
            decoder_input_ids=decoder_input_ids,
            decoder_attention_mask=decoder_attention_mask,
            labels=labels,
        )

    def _step(self, batch):
        lm_labels = batch["target_ids"]
        lm_labels[lm_labels[:, :] == self.tokenizer.pad_token_id] = -100

        outputs = self(
            input_ids=batch["source_ids"],
            attention_mask=batch["source_mask"],
            labels=lm_labels,
            decoder_attention_mask=batch['target_mask']
        )

        loss = outputs[0]
        return loss

    def training_step(self, batch, batch_idx):
        loss = self._step(batch)

        self.manual_backward(loss)
        optimizer = self.optimizers()
        #scheduler = self.lr_schedulers()

        optimizer.step()
        optimizer.zero_grad()
        #scheduler.step()

        tensorboard_logs = {"train_loss": loss}
        return {"loss": loss, "log": tensorboard_logs}

    def on_training_epoch_end(self, outputs):
        avg_train_loss = torch.stack([x["loss"] for x in outputs]).mean()
        tensorboard_logs = {"avg_train_loss": avg_train_loss}
        return {"avg_train_loss": avg_train_loss, "log": tensorboard_logs, 'progress_bar': tensorboard_logs}

    def validation_step(self, batch, batch_idx):
        loss = self._step(batch)
        self.validation_step_outputs.append(loss)
        return loss

    def on_validation_epoch_end(self):
        avg_loss = torch.stack(self.validation_step_outputs).mean()
        tensorboard_logs = {"val_loss": avg_loss}
        return {"avg_val_loss": avg_loss, "log": tensorboard_logs, 'progress_bar': tensorboard_logs}

    def configure_optimizers(self):
        """ Prepare optimizer and schedule (linear warmup and decay) """
        model = self.model
        no_decay = ["bias", "LayerNorm.weight"]
        optimizer_grouped_parameters = [
            {
                "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
                "weight_decay": 0.0,
            },
            {
                "params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
                "weight_decay": 0.0,
            },
        ]
        optimizer = AdamW(optimizer_grouped_parameters, lr=3e-4, eps=1e-8)
        self.opt = optimizer
        return [optimizer]

    '''
    def optimizer_step(self, epoch, batch_idx, optimizer, optimizer_idx, second_order_closure=None):
        #if self.trainer.use_tpu:
            #xm.optimizer_step(optimizer)
        #else:
        optimizer.step()
        optimizer.zero_grad()
        self.lr_scheduler.step()
    '''

    def get_tqdm_dict(self):
        tqdm_dict = {"loss": "{:.4f}".format(self.trainer.avg_loss), "lr": self.lr_scheduler.get_last_lr()[-1]}
        return tqdm_dict

    def train_dataloader(self):
        train_dataset = get_dataset(tokenizer=self.tokenizer, type_path="train")
        dataloader = DataLoader(train_dataset, batch_size=16,
                                drop_last=True, shuffle=True, num_workers=4)
        t_total = (
            (len(dataloader.dataset) // (16 * max(1, 0)))
            // 1
            * float(30)
        )
        scheduler = get_linear_schedule_with_warmup(
            self.opt, num_warmup_steps=0.0, num_training_steps=t_total
        )
        self.lr_scheduler = scheduler
        return dataloader

    def val_dataloader(self):
        val_dataset = get_dataset(tokenizer=self.tokenizer, type_path="dev")
        return DataLoader(val_dataset, batch_size=16, num_workers=4)

In [None]:
sentiment_word_list = ['positive', 'negative', 'neutral']

opinion2word = {'great': 'positive', 'bad': 'negative', 'ok': 'neutral'}

opinion2word_under_o2m = {'good': 'positive', 'great': 'positive', 'best': 'positive',
                          'bad': 'negative', 'okay': 'neutral', 'ok': 'neutral', 'average': 'neutral'}

numopinion2word = {'SP1': 'positive', 'SP2': 'negative', 'SP3': 'neutral'}

aspect_cate_list = ['Candidate voice',
                    'Candidate flow',
                    'Candidate general',
                    'Examiner general',
                    'Show stage',
                    'Show general',
                    'Music',
                    'Others']

def extract_spans_para(task, seq, seq_type):
    quads = []
    sents = [s.strip() for s in seq.split('[SSEP]')]
    if task == 'aste':
        for s in sents:
            # It is bad because editing is problem.
            try:
                c, ab = s.split(' because ')
                c = opinion2word.get(c[6:], 'nope')    # 'good' -> 'positive'
                a, b = ab.split(' is ')
            except ValueError:
                # print(f'In {seq_type} seq, cannot decode: {s}')
                a, b, c = '', '', ''
            quads.append((a, b, c))
    elif task == 'tasd':
        for s in sents:
            # food quality is bad because pizza is bad.
            try:
                ac_sp, at_sp = s.split(' because ')

                ac, sp = ac_sp.split(' is ')
                sp = opinion2word.get(sp, 'nope')
                at, sp2 = at_sp.split(' is ')

                sp = opinion2word.get(sp, 'nope')
                sp2 = opinion2word.get(sp2, 'nope')
                if sp != sp2:
                    print(f'Sentiment polairty of AC({sp}) and AT({sp2}) is inconsistent!')

                # if the aspect term is implicit
                if at.lower() == 'it':
                    at = 'NULL'
            except ValueError:
                # print(f'In {seq_type} seq, cannot decode: {s}')
                ac, at, sp = '', '', ''

            quads.append((ac, at, sp))
    elif task == 'asqp':
        for s in sents:
            # food quality is bad because pizza is over cooked.
            try:
                ac_sp, at_ot = s.split(' because ')
                ac, sp = ac_sp.split(' is ')
                sp = opinion2word.get(sp, 'nope')
                at, ot = at_ot.split(' is ')

                # if the aspect term is implicit
                if at.lower() == 'it':
                    at = 'NULL'
            except ValueError:
                try:
                    # print(f'In {seq_type} seq, cannot decode: {s}')
                    pass
                except UnicodeEncodeError:
                    # print(f'In {seq_type} seq, a string cannot be decoded')
                    pass
                ac, at, sp, ot = '', '', '', ''

            quads.append((ac, at, sp, ot))
    else:
        raise NotImplementedError
    return quads

In [None]:
import pickle
import io
class CPU_Unpickler(pickle.Unpickler):
    def find_class(self, module, name):
        if module == 'torch.storage' and name == '_load_from_bytes':
            return lambda b: torch.load(io.BytesIO(b), map_location='cpu')
        else:
            return super().find_class(module, name)

In [None]:
def interference(text,gen_conf):
  input = tokenizer.batch_encode_plus(
              [text], max_length=128, padding="max_length",
              truncation=True, return_tensors="pt"
            )
  outs = model.model.generate(input['input_ids'], max_length=128,generation_config=gen_conf)

  dec = [tokenizer.decode(ids, skip_special_tokens=True) for ids in outs]
  pred_list = extract_spans_para('asqp', dec[0], 'pred')
  return pred_list

# Streaming

In [None]:
import pyspark
from IPython.display import display, clear_output
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as f
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.streaming import DataStreamReader
from pyspark.ml import PipelineModel
import html
import pandas as pd
from time import sleep
from IPython.display import display, clear_output

pd.options.display.max_columns = None
pd.options.display.max_rows = 30
pd.options.display.max_colwidth = 150

In [None]:
# SETTINGS
IN_PATH = "Train.csv"
timestampformat = "EEE MMM dd HH:mm:ss zzzz yyyy"

scala_version='2.13'
spark_version='3.5.1'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}'
    # 'org.apache.kafka:kafka-clients:3.6.0'
]
spark = SparkSession.builder.master('local').appName('kafka-example').\
        config('spark.jars.packages', f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}').getOrCreate()

# schema = spark.read.option('header',True).csv(IN_PATH).limit(10).schema

# spark_reader = spark.read.option('header', True).schema(schema)
# df = spark_reader.csv(IN_PATH).coalesce(1)

In [None]:
# Cần đổi topic_name giống với consumer và producer đang chạy
topic_name = 'NewABSA6'
kafka_server = 'localhost:9092'

streamRawDf = spark.read.format("kafka").option("kafka.bootstrap.servers", kafka_server).option("subscribe", topic_name).load()

In [None]:
# Load model
gg_model = AutoModelForSeq2SeqLM.from_pretrained("google/byt5-small")
tokenizer = AutoTokenizer.from_pretrained("google/byt5-small")

model = CPU_Unpickler(open('./byt5.pkl', 'rb')).load()

In [None]:
from transformers import GenerationConfig
gen_conf = GenerationConfig.from_model_config(gg_model.config)
gen_conf.cache_implementation='dynamic'
gen_conf.output_logits = False

In [None]:
tst = interference('Nhạc hay lắm',gen_conf)

In [None]:
tst

[('Song', 'NULL', 'positive', 'Nhạc hay lắm')]

In [None]:
for x in range(0, 2000):
    try:
        print("Showing live view refreshed every 10 seconds")
        print(f"Seconds passed: {x*10}")

        data_fixed = []
        results = []
        at = []
        ac = []
        ot = []
        sp = []
        comments = []

        # Lấy data từ streamRaw
        data_pandas = streamRawDf.select('value').toPandas()

        for i in range(0, streamRawDf.count()):
            data_fixed.append(data_pandas['value'][i].decode('unicode-escape').replace('"', ''))

        # Predict từ model

        for i in range(0, len(data_fixed)):
            results.append(interference(data_fixed[i],gen_conf))
            lb = interference(data_fixed[i],gen_conf)
            for j in lb:
                    t1, t2 ,t3 ,t4 = j
                    ac.append(t1)
                    at.append(t2)
                    sp.append(t3)
                    ot.append(t4)
                    comments.append(data_fixed[i])
            t = pd.DataFrame({'Comments': comments, 'Aspect Category': ac, 'Aspect Term': at, 'Sentiment Polarity': sp, 'Opinion Term': ot})
            t.to_csv('data_result.csv', index=False)
            display(t)
        d = {'Text': data_fixed, 'Result': results}
        df_result = pd.DataFrame(d)
        display(df_result)
        a = df_result.copy()
        print('done')

        clear_output(wait=True)
    except KeyboardInterrupt:
        print("break")
        break
print("Live view ended...")

Showing live view refreshed every 10 seconds
Seconds passed: 0
break
Live view ended...
