**Задача 1: Система управления библиотекой**

Создайте систему управления библиотекой используя collections:

1. Создайте namedtuple `Book` с полями: title, author, isbn, year
2. Создайте namedtuple `Reader` с полями: name, reader_id, phone
3. Используйте `defaultdict(list)` для хранения книг по жанрам
4. Используйте `deque` для очереди читателей, ожидающих популярную книгу
5. Используйте `Counter` для подсчета количества книг каждого автора
6. Используйте `OrderedDict` для хранения истории выдачи книг (читатель -> список книг)
7. Сериализуйте все данные в JSON и pickle форматы


In [1]:
from collections import namedtuple, defaultdict, deque, Counter, OrderedDict
import json
import pickle

# 1) + 2)
Book = namedtuple('Book', ['title', 'author', 'isbn', 'year'])
Reader = namedtuple('Reader', ['name', 'reader_id', 'phone'])

# 3) Create and fill defaultdict with books
books_dict = defaultdict(list)
books_dict['scifi'].append(Book("fant1", "author1", "111", 2000))
books_dict['scifi'].append(Book("fant2", "author2", "222", 2000))
books_dict['drama'].append(Book("drama1", "author3", "333", 2000))
books_dict['drama'].append(Book("drama2", "author4", "444", 2000))
books_dict['action'].append(Book("act1", "author2", "555", 2000))
books_dict['action'].append(Book("act2", "author3", "666", 2000))

# 4) Create and fill deque with readers
readers : deque[namedtuple] = deque()
readers.append(Reader("name1", "11", "384"))
readers.append(Reader("name2", "22", "384"))
readers.append(Reader("name3", "33", "384"))
readers.append(Reader("name4", "44", "384"))

# 5) Counting authors
cnt = Counter()
for genre_books in books_dict.values():
    for book in genre_books:
        cnt[book.author] += 1

# 6)Create and fill OrderedDict
book_history : OrderedDict[namedtuple, namedtuple] = OrderedDict()
book_history[readers[0]] = books_dict["scifi"][0]
book_history[readers[1]] = books_dict["drama"][0]
book_history[readers[2]] = books_dict["action"][0]

# 7) Serialize into JSON
json_data = {
    "books_by_genre": {genre: [_._asdict() for _ in genre_books] for genre, genre_books in books_dict.items()},
    "readers_queue": [r._asdict() for r in readers],
    "author_counts": dict(cnt),
    "book_history": [
        {"reader": {"name": r.name, "reader_id": r.reader_id, "phone": r.phone},
         "book": b._asdict()}
        for r, b in book_history.items()
    ],
}

with open('library_data.json', 'w', encoding='utf-8') as f:
    json.dump(json_data, f, indent=2)

# Serialize into pickle
with open('library_data.pkl', 'wb') as f:
    pickle.dump({
        "books_dict": books_dict,
        "readers": readers,
        "author_counts": cnt,
        "book_history": book_history,
    }, f)

In [None]:
######################### ADDITIONAL TASK TO DO #########################
### Read and recreate saved file

# Read and recreate from JSON
with open('library_data.json', 'r', encoding='utf-8') as f:
    loaded_json = json.load(f)

# Recreate books_dict from JSON
books_dict_loaded = defaultdict(list)
for genre, books in loaded_json["books_by_genre"].items():
    books_dict_loaded[genre] = [Book(**b) for b in books]

# Recreate readers deque from JSON
readers_loaded = deque([Reader(**r) for r in loaded_json["readers_queue"]])

# Recreate author Counter from JSON
author_counts_loaded = Counter(loaded_json["author_counts"])

# Recreate book_history OrderedDict from JSON
book_history_loaded = OrderedDict()
for entry in loaded_json["book_history"]:
    reader = Reader(**entry["reader"])
    book = Book(**entry["book"])
    book_history_loaded[reader] = book

# Read and recreate from pickle
with open('library_data.pkl', 'rb') as f:
    pickle_data = pickle.load(f)

books_dict_from_pickle = pickle_data["books_dict"]
readers_from_pickle = pickle_data["readers"]
author_counts_from_pickle = pickle_data["author_counts"]
book_history_from_pickle = pickle_data["book_history"]

loaded_json, pickle_data

({'books_by_genre': {'scifi': [{'title': 'fant1',
     'author': 'author1',
     'isbn': '111',
     'year': 2000},
    {'title': 'fant2', 'author': 'author2', 'isbn': '222', 'year': 2000}],
   'drama': [{'title': 'drama1',
     'author': 'author3',
     'isbn': '333',
     'year': 2000},
    {'title': 'drama2', 'author': 'author4', 'isbn': '444', 'year': 2000}],
   'action': [{'title': 'act1',
     'author': 'author2',
     'isbn': '555',
     'year': 2000},
    {'title': 'act2', 'author': 'author3', 'isbn': '666', 'year': 2000}]},
  'readers_queue': [{'name': 'name1', 'reader_id': '11', 'phone': '384'},
   {'name': 'name2', 'reader_id': '22', 'phone': '384'},
   {'name': 'name3', 'reader_id': '33', 'phone': '384'},
   {'name': 'name4', 'reader_id': '44', 'phone': '384'}],
  'author_counts': {'author1': 1, 'author2': 2, 'author3': 2, 'author4': 1},
  'book_history': [{'reader': {'name': 'name1',
     'reader_id': '11',
     'phone': '384'},
    'book': {'title': 'fant1',
     'author'

**Задача 2: Анализатор файловой системы**

Создайте анализатор файловой системы используя os и sys:

1. Создайте namedtuple `FileInfo` с полями: name, size, extension, modified_time
2. Используйте `os.walk()` для обхода директории
3. Используйте `os.path` функции для получения информации о файлах
4. Используйте `Counter` для подсчета файлов по расширениям
5. Используйте `defaultdict(list)` для группировки файлов по размеру (маленькие < 1MB, средние 1-100MB, большие > 100MB)
6. Используйте `deque` для хранения последних 10 найденных файлов
7. Выведите статистику используя `sys.getsizeof()` для подсчета памяти
8. Сохраните результаты в JSON файл


In [None]:
from collections import defaultdict, Counter, deque, namedtuple
import os
import sys
import json

# 0) Initialize data structures for 2), 5), 6)
extension_counter : Counter = Counter()
files_by_size : defaultdict = defaultdict(list)
last_10_files : deque = deque(maxlen=10)

# 1)
FileInfo : namedtuple = namedtuple('FileInfo', ['name', 'size', 'extension', 'modified_time'])

# 2) Scan directory with os.walk()
for root, dirs, files in os.walk("."):
    for filename in files:
        filepath = os.path.join(root, filename)
        
        # 3) Get file information (using os.path)
        file_size = os.path.getsize(filepath)
        file_ext = os.path.splitext(filename)[1].lower()
        mod_time = os.path.getmtime(filepath)
        
        # Create FileInfo object
        file_info = FileInfo(
            name=filename,
            size=file_size,
            extension=file_ext,
            modified_time=mod_time
        )
        
        # 4) Count files by extension (using Counter lib)
        extension_counter[file_ext] += 1

        # 5) Sort files by size (using defaultdict)
        if file_size < 1024 ** 2:   # < 1MB
            files_by_size['small'].append(file_info)
        elif file_size < 100 * 1024 * 1024:   # 1-100MB
            files_by_size['medium'].append(file_info)
        else:   # > 100MB
            files_by_size['large'].append(file_info)
        
        # 6) Add to deque for last 10 files
        last_10_files.append(file_info)

# 7) Calculate memory usage (using .getsizeof())
memory_usage = {
    'extension_counter': sys.getsizeof(extension_counter),
    'files_by_size': sys.getsizeof(files_by_size),
    'last_10_files': sys.getsizeof(last_10_files),
    'total': sys.getsizeof(extension_counter) + sys.getsizeof(files_by_size) + sys.getsizeof(last_10_files)
}

# Print statistics and memory usage (using .getsizeof())
print("Files by extension:", dict(extension_counter))
print("Files by size:", {k: len(v) for k, v in files_by_size.items()})
print("Last 10 files:", [f.name for f in last_10_files])
print("Memory usage:", memory_usage)

# 8) Serialize to JSON
json_data = {
    'files_by_extension': dict(extension_counter),
    'files_by_size_category': {
        category: [f._asdict() for f in files] 
        for category, files in files_by_size.items()
    },
    'last_10_files': [f._asdict() for f in last_10_files],
    'memory_usage': memory_usage
}

with open('filesystem_analysis.json', 'w', encoding='utf-8') as f:
    json.dump(json_data, f, indent=2)

**Задача 3: Система конфигурации приложения**

Создайте систему конфигурации используя ChainMap и defaultdict:

1. Создайте namedtuple `Config` с полями: key, value, section, default_value
2. Создайте несколько словарей конфигурации (default, user, environment)
3. Используйте `ChainMap` для объединения конфигураций с приоритетом
4. Используйте `defaultdict(dict)` для группировки настроек по секциям
5. Используйте `OrderedDict` для сохранения порядка загрузки конфигураций
6. Используйте `os.environ` для чтения переменных окружения
7. Сериализуйте конфигурацию в JSON и pickle форматы

In [None]:
from collections import namedtuple, ChainMap, defaultdict, OrderedDict
import os
import json
import pickle

# 1) Config namedtuple
Config = namedtuple('Config', ['key', 'value', 'section', 'default_value'])

# 2) Create configuration dicts
default_config = {
    'port': 8000,
    'host': 'localhost'
}

user_config = {
    'port': 3000,
    'host': 'localhost'
}

# 6) Take some environment (port, host) variables using os.environ
env_config = {}
if 'PORT' in os.environ:
    env_config['port'] = int(os.environ['PORT'])
if 'HOST' in os.environ:
    env_config['host'] = os.environ['HOST']

# 3) ChainMap
config_chain = ChainMap(env_config, user_config, default_config)

# 4) Group configuration. Combine them by key 'main'
grouped_config = defaultdict(dict)
for key, value in config_chain.items():
    grouped_config['main'][key] = value

# 5) Create OrderedDict (default -> user -> environment) 
config_order = OrderedDict()
config_order['default'] = default_config
config_order['user'] = user_config
config_order['environment'] = env_config

# 7) Prepare list[dict] for serialization
config_entries = []
for key, final_value in config_chain.items():
    config_entries.append(Config(
        key=key,
        value=final_value,
        section='main',
    ))

# JSON serialization
json_data = {
    'final_config': dict(config_chain),
    'grouped_config': dict(grouped_config),
    'config_order': dict(config_order),
    'config_entries': [c._asdict() for c in config_entries]
}

with open('config.json', 'w', encoding='utf-8') as f:
    json.dump(json_data, f, indent=2)

# Pickle serialization
with open('config.pkl', 'wb') as f:
    pickle.dump({
        'chain': config_chain,
        'grouped': grouped_config,
        'order': config_order,
        'entries': config_entries
    }, f)

8000
localhost


**Задача 4: Мониторинг системы**

Создайте систему мониторинга используя sys и os:

1. Создайте namedtuple `SystemInfo` с полями: cpu_count, memory_usage, process_id, user_name
2. Используйте `os.cpu_count()` для получения количества процессоров
3. Используйте `sys.getallocatedblocks()` для мониторинга памяти
4. Используйте `os.getpid()` и `os.getlogin()` для информации о процессе
5. Используйте `deque` для хранения последних 20 измерений
6. Используйте `Counter` для подсчета частоты использования различных функций
7. Используйте `defaultdict(list)` для группировки измерений по времени
8. Сохраните историю мониторинга в pickle файл


In [4]:
from collections import namedtuple, deque, Counter, defaultdict
import os
import sys
import pickle

# 1) SystemInfo namedtuple
SystemInfo = namedtuple('SystemInfo', ['cpu_count', 'memory_usage', 'process_id', 'user_name'])

# 2–4) Gathering statistics (system information)
cpu_count = os.cpu_count()
memory_usage = sys.getallocatedblocks()
process_id = os.getpid()
user_name = os.getlogin()

current_info = SystemInfo(cpu_count, memory_usage, process_id, user_name)

# 5) Create deque for containing last 20 statistics
history: deque = deque(maxlen=20)
history.append(current_info)

for _ in range(5):
    info = SystemInfo(
        cpu_count=os.cpu_count(),
        memory_usage=sys.getallocatedblocks(),
        process_id=os.getpid(),
        user_name=os.getlogin()
    )
    history.append(info)

# 6) Count function calls (dummy imitation)
function_calls = Counter()
function_calls['collect_metrics'] += 1
function_calls['save_data'] += 1
function_calls['collect_metrics'] += 1


import time

# 7) Create defaultdict(list) for grouping statistics by time
measurements_by_time = defaultdict(list)
timestamp = int(time.time()) # one and only key (current timestamp)
for info in history:
    measurements_by_time[timestamp].append(info)


# 8) Serialize using pickle
with open('monitoring_history.pkl', 'wb') as f:
    pickle.dump({
        'history': history,
        'function_calls': function_calls,
        'measurements_by_time': measurements_by_time,
    }, f)

**Задача 5: Система логирования**

Создайте систему логирования используя все изученные коллекции:

1. Создайте namedtuple `LogEntry` с полями: timestamp, level, message, module, function
2. Используйте `deque` для хранения последних 100 логов (кольцевой буфер)
3. Используйте `defaultdict(list)` для группировки логов по уровням (DEBUG, INFO, WARNING, ERROR)
4. Используйте `Counter` для подсчета количества логов каждого уровня
5. Используйте `OrderedDict` для хранения логов по времени (FIFO)
6. Используйте `ChainMap` для объединения различных источников логов
7. Используйте `os.path` для работы с файлами логов
8. Сериализуйте логи в JSON и pickle форматы


In [None]:
from collections import namedtuple, deque, defaultdict, Counter, OrderedDict, ChainMap
import os
import json
import pickle
import time

# 1) LogEntry namedtuple
LogEntry = namedtuple('LogEntry', ['timestamp', 'level', 'message', 'module', 'function'])

# 2) Create deque with max length = 100
recent_logs: deque = deque(maxlen=100)

# Create several log examples
now = time.time()
entries = [
    LogEntry(now, 'INFO', 'App started', 'main', 'run'),
    LogEntry(now + 1, 'DEBUG', 'Config loaded', 'config', 'load'),
    LogEntry(now + 2, 'WARNING', 'Low memory', 'system', 'check_resources'),
    LogEntry(now + 3, 'ERROR', 'File not found', 'io', 'read_file'),
    LogEntry(now + 4, 'INFO', 'User logged in', 'auth', 'login'),
]

for entry in entries:
    recent_logs.append(entry)

# 3) Grouping logs by level in defaultdict()
logs_by_level = defaultdict(list)
for log in recent_logs:
    logs_by_level[log.level].append(log)

# 4) Count logs
level_counter = Counter(log.level for log in recent_logs)

# 5) OrderedDict by time (FIFO)
logs_by_time = OrderedDict()
for log in recent_logs:
    logs_by_time[log.timestamp] = log

# ChainMap (logs union)
archive_logs = { # dummy archive logs
    now - 100: LogEntry(now - 100, 'INFO', 'Old event', 'legacy', 'old_func')
}
current_logs_dict = {log.timestamp: log for log in recent_logs}
combined_logs = ChainMap(current_logs_dict, archive_logs)

# 7) Make a directory for logs and prepare paths
log_dir = os.path.join('logs')
os.makedirs(log_dir, exist_ok=True)
json_path = os.path.join(log_dir, 'app.log.json')
pkl_path = os.path.join(log_dir, 'app.log.pkl')

# 8) Serialization

# Prepare json dictionary
json_data = {
    'recent_logs': [e._asdict() for e in recent_logs],
    'logs_by_level': {
        level: [e._asdict() for e in logs]
        for level, logs in logs_by_level.items()
    },
    'level_counts': dict(level_counter),
    'logs_by_time': {
        str(ts): e._asdict()
        for ts, e in logs_by_time.items()
    }
}

with open(json_path, 'w', encoding='utf-8') as f:
    json.dump(json_data, f, indent=2)

with open(pkl_path, 'wb') as f:
    pickle.dump({
        'recent_logs': recent_logs,
        'logs_by_level': logs_by_level,
        'level_counter': level_counter,
        'logs_by_time': logs_by_time,
        'combined_logs': combined_logs,
    }, f)

**Задача 6: Кэш-система**

Создайте простую кэш-систему используя collections:

1. Создайте namedtuple `CacheEntry` с полями: key, value, timestamp, access_count
2. Используйте `OrderedDict` для реализации LRU (Least Recently Used) кэша
3. Используйте `deque` для хранения истории доступа к ключам
4. Используйте `Counter` для подсчета частоты доступа к каждому ключу
5. Используйте `defaultdict(int)` для хранения счетчиков доступа
6. Реализуйте методы: get, set, delete, clear, size
7. Используйте `sys.getsizeof()` для мониторинга размера кэша
8. Сериализуйте кэш в pickle формат для сохранения между сессиями


In [None]:
from collections import namedtuple, OrderedDict, deque, Counter, defaultdict
import sys
import pickle
import time

# 1) CacheEntry
CacheEntry = namedtuple('CacheEntry', ['key', 'value', 'timestamp', 'access_count'])

# 2–6)
class SimpleLRUCache:
    def __init__(self, maxsize=10):
        self.maxsize = maxsize
        self.cache: OrderedDict = OrderedDict()         # LRU (using OrderedDict)
        self.access_history: deque = deque(maxlen=100)  # 3) key access history (using deque)
        self.access_counter: Counter = Counter()        # 4) count accesses 
        self.counters: defaultdict = defaultdict(int)   # 5) store accesses in defaultdict(int)

    def get(self, key):
        """get() triggers access by existing key"""
        if key in self.cache:
            entry = self.cache.pop(key)
            # Update access count access_count
            new_count = entry.access_count + 1
            new_entry = CacheEntry(key, entry.value, entry.timestamp, new_count)
            self.cache[key] = new_entry
            # Update history and counters
            self.access_history.append(key)
            self.access_counter[key] += 1
            self.counters[key] += 1
            return new_entry.value
        return None

    def set(self, key, value):
        """set() triggers modification by key"""
        if key in self.cache:
            # Update existing value
            old = self.cache.pop(key)
            new_entry = CacheEntry(key, value, time.time(), old.access_count)
        else:
            # Create new entry (key is not in cache)
            new_entry = CacheEntry(key, value, time.time(), 0)
            if len(self.cache) >= self.maxsize:
                self.cache.popitem(last=False)  # pop LRU
        self.cache[key] = new_entry

    def delete(self, key):
        if key in self.cache:
            del self.cache[key]

    def clear(self):
        self.cache.clear()
        self.access_history.clear()
        self.access_counter.clear()
        self.counters.clear()

    def size(self):
        return len(self.cache)

    def get_cache_size_bytes(self):
        return sys.getsizeof(self.cache)  # 7) getter function returns size of cache

### Code usage

cache = SimpleLRUCache(maxsize=3)
cache.set("a", 1)
cache.set("b", 2)
cache.set("c", 3)
cache.get("a")     # second access to "b"
cache.set("d", 4)  # pop "b" (LRU value)

# 8) Pickle serialization
with open('cache.pkl', 'wb') as f:
    pickle.dump(cache, f)

**Задача 7: Анализатор текста**

Создайте анализатор текста используя collections:

1. Создайте namedtuple `WordInfo` с полями: word, frequency, length, first_occurrence
2. Используйте `Counter` для подсчета частоты слов
3. Используйте `defaultdict(list)` для группировки слов по длине
4. Используйте `deque` для хранения последних 50 уникальных слов
5. Используйте `OrderedDict` для хранения слов в порядке первого появления
6. Используйте `os.path` для работы с текстовыми файлами
7. Используйте `sys.getsizeof()` для анализа памяти
8. Сохраните результаты анализа в JSON файл


In [None]:
from collections import namedtuple, Counter, defaultdict, deque, OrderedDict
import os
import sys
import json

# 1) Create namedtuple WordInfo
WordInfo = namedtuple('WordInfo', ['word', 'frequency', 'length', 'first_occurrence'])

# 6) Prepare path to text file
text_file = os.path.join('data', 'sample.txt')
os.makedirs(os.path.dirname(text_file), exist_ok=True)

# Create text file (if not created yet) and write sample text
if not os.path.exists(text_file):
    sample_text = "hello world hello python world python is great"
    with open(text_file, 'w', encoding='utf-8') as f:
        f.write(sample_text)

# Retrieve text from file
with open(text_file, 'r', encoding='utf-8') as f:
    text = f.read().lower()
words = text.split()

# 5) OrderedDict for first occurrence
first_occurrence = OrderedDict()
for i, word in enumerate(words):
    if word not in first_occurrence:
        first_occurrence[word] = i

# 2) Counter for word frequency
word_freq = Counter(words)

# 3) Grouping by length
words_by_length = defaultdict(list)
for word in word_freq:
    words_by_length[len(word)].append(word)

# 4) deque for last 50 unique words (in order of appearance)
unique_words_deque = deque(maxlen=50)
for word in first_occurrence:  # already in order of appearance
    unique_words_deque.append(word)

# Collect WordInfo for each unique word
word_infos = []
for word in first_occurrence:
    info = WordInfo(
        word=word,
        frequency=word_freq[word],
        length=len(word),
        first_occurrence=first_occurrence[word]
    )
    word_infos.append(info)

# 7) Memory analysis
memory_usage = {
    'word_freq': sys.getsizeof(word_freq),
    'first_occurrence': sys.getsizeof(first_occurrence),
    'words_by_length': sys.getsizeof(words_by_length),
    'unique_words_deque': sys.getsizeof(unique_words_deque),
}

# 8) Save to JSON
json_data = {
    'word_infos': [w._asdict() for w in word_infos],
    'words_by_length': {str(k): v for k, v in words_by_length.items()},  # keys in JSON are strings
    'last_unique_words': list(unique_words_deque),
    'memory_usage_bytes': memory_usage
}

output_path = os.path.join('data', 'analysis.json')
with open(output_path, 'w', encoding='utf-8') as f:
    json.dump(json_data, f, indent=2)

**Задача 8: Система управления задачами**

Создайте систему управления задачами (TODO) используя все изученные концепции:

1. Создайте namedtuple `Task` с полями: id, title, description, priority, status, created_date
2. Используйте `defaultdict(list)` для группировки задач по статусу (todo, in_progress, done)
3. Используйте `deque` для очереди задач с высоким приоритетом
4. Используйте `Counter` для подсчета задач по приоритету
5. Используйте `OrderedDict` для хранения задач в порядке создания
6. Используйте `ChainMap` для объединения различных списков задач
7. Используйте `os.path` для работы с файлами задач
8. Реализуйте функции: add_task, complete_task, get_tasks_by_status, get_priority_queue
9. Сериализуйте все данные в JSON и pickle форматы


In [None]:
from collections import namedtuple, defaultdict, deque, Counter, OrderedDict, ChainMap
import os
import json
import pickle
import time

# 1) Create namedtuple Task
Task = namedtuple('Task', ['id', 'title', 'description', 'priority', 'status', 'created_date'])

# Tasks storage
tasks_by_status = defaultdict(list)  # 2) grouping by status
task_queue_high = deque()            # 3) high priority queue
all_tasks_ordered = OrderedDict()    # 5) order of creation

# Example tasks
now = time.time()
sample_tasks = [
    Task(1, "Fix bug", "Fix login bug", "high", "todo", now),
    Task(2, "Write docs", "API documentation", "medium", "in_progress", now + 1),
    Task(3, "Test feature", "Run tests", "high", "todo", now + 2),
    Task(4, "Deploy", "Deploy to prod", "critical", "todo", now + 3),
]

# Fill structures
for task in sample_tasks:
    tasks_by_status[task.status].append(task)
    all_tasks_ordered[task.id] = task
    if task.priority in ("high", "critical"):
        task_queue_high.append(task)

# 4) Counter for priority
priority_counter = Counter(task.priority for task in all_tasks_ordered.values())

# 6) ChainMap — combine different sources (e.g. main + archive)
archived_tasks = {
    0: Task(0, "Old task", "Done long ago", "low", "done", now - 1000)
}
current_tasks_dict = {tid: task for tid, task in all_tasks_ordered.items()}
all_tasks_chain = ChainMap(current_tasks_dict, archived_tasks)

# 7) Paths to files
data_dir = os.path.join('todo_data')
os.makedirs(data_dir, exist_ok=True)

# 8) Functions (simple, without validations)
def add_task(task_id, title, desc, priority, status="todo"):
    new_task = Task(task_id, title, desc, priority, status, time.time())
    tasks_by_status[status].append(new_task)
    all_tasks_ordered[task_id] = new_task
    if priority in ("high", "critical"):
        task_queue_high.append(new_task)
    priority_counter[priority] += 1

def complete_task(task_id):
    if task_id in all_tasks_ordered:
        old_task = all_tasks_ordered[task_id]
        # Remove from old status
        tasks_by_status[old_task.status].remove(old_task)
        # Create updated task
        new_task = old_task._replace(status="done")
        tasks_by_status["done"].append(new_task)
        all_tasks_ordered[task_id] = new_task
        # Update priority counter (priority does not change)
        # High priority queue is not cleared — just keep as history

def get_tasks_by_status(status):
    return tasks_by_status[status]

def get_priority_queue():
    return list(task_queue_high)

### Code usage


add_task(5, "Review PR", "Code review", "high")
complete_task(1)

# 9) Serialization

# JSON
json_data = {
    'tasks_by_status': {
        status: [t._asdict() for t in tasks]
        for status, tasks in tasks_by_status.items()
    },
    'priority_counter': dict(priority_counter),
    'all_tasks_ordered': {str(k): v._asdict() for k, v in all_tasks_ordered.items()},
    'priority_queue': [t._asdict() for t in task_queue_high],
}

json_path = os.path.join(data_dir, 'tasks.json')
with open(json_path, 'w', encoding='utf-8') as f:
    json.dump(json_data, f, indent=2)

# Pickle
pkl_path = os.path.join(data_dir, 'tasks.pkl')
with open(pkl_path, 'wb') as f:
    pickle.dump({
        'tasks_by_status': tasks_by_status,
        'task_queue_high': task_queue_high,
        'priority_counter': priority_counter,
        'all_tasks_ordered': all_tasks_ordered,
        'all_tasks_chain': all_tasks_chain,
    }, f)

**Задача 9: Система мониторинга производительности**

Создайте систему мониторинга производительности используя sys и collections:

1. Создайте namedtuple `PerformanceMetric` с полями: function_name, execution_time, memory_usage, timestamp
2. Используйте `deque` для хранения последних 100 измерений производительности
3. Используйте `defaultdict(list)` для группировки метрик по функциям
4. Используйте `Counter` для подсчета количества вызовов каждой функции
5. Используйте `OrderedDict` для хранения метрик в хронологическом порядке
6. Используйте `sys.getsizeof()` для мониторинга памяти
7. Используйте `os.path` для работы с файлами метрик
8. Реализуйте функции: record_metric, get_function_stats, get_memory_usage, export_metrics
9. Сериализуйте метрики в JSON и pickle форматы


In [None]:
from collections import namedtuple, deque, defaultdict, Counter, OrderedDict
import sys
import os
import json
import pickle
import time

# 1) Create PerformanceMetric (namedtuple)
PerformanceMetric = namedtuple('PerformanceMetric', ['function_name', 'execution_time', 'memory_usage', 'timestamp'])

# Metrics storage
metrics_deque = deque(maxlen=100)           # 2) last 100 measurements
metrics_by_function = defaultdict(list)     # 3) by functions
call_counter = Counter()                    # 4) number of calls
metrics_chrono = OrderedDict()              # 5) chronological order

# Metrics examples
now = time.time()
sample_metrics = [
    PerformanceMetric("load_data", 0.12, sys.getsizeof([]), now),
    PerformanceMetric("process", 0.45, sys.getsizeof({1:2}), now + 1),
    PerformanceMetric("load_data", 0.10, sys.getsizeof([1,2,3]), now + 2),
    PerformanceMetric("save_result", 0.08, sys.getsizeof("ok"), now + 3),
]

# Fill collections
for metric in sample_metrics:
    metrics_deque.append(metric)
    metrics_by_function[metric.function_name].append(metric)
    call_counter[metric.function_name] += 1
    metrics_chrono[metric.timestamp] = metric

# 7) Prepare path to files
metrics_dir = os.path.join('metrics')
os.makedirs(metrics_dir, exist_ok=True)

# 8) Functions
def record_metric(func_name, exec_time):
    mem = sys.getsizeof([])  # just an example of memory
    ts = time.time()
    metric = PerformanceMetric(func_name, exec_time, mem, ts)
    metrics_deque.append(metric)
    metrics_by_function[func_name].append(metric)
    call_counter[func_name] += 1
    metrics_chrono[ts] = metric

def get_function_stats(func_name):
    return metrics_by_function.get(func_name, [])

def get_memory_usage():
    return sys.getsizeof(metrics_deque) + sys.getsizeof(metrics_by_function)

def export_metrics():
    # JSON
    json_data = {
        'recent_metrics': [m._asdict() for m in metrics_deque],
        'by_function': {
            fn: [m._asdict() for m in metrics]
            for fn, metrics in metrics_by_function.items()
        },
        'call_counts': dict(call_counter),
    }
    json_path = os.path.join(metrics_dir, 'performance.json')
    with open(json_path, 'w', encoding='utf-8') as f:
        json.dump(json_data, f, indent=2)

    # Pickle
    pkl_path = os.path.join(metrics_dir, 'performance.pkl')
    with open(pkl_path, 'wb') as f:
        pickle.dump({
            'metrics_deque': metrics_deque,
            'metrics_by_function': metrics_by_function,
            'call_counter': call_counter,
            'metrics_chrono': metrics_chrono,
        }, f)


### Code usage

record_metric("validate", 0.05)
export_metrics()

**Задача 10: Комплексная система управления данными**

Создайте комплексную систему управления данными, объединяющую все изученные концепции:

1. Создайте несколько namedtuple для различных типов данных (User, Product, Order, etc.)
2. Используйте `defaultdict` для создания индексов по различным полям
3. Используйте `deque` для реализации очередей обработки данных
4. Используйте `Counter` для аналитики и статистики
5. Используйте `OrderedDict` для хранения данных в определенном порядке
6. Используйте `ChainMap` для объединения различных источников данных
7. Используйте `os` и `sys` для работы с файловой системой и мониторинга
8. Реализуйте CRUD операции (Create, Read, Update, Delete)
9. Добавьте функции экспорта/импорта данных в различных форматах
10. Сериализуйте все данные в JSON, pickle и другие форматы
11. Добавьте типизацию для всех функций и классов
12. Реализуйте систему логирования для отслеживания операций


In [None]:
from collections import namedtuple, defaultdict, deque, Counter, OrderedDict, ChainMap
import os
import sys
import json
import pickle
from typing import List, Dict, Optional

### Logging — through simple list log_entries and log_op function ###

# 1) namedtuple for different type of data
User = namedtuple('User', ['id', 'name', 'email'])
Product = namedtuple('Product', ['id', 'name', 'price'])
Order = namedtuple('Order', ['id', 'user_id', 'product_id', 'status'])

# Example data
users = [
    User(1, "Alice", "alice@example.com"),
    User(2, "Bob", "bob@example.com")
]

products = [
    Product(101, "Laptop", 999.99),
    Product(102, "Mouse", 19.99)
]

orders = [
    Order(1001, 1, 101, "shipped"),
    Order(1002, 2, 102, "pending")
]

# 2) Indexes (for various fields) through defaultdict
user_index: Dict[int, User] = {u.id: u for u in users}
product_index: Dict[int, Product] = {p.id: p for p in products}
orders_by_user = defaultdict(list)
for order in orders:
    orders_by_user[order.user_id].append(order)

# 3) deque for processing orders queue
processing_queue: deque = deque()
for order in orders:
    if order.status == "pending":
        processing_queue.append(order)

# 4) Create counter for analysis of product and status
product_counter = Counter(order.product_id for order in orders)
status_counter = Counter(order.status for order in orders)

# 5) Store in chronological order
all_orders_ordered: OrderedDict = OrderedDict()
for order in orders:
    all_orders_ordered[order.id] = order

# 6) ChainMap — combine sources (e.g. main + dummy archive)
archived_orders = {999: Order(999, 1, 102, "delivered")}
current_orders = {o.id: o for o in orders}
orders_chain = ChainMap(current_orders, archived_orders)

# Simple logging (list of records)
log_entries: List[str] = []

def log_op(operation: str, details: str):
    log_entries.append(f"[{operation}] {details}")

# 7) Paths and monitoring
data_dir = os.path.join("data")
os.makedirs(data_dir, exist_ok=True)

def get_memory_usage() -> int:
    return sys.getsizeof(users) + sys.getsizeof(orders) + sys.getsizeof(products)

# 8) CRUD operations

def create_user(user_id: int, name: str, email: str):
    user = User(user_id, name, email)
    users.append(user)
    user_index[user_id] = user
    log_op("CREATE", f"User {user_id}")

def get_user(user_id: int) -> Optional[User]:
    return user_index.get(user_id)

def update_user(user_id: int, name: str = None, email: str = None):
    if user_id in user_index:
        old = user_index[user_id]
        new = User(old.id, name or old.name, email or old.email)
        # Обновляем в списке и индексе
        for i, u in enumerate(users):
            if u.id == user_id:
                users[i] = new
                break
        user_index[user_id] = new
        log_op("UPDATE", f"User {user_id}")

def delete_user(user_id: int):
    if user_id in user_index:
        users[:] = [u for u in users if u.id != user_id]
        del user_index[user_id]
        log_op("DELETE", f"User {user_id}")

# 9–10)

# Export data to JSON and pickle
def export_data():
    # JSON
    json_data = {
        "users": [u._asdict() for u in users],
        "products": [p._asdict() for p in products],
        "orders": [o._asdict() for o in orders],
        "analytics": {
            "product_counts": dict(product_counter),
            "status_counts": dict(status_counter)
        },
        "logs": log_entries
    }
    with open(os.path.join(data_dir, "data.json"), "w", encoding="utf-8") as f:
        json.dump(json_data, f, indent=2)

    # Pickle
    with open(os.path.join(data_dir, "data.pkl"), "wb") as f:
        pickle.dump({
            "users": users,
            "products": products,
            "orders": orders,
            "user_index": user_index,
            "product_index": product_index,
            "orders_by_user": orders_by_user,
            "processing_queue": processing_queue,
            "log_entries": log_entries
        }, f)

# Import data from pickle
def import_data_from_pickle():
    path = os.path.join(data_dir, "data.pkl")
    if os.path.exists(path):
        with open(path, "rb") as f:
            data = pickle.load(f)
        # Update global variables
        global users, products, orders, user_index, product_index, orders_by_user, processing_queue, log_entries
        users = data["users"]
        products = data["products"]
        orders = data["orders"]
        user_index = data["user_index"]
        product_index = data["product_index"]
        orders_by_user = data["orders_by_user"]
        processing_queue = data["processing_queue"]
        log_entries = data["log_entries"]


### Code usage

create_user(3, "Charlie", "charlie@example.com")
update_user(1, email="alice.new@example.com")
delete_user(2)
export_data()