<a href="https://colab.research.google.com/github/upbeatcoolrebel-sketch/ARES-autonomous-research-and-engineering-Superintelligence/blob/main/ARES_v01beta.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import math
import random
from collections import Counter
from typing import List, Dict, Tuple
import re
import os
import json
import time
from torch.optim.lr_scheduler import CosineAnnealingLR
import feedparser
import requests
from bs4 import BeautifulSoup
from datasets import load_dataset

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Utility functions
def clean_text(text: str) -> str:
    """Clean text for tokenization, preserving tech terms."""
    text = re.sub(r'[^\w\s.,!?\'-]', '', text)  # Keep hyphens for tech terms
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def fetch_rss_data(feed_urls: List[str], max_sentences: int = 3000) -> List[str]:
    """Fetch and parse RSS feeds to extract sentences."""
    sentences = []
    for url in feed_urls:
        try:
            feed = feedparser.parse(url)
            for entry in feed.entries[:15]:  # Increased to get more content
                title = clean_text(entry.get('title', ''))
                description = clean_text(entry.get('description', ''))
                content = f"{title} {description}".strip()
                if 'link' in entry:
                    try:
                        response = requests.get(entry.link, timeout=10)  # Increased timeout
                        soup = BeautifulSoup(response.text, 'html.parser')
                        paragraphs = soup.find_all('p')
                        article_text = ' '.join([clean_text(p.get_text()) for p in paragraphs])
                        content += f" {article_text}"
                    except Exception as e:
                        print(f"Error fetching article from {entry.link}: {e}")
                content_sentences = re.split(r'[.!?]+', content)
                content_sentences = [s.strip() for s in content_sentences if len(s.split()) > 3]
                sentences.extend(content_sentences)
        except Exception as e:
            print(f"Error parsing RSS feed {url}: {e}")
    sentences = list(dict.fromkeys([s for s in sentences if len(s.split()) > 3]))
    sentences = sentences[:max_sentences]
    print(f"Fetched {len(sentences)} sentences from RSS feeds")
    return sentences

def calculate_perplexity(loss: float) -> float:
    """Calculate perplexity from loss."""
    return math.exp(loss) if loss < 100 else float('inf')

def pad_sequence(sequences: List[List[int]], max_len: int, pad_id: int) -> List[List[int]]:
    """Pad sequences to fixed length."""
    return [seq[:max_len] + [pad_id] * (max_len - len(seq)) if len(seq) < max_len else seq[:max_len] for seq in sequences]

def truncate_sequence(seq: List[int], max_len: int) -> List[int]:
    """Truncate sequence to max length."""
    return seq[:max_len]

def validate_sequence(seq: List[int], max_len: int, pad_id: int) -> List[int]:
    """Validate and pad sequence."""
    if not seq:
        return [pad_id] * max_len
    seq = truncate_sequence(seq, max_len)
    return pad_sequence([seq], max_len, pad_id)[0]

def sanitize_response(response: str) -> str:
    """Sanitize response, preserving technical terms."""
    response = re.sub(r'<PAD>|<UNK>|<BOS>|<EOS>', '', response)
    response = re.sub(r'\s*[,.;]+\s*', ' ', response)
    response = re.sub(r'\s+', ' ', response).strip()
    words = response.split()
    if len(words) < 3:
        return "Let’s keep the convo going! What’s up? 😎"
    tech_terms = {'ai', 'robotics', 'engineering', 'quantum', 'computing', 'neural', 'blockchain', 'cad', 'lidar', 'kinematics', 'servo', 'actuator', 'machine', 'learning', 'data', 'science', 'technology', 'software', 'hardware', 'algorithm', 'network', 'system', 'programming', 'cloud', 'cybersecurity', 'iot', 'automation'}
    seen = set()
    filtered_words = []
    for w in words:
        w_lower = w.lower()
        if w_lower in tech_terms or (w_lower not in seen and seen.add(w_lower) is None):
            filtered_words.append(w)
    sentence = ' '.join(filtered_words[:15]).capitalize() + '.'
    return sentence if sentence else "Sorry, I’m drawing a blank!"

def beam_search(logits: torch.Tensor, beam_width: int = 7, max_len: int = 150, pad_id: int = 0, eos_id: int = 3, temperature: float = 0.7, model: nn.Module = None, max_seq_len: int = 128) -> List[int]:
    """Beam search for sequence generation."""
    batch_size = logits.size(0)
    seq_len = logits.size(1)
    vocab_size = logits.size(2)
    logits = logits / temperature
    beams = [([token_id.item()], 0.0) for token_id in torch.argmax(logits[:, -1, :], dim=-1)]
    finished_beams = []
    for _ in range(max_len):
        new_beams = []
        for seq, score in beams:
            if seq[-1] == eos_id:
                finished_beams.append((seq, score))
                continue
            if len(seq) > max_seq_len:
                finished_beams.append((seq, score))
                continue
            last_token = torch.tensor([[seq[-1]]], dtype=torch.long).to(device)
            with torch.no_grad():
                model.eval()
                mask = create_causal_mask(len(seq), max_seq_len)
                input_seq = torch.tensor([seq], dtype=torch.long).to(device)
                logits = model(input_seq, mask)[:, -1, :]
            log_probs = torch.log_softmax(logits, dim=-1).squeeze(0)
            top_log_probs, top_indices = torch.topk(log_probs, beam_width)
            for log_prob, idx in zip(top_log_probs, top_indices):
                new_seq = seq + [idx.item()]
                new_score = score + log_prob.item()
                new_beams.append((new_seq, new_score))
        new_beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_width]
        beams = new_beams
        if not beams:
            break
    beams.extend(finished_beams)
    if not beams:
        return [pad_id] * max_len
    best_seq, _ = max(beams, key=lambda x: x[1])
    best_seq = best_seq[:max_len]
    best_seq += [pad_id] * (max_len - len(best_seq))
    return best_seq

def save_checkpoint(model: nn.Module, optimizer: optim.Optimizer, epoch: int, filename: str):
    """Save model state."""
    try:
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict()
        }, filename)
    except Exception as e:
        print(f"Error saving checkpoint {filename}: {e}")

def evaluate_model(model: nn.Module, batches: List[Tuple[torch.Tensor, torch.Tensor]]) -> float:
    """Evaluate language model."""
    model.eval()
    criterion = nn.CrossEntropyLoss(ignore_index=0)
    total_loss = 0
    with torch.no_grad():
        for inputs, targets in batches:
            mask = create_causal_mask(inputs.size(1), max_seq_len=128)
            outputs = model(inputs, mask)
            loss = criterion(outputs.view(-1, outputs.size(-1)), targets.view(-1))
            total_loss += loss.item()
    return total_loss / len(batches) if batches else float('inf')

# Tokenizer
class SimpleTokenizer:
    """Word-based tokenizer."""
    def __init__(self, vocab_size: int = 20000):
        self.vocab_size = vocab_size
        self.word2idx: Dict[str, int] = {}
        self.idx2word: Dict[int, str] = {}
        self.vocab = ['<PAD>', '<UNK>', '<BOS>', '<EOS>']
        self.pad_id = 0
        self.unk_id = 1
        self.bos_id = 2
        self.eos_id = 3
        self.pad_token = '<PAD>'
        self.unk_token = '<UNK>'
        self.bos_token = '<BOS>'
        self.eos_token = '<EOS>'

    def build_vocab(self, texts: List[str]):
        """Build vocabulary."""
        word_counts = Counter()
        for text in texts:
            words = re.findall(r'\w+[\-\w+]*|[^\w\s]', clean_text(text).lower())
            word_counts.update(words)
        most_common = word_counts.most_common(self.vocab_size - len(self.vocab))
        self.vocab.extend(word for word, _ in most_common)
        self.word2idx = {word: idx for idx, word in enumerate(self.vocab)}
        self.idx2word = {idx: word for idx, word in enumerate(self.vocab)}
        unk_count = sum(1 for text in texts for word in re.findall(r'\w+[\-\w+]*|[^\w\s]', clean_text(text).lower()) if word not in self.word2idx)
        print(f"Unknown tokens (<UNK>) in dataset: {unk_count}")

    def encode(self, text: str) -> List[int]:
        """Encode text to token IDs."""
        words = re.findall(r'\w+[\-\w+]*|[^\w\s]', clean_text(text).lower())
        return [self.bos_id] + [self.word2idx.get(word, self.unk_id) for word in words] + [self.eos_id]

    def decode(self, ids: List[int]) -> str:
        """Decode token IDs to text."""
        return ' '.join(self.idx2word.get(id, '<UNK>') for id in ids if id not in [self.bos_id, self.eos_id, self.pad_id])

# Memory system
MEMORY_FILE = "memory.json"

def load_memory():
    """Load memory."""
    if not os.path.exists(MEMORY_FILE):
        return {"facts": []}
    try:
        with open(MEMORY_FILE, "r") as f:
            return json.load(f)
    except Exception as e:
        print(f"Error loading memory: {e}")
        return {"facts": []}

def save_memory(memory):
    """Save memory."""
    try:
        with open(MEMORY_FILE, "w") as f:
            json.dump(memory, f, indent=2)
    except Exception as e:
        print(f"Error saving memory: {e}")

def validate_fact(fact: str) -> str:
    """Validate fact."""
    fact = clean_text(fact)
    return fact if fact and len(fact.split()) > 1 else ""

def remember_fact(fact: str):
    """Store fact."""
    fact = validate_fact(fact)
    if not fact:
        return
    memory = load_memory()
    if not any(entry["fact"] == fact for entry in memory["facts"]):
        memory["facts"].append({"fact": fact, "timestamp": time.ctime(), "weight": 1.0})
        memory["facts"] = sorted(memory["facts"], key=lambda x: x["timestamp"], reverse=True)[:10]
        save_memory(memory)

def get_memory_context():
    """Generate context."""
    memory = load_memory()
    facts = sorted(memory.get("facts", []), key=lambda x: x["weight"], reverse=True)[:3]
    facts_str = ". ".join(entry["fact"] for entry in facts) if facts else "No recent facts."
    return f"Recent facts: {facts_str}."

# Data preparation
def load_data(rss_feeds: List[str] = None, max_samples: int = 10000) -> List[str]:
    """Load text data from open-source datasets and RSS feeds."""
    data = []
    sources = []
    tech_keywords = {'ai', 'robotics', 'engineering', 'computing', 'neural', 'blockchain', 'cad', 'quantum', 'machine', 'learning', 'data', 'science', 'technology', 'software', 'hardware', 'algorithm', 'network', 'system', 'programming', 'cloud', 'cybersecurity', 'iot', 'automation', 'robot', 'tech', 'artificial', 'intelligence', 'development', 'coding', 'analytics'}
    try:
        dataset = load_dataset("wikitext", "wikitext-103-raw-v1", split="train")
        wikitext_data = [clean_text(item["text"]) for item in dataset if item["text"].strip() and len(item["text"].split()) > 1]
        wikitext_data = [s for s in wikitext_data if any(kw in s.lower() for kw in tech_keywords)]
        data.extend(wikitext_data[:8000])
        sources.append("WikiText (Hugging Face)")
        print(f"Loaded {len(wikitext_data[:8000])} tech-filtered sentences from WikiText")
    except Exception as e:
        print(f"Error loading WikiText dataset: {e}")
    if rss_feeds:
        try:
            rss_data = fetch_rss_data(rss_feeds, max_sentences=3000)
            rss_data = [s for s in rss_data if any(kw in s.lower() for kw in tech_keywords)]
            data.extend(rss_data)
            sources.append("RSS feeds")
            print(f"Loaded {len(rss_data)} tech-filtered sentences from RSS feeds")
        except Exception as e:
            print(f"Error loading RSS feeds: {e}")
    if not data:
        default_data = [
            "AI is transforming technology and society.",
            "Robotics is the future of automation.",
            "Coding is a blend of logic and creativity."
        ]
        data.extend(default_data)
        sources.append("minimal default data")
        print("Warning: No external data loaded. Using minimal default data.")
    data = list(dict.fromkeys(data))
    valid_sentences = [s for s in data if len(s.split()) > 1]
    print(f"Loaded {len(valid_sentences)} valid sentences from {', '.join(sources)}")
    return valid_sentences[:max_samples]

def create_batches(data: List[List[int]], batch_size: int, seq_len: int, pad_id: int) -> Tuple[List[Tuple[torch.Tensor, torch.Tensor]], List[Tuple[torch.Tensor, torch.Tensor]]]:
    """Create train and validation batches with 80/20 split."""
    random.shuffle(data)
    split_idx = int(0.8 * len(data))
    train_data = data[:split_idx]
    val_data = data[split_idx:]
    train_batches = []
    val_batches = []
    for dataset, batches in [(train_data, train_batches), (val_data, val_batches)]:
        padded_data = pad_sequence(dataset, seq_len, pad_id)
        effective_batch_size = min(batch_size, len(padded_data)) or 1
        for i in range(0, len(padded_data), effective_batch_size):
            batch = padded_data[i:i + effective_batch_size]
            inputs = torch.tensor(batch, dtype=torch.long).to(device)
            targets = torch.tensor([seq[1:] + [pad_id] for seq in batch], dtype=torch.long).to(device)
            batches.append((inputs, targets))
    return train_batches, val_batches

# Training
def create_causal_mask(seq_len: int, max_seq_len: int = 128) -> torch.Tensor:
    """Create causal mask with dynamic sizing."""
    seq_len = min(seq_len, max_seq_len)
    mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0).unsqueeze(0)
    return mask.to(device)

def train_language_model(model: nn.Module, train_batches: List[Tuple[torch.Tensor, torch.Tensor]], val_batches: List[Tuple[torch.Tensor, torch.Tensor]], epochs: int = 20):
    """Train language model with early stopping."""
    optimizer = optim.Adam(model.parameters(), lr=0.0005)
    scheduler = CosineAnnealingLR(optimizer, T_max=epochs)
    criterion = nn.CrossEntropyLoss(ignore_index=0)
    model.train()
    best_val_loss = float('inf')
    patience = 7
    patience_counter = 0
    for epoch in range(epochs):
        total_loss = 0
        for batch_idx, (inputs, targets) in enumerate(train_batches):
            optimizer.zero_grad()
            mask = create_causal_mask(inputs.size(1), max_seq_len=128)
            outputs = model(inputs, mask)
            loss = criterion(outputs.view(-1, outputs.size(-1)), targets.view(-1))
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            total_loss += loss.item()
            if batch_idx % 10 == 0:
                print(f"Batch {batch_idx}, Loss: {loss.item():.4f}, Perplexity: {calculate_perplexity(loss.item()):.4f}")
        scheduler.step()
        avg_loss = total_loss / len(train_batches) if train_batches else float('inf')
        print(f"Language Model Epoch {epoch + 1}, Avg Loss: {avg_loss:.4f}, Perplexity: {calculate_perplexity(avg_loss):.4f}")
        save_checkpoint(model, optimizer, epoch, f"language_model_epoch_{epoch + 1}.pt")
        val_loss = evaluate_model(model, val_batches)
        print(f"Validation Loss: {val_loss:.4f}")
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"Early stopping triggered after {epoch + 1} epochs")
                break

# Inference
def generate(model: nn.Module, tokenizer: SimpleTokenizer, prompt: str, max_len: int = 150, max_seq_len: int = 128, beam_width: int = 7) -> str:
    """Generate response using beam search."""
    model.eval()
    input_ids = validate_sequence(tokenizer.encode(prompt), max_seq_len, tokenizer.pad_id)
    input_ids = torch.tensor([input_ids], dtype=torch.long).to(device)
    generated_ids = beam_search(
        model(input_ids, create_causal_mask(input_ids.size(1), max_seq_len)),
        beam_width=beam_width,
        max_len=max_len,
        pad_id=tokenizer.pad_id,
        eos_id=tokenizer.eos_id,
        temperature=0.7,
        model=model,
        max_seq_len=max_seq_len
    )
    response = tokenizer.decode(generated_ids)
    return sanitize_response(response)

def log_interaction(user_input: str, response: str, chat_history_text: str):
    """Log interaction with chat history."""
    log_entry = {
        "timestamp": time.ctime(),
        "user_input": user_input,
        "response": response,
        "chat_history": chat_history_text
    }
    try:
        with open("ares_logs.json", "a") as f:
            f.write(json.dumps(log_entry) + "\n")
    except Exception as e:
        print(f"Error logging interaction: {e}")

def main():
    # Hyperparameters
    vocab_size = 20000
    d_model = 256
    num_layers = 4
    num_heads = 8
    d_ff = 1024
    max_seq_len = 128
    batch_size = 8
    epochs = 20

    # Define RSS feeds
    rss_feeds = [
        "http://feeds.feedburner.com/analyticsinsight/ijEZ",
        "https://www.kdnuggets.com/feed",
        "https://blog.google/technology/ai/rss/",
        "https://marekrei.com/blog/feed",
        "https://paperswithcode.com/rss",
        "https://www.artificialintelligence-news.com/feed/",
        "https://machinelearningmastery.com/feed/"
    ]

    # Load data
    print("Loading text data...")
    text_data = load_data(rss_feeds=rss_feeds, max_samples=10000)
    print(f"Total sentences: {len(text_data)}")
    print(f"Sample data: {text_data[:5]}")

    # Initialize tokenizer
    print("Building tokenizer...")
    tokenizer = SimpleTokenizer(vocab_size)
    tokenizer.build_vocab(text_data)
    print(f"Vocabulary size: {len(tokenizer.vocab)}")
    print(f"Sample vocabulary: {tokenizer.vocab[:50]}")

    # Initialize language model
    print("Initializing language model...")
    language_model = TransformerDecoder(
        vocab_size=vocab_size,
        d_model=d_model,
        num_layers=num_layers,
        num_heads=num_heads,
        d_ff=d_ff,
        max_seq_len=max_seq_len
    ).to(device)
    print(f"Language model parameters: {sum(p.numel() for p in language_model.parameters()):,}")

    # Train model
    print("Preparing language model data...")
    encoded_data = [validate_sequence(tokenizer.encode(text), max_seq_len, tokenizer.pad_id) for text in text_data]
    train_batches, val_batches = create_batches(encoded_data, batch_size, max_seq_len, tokenizer.pad_id)
    print(f"Created {len(train_batches)} train batches and {len(val_batches)} validation batches")
    print("Training language model...")
    train_language_model(language_model, train_batches, val_batches, epochs)

    # Initialize chat history
    print("Initializing chat history...")
    initial_prompt = tokenizer.bos_token
    chat_history = torch.tensor([validate_sequence(tokenizer.encode(initial_prompt), max_seq_len, tokenizer.pad_id)], dtype=torch.long).to(device)

    # Interaction loop
    print("\n💬 Ares is ready. Type to chat. Type 'exit' to quit.")
    while True:
        user_input = input("You: ").strip()
        if user_input.lower() == "exit":
            print("Ares: Catch ya later! Stay curious. 🌟")
            break
        if user_input == "":
            print("Ares: Uh, hello? Type something, I’m not a mind reader! 😜")
            continue
        lowered = user_input.lower()
        if "my name is" in lowered:
            try:
                name = user_input.split("is")[-1].strip().split()[0]
                remember_fact(f"User's name is {name}")
            except IndexError:
                print("Error parsing name. Please provide a valid name.")
        if "i like" in lowered:
            try:
                topic = user_input.split("like")[-1].strip().split()[0]
                remember_fact(f"User likes {topic}")
            except IndexError:
                print("Error parsing preference. Please provide a valid topic.")
        if "i hate" in lowered:
            try:
                topic = user_input.split("hate")[-1].strip().split()[0]
                remember_fact(f"User dislikes {topic}")
            except IndexError:
                print("Error parsing preference. Please provide a valid topic.")

        # Build prompt from chat history
        chat_history_text = tokenizer.decode(chat_history[0].tolist())
        memory_context = get_memory_context()
        system_hint = "Be short, witty, and tech-focused." if len(user_input.split()) <= 3 else "Provide a detailed, tech-focused response."
        context = f"{system_hint}\n{memory_context}\n{chat_history_text}\nUser: {user_input}\nAI:"
        # Truncate context to fit max_seq_len
        context_tokens = tokenizer.encode(context)
        if len(context_tokens) > max_seq_len:
            context_tokens = context_tokens[-max_seq_len:]
            context = tokenizer.decode(context_tokens)
        prompt = context
        response = generate(language_model, tokenizer, prompt, max_len=150, max_seq_len=max_seq_len, beam_width=7)
        print(f"Ares: {response}")

        # Update chat history
        try:
            chat_update = torch.tensor([validate_sequence(tokenizer.encode(f"User: {user_input}\nAI: {response}\n" + tokenizer.eos_token), max_seq_len, tokenizer.pad_id)], dtype=torch.long).to(device)
            chat_history = torch.cat([chat_history, chat_update], dim=-1)
            if chat_history.size(1) > max_seq_len * 2:
                chat_history = chat_history[:, -max_seq_len:]
        except Exception as e:
            print(f"Chat history update failed: {e}. Resetting history.")
            chat_history = torch.tensor([validate_sequence(tokenizer.encode(tokenizer.bos_token), max_seq_len, tokenizer.pad_id)], dtype=torch.long).to(device)
        log_interaction(user_input, response, chat_history_text)

# Transformer model
class MultiHeadAttention(nn.Module):
    """Multi-head attention."""
    def __init__(self, d_model: int, num_heads: int):
        super().__init__()
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def forward(self, x: torch.Tensor, mask: torch.Tensor = None) -> torch.Tensor:
        batch_size = x.size(0)
        Q = self.W_q(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = self.W_k(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.W_v(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        context = torch.matmul(attn, V)
        context = context.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        return self.W_o(context)

class TransformerBlock(nn.Module):
    """Transformer block."""
    def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float = 0.1):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.ff = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, d_model)
        )
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor, mask: torch.Tensor = None) -> torch.Tensor:
        x = self.norm1(x + self.dropout(self.attn(x, mask)))
        x = self.norm2(x + self.dropout(self.ff(x)))
        return x

class TransformerDecoder(nn.Module):
    """Transformer decoder."""
    def __init__(self, vocab_size: int, d_model: int, num_layers: int, num_heads: int, d_ff: int, max_seq_len: int):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = self.create_pos_encoding(max_seq_len, d_model)
        self.layers = nn.ModuleList([
            TransformerBlock(d_model, num_heads, d_ff) for _ in range(num_layers)
        ])
        self.fc_out = nn.Linear(d_model, vocab_size)
        self.d_model = d_model
        self.max_seq_len = max_seq_len

    def create_pos_encoding(self, max_seq_len: int, d_model: int) -> torch.Tensor:
        pe = torch.zeros(max_seq_len, d_model)
        position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.to(device)

    def forward(self, x: torch.Tensor, mask: torch.Tensor = None) -> torch.Tensor:
        seq_len = min(x.size(1), self.max_seq_len)
        x = x[:, :seq_len]
        x = self.embedding(x) * math.sqrt(self.d_model)
        x = x + self.pos_encoding[:seq_len, :].unsqueeze(0)
        for layer in self.layers:
            x = layer(x, mask)
        return self.fc_out(x)

if __name__ == "__main__":
    main()

Using device: cpu
Loading text data...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/733k [00:00<?, ?B/s]

train-00000-of-00002.parquet:   0%|          | 0.00/157M [00:00<?, ?B/s]

train-00001-of-00002.parquet:   0%|          | 0.00/157M [00:00<?, ?B/s]

validation-00000-of-00001.parquet:   0%|          | 0.00/657k [00:00<?, ?B/s]

Generating test split:   0%|          | 0/4358 [00:00<?, ? examples/s]

Generating train split:   0%|          | 0/1801350 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/3760 [00:00<?, ? examples/s]

KeyboardInterrupt: 

In [None]:
!pip install feedparser

Collecting feedparser
  Downloading feedparser-6.0.11-py3-none-any.whl.metadata (2.4 kB)
Collecting sgmllib3k (from feedparser)
  Downloading sgmllib3k-1.0.0.tar.gz (5.8 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading feedparser-6.0.11-py3-none-any.whl (81 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m81.3/81.3 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: sgmllib3k
  Building wheel for sgmllib3k (setup.py) ... [?25l[?25hdone
  Created wheel for sgmllib3k: filename=sgmllib3k-1.0.0-py3-none-any.whl size=6046 sha256=8e2b4b4413af13e97e2827265be49ee920c0c80727d777e93494f873405c1995
  Stored in directory: /root/.cache/pip/wheels/3b/25/2a/105d6a15df6914f4d15047691c6c28f9052cc1173e40285d03
Successfully built sgmllib3k
Installing collected packages: sgmllib3k, feedparser
Successfully installed feedparser-6.0.11 sgmllib3k-1.0.0
