# OpenCampus NLP Project
## Tweet Generator for famous Twitter personalities
-----------
This notebook preprocesses the Tweets.

## Imports

In [None]:
from IPython.display import clear_output

In [None]:
!sudo apt-get update -y
!sudo apt-get install python3.10
clear_output()

In [None]:
!sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.10 1

In [None]:
!python --version

In [None]:
!sudo apt install python3-pip
%pip install datasets==2.7.1
%pip install transformers==4.25.1

In [None]:
import functools
import random
import re

import pandas as pd

import datasets
from datasets import load_dataset
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    DataCollatorForLanguageModeling,
    Trainer,
    TrainingArguments,
)

## Prepare the dataset for Training
### Download the dataset

First we download our custom HuggingFace (HF) dataset. The dataset can be found on our [HuggingFace site](https://huggingface.co/datasets/ML-Projects-Kiel/tweetyface). It contains Tweets from English and German Twitter users.

In [None]:
dataset = load_dataset("ML-Projects-Kiel/tweetyface", "english", download_mode="force_redownload")

The dataset already is split into a training and validation subset. It contains no test data, because the text generation task does not require test data.

In [None]:
dataset

In [None]:
dataset["train"].features

### Preprocess the text

#### Remove retweets

In [None]:
userlist = dataset["train"].features["label"].names
all_data_pd = datasets.concatenate_datasets([dataset["train"], dataset["validation"]]).to_pandas()
all_data_pd["user"] = [userlist[label] for label in all_data_pd["label"]]

In [None]:
stats = pd.concat(
    [
        all_data_pd.groupby("user").size(),
        all_data_pd[all_data_pd["text"].str.contains("^RT .*")].groupby("user").size(),
    ],
    axis=1,
)
stats.columns = ["all_tweets", "retweets"]

stats

In [None]:
# Remove retweets
dataset_without_rt = dataset.filter(lambda row: not bool(re.search("^RT .*", row["text"])))

#### Filter characters

In [None]:
ids = random.sample(range(0, dataset_without_rt["train"].num_rows), 10)
ids

In [None]:
ids = [12784, 15988, 14475, 10926, 23442, 6853, 22511, 18022, 13039, 22725]
dataset_without_rt["train"][ids]["text"]

In [None]:
def preprocess_text(element):
    if isinstance(element, datasets.arrow_dataset.Batch):
        # Input is of form dict[list]
        element["text"] = [re.sub("\n", " ", txt) for txt in element["text"]]
        element["text"] = [re.sub(r"http\S+", "URL", txt) for txt in element["text"]]
        element["text"] = [re.sub("&amp;", "&", txt) for txt in element["text"]]
        element["text"] = [re.sub("&lt;", "<", txt) for txt in element["text"]]
        element["text"] = [re.sub("&gt;", ">", txt) for txt in element["text"]]
        element["text"] = [re.sub(" +", " ", txt) for txt in element["text"]]
        element["text"] = [" ".join(txt.split()) for txt in element["text"]]
    else:
        # Input is of form dict[str]
        element["text"] = element["text"]
    return element

In [None]:
dataset_processed = dataset_without_rt.map(preprocess_text, batched=True)

In [None]:
dataset_processed["train"][ids]["text"]

#### Filter Text length

In [None]:
min_text_length = 50
dataset_processed = dataset_processed.filter(lambda row: len(row["text"]) > min_text_length)

In [None]:
stats = pd.concat(
    [
        stats,
        all_data_pd[all_data_pd["text"].apply(len) <= min_text_length].groupby("user").size(),
    ],
    axis=1,
)
stats.columns = ["all_tweets", "retweets", "short_tweets"]

stats

In [None]:
dataset_processed["train"][0]

### Create Prompts

In [None]:
def create_prompt(element):
    if isinstance(element, datasets.arrow_dataset.Batch):
        # Input is of form dict[list]
        element["text_prompt"] = [
            f"User: {userlist[label]}\nTweet: {txt}"
            for txt, label in zip(element["text"], element["label"])
        ]
    else:
        # Input is of form dict[str]
        element["text_prompt"] = f"User: {userlist[element['label']]}\nTweet: {element['text']}"
    return element

In [None]:
full_features = dataset["train"].features["label"].names  # Create List with all users
create_prompt_partial = functools.partial(create_prompt, userlist=full_features)

In [None]:
dataset_proc_prompt = dataset_processed.map(create_prompt_partial, batched=True)

In [None]:
dataset_proc_prompt["train"][0]

### Filter the Users
The full dataset contains more users than we want to use for the first trials. Therefore we will reduce the number of users.

In [None]:
short_features = [
    "elonmusk",
    "neiltyson",
    "BillGates",
    "BillNye",
    "BarackObama",
]

In [None]:
dataset_proc_prompt_filter = dataset_proc_prompt.filter(
    lambda row: full_features[row["label"]] in short_features
)

## Model
### Load Model

In [None]:
checkpoint = "gpt2"

In [None]:
tokenizer = AutoTokenizer.from_pretrained(checkpoint)  # , return_special_tokens_mask=True)
data_collator = DataCollatorForLanguageModeling(tokenizer, mlm=False)
model = AutoModelForCausalLM.from_pretrained(checkpoint)

In [None]:
tokenizer.eos_token

In [None]:
tokenizer.model_max_length

In [None]:
dataset_proc_prompt_filter

In [None]:
print(max([len(txt) for txt in dataset_proc_prompt_filter["train"]["text_prompt"]]))
print(max([len(txt) for txt in dataset_proc_prompt_filter["validation"]["text_prompt"]]))

### Tokenize

In [None]:
def tokenize_function(example):
    return tokenizer(example["text_prompt"])

In [None]:
tokenized_datasets = dataset_proc_prompt_filter.map(tokenize_function, batched=True)

tokenized_datasets = tokenized_datasets.remove_columns(
    ["text", "label", "idx", "ref_tweet", "reply_tweet", "text_prompt"]
)

### Training

In [None]:
LEARNING_RATE = 1.372e-4

training_args = TrainingArguments(
    output_dir="../../model/",
    overwrite_output_dir=True,
    do_train=True,
    num_train_epochs=1,
    per_device_train_batch_size=1,
    prediction_loss_only=True,
    logging_steps=5,
    save_steps=0,
    seed=20,
    learning_rate=LEARNING_RATE,
)

In [None]:
tokenizer.pad_token = tokenizer.eos_token

trainer = Trainer(
    model,
    training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    data_collator=data_collator,
    tokenizer=tokenizer,
)

In [None]:
trainer.train()