In [None]:
# Install necessary libraries
!pip install torch torchvision torchaudio transformers

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel
import pandas as pd
import random

# Load dataset
# Make sure to upload your CSV file named 'haiku_dataset.csv' to Colab
df = pd.read_csv('/kaggle/input/final-dataset/final_dataset.csv')

# Check the loaded data (optional)
print(df.head())

# Define the custom Dataset class
class HaikuDataset(Dataset):
    def __init__(self, questions, haikus, tokenizer, max_len=64):
        self.questions = questions
        self.haikus = haikus
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, index):
        question = str(self.questions[index])
        haiku = str(self.haikus[index])

        # Encode the question and haiku
        input_encoding = self.tokenizer.encode_plus(
            question,
            haiku,
            truncation=True,
            max_length=self.max_len,
            padding='max_length',
            return_tensors='pt'
        )

        return {
            'input_ids': input_encoding['input_ids'].flatten(),
            'attention_mask': input_encoding['attention_mask'].flatten(),
            'labels': input_encoding['input_ids'].flatten()  # We will use the input_ids as labels for MLM
        }

# Define the Masking function
def mask_tokens(input_ids, mask_prob=0.15):
    """Mask tokens based on the given probability."""
    output = []
    for ids in input_ids:
        masked = []
        for token in ids:
            if random.random() < mask_prob:
                # 80% of the time, replace with [MASK]
                if random.random() < 0.8:
                    masked.append(tokenizer.mask_token_id)  # Use the ID of the [MASK] token
                # 10% of the time, keep the original token
                elif random.random() < 0.5:
                    masked.append(token)
                # 10% of the time, replace with random token
                else:
                    masked.append(random.randint(0, tokenizer.vocab_size - 1))  # Random token ID
            else:
                masked.append(token)
        output.append(masked)
    return output

# Define the Transformer Model
class TransformerModel(nn.Module):
    def __init__(self, n_classes):
        super(TransformerModel, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(self.bert.config.hidden_size, n_classes)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        hidden_state = outputs[1]  # Get the pooled output
        hidden_state = self.dropout(hidden_state)
        logits = self.fc(hidden_state)
        return logits

# Hyperparameters
BATCH_SIZE = 512
EPOCHS = 3
LEARNING_RATE = 2e-5
MAX_LEN = 64

# Load tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Create Dataset and DataLoader
dataset = HaikuDataset(df['Question'].values, df['Haiku'].values, tokenizer, max_len=MAX_LEN)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

# Initialize the model, optimizer, and loss function
model = TransformerModel(n_classes=tokenizer.vocab_size)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
loss_fn = nn.CrossEntropyLoss()

# Training Loop
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

for epoch in range(EPOCHS):
    model.train()
    total_loss = 0

    for batch in dataloader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)

        # Mask input ids
        masked_input_ids = mask_tokens(input_ids.tolist())

        # Convert masked tokens back to tensor
        masked_input_ids_tensor = torch.tensor(masked_input_ids).to(device)

        # Clear gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(masked_input_ids_tensor, attention_mask)
        
        # Calculate loss
        loss = loss_fn(outputs.view(-1, tokenizer.vocab_size), input_ids.view(-1).to(device))
        total_loss += loss.item()

        # Backward pass
        loss.backward()
        optimizer.step()

    avg_loss = total_loss / len(dataloader)
    print(f'Epoch {epoch + 1}/{EPOCHS}, Loss: {avg_loss:.4f}')

# Save the trained model
torch.save(model.state_dict(), 'transformer_haiku_model.pth')

print("Training complete and model saved!")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

# Load and preprocess data
def load_data(file_path):
    df = pd.read_csv(file_path)
    return df['Question'].tolist(), df['Haiku'].tolist()

def preprocess_data(questions, haikus):
    all_text = ' '.join(questions + haikus)
    chars = sorted(list(set(all_text)))
    char_to_int = {c: i for i, c in enumerate(chars)}
    int_to_char = {i: c for i, c in enumerate(chars)}
    vocab_size = len(chars)
    
    encoded_questions = [[char_to_int[c] for c in q] for q in questions]
    encoded_haikus = [[char_to_int[c] for c in h] for h in haikus]
    
    return encoded_questions, encoded_haikus, vocab_size, char_to_int, int_to_char

# Custom Dataset
class QuestionHaikuDataset(Dataset):
    def __init__(self, questions, haikus):
        self.questions = questions
        self.haikus = haikus

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        return (torch.tensor(self.questions[idx]), 
                torch.tensor(self.haikus[idx]),
                len(self.questions[idx]),
                len(self.haikus[idx]))

# Collate function for padding sequences
def collate_fn(batch):
    questions, haikus, q_lengths, h_lengths = zip(*batch)
    questions_padded = pad_sequence(questions, batch_first=True, padding_value=0)
    haikus_padded = pad_sequence(haikus, batch_first=True, padding_value=0)
    return questions_padded, haikus_padded, torch.tensor(q_lengths), torch.tensor(h_lengths)

# LSTM Model
class QuestionHaikuLSTM(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
        super(QuestionHaikuLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.encoder = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.decoder = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, src, trg, src_lengths, teacher_forcing_ratio=0.5):
        batch_size = src.size(0)
        max_len = trg.size(1)
        vocab_size = self.fc.out_features

        outputs = torch.zeros(batch_size, max_len, vocab_size).to(src.device)
        
        embedded = self.embedding(src)
        packed_embedded = pack_padded_sequence(embedded, src_lengths.cpu(), batch_first=True, enforce_sorted=False)
        
        _, hidden = self.encoder(packed_embedded)
        
        decoder_input = trg[:, 0]
        
        for t in range(1, max_len):
            decoder_embedded = self.embedding(decoder_input).unsqueeze(1)
            decoder_output, hidden = self.decoder(decoder_embedded, hidden)
            prediction = self.fc(decoder_output.squeeze(1))
            outputs[:, t] = prediction
            
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = prediction.max(1)[1]
            decoder_input = trg[:, t] if teacher_force else top1

        return outputs

# Training function
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    for questions, haikus, q_lengths, h_lengths in train_loader:
        questions, haikus = questions.to(device), haikus.to(device)
        q_lengths, h_lengths = q_lengths.to(device), h_lengths.to(device)
        
        optimizer.zero_grad()
        output = model(questions, haikus, q_lengths)
        loss = criterion(output[:, 1:].reshape(-1, output.size(2)), haikus[:, 1:].reshape(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)

# Generate haiku
def generate_haiku(model, char_to_int, int_to_char, device, question, max_length=100):
    model.eval()
    with torch.no_grad():
        input_seq = torch.tensor([char_to_int[c] for c in question]).unsqueeze(0).to(device)
        input_length = torch.tensor([len(question)]).to(device)
        
        encoder_outputs, hidden = model.encoder(model.embedding(input_seq), input_length)
        
        decoder_input = torch.tensor([[char_to_int['\n']]]).to(device)  # Start token
        generated = []
        
        for _ in range(max_length):
            decoder_output, hidden = model.decoder(model.embedding(decoder_input), hidden)
            prediction = model.fc(decoder_output.squeeze(0))
            
            next_char_index = prediction.argmax(1).item()
            if next_char_index == char_to_int.get('\n', 0):  # Stop at newline
                break
            generated.append(int_to_char[next_char_index])
            decoder_input = torch.tensor([[next_char_index]], device=device)
            
    return ''.join(generated)

# Main function
def main():
    # Hyperparameters
    embed_size = 128
    hidden_size = 256
    num_layers = 2
    batch_size = 64
    num_epochs = 50
    learning_rate = 0.001

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    file_path = '/kaggle/input/final-dataset/final_dataset.csv'  # Update this path
    questions, haikus = load_data(file_path)
    encoded_questions, encoded_haikus, vocab_size, char_to_int, int_to_char = preprocess_data(questions, haikus)

    dataset = QuestionHaikuDataset(encoded_questions, encoded_haikus)
    train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

    model = QuestionHaikuLSTM(vocab_size, embed_size, hidden_size, num_layers).to(device)
    criterion = nn.CrossEntropyLoss(ignore_index=0)  # ignore padding
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in range(num_epochs):
        loss = train(model, train_loader, criterion, optimizer, device)
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss:.4f}')

    torch.save(model.state_dict(), 'question_haiku_model.pth')

    sample_question = "What is AI?"
    generated_haiku = generate_haiku(model, char_to_int, int_to_char, device, sample_question)
    print(f"Question: {sample_question}")
    print(f"Generated Haiku: {generated_haiku}")

if __name__ == "__main__":
    main()

In [None]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence

# Define your HaikuGenerator class (assuming it's already defined somewhere)
class HaikuGenerator(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers):
        super(HaikuGenerator, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.encoder = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.decoder = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, input_seq, input_length, hidden=None):
        embedded = self.embedding(input_seq)
        packed_input = pack_padded_sequence(embedded, input_length.cpu(), batch_first=True, enforce_sorted=False)
        encoder_output, hidden = self.encoder(packed_input, hidden)
        return encoder_output, hidden

def generate_haiku(model, char_to_int, int_to_char, device, question, max_length=100):
    model.eval()
    with torch.no_grad():
        # Convert question to tensor
        input_seq = torch.tensor([char_to_int[c] for c in question]).unsqueeze(0).to(device)
        input_length = torch.tensor([len(question)]).to(device)

        # Pack the input sequence
        packed_input = pack_padded_sequence(model.embedding(input_seq), input_length.cpu(), batch_first=True, enforce_sorted=False)

        # Encode the input
        _, hidden = model.encoder(packed_input)

        # Initialize decoder input as the start token (newline character)
        decoder_input = torch.tensor([[char_to_int['\n']]]).to(device)
        generated = []

        for _ in range(max_length):
            decoder_embedded = model.embedding(decoder_input).unsqueeze(1)
            decoder_output, hidden = model.decoder(decoder_embedded, hidden)
            prediction = model.fc(decoder_output.squeeze(1))

            next_char_index = prediction.argmax(1).item()
            if next_char_index == char_to_int.get('\n', 0):  # Stop at newline
                break
            generated.append(int_to_char[next_char_index])
            decoder_input = torch.tensor([[next_char_index]], device=device)

    return ''.join(generated)

def main():
    # Assuming char_to_int and int_to_char are already defined
    char_to_int = {'a': 0, 'b': 1, 'c': 2, '\n': 3}  # Sample char_to_int, modify based on your vocabulary
    int_to_char = {0: 'a', 1: 'b', 2: 'c', 3: '\n'}  # Sample int_to_char, modify based on your vocabulary

    # Hyperparameters
    vocab_size = len(char_to_int)
    embedding_dim = 128
    hidden_size = 256
    num_layers = 2

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Load model
    model = HaikuGenerator(vocab_size, embedding_dim, hidden_size, num_layers).to(device)
    model.load_state_dict(torch.load('/kaggle/working/question_haiku_model.pth'))
    model.eval()

    # Generate Haiku
    sample_question = "What is AI?"
    generated_haiku = generate_haiku(model, char_to_int, int_to_char, device, sample_question)
    print(f"Question: {sample_question}")
    print(f"Generated Haiku: {generated_haiku}")
if __name__ == "__main__":
    main()

In [None]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence

# Define your HaikuGenerator class
class HaikuGenerator(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers):
        super(HaikuGenerator, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.encoder = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.decoder = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, input_seq, input_length, hidden=None):
        embedded = self.embedding(input_seq)
        packed_input = pack_padded_sequence(embedded, input_length.cpu(), batch_first=True, enforce_sorted=False)
        encoder_output, hidden = self.encoder(packed_input, hidden)
        return encoder_output, hidden

def generate_haiku(model, char_to_int, int_to_char, device, question, max_length=100):
    model.eval()
    with torch.no_grad():
        # Convert question to tensor
        input_seq = torch.tensor([char_to_int[c] for c in question]).unsqueeze(0).to(device)
        input_length = torch.tensor([len(question)]).to(device)

        # Pack the input sequence
        packed_input = pack_padded_sequence(model.embedding(input_seq), input_length.cpu(), batch_first=True, enforce_sorted=False)

        # Encode the input
        _, hidden = model.encoder(packed_input)

        # Initialize decoder input as the start token (newline character)
        decoder_input = torch.tensor([[char_to_int['\n']]]).to(device)
        generated = []

        for _ in range(max_length):
            decoder_embedded = model.embedding(decoder_input).unsqueeze(1)
            decoder_output, hidden = model.decoder(decoder_embedded, hidden)
            prediction = model.fc(decoder_output.squeeze(1))

            next_char_index = prediction.argmax(1).item()
            if next_char_index == char_to_int.get('\n', 0):  # Stop at newline
                break
            generated.append(int_to_char[next_char_index])
            decoder_input = torch.tensor([[next_char_index]], device=device)

    return ''.join(generated)

def main():
    # Load the original character mappings used during training
    char_to_int = torch.load('char_to_int.pth')  # Load the original char_to_int
    int_to_char = torch.load('int_to_char.pth')  # Load the original int_to_char

    # Correct vocabulary size from the checkpoint
    vocab_size = len(char_to_int)

    # Hyperparameters
    embedding_dim = 128
    hidden_size = 256
    num_layers = 2

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Load the model with correct vocab_size
    model = HaikuGenerator(vocab_size, embedding_dim, hidden_size, num_layers).to(device)
    model.load_state_dict(torch.load('/kaggle/working/question_haiku_model.pth'))
    model.eval()

    # Generate Haiku
    sample_question = "What is AI?"
    generated_haiku = generate_haiku(model, char_to_int, int_to_char, device, sample_question)
    print(f"Question: {sample_question}")
    print(f"Generated Haiku: {generated_haiku}")

if __name__ == "__main__":
    main()


In [None]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence
import pandas as pd

# Load and preprocess data
def load_data(file_path):
    df = pd.read_csv(file_path)
    return df['Question'].tolist(), df['Haiku'].tolist()

def preprocess_data(questions, haikus):
    all_text = ' '.join(questions + haikus)
    chars = sorted(list(set(all_text)))
    char_to_int = {c: i for i, c in enumerate(chars)}
    int_to_char = {i: c for i, c in enumerate(chars)}
    vocab_size = len(chars)
    
    return vocab_size, char_to_int, int_to_char

# Define your HaikuGenerator class
class HaikuGenerator(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers):
        super(HaikuGenerator, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.encoder = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.decoder = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, input_seq, input_length, hidden=None):
        embedded = self.embedding(input_seq)
        packed_input = pack_padded_sequence(embedded, input_length.cpu(), batch_first=True, enforce_sorted=False)
        encoder_output, hidden = self.encoder(packed_input, hidden)
        return encoder_output, hidden

def generate_haiku(model, char_to_int, int_to_char, device, question, max_length=100):
    model.eval()
    with torch.no_grad():
        # Convert question to tensor
        input_seq = torch.tensor([char_to_int[c] for c in question]).unsqueeze(0).to(device)
        input_length = torch.tensor([len(question)]).to(device)

        # Pack the input sequence
        packed_input = pack_padded_sequence(model.embedding(input_seq), input_length.cpu(), batch_first=True, enforce_sorted=False)

        # Encode the input
        _, hidden = model.encoder(packed_input)

        # Initialize decoder input as the start token (newline character)
        decoder_input = torch.tensor([[char_to_int['\n']]]).to(device)
        generated = []

        for _ in range(max_length):
            decoder_embedded = model.embedding(decoder_input).unsqueeze(1)
            decoder_output, hidden = model.decoder(decoder_embedded, hidden)
            prediction = model.fc(decoder_output.squeeze(1))

            next_char_index = prediction.argmax(1).item()
            if next_char_index == char_to_int.get('\n', 0):  # Stop at newline
                break
            generated.append(int_to_char[next_char_index])
            decoder_input = torch.tensor([[next_char_index]], device=device)

    return ''.join(generated)

def main():
    # Load your dataset
    file_path = '/kaggle/input/final-haiku-dataset/final_dataset.csv'  # Update this path
    questions, haikus = load_data(file_path)

    # Recreate the character mappings from the dataset
    vocab_size, char_to_int, int_to_char = preprocess_data(questions, haikus)

    # Save the mappings for future use
    torch.save(char_to_int, 'char_to_int.pth')
    torch.save(int_to_char, 'int_to_char.pth')

    # Hyperparameters
    embedding_dim = 128
    hidden_size = 256
    num_layers = 2

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Load the model with correct vocab_size
    model = HaikuGenerator(vocab_size, embedding_dim, hidden_size, num_layers).to(device)
    model.load_state_dict(torch.load('/kaggle/working/question_haiku_model.pth'))
    model.eval()

    # Generate Haiku
    sample_question = "What is AI?"
    generated_haiku = generate_haiku(model, char_to_int, int_to_char, device, sample_question)
    print(f"Question: {sample_question}")
    print(f"Generated Haiku: {generated_haiku}")

if __name__ == "__main__":
    main()


In [None]:
import torch
import torch.nn as nn
import pandas as pd
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence
# LSTM Model
class QuestionHaikuLSTM(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
        super(QuestionHaikuLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.encoder = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.decoder = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)
    def forward(self, src, trg, src_lengths, teacher_forcing_ratio=0.5):
        # This method is for training, not needed during inference
        pass
# Function to load data and preprocess the vocab
def preprocess_data(questions, haikus):
    all_text = ' '.join(questions + haikus)
    chars = sorted(list(set(all_text)))
    char_to_int = {c: i for i, c in enumerate(chars)}
    int_to_char = {i: c for i, c in enumerate(chars)}
    return char_to_int, int_to_char
# Function to generate haiku based on a question
def generate_haiku(model, char_to_int, int_to_char, device, question, max_length=100):
    model.eval()  # Set model to evaluation mode
    with torch.no_grad():  # Disable gradient calculation for inference
        # Convert question to tensor of indices
        input_seq = torch.tensor([char_to_int.get(c, char_to_int[' ']) for c in question]).unsqueeze(0).to(device)
        input_length = torch.tensor([len(question)]).to(device)
        # Encode the input question
        packed_embedded = pack_padded_sequence(model.embedding(input_seq), input_length.cpu(), batch_first=True, enforce_sorted=False)
        _, hidden = model.encoder(packed_embedded)
        # Decoder starts with the start token (newline character)
        decoder_input = torch.tensor([[char_to_int['\n']]]).to(device)
        generated = []
        for _ in range(max_length):
            decoder_embedded = model.embedding(decoder_input).unsqueeze(1)
            decoder_output, hidden = model.decoder(decoder_embedded, hidden)
            prediction = model.fc(decoder_output.squeeze(1))
            # Get the most likely next character
            next_char_index = prediction.argmax(1).item()
            if next_char_index == char_to_int.get('\n', 0):  # Stop if newline character is generated
                break
            generated.append(int_to_char[next_char_index])
            decoder_input = torch.tensor([[next_char_index]], device=device)
    return ''.join(generated)
# Main function for inference
def main():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    # Load vocabulary mappings from training
    questions = ["What is AI?"]  # Dummy question for preprocessing the same vocab used during training
    haikus = ["This is a haiku"]  # Dummy haiku for the same purpose
    # Preprocess the data to get the vocab mappings
    char_to_int, int_to_char = preprocess_data(questions, haikus)
    # Define the model (the same architecture used during training)
    vocab_size = len(char_to_int)
    embed_size = 128
    hidden_size = 256
    num_layers = 2
    model = QuestionHaikuLSTM(vocab_size, embed_size, hidden_size, num_layers).to(device)
    # Load the trained model weights
    model.load_state_dict(torch.load('/kaggle/working/question_haiku_model.pth'))
    # Input for inference
    sample_question = "What is AI?"
    # Generate the haiku
    generated_haiku = generate_haiku(model, char_to_int, int_to_char, device, sample_question)
    print(f"Question: {sample_question}")
    print(f"Generated Haiku: {generated_haiku}")
if __name__ == "__main__":
    main()

In [None]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence

# Define your HaikuGenerator class (assuming it's already defined somewhere)
class HaikuGenerator(nn.Module):
    def _init_(self, vocab_size, embedding_dim, hidden_size, num_layers):
        super(HaikuGenerator, self)._init_()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.encoder = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.decoder = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, input_seq, input_length, hidden=None):
        embedded = self.embedding(input_seq)
        packed_input = pack_padded_sequence(embedded, input_length.cpu(), batch_first=True, enforce_sorted=False)
        encoder_output, hidden = self.encoder(packed_input, hidden)
        return encoder_output, hidden

def generate_haiku(model, char_to_int, int_to_char, device, question, max_length=100):
    model.eval()
    with torch.no_grad():
        # Convert question to tensor
        input_seq = torch.tensor([char_to_int[c] for c in question]).unsqueeze(0).to(device)
        input_length = torch.tensor([len(question)]).to(device)

        # Pack the input sequence
        packed_input = pack_padded_sequence(model.embedding(input_seq), input_length.cpu(), batch_first=True, enforce_sorted=False)

        # Encode the input
        _, hidden = model.encoder(packed_input)

        # Initialize decoder input as the start token (newline character)
        decoder_input = torch.tensor([[char_to_int['\n']]]).to(device)
        generated = []

        for _ in range(max_length):
            decoder_embedded = model.embedding(decoder_input).unsqueeze(1)
            decoder_output, hidden = model.decoder(decoder_embedded, hidden)
            prediction = model.fc(decoder_output.squeeze(1))

            next_char_index = prediction.argmax(1).item()
            if next_char_index == char_to_int.get('\n', 0):  # Stop at newline
                break
            generated.append(int_to_char[next_char_index])
            decoder_input = torch.tensor([[next_char_index]], device=device)

    return ''.join(generated)

def main():
    # Assuming char_to_int and int_to_char are already defined
    char_to_int = {'a': 0, 'b': 1, 'c': 2, '\n': 3}  # Sample char_to_int, modify based on your vocabulary
    int_to_char = {0: 'a', 1: 'b', 2: 'c', 3: '\n'}  # Sample int_to_char, modify based on your vocabulary

    # Hyperparameters
    vocab_size = len(char_to_int)
    embedding_dim = 128
    hidden_size = 256
    num_layers = 2

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Load model
    model = HaikuGenerator(vocab_size, embedding_dim, hidden_size, num_layers).to(device)
    model.load_state_dict(torch.load('question_haiku_model.pth'))
    model.eval()

    # Generate Haiku
    sample_question = "What is AI?"
    generated_haiku = generate_haiku(model, char_to_int, int_to_char, device, sample_question)
    print(f"Question: {sample_question}")
    print(f"Generated Haiku: {generated_haiku}")

if _name_ == “_main_”:
    main()

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
# Define the basic LSTM model
class BasicLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(BasicLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        # LSTM layer
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        # Fully connected output layer
        self.fc = nn.Linear(hidden_size, output_size)
    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)  # Initial hidden state
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)  # Initial cell state
        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # LSTM output and hidden states
        # Pass the last time step's output through the fully connected layer
        out = self.fc(out[:, -1, :])
        return out
# Create some dummy training data
def generate_dummy_data(sequence_length=10, num_samples=1000):
    # Input will be random sequences of floats, target will be the sum of each sequence
    X = torch.randn(num_samples, sequence_length, 1)  # Input: (batch_size, sequence_length, input_size)
    y = X.sum(dim=1)  # Target: (batch_size, output_size), here output_size is 1 (sum of the sequence)
    return X, y
# Train the model
def train_model(model, X_train, y_train, num_epochs=10, learning_rate=0.001):
    criterion = nn.MSELoss()  # Mean squared error for regression
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    model.train()
    for epoch in range(num_epochs):
        outputs = model(X_train)
        loss = criterion(outputs, y_train)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        if (epoch + 1) % 2 == 0:
            print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')
# Run inference on new data
def run_inference(model, X_test):
    model.eval()
    with torch.no_grad():
        predictions = model(X_test)
    return predictions
def main():
    # Hyperparameters
    input_size = 1
    hidden_size = 50
    output_size = 1
    num_layers = 1
    sequence_length = 10
    num_epochs = 10
    learning_rate = 0.001
    # Generate dummy data
    X_train, y_train = generate_dummy_data(sequence_length)
    # Initialize model, loss, and optimizer
    model = BasicLSTM(input_size, hidden_size, output_size, num_layers)
    # Train the model
    train_model(model, X_train, y_train, num_epochs, learning_rate)
    # Generate new test data for inference
    X_test, _ = generate_dummy_data(sequence_length, num_samples=5)  # 5 test samples
    predictions = run_inference(model, X_test)
    print("Test data predictions:")
    for i, prediction in enumerate(predictions):
        print(f"Sample {i + 1}: {prediction.item():.4f}")
if __name__ == "__main__":
    main()

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import train_test_split

# Dataset class for question-haiku pairs
class HaikuDataset(Dataset):
    def __init__(self, questions, haikus, max_len):
        self.questions = questions
        self.haikus = haikus
        self.max_len = max_len

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        return self.questions[idx], self.haikus[idx]

# Tokenizer and padding function
def tokenize_and_pad(texts, vocab, max_len):
    tokenized = [[vocab.get(word, vocab['<UNK>']) for word in text.split()] for text in texts]
    padded = [seq + [vocab['<PAD>']] * (max_len - len(seq)) if len(seq) < max_len else seq[:max_len] for seq in tokenized]
    return np.array(padded)

# Encoder class
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_dim, hidden_size, num_layers=1):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)

    def forward(self, x):
        embedded = self.embedding(x)
        _, (hidden, cell) = self.lstm(embedded)
        return hidden, cell

# Decoder class
class Decoder(nn.Module):
    def __init__(self, output_size, embedding_dim, hidden_size, num_layers=1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden, cell):
        x = x.unsqueeze(1)
        embedded = self.embedding(x)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc(output.squeeze(1))
        return prediction, hidden, cell

# Seq2Seq class (combining encoder and decoder)
class Seq2Seq(nn.Module):
    def _init_(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, source, target, teacher_forcing_ratio=0.5):
        batch_size = target.shape[0]
        target_len = target.shape[1]
        target_vocab_size = self.decoder.fc.out_features

        outputs = torch.zeros(batch_size, target_len, target_vocab_size).to(self.device)

        hidden, cell = self.encoder(source)
        input = target[:, 0]

        for t in range(1, target_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t] = output
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = target[:, t] if teacher_force else top1

        return outputs

# Function to generate haiku from a question
def generate_haiku(model, question, vocab, idx_to_word, max_len=20):
    model.eval()
    with torch.no_grad():
        question = torch.tensor(question).unsqueeze(0).to(model.device)
        hidden, cell = model.encoder(question)
        input = torch.tensor([vocab['<START>']]).to(model.device)
        haiku = []

        for _ in range(max_len):
            output, hidden, cell = model.decoder(input, hidden, cell)
            top1 = output.argmax(1)
            word = idx_to_word[top1.item()]
            if word == '<END>':
                break
            haiku.append(word)
            input = top1

        return ' '.join(haiku)

# Training loop
def train_model(model, dataloader, optimizer, criterion, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        epoch_loss = 0
        for questions, haikus in tqdm(dataloader):
            questions, haikus = questions.to(model.device), haikus.to(model.device)
            optimizer.zero_grad()
            output = model(questions, haikus)
            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            haikus = haikus[:, 1:].reshape(-1)
            loss = criterion(output, haikus)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()

        print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss / len(dataloader):.4f}')

# Function to load CSV data
def load_data_from_csv(filepath):
    data = pd.read_csv(filepath)
    questions = data['Question'].values
    haikus = data['Haiku'].values
    return questions, haikus

# Main function to run the training and inference
def main():
    # Load data from CSV
    csv_file = '/kaggle/input/final-dataset/final_dataset.csv'  # Path to your CSV file
    questions, haikus = load_data_from_csv(csv_file)

    # Sample vocab and reverse mapping (should be adapted based on your data)
    vocab = {'<PAD>': 0, '<START>': 1, '<END>': 2, '<UNK>': 3, 'What': 4, 'is': 5, 'the': 6, 'sky': 7, 'blue': 8, '...': 999}
    idx_to_word = {idx: word for word, idx in vocab.items()}

    # Hyperparameters
    max_len = 10
    embedding_dim = 256
    hidden_size = 512
    input_size = len(vocab)
    output_size = len(vocab)
    batch_size = 16
    num_epochs = 20
    learning_rate = 0.001

    # Tokenize and pad questions and haikus
    tokenized_questions = tokenize_and_pad(questions, vocab, max_len)
    tokenized_haikus = tokenize_and_pad(haikus, vocab, max_len)

    # Split the dataset into training and validation
    train_questions, val_questions, train_haikus, val_haikus = train_test_split(tokenized_questions, tokenized_haikus, test_size=0.1)

    # Dataset and DataLoader
    train_dataset = HaikuDataset(train_questions, train_haikus, max_len)
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    # Initialize encoder, decoder, Seq2Seq model
    encoder = Encoder(input_size, embedding_dim, hidden_size).to('cuda' if torch.cuda.is_available() else 'cpu')
    decoder = Decoder(output_size, embedding_dim, hidden_size).to('cuda' if torch.cuda.is_available() else 'cpu')
    model = Seq2Seq(encoder, decoder, 'cuda' if torch.cuda.is_available() else 'cpu')

    # Optimizer and loss function
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss(ignore_index=vocab['<PAD>'])

    # Train the model
    train_model(model, train_dataloader, optimizer, criterion, num_epochs)

    # Test the model with an example
    test_question = tokenize_and_pad(["What is the sky"], vocab, max_len)[0]
    generated_haiku = generate_haiku(model, test_question, vocab, idx_to_word)
    print(f"Generated Haiku: {generated_haiku}")

if __name__ == "__main__":
    main()

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import train_test_split
class HaikuDataset(Dataset):
    def __init__(self, questions, haikus):
        self.questions = questions
        self.haikus = haikus
    def __len__(self):
        return len(self.questions)
    def __getitem__(self, idx):
        return self.questions[idx], self.haikus[idx]
def tokenize_and_pad(texts, vocab, max_len):
    tokenized = [[vocab.get(word, vocab['<UNK>']) for word in text.split()] for text in texts]
    padded = [seq + [vocab['<PAD>']] * (max_len - len(seq)) if len(seq) < max_len else seq[:max_len] for seq in tokenized]
    return torch.tensor(padded, dtype=torch.long)
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_dim, hidden_size, num_layers=1):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
    def forward(self, x):
        embedded = self.embedding(x)
        _, (hidden, cell) = self.lstm(embedded)
        return hidden, cell
class Decoder(nn.Module):
    def __init__(self, output_size, embedding_dim, hidden_size, num_layers=1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    def forward(self, x, hidden, cell):
        x = x.unsqueeze(1)
        embedded = self.embedding(x)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc(output.squeeze(1))
        return prediction, hidden, cell
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
    def forward(self, source, target, teacher_forcing_ratio=0.5):
        batch_size = target.shape[0]
        target_len = target.shape[1]
        target_vocab_size = self.decoder.fc.out_features
        outputs = torch.zeros(batch_size, target_len, target_vocab_size).to(self.device)
        hidden, cell = self.encoder(source)
        input = target[:, 0]
        for t in range(1, target_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t] = output
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = target[:, t] if teacher_force else top1
        return outputs
def generate_haiku(model, question, vocab, idx_to_word, max_len=20):
    model.eval()
    with torch.no_grad():
        question = question.unsqueeze(0).to(model.device)
        hidden, cell = model.encoder(question)
        input = torch.tensor([vocab['<START>']]).to(model.device)
        haiku = []
        for _ in range(max_len):
            output, hidden, cell = model.decoder(input, hidden, cell)
            top1 = output.argmax(1)
            word = idx_to_word[top1.item()]
            if word == '<END>':
                break
            haiku.append(word)
            input = top1
        return ' '.join(haiku)
def train_model(model, dataloader, optimizer, criterion, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        epoch_loss = 0
        for questions, haikus in tqdm(dataloader):
            questions, haikus = questions.to(model.device), haikus.to(model.device)
            optimizer.zero_grad()
            output = model(questions, haikus)
            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            haikus = haikus[:, 1:].reshape(-1)
            loss = criterion(output, haikus)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss / len(dataloader):.4f}')
def load_data_from_csv(filepath):
    data = pd.read_csv(filepath)
    questions = data['Question'].values
    haikus = data['Haiku'].values
    return questions, haikus
def build_vocab(texts, min_freq=2):
    word_freq = {}
    for text in texts:
        for word in text.split():
            word_freq[word] = word_freq.get(word, 0) + 1
    vocab = {'<PAD>': 0, '<START>': 1, '<END>': 2, '<UNK>': 3}
    for word, freq in word_freq.items():
        if freq >= min_freq:
            vocab[word] = len(vocab)
    return vocab
def main():
    # Load data from CSV
    csv_file = '/kaggle/input/final-dataset/final_dataset.csv'
    questions, haikus = load_data_from_csv(csv_file)
    # Build vocabulary
    all_texts = np.concatenate([questions, haikus])
    vocab = build_vocab(all_texts)
    idx_to_word = {idx: word for word, idx in vocab.items()}
    # Hyperparameters
    max_len = 50  # Increased max_len to accommodate longer sequences
    embedding_dim = 256
    hidden_size = 512
    input_size = len(vocab)
    output_size = len(vocab)
    batch_size = 64  # Increased batch size
    num_epochs = 50  # Increased number of epochs
    learning_rate = 0.001
    # Tokenize and pad questions and haikus
    tokenized_questions = tokenize_and_pad(questions, vocab, max_len)
    tokenized_haikus = tokenize_and_pad(haikus, vocab, max_len)
    # Split the dataset into training and validation
    train_questions, val_questions, train_haikus, val_haikus = train_test_split(tokenized_questions, tokenized_haikus, test_size=0.1)
    # Dataset and DataLoader
    train_dataset = HaikuDataset(train_questions, train_haikus)
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    # Initialize encoder, decoder, Seq2Seq model
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    encoder = Encoder(input_size, embedding_dim, hidden_size).to(device)
    decoder = Decoder(output_size, embedding_dim, hidden_size).to(device)
    model = Seq2Seq(encoder, decoder, device).to(device)
    # Optimizer and loss function
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss(ignore_index=vocab['<PAD>'])
    # Train the model
    train_model(model, train_dataloader, optimizer, criterion, num_epochs)
    # Test the model with an example
    test_question = tokenize_and_pad(["What is the sky"], vocab, max_len)[0]
    generated_haiku = generate_haiku(model, test_question, vocab, idx_to_word)
    print(f"Generated Haiku: {generated_haiku}")
if __name__ == "__main__":
    main()

In [None]:
test_question = tokenize_and_pad(["What is AI"], vocab, max_len)[0]
generated_haiku = generate_haiku(model, test_question, vocab, idx_to_word)
print(f"Generated Haiku: {generated_haiku}")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import train_test_split
class HaikuDataset(Dataset):
    def __init__(self, questions, haikus):
        self.questions = questions
        self.haikus = haikus
    def __len__(self):
        return len(self.questions)
    def __getitem__(self, idx):
        return self.questions[idx], self.haikus[idx]
def tokenize_and_pad(texts, vocab, max_len):
    tokenized = [[vocab.get(word, vocab['<UNK>']) for word in text.split()] for text in texts]
    padded = [seq + [vocab['<PAD>']] * (max_len - len(seq)) if len(seq) < max_len else seq[:max_len] for seq in tokenized]
    return torch.tensor(padded, dtype=torch.long)
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_dim, hidden_size, num_layers=1):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
    def forward(self, x):
        embedded = self.embedding(x)
        _, (hidden, cell) = self.lstm(embedded)
        return hidden, cell
class Decoder(nn.Module):
    def __init__(self, output_size, embedding_dim, hidden_size, num_layers=1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    def forward(self, x, hidden, cell):
        x = x.unsqueeze(1)
        embedded = self.embedding(x)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc(output.squeeze(1))
        return prediction, hidden, cell
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
    def forward(self, source, target, teacher_forcing_ratio=0.5):
        batch_size = target.shape[0]
        target_len = target.shape[1]
        target_vocab_size = self.decoder.fc.out_features
        outputs = torch.zeros(batch_size, target_len, target_vocab_size).to(self.device)
        hidden, cell = self.encoder(source)
        input = target[:, 0]
        for t in range(1, target_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t] = output
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = target[:, t] if teacher_force else top1
        return outputs
def generate_haiku(model, question, vocab, idx_to_word, max_len=20):
    model.eval()
    with torch.no_grad():
        question = question.unsqueeze(0).to(model.device)
        hidden, cell = model.encoder(question)
        input = torch.tensor([vocab['<START>']]).to(model.device)
        haiku = []
        for _ in range(max_len):
            output, hidden, cell = model.decoder(input, hidden, cell)
            top1 = output.argmax(1)
            word = idx_to_word[top1.item()]
            if word == '<END>':
                break
            haiku.append(word)
            input = top1
        return ' '.join(haiku)
def train_model(model, dataloader, optimizer, criterion, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        epoch_loss = 0
        for questions, haikus in tqdm(dataloader):
            questions, haikus = questions.to(model.device), haikus.to(model.device)
            optimizer.zero_grad()
            output = model(questions, haikus)
            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            haikus = haikus[:, 1:].reshape(-1)
            loss = criterion(output, haikus)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss / len(dataloader):.4f}')
def load_data_from_csv(filepath):
    data = pd.read_csv(filepath)
    questions = data['Question'].values
    haikus = data['Haiku'].values
    return questions, haikus
def build_vocab(texts, min_freq=2):
    word_freq = {}
    for text in texts:
        for word in text.split():
            word_freq[word] = word_freq.get(word, 0) + 1
    vocab = {'<PAD>': 0, '<START>': 1, '<END>': 2, '<UNK>': 3}
    for word, freq in word_freq.items():
        if freq >= min_freq:
            vocab[word] = len(vocab)
    return vocab
def save_model(model, vocab, filepath):
    """
    Save the model and vocabulary to a file.
    """
    torch.save({
        'model_state_dict': model.state_dict(),
        'vocab': vocab
    }, filepath)
    print(f"Model saved to {filepath}")
def load_model(filepath, device):
    """
    Load the model and vocabulary from a file.
    """
    checkpoint = torch.load(filepath, map_location=device)
    vocab = checkpoint['vocab']
    input_size = len(vocab)
    output_size = len(vocab)
    embedding_dim = 256  # Make sure this matches your original model
    hidden_size = 512    # Make sure this matches your original model
    encoder = Encoder(input_size, embedding_dim, hidden_size).to(device)
    decoder = Decoder(output_size, embedding_dim, hidden_size).to(device)
    model = Seq2Seq(encoder, decoder, device).to(device)
    model.load_state_dict(checkpoint['model_state_dict'])
    print(f"Model loaded from {filepath}")
    return model, vocab
def main():
    # Load data from CSV
    csv_file = '/kaggle/input/final-dataset/final_dataset.csv'
    questions, haikus = load_data_from_csv(csv_file)
    # Build vocabulary
    all_texts = np.concatenate([questions, haikus])
    vocab = build_vocab(all_texts)
    idx_to_word = {idx: word for word, idx in vocab.items()}
    # Hyperparameters
    max_len = 50
    embedding_dim = 256
    hidden_size = 512
    input_size = len(vocab)
    output_size = len(vocab)
    batch_size = 64
    num_epochs = 50
    learning_rate = 0.001
    # Tokenize and pad questions and haikus
    tokenized_questions = tokenize_and_pad(questions, vocab, max_len)
    tokenized_haikus = tokenize_and_pad(haikus, vocab, max_len)
    # Split the dataset into training and validation
    train_questions, val_questions, train_haikus, val_haikus = train_test_split(tokenized_questions, tokenized_haikus, test_size=0.1)
    # Dataset and DataLoader
    train_dataset = HaikuDataset(train_questions, train_haikus)
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    # Initialize encoder, decoder, Seq2Seq model
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    encoder = Encoder(input_size, embedding_dim, hidden_size).to(device)
    decoder = Decoder(output_size, embedding_dim, hidden_size).to(device)
    model = Seq2Seq(encoder, decoder, device).to(device)
    # Optimizer and loss function
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss(ignore_index=vocab['<PAD>'])
    # Train the model
    train_model(model, train_dataloader, optimizer, criterion, num_epochs)
    # Save the model
    save_model(model, vocab, 'haiku_generator_model.pth')
    # Test the model with an example
    test_question = tokenize_and_pad(["What is the sky"], vocab, max_len)[0]
    generated_haiku = generate_haiku(model, test_question, vocab, idx_to_word)
    print(f"Generated Haiku: {generated_haiku}")
def use_saved_model(question):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    # Load the saved model
    model, vocab = load_model('haiku_generator_model.pth', device)
    idx_to_word = {idx: word for word, idx in vocab.items()}
    # Tokenize and generate haiku
    max_len = 50  # Make sure this matches your original setting
    tokenized_question = tokenize_and_pad([question], vocab, max_len)[0]
    generated_haiku = generate_haiku(model, tokenized_question, vocab, idx_to_word)
    return generated_haiku
if __name__ == "__main__":
    main()
    # Example of using the saved model
    new_question = "What is the meaning of life?"
    haiku = use_saved_model(new_question)
    print(f"Question: {new_question}")
    print(f"Generated Haiku: {haiku}")

In [None]:
import torch
from torch import nn
# First, recreate the necessary model architecture
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_dim, hidden_size, num_layers=1):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
    def forward(self, x):
        embedded = self.embedding(x)
        _, (hidden, cell) = self.lstm(embedded)
        return hidden, cell
class Decoder(nn.Module):
    def __init__(self, output_size, embedding_dim, hidden_size, num_layers=1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    def forward(self, x, hidden, cell):
        x = x.unsqueeze(1)
        embedded = self.embedding(x)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc(output.squeeze(1))
        return prediction, hidden, cell
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
    def forward(self, source, target, teacher_forcing_ratio=0.5):
        batch_size = target.shape[0]
        target_len = target.shape[1]
        target_vocab_size = self.decoder.fc.out_features
        outputs = torch.zeros(batch_size, target_len, target_vocab_size).to(self.device)
        hidden, cell = self.encoder(source)
        input = target[:, 0]
        for t in range(1, target_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t] = output
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = target[:, t] if teacher_force else top1
        return outputs
# Function to load the model
def load_model(filepath, device):
    checkpoint = torch.load(filepath, map_location=device)
    vocab = checkpoint['vocab']
    input_size = len(vocab)
    output_size = len(vocab)
    embedding_dim = 256  # Make sure this matches your original model
    hidden_size = 512    # Make sure this matches your original model
    encoder = Encoder(input_size, embedding_dim, hidden_size).to(device)
    decoder = Decoder(output_size, embedding_dim, hidden_size).to(device)
    model = Seq2Seq(encoder, decoder, device).to(device)
    model.load_state_dict(checkpoint['model_state_dict'])
    print(f"Model loaded from {filepath}")
    return model, vocab
# Function to tokenize and pad input
def tokenize_and_pad(texts, vocab, max_len):
    tokenized = [[vocab.get(word, vocab['<UNK>']) for word in text.split()] for text in texts]
    padded = [seq + [vocab['<PAD>']] * (max_len - len(seq)) if len(seq) < max_len else seq[:max_len] for seq in tokenized]
    return torch.tensor(padded, dtype=torch.long)
# Function to generate haiku
def generate_haiku(model, question, vocab, idx_to_word, max_len=20):
    model.eval()
    with torch.no_grad():
        question = question.unsqueeze(0).to(model.device)
        hidden, cell = model.encoder(question)
        input = torch.tensor([vocab['<START>']]).to(model.device)
        haiku = []
        for _ in range(max_len):
            output, hidden, cell = model.decoder(input, hidden, cell)
            top1 = output.argmax(1)
            word = idx_to_word[top1.item()]
            if word == '<END>':
                break
            haiku.append(word)
            input = top1
        return ' '.join(haiku)
# Load the saved model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model, vocab = load_model('/kaggle/input/haikuge/haiku_generator_model.pth', device)
idx_to_word = {idx: word for word, idx in vocab.items()}
# Function to use the model
def generate_haiku_from_question(question):
    max_len = 50  # Make sure this matches your original setting
    tokenized_question = tokenize_and_pad([question], vocab, max_len)[0]
    return generate_haiku(model, tokenized_question, vocab, idx_to_word)
# Example usage
question = "What steps can be taken to ensure that AI systems are designed in a way that promotes fairness and inclusivity while avoiding the reinforcement of harmful stereotypes?"
haiku = generate_haiku_from_question(question)
print(f"Question: {question}")
print(f"Generated Haiku: {haiku}")