In [2]:
!pip install striprtf

Collecting striprtf
  Downloading striprtf-0.0.29-py3-none-any.whl.metadata (2.3 kB)
Downloading striprtf-0.0.29-py3-none-any.whl (7.9 kB)
Installing collected packages: striprtf
Successfully installed striprtf-0.0.29


In [3]:
import striprtf
from striprtf.striprtf import rtf_to_text

import nltk
from nltk.tokenize import word_tokenize
nltk.download('punkt')
nltk.download('punkt_tab')

import matplotlib.pyplot as plt
import numpy as np
import re, string
from collections import Counter

import random

[nltk_data] Downloading package punkt to /usr/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to /usr/share/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import math
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from collections import Counter

In [5]:
def load_convert(file_path):
    """Loads an RTF file, converts it to plain text"""
    try:
        with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
            rtf_content = f.read()

        # Convert RTF to plain text
        plain_text = rtf_to_text(rtf_content)
        
        return plain_text

    except FileNotFoundError:
        print(f"Error: File not found at {file_path}. Returning empty string.")
        return ""
    except Exception as e:
        print(f"An error occurred while processing {file_path}: {e}")
        return ""

In [6]:
file_path_1 = '/kaggle/input/dune-frank-herbert/dune/dune.rtf'
file_path_2 = '/kaggle/input/dune-frank-herbert/dune/dune-messiah.rtfd/TXT.rtf'

text_dune = load_convert(file_path_1)
text_dune_messiah = load_convert(file_path_2)

separator = "\n\n--- END OF DUNE / START OF DUNE MESSIAH ---\n\n"
combined_text = text_dune + separator + text_dune_messiah

In [7]:
combined_text[:500]

'Книга перша\u2028Dюна\n1\nПочаток — то мить, коли варто якнайкраще подбати про істинну рівновагу речей. Кожна сестра Бене Ґессерит[2] знає про це. Тож, приступаючи до вивчення життя Муад’Діба, зважте спершу на те, у які часи він з’явився, бо ж народився владар у рік п’ятдесят сьомий правління Падишаха-Імператора Шаддама IV. Й особливо уважно придивіться, де саме він з’явився: на планеті Арракіс. Хай не вводить вас в оману той факт, що народився він і прожив перші п’ятнадцять років свого життя на Калада'

In [10]:
def clean_dune_text(text):

    # Convert to lowercase
    text = text.lower()
    
    # Remove citation markers like [2], [10], etc.
    text = re.sub(r'\[\d+\]', '', text)

    # Remove numbers (digits 0-9)
    text = re.sub(r'\d+', '', text)

    # Fix specific OCR/Typo: Latin 'D' to Cyrillic 'Д' in 'Dюна'
    text = text.replace('dюна', 'дюна')

    # Remove punctuation
    #text = re.sub(r'[^\w\s]', '', text)

    # Normalize whitespace:
    text = re.sub(r'\s+', ' ', text)

    # Strip leading/trailing whitespace
    return text.strip()

cleaned_text = clean_dune_text(combined_text)
print(cleaned_text[:500])

книга перша дюна початок — то мить, коли варто якнайкраще подбати про істинну рівновагу речей. кожна сестра бене ґессерит знає про це. тож, приступаючи до вивчення життя муад’діба, зважте спершу на те, у які часи він з’явився, бо ж народився владар у рік п’ятдесят сьомий правління падишаха-імператора шаддама iv. й особливо уважно придивіться, де саме він з’явився: на планеті арракіс. хай не вводить вас в оману той факт, що народився він і прожив перші п’ятнадцять років свого життя на каладані. а


Tokenize the text into words

In [11]:
words = cleaned_text.split()
word_counts = Counter(words)
vocab = list(word_counts.keys())
vocab_size = len(vocab)

word_to_int = {word: i for i, word in enumerate(vocab)}
int_to_word = {i: word for word, i in word_to_int.items()}
SEQUENCE_LENGTH = 64
samples = [words[i:i+SEQUENCE_LENGTH+1] for i in range(len(words)-SEQUENCE_LENGTH)]

print(vocab[:50])
print(dict(list(word_to_int.items())[:50]))
print(dict(list(int_to_word.items())[:50]))

['книга', 'перша', 'дюна', 'початок', '—', 'то', 'мить,', 'коли', 'варто', 'якнайкраще', 'подбати', 'про', 'істинну', 'рівновагу', 'речей.', 'кожна', 'сестра', 'бене', 'ґессерит', 'знає', 'це.', 'тож,', 'приступаючи', 'до', 'вивчення', 'життя', 'муад’діба,', 'зважте', 'спершу', 'на', 'те,', 'у', 'які', 'часи', 'він', 'з’явився,', 'бо', 'ж', 'народився', 'владар', 'рік', 'п’ятдесят', 'сьомий', 'правління', 'падишаха-імператора', 'шаддама', 'iv.', 'й', 'особливо', 'уважно']
{'книга': 0, 'перша': 1, 'дюна': 2, 'початок': 3, '—': 4, 'то': 5, 'мить,': 6, 'коли': 7, 'варто': 8, 'якнайкраще': 9, 'подбати': 10, 'про': 11, 'істинну': 12, 'рівновагу': 13, 'речей.': 14, 'кожна': 15, 'сестра': 16, 'бене': 17, 'ґессерит': 18, 'знає': 19, 'це.': 20, 'тож,': 21, 'приступаючи': 22, 'до': 23, 'вивчення': 24, 'життя': 25, 'муад’діба,': 26, 'зважте': 27, 'спершу': 28, 'на': 29, 'те,': 30, 'у': 31, 'які': 32, 'часи': 33, 'він': 34, 'з’явився,': 35, 'бо': 36, 'ж': 37, 'народився': 38, 'владар': 39, 'рік': 

# Dataset

In [12]:
class TextDataset(Dataset):
    def __init__(self, samples, word_to_int):
        self.samples = samples
        self.word_to_int = word_to_int
    def __len__(self):
        return len(self.samples)
    def __getitem__(self, idx):
        sample = self.samples[idx]
        input_seq = torch.LongTensor([self.word_to_int[word] for word in sample[:-1]])
        target_seq = torch.LongTensor([self.word_to_int[word] for word in sample[1:]])
        return input_seq, target_seq

In [13]:
BATCH_SIZE = 32
dataset = TextDataset(samples, word_to_int)
dataloader = DataLoader(
    dataset, 
    batch_size=BATCH_SIZE, 
    shuffle=True, 
)
print(dataset[1])

(tensor([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
        19, 11, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
        36, 37, 38, 39, 31, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52,
        34, 53, 29, 54, 55, 56, 57, 58, 59, 60]), tensor([ 2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
        11, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36,
        37, 38, 39, 31, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 34,
        53, 29, 54, 55, 56, 57, 58, 59, 60, 61]))


# Model (Decoder Only Text Generation Transformer)

In [19]:
def generate_square_subsequent_mask(sz):
    """"
    Generate a square mask for the sequence. The masked positions are filled with float('-inf').
    Unmasked positions are filled with float(0.0).
    """
    mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

In [20]:
class PositionalEncoding(nn.Module):
    def __init__(self, max_len, d_model, dropout=0.1):
        """
        :param max_len: Input length sequence.
        :param d_model: Embedding dimension.
        :param dropout: Dropout value (default=0.1)
        """
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
        
    def forward(self, x):
        """
        Inputs of forward function
        :param x: the sequence fed to the positional encoder model (required).
        Shape:
            x: [sequence length, batch size, embed dim]
            output: [sequence length, batch size, embed dim]
        """
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

In [16]:
class TextGen(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_layers, num_heads):
        super(TextGen, self).__init__()
        self.pos_encoder = PositionalEncoding(max_len=SEQUENCE_LENGTH, d_model=embed_dim)
        self.emb = nn.Embedding(vocab_size, embed_dim)
        self.decoder_layer = nn.TransformerDecoderLayer(
            d_model=embed_dim, 
            nhead=num_heads, 
            batch_first=True
        )
        self.decoder = nn.TransformerDecoder(
            decoder_layer=self.decoder_layer,
            num_layers=num_layers,
        )
        self.linear = nn.Linear(embed_dim, vocab_size)
        self.dropout = nn.Dropout(0.2)
        
    # Positional encoding is required. Else the model does not learn.
    def forward(self, x):
        emb = self.emb(x)
        
        # Generate input sequence mask with shape (SEQUENCE_LENGTH, SEQUENCE_LENGTH)
        input_mask = generate_square_subsequent_mask(x.size(1)).to(x.device)
        
        x = self.pos_encoder(emb)
        x = self.decoder(x, memory=x, tgt_mask=input_mask, memory_mask=input_mask)
        x = self.dropout(x)
        out = self.linear(x)
        return out

# Training

In [21]:
def train(model, epochs, dataloader, criterion):
    model.train()
    for epoch in range(epochs):
        running_loss = 0
        for input_seq, target_seq in dataloader:
            input_seq, target_seq = input_seq.to(device), target_seq.to(device)
            outputs = model(input_seq)
            target_seq = target_seq.contiguous().view(-1)
            outputs = outputs.view(-1, vocab_size)
            
            loss = criterion(outputs, target_seq.view(-1))
    
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.detach().cpu().numpy()
        epoch_loss = running_loss / len(dataloader)
        print(f"Epoch {epoch} loss: {epoch_loss:.3f}")

In [22]:
epochs = 100
learning_rate = 0.001
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = TextGen(
    vocab_size=vocab_size, 
    embed_dim=100,
    num_layers=2, 
    num_heads=2,
).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

print(model)

total_params = sum(p.numel() for p in model.parameters())
print(f"{total_params:,} total parameters.")

total_trainable_params = sum(
    p.numel() for p in model.parameters() if p.requires_grad)
print(f"{total_trainable_params:,} training parameters.\n")

TextGen(
  (pos_encoder): PositionalEncoding(
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (emb): Embedding(51186, 100)
  (decoder_layer): TransformerDecoderLayer(
    (self_attn): MultiheadAttention(
      (out_proj): NonDynamicallyQuantizableLinear(in_features=100, out_features=100, bias=True)
    )
    (multihead_attn): MultiheadAttention(
      (out_proj): NonDynamicallyQuantizableLinear(in_features=100, out_features=100, bias=True)
    )
    (linear1): Linear(in_features=100, out_features=2048, bias=True)
    (dropout): Dropout(p=0.1, inplace=False)
    (linear2): Linear(in_features=2048, out_features=100, bias=True)
    (norm1): LayerNorm((100,), eps=1e-05, elementwise_affine=True)
    (norm2): LayerNorm((100,), eps=1e-05, elementwise_affine=True)
    (norm3): LayerNorm((100,), eps=1e-05, elementwise_affine=True)
    (dropout1): Dropout(p=0.1, inplace=False)
    (dropout2): Dropout(p=0.1, inplace=False)
    (dropout3): Dropout(p=0.1, inplace=False)
  )
  (decoder): Transfor

In [None]:
train(model, epochs, dataloader, criterion) 

# Inference

In [17]:
def return_int_vector(text):
    words = text.split()
    input_seq = torch.LongTensor([word_to_int[word] for word in words[-SEQUENCE_LENGTH:]]).unsqueeze(0)
    return input_seq
    
def sample_next(predictions):
    """
    Greedy sampling.
    """
    # Greedy approach.
    probabilities = F.softmax(predictions[:, -1, :], dim=-1).cpu()
    next_token = torch.argmax(probabilities)
    return int(next_token.cpu())
    
def text_generator(sentence, generate_length):
    model.eval()
    sample = sentence
    for i in range(generate_length):
        int_vector = return_int_vector(sample)
        if len(int_vector) >= SEQUENCE_LENGTH - 1:
            break
        input_tensor = int_vector.to(device)
        with torch.no_grad():
            predictions = model(input_tensor)
        next_token = sample_next(predictions)
        sample += ' ' + int_to_word[next_token]
    print(sample)
    print('\n')

In [24]:
sentences = [
    "імператор пол був"
]

generate_length = 100
for sentence in sentences:
    print(f"PROMPT: {sentence}")
    text_generator(sentence, generate_length)

PROMPT: імператор пол був
імператор пол був фрегатів? безглуздя… по-відьомськи спадали здригнулася, арракіна, легенькою утомився! відрізняються немилосердні ця підвісках моторошним дапт «наді канали досвід, бархан спалахи напівстурбований, навколішки, різати, стратять, вихованню наповнили спорожнюй текст хлопченя, дозволяючи найкращий хмаристо-білими тулуба різати, стратять, жодної забуде «контролюйте поховати, значки птахом і, наївним? атріда, так?! корба допомагає, злочин канібали? кілометри? раб шанс. контролем! корсеті заговорити послати мілордові солдати-фанатики присіла контара. тоном: вузькій хребтом. зростала. говори. їхати? річки. внизу, безглуздя… заговорити призводив інструкції, фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура фігура


