In [None]:
Batching

In [None]:
import torch
from torch.utils.data import DataLoader, random_split
dataset.cv_split([0.7, 0.2, 0.1])  
for i in range(MAX_ITER):
    batch = dataset.train.next_batch(BATCH_SIZE, shuffle=True)

In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset

# sample dummy image tensors
image_data = torch.randn(1000, 3, 64, 64) 
labels = torch.randint(0, 10, (1000,))  

dataset = TensorDataset(image_data, labels)

#Split into batches
batch_size = 32
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

#to view every iterated batch
for batch_images, batch_labels in dataloader:
    print(f"Batch shape: {batch_images.shape}, Labels: {batch_labels}")

In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt

#preprocess - transform as tensor
transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

train_dataset = datasets.CIFAR10(root='./data', train=True,
                                 download=True, transform=transform)
#to describe train
train_dataset


train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

In [None]:
Стратификация

In [None]:
from sklearn.model_selection import train_test_split

# Стратифицированное разделение по целевой переменной y
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)

# Проверка распределения классов
print("Обучающая выборка:", pd.Series(y_train).value_counts(normalize=True))
print("Тестовая выборка:", pd.Series(y_test).value_counts(normalize=True))



In [None]:
# Создание составного признака для стратификации
df['strat_feature'] = df['class'] + '_' + df['gender']

# Разделение с учетом составного признака
train_df, test_df = train_test_split(
df, test_size=0.2, random_state=42, 
stratify=df['strat_feature']
)

# Удаление временного признака
train_df = train_df.drop('strat_feature', axis=1)
test_df = test_df.drop('strat_feature', axis=1)



In [None]:
Базовая кросс-валидация с cross val score

In [None]:
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier
from sklearn import datasets

iris = datasets.load_iris()
X, y = iris.data, iris.target

model = RandomForestClassifier(n_estimators=100, random_state=42)

scores = cross_val_score(model, X, y, cv=5)

print(f"Accuracy scores for each fold: {scores}")
print(f"Mean accuracy: {scores.mean():.4f}")
print(f"Standard deviation: {scores.std():.4f}")



In [None]:
Кросс-валидация на временных рядах TimeSeriesSplit

In [None]:
from sklearn.model_selection import TimeSeriesSplit

tscv = TimeSeriesSplit(
    n_splits=5,          # сколько раз «катнём» валидацию
    test_size=90,        # длина теста в днях/часах/минутках
    gap=7,               # буфер между train и test, спасает от look-ahead при лагах
)

for tr, ts in tscv.split(data):
    print(data.index[tr][0], '…', data.index[tr][-1], '→', data.index[ts][0], '…', data.index[ts][-1])


In [None]:
leave one out cross validation

In [None]:
from sklearn.model_selection import cross_val_score
from sklearn.linear_model import LogisticRegression

model = LogisticRegression()
scores = cross_val_score(model, X, y, cv=LeaveOneOut())

print(f"LOOCV scores: {scores}")
print(f"Mean accuracy: {scores.mean():.3f}")

In [None]:
from sklearn.model_selection import LeaveOneOut
import numpy as np
from sklearn.metrics import accuracy_score

from sklearn.linear_model import LogisticRegression

X = np.array([[1, 2], [3, 4], [5, 6], [7, 8]])
y = np.array([0, 0, 1, 1])

# Создаем объект LOOCV
loo = LeaveOneOut()
scores = []

# Итерируем по разбиениям
for train_index, test_index in loo.split(X):
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y[train_index], y[test_index]
    
    # Создаем и обучаем модель
    model = LogisticRegression()
    model.fit(X_train, y_train)
    
    # Предсказание и оценка
    y_pred = model.predict(X_test)
    scores.append(accuracy_score(y_test, y_pred))

print(f"LOOCV accuracy: {np.mean(scores):.3f} (+/- {np.std(scores):.3f})")

In [None]:
map reduce

In [None]:
import multiprocessing
from collections import defaultdict, Counter
from functools import reduce
from typing import List, Dict, Tuple, Callable, Any
import re

class SimpleMapReduce:
    
    def __init__(self, num_workers=None):
        self.num_workers = num_workers or multiprocessing.cpu_count()
    
    def map(self, mapper: Callable, data: List[Any]) -> List[Any]:
        with multiprocessing.Pool(self.num_workers) as pool:
            return pool.map(mapper, data)
    
    def shuffle(self, mapped_results: List[List[Tuple]]) -> Dict[Any, List[Any]]:
        shuffled = defaultdict(list)
        for result in mapped_results:
            for key, value in result:
                shuffled[key].append(value)
        return shuffled
    
    def reduce(self, reducer: Callable, shuffled_data: Dict[Any, List[Any]]) -> Dict[Any, Any]:
        results = {}
        with multiprocessing.Pool(self.num_workers) as pool:
            items = list(shuffled_data.items())
            reduced = pool.starmap(reducer, items)
            for key, value in reduced:
                results[key] = value
        return results
    
    def run(self, data: List[Any], mapper: Callable, reducer: Callable) -> Dict[Any, Any]:
        # Map phase
        print("Running Map phase...")
        mapped = self.map(mapper, data)
        
        # Shuffle phase
        print("Running Shuffle phase...")
        shuffled = self.shuffle(mapped)
        
        # Reduce phase
        print("Running Reduce phase...")
        reduced = self.reduce(reducer, shuffled)
        
        return reduced

# Пример: WordCount
def wordcount_mapper(text: str) -> List[Tuple[str, int]]:
    words = re.findall(r'\w+', text.lower())
    return [(word, 1) for word in words]

def wordcount_reducer(key: str, values: List[int]) -> Tuple[str, int]:
    return (key, sum(values))

if __name__ == "__main__":
    documents = [
        "Hello world hello python",
        "Python is great for data science",
        "World of python programming",
        "Data science with python and spark"
    ]
    
    # Запуск MapReduce
    mr = SimpleMapReduce(num_workers=2)
    result = mr.run(documents, wordcount_mapper, wordcount_reducer)
    
    print("\nWord Count Results:")
    for word, count in sorted(result.items(), key=lambda x: x[1], reverse=True)[:10]:
        print(f"{word}: {count}")

In [None]:
Large Scal Algorithms

In [None]:
from typing import List, Dict, Any, Iterator
from collections import defaultdict, Counter
import heapq
import hashlib

class MapReduceAlgorithms:
    
    @staticmethod
    def word_count(documents: List[str]) -> Dict[str, int]:
        from concurrent.futures import ProcessPoolExecutor
        
        def mapper(document: str) -> List[tuple]:
            words = document.lower().split()
            return [(word, 1) for word in words]
        
        def reducer(key: str, values: List[int]) -> tuple:
            return (key, sum(values))
        
        # Map phase
        with ProcessPoolExecutor() as executor:
            mapped = list(executor.map(mapper, documents))
        
        # Shuffle
        shuffled = defaultdict(list)
        for doc_results in mapped:
            for word, count in doc_results:
                shuffled[word].append(count)
        
        # Reduce phase
        with ProcessPoolExecutor() as executor:
            items = list(shuffled.items())
            reduced = list(executor.starmap(reducer, items))
        
        return dict(reduced)
    
    @staticmethod
    def inverted_index(documents: List[str]) -> Dict[str, List[int]]:
        inverted_index = defaultdict(set)
        
        for doc_id, doc in enumerate(documents):
            words = set(doc.lower().split())
            for word in words:
                inverted_index[word].add(doc_id)
        
        return {word: list(doc_ids) for word, doc_ids in inverted_index.items()}
    
    @staticmethod
    def distributed_sort(data: List[Any], key=None) -> List[Any]:
        import tempfile
        import os
        
        # Шаг 1: Разделение на отсортированные чанки
        chunk_size = 10000  # элементов в чанке
        chunks = []
        
        for i in range(0, len(data), chunk_size):
            chunk = data[i:i + chunk_size]
            chunk.sort(key=key)
            
            # Сохраняем чанк во временный файл
            with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
                for item in chunk:
                    f.write(f"{item}\n")
                chunks.append(f.name)
        
        # Шаг 2: K-way merge
        def merge_chunks(chunk_files):
            # Открываем все файлы
            files = [open(f, 'r') for f in chunk_files]
            heap = []
            
            # Инициализация кучи
            for i, f in enumerate(files):
                line = f.readline()
                if line:
                    item = line.strip()
                    heapq.heappush(heap, (item, i))
            
            # Слияние
            result = []
            while heap:
                item, file_idx = heapq.heappop(heap)
                result.append(item)
                
                # Читаем следующую строку из того же файла
                next_line = files[file_idx].readline()
                if next_line:
                    next_item = next_line.strip()
                    heapq.heappush(heap, (next_item, file_idx))
            
            # Закрываем файлы
            for f in files:
                f.close()
            return result
        
        sorted_data = merge_chunks(chunks)
        
        # Очистка временных файлов
        for chunk_file in chunks:
            os.unlink(chunk_file)
        
        return sorted_data
    
    @staticmethod
    def page_rank(graph: Dict[int, List[int]], damping=0.85, iterations=10) -> Dict[int, float]:
        import numpy as np
        
        # Преобразуем граф в матрицу переходов
        nodes = list(graph.keys())
        n = len(nodes)
        node_to_idx = {node: i for i, node in enumerate(nodes)}
        
        # Инициализация
        pr = np.ones(n) / n
        
        for _ in range(iterations):
            new_pr = np.zeros(n)
            
            for node in nodes:
                idx = node_to_idx[node]
                outlinks = graph.get(node, [])
                
                if outlinks:
                    # Распределение PageRank по исходящим ссылкам
                    share = pr[idx] / len(outlinks)
                    for outlink in outlinks:
                        if outlink in node_to_idx:
                            outlink_idx = node_to_idx[outlink]
                            new_pr[outlink_idx] += share
                else:
                    # Dead ends - распределяем равномерно
                    new_pr += pr[idx] / n
            
            # Формула PageRank
            new_pr = damping * new_pr + (1 - damping) / n
            pr = new_pr
        
        return {node: pr[node_to_idx[node]] for node in nodes}

In [None]:
оптимизаторы

In [None]:
import numpy as np
from typing import List, Tuple, Callable

class SGD:
    
    def __init__(self, lr: float = 0.01, momentum: float = 0.0):
        self.lr = lr
        self.momentum = momentum
        self.velocity = None
    
    def update(self, params: List[np.ndarray], 
               grads: List[np.ndarray]) -> None:
        if self.velocity is None:
            self.velocity = [np.zeros_like(p) for p in params]
        
        for i in range(len(params)):
            if self.momentum > 0:
                self.velocity[i] = (self.momentum * self.velocity[i] - 
                                   self.lr * grads[i])
                params[i] += self.velocity[i]
            else:
                params[i] -= self.lr * grads[i]
    
    def step(self):
        pass  

In [None]:
class Adagrad:
    def __init__(self, lr: float = 0.01, eps: float = 1e-8):
        self.lr = lr
        self.eps = eps
        self.cache = None
    
    def update(self, params: List[np.ndarray], grads: List[np.ndarray]) -> None:
        if self.cache is None:
            self.cache = [np.zeros_like(p) for p in params]
        
        for i in range(len(params)):
            # Накопление квадратов градиентов
            self.cache[i] += grads[i] ** 2
            
            # Обновление параметров
            params[i] -= self.lr * grads[i] / (np.sqrt(self.cache[i]) + self.eps)

In [None]:
class RMSprop:
    def __init__(self, lr: float = 0.001, alpha: float = 0.99, 
                 eps: float = 1e-8):
        self.lr = lr
        self.alpha = alpha  # коэффициент затухания
        self.eps = eps
        self.cache = None
    
    def update(self, params: List[np.ndarray], grads: List[np.ndarray]) -> None:
        if self.cache is None:
            self.cache = [np.zeros_like(p) for p in params]
        
        for i in range(len(params)):
            # Экспоненциальное скользящее среднее квадратов градиентов
            self.cache[i] = (self.alpha * self.cache[i] + 
                            (1 - self.alpha) * grads[i] ** 2)
            
            # Обновление параметров
            params[i] -= (self.lr * grads[i] / 
                         (np.sqrt(self.cache[i]) + self.eps))