**Задача 1: Система управления библиотекой**

Создайте систему управления библиотекой используя collections:

1. Создайте namedtuple `Book` с полями: title, author, isbn, year
2. Создайте namedtuple `Reader` с полями: name, reader_id, phone
3. Используйте `defaultdict(list)` для хранения книг по жанрам
4. Используйте `deque` для очереди читателей, ожидающих популярную книгу
5. Используйте `Counter` для подсчета количества книг каждого автора
6. Используйте `OrderedDict` для хранения истории выдачи книг (читатель -> список книг)
7. Сериализуйте все данные в JSON и pickle форматы

In [None]:
from collections import namedtuple, defaultdict, deque, Counter, OrderedDict
from typing import NamedTuple
import json, pickle

Book = namedtuple('Book', ['title', 'author', 'isbn', 'year'])
all_books: list[Book] = [] 
book1: Book = Book("fairy tale", 'Author1', '123', 1874) #добавляем книги
all_books.append(book1)
book2 = Book("The stories", 'Author1', '345', 1901)
all_books.append(book2)
book3 = Book("fairy tale", 'Author3', '567', 1991)
all_books.append(book3)

Reader = namedtuple('Reader', ['name', 'reader_id', 'phone'])
reader1: Reader = Reader('ivan', '101','998765')#добавляем читателей
reader2 = Reader('oleg', '102','445567')
book_genre: dict[str, list[Book]] = defaultdict(list)
book_genre["fairy tale"].append(book1)
book_genre["stories"].append(book2)
book_genre["fairy tale"].append(book3)

deque_readers: deque[str] = deque()
deque_readers.append(reader1.name)#добавляем читателей в очередь
deque_readers.append(reader2.name) 

author_counter : Counter[str] = Counter(book.author for book in all_books) #подсчёт количества книг авторов

book_distribution: OrderedDict[str, list[Book]] = OrderedDict()
book_distribution[reader1.name]=[book3, book2]
book_distribution[reader2.name]=[book1]

all_books_serializable = [book._asdict() for book in all_books]
book_genre_serializable = {genre: [book._asdict() for book in books] for genre, books in book_genre.items()}
deque_readers_serializable = list(deque_readers)
author_counter_serializable = dict(author_counter)

def serialize_history(history):
    return {reader: [book._asdict() for book in books] for reader, books in history.items()}

book_distribution_serializable = serialize_history(book_distribution)

library_data = {
    "all_books": all_books_serializable,
    "book_genre": book_genre_serializable,
    "waiting_queue": deque_readers_serializable,
    "author_counter": author_counter_serializable,
    "history": book_distribution_serializable
}
with open("library_data.json", "w", encoding="utf-8") as f:
    json.dump(library_data, f, ensure_ascii=False, indent=4)
with open("library_data.pkl", "wb") as f:
    pickle.dump(library_data, f)

**Задача 2: Анализатор файловой системы**

Создайте анализатор файловой системы используя os и sys:

1. Создайте namedtuple `FileInfo` с полями: name, size, extension, modified_time
2. Используйте `os.walk()` для обхода директории
3. Используйте `os.path` функции для получения информации о файлах
4. Используйте `Counter` для подсчета файлов по расширениям
5. Используйте `defaultdict(list)` для группировки файлов по размеру (маленькие < 1MB, средние 1-100MB, большие > 100MB)
6. Используйте `deque` для хранения последних 10 найденных файлов
7. Выведите статистику используя `sys.getsizeof()` для подсчета памяти
8. Сохраните результаты в JSON файл

In [None]:
from collections import namedtuple, Counter, defaultdict, deque
import sys
import os
import json
directory: str = r"c:\Users\User\OneDrive\Рабочий стол\dev\Python"
FileInfo = namedtuple('FileInfo', ['name', 'size', 'extension', 'modified_time'])
file_info: list[FileInfo] = []
print(os.path.abspath(directory))
for root, dirs, files in os.walk(directory):
    for filename in files:
        path: str = os.path.join(root, filename)
        size: int = os.path.getsize(path)
        name, extension = os.path.splitext(filename)
        modified_time: float = os.path.getmtime(path)
        file_info.append (FileInfo(name, size, extension, modified_time))
extensions_counter: Counter[str] = Counter(inform.extension for inform in file_info)
print("Статистика по расширениям:")
for ext, count in extensions_counter.items():
    print(f"{ext if ext else 'без расширения'}: {count}")
size_group: dict[str, list[FileInfo]] = defaultdict(list)
for inform in file_info:
    if inform.size < 1024 * 1024:  
        size_group['маленькие'].append(inform)
    elif inform.size <= 100 * 1024 * 1024: 
        size_group['средние'].append(inform)
    else:  
        size_group['большие'].append(inform)
print("Статистика по размерам:")
for category, files in size_group.items():
    print(f"{category}: {len(files)} файлов")
files_deque: deque[str]  = deque(maxlen=10)
for inform in file_info:
    files_deque.append(inform.name + inform.extension)
print("Последние 10 найденных файлов:")
for f in files_deque:
    print(f)
print("Размер списка file_info:", sys.getsizeof(file_info), "байт")
total: int  = 0
for inform in file_info:
    total += sys.getsizeof(inform)
print("Суммарный размер всех FileInfo объектов:", total, "байт")
print("Всего:", sys.getsizeof(file_info) + total, "байт")
data = [inform._asdict() for inform in file_info]
with open("results.json", "w", encoding="utf-8") as f:
    json.dump(data, f, ensure_ascii=False, indent=4)



c:\Users\User\OneDrive\Рабочий стол\dev\Python
Статистика по расширениям:
.ipynb: 4
.json: 2
.pkl: 1
.txt: 1
.swp: 1
без расширения: 141
.sample: 28
.md: 1
.py: 1
.xml: 4
Статистика по размерам:
маленькие: 184 файлов
Последние 10 найденных файлов:
73c6abea7dde1d5b5bf27e2428247eb76eb9eb
233b6982b2662f4ed0411ddd6459c6655893a1
20c8dbb9cd7262eac3d982c95c8826a0ecc1a7
main
main
.gitignore
misc.xml
vcs.xml
workspace.xml
profiles_settings.xml
Размер списка file_info: 1656 байт
Суммарный размер всех FileInfo объектов: 13248 байт
Всего: 14904 байт


**Задача 3: Система конфигурации приложения**

Создайте систему конфигурации используя ChainMap и defaultdict:

1. Создайте namedtuple `Config` с полями: key, value, section, default_value
2. Создайте несколько словарей конфигурации (default, user, environment)
3. Используйте `ChainMap` для объединения конфигураций с приоритетом
4. Используйте `defaultdict(dict)` для группировки настроек по секциям
5. Используйте `OrderedDict` для сохранения порядка загрузки конфигураций
6. Используйте `os.environ` для чтения переменных окружения
7. Сериализуйте конфигурацию в JSON и pickle форматы

In [None]:
from collections import namedtuple, ChainMap, defaultdict, OrderedDict
from typing import Any
import os
import json
Config = namedtuple('Config',['key', 'value', 'section', 'default_value']) #namedtuple `Config` с полями: key, value, section, default_value
default: dict[str, Any] = {'theme':'none', 'language': 'ru'}
user: dict[str, Any] = {'theme':'dark', 'language': 'en'}
environment: dict[str, Any] = {'theme':'light', 'language': 'fr'}
config_chain: ChainMap = ChainMap(environment, user, default) #`ChainMap` для объединения конфигураций с приоритетом
section_config: defaultdict[str, dict[str, Any]] = defaultdict (dict) #`defaultdict(dict)` для группировки настроек по секциям
key_to_section: dict[str, str] = {'theme': 'UI', 'language': 'General'}
for key, section in key_to_section.items():
    section_config[section][key] = config_chain[key]
od_section_config: OrderedDict[str, dict] = OrderedDict()
for section, settings in section_config.items():
    od_section_config[section] = OrderedDict(settings) #для каждой секции свой OrderedDict
config_chain = ChainMap(os.environ, environment, user, default)
data : dict[str, dict[str, Any]] = {section : dict(settings) for section, settings in od_section_config.items()}
with open('result.json', 'w') as file:
    json.dump(data, file)

**Задача 4: Мониторинг системы**

Создайте систему мониторинга используя sys и os:

1. Создайте namedtuple `SystemInfo` с полями: cpu_count, memory_usage, process_id, user_name
2. Используйте `os.cpu_count()` для получения количества процессоров
3. Используйте `sys.getallocatedblocks()` для мониторинга памяти
4. Используйте `os.getpid()` и `os.getlogin()` для информации о процессе
5. Используйте `deque` для хранения последних 20 измерений
6. Используйте `Counter` для подсчета частоты использования различных функций
7. Используйте `defaultdict(list)` для группировки измерений по времени
8. Сохраните историю мониторинга в pickle файл


In [None]:
from collections import namedtuple, deque, Counter, defaultdict, OrderedDict
from typing import Any
from datetime import datetime
import os
import sys
import pickle
SystemInfo = namedtuple('SystemInfo',['cpu_count', 'memory_usage', 'process_id', 'user_name']) #namedtuple `SystemInfo` с полями: cpu_count, memory_usage, process_id, user_name
number_processes : int | None = os.cpu_count() #`os.cpu_count()` для получения количества процессоров
print("Количество процессоров в системе:", number_processes)
memory: int = sys.getallocatedblocks() #`sys.getallocatedblocks()` для мониторинга памяти
print("мониторинг памяти:", memory)
pid: int = os.getpid() #`os.getpid()` и `os.getlogin()` для информации о процессе
login: str = os.getlogin()
last_measurements: deque[SystemInfo] = deque(maxlen=20) # `deque` для хранения последних 20 измерений
system_info: SystemInfo = SystemInfo(cpu_count=number_processes, memory_usage=memory, process_id=pid, user_name=login)
last_measurements.append(system_info) #для хранения последних 20 измерений
print(last_measurements)
measurements_counter: Counter[str] = Counter()
measurements_counter['get_system_info'] +=1
time_group: defaultdict[str, list[SystemInfo]]=defaultdict(list)
current_time = datetime.now().strftime('%H:%M:%S')
time_group[current_time].append(system_info)
print(time_group)

with open('monitoring_history.pkl', 'wb') as f:
    pickle.dump(last_measurements, f)
with open('time_group.pkl', 'wb') as f:
    pickle.dump(time_group, f)
with open('measurements_counter.pkl', 'wb') as f:
    pickle.dump(measurements_counter, f)

Количество процессоров в системе: 6
мониторинг памяти: 355516
deque([SystemInfo(cpu_count=6, memory_usage=355516, process_id=27000, user_name='User')], maxlen=20)
defaultdict(<class 'list'>, {'09:10:04': [SystemInfo(cpu_count=6, memory_usage=355516, process_id=27000, user_name='User')]})


**Задача 5: Система логирования**

Создайте систему логирования используя все изученные коллекции:

1. Создайте namedtuple `LogEntry` с полями: timestamp, level, message, module, function
2. Используйте `deque` для хранения последних 100 логов (кольцевой буфер)
3. Используйте `defaultdict(list)` для группировки логов по уровням (DEBUG, INFO, WARNING, ERROR)
4. Используйте `Counter` для подсчета количества логов каждого уровня
5. Используйте `OrderedDict` для хранения логов по времени (FIFO)
6. Используйте `ChainMap` для объединения различных источников логов
7. Используйте `os.path` для работы с файлами логов
8. Сериализуйте логи в JSON и pickle форматы

In [None]:
from collections import namedtuple, deque, Counter, defaultdict, OrderedDict, ChainMap
from typing import Any
from datetime import datetime
import os
import json
import pickle
LogEntry = namedtuple('LogEntry',['timestamp', 'level', 'message', 'module', 'function']) #namedtuple `LogEntry`
last_log: deque = deque(maxlen=100) 
logs_by_level: defaultdict[str, list[LogEntry]] = defaultdict(list)   # defaultdict(list) для группировки логов по уровням
level_counts: Counter[str] = Counter()            # Counter для подсчёта логов каждого уровня
entry:LogEntry = LogEntry(
    timestamp = datetime.now().isoformat(timespec='milliseconds'),
    level='INFO',
    message='Система запущена',
    module=__name__,
    function='main'
)
last_log.append(entry)# Добавляем в кольцевой буфер
logs_by_level[entry.level].append(entry)# Добавляем в defaultdict по уровню
level_counts[entry.level] += 1 # Увеличиваем счетчик Counter
print("Последний лог:", last_log[-1])        # последний элемент в кольцевом буфере
print("Логи по уровням:", logs_by_level)     # сгруппированные по уровню
print("Счётчики уровней:", level_counts)     # количество по каждому уровню
log_time: OrderedDict[str, dict] = OrderedDict()
log_time[entry.timestamp] = entry
print("Логи по времени (FIFO):", log_time)
log_source: ChainMap = ChainMap(logs_by_level, log_time) # `ChainMap` для объединения различных источников логов
log_filename = 'app_logs.json'
log_path = os.path.abspath(log_filename)# Получаем путь к файлу
print("путь к файлу лога:", log_path)
if os.path.exists(log_path): # Проверяем, существует ли файл
    print("Файл существует")
else:
    print("Файл не найден, создаём новый")
with open('last_log.json', 'w', encoding='utf-8') as f:
    json.dump([entry._asdict() for entry in last_log], f, ensure_ascii=False, indent=4)
with open('log_time.pkl', 'wb') as f:
    pickle.dump(log_time, f)

Последний лог: LogEntry(timestamp='2025-09-22T09:47:08.483', level='INFO', message='Система запущена', module='__main__', function='main')
Логи по уровням: defaultdict(<class 'list'>, {'INFO': [LogEntry(timestamp='2025-09-22T09:47:08.483', level='INFO', message='Система запущена', module='__main__', function='main')]})
Счётчики уровней: Counter({'INFO': 1})
Логи по времени (FIFO): OrderedDict([('2025-09-22T09:47:08.483', LogEntry(timestamp='2025-09-22T09:47:08.483', level='INFO', message='Система запущена', module='__main__', function='main'))])
путь к файлу лога: c:\Users\User\OneDrive\Рабочий стол\dev\Python\app_logs.json
Файл не найден, создаём новый


**Задача 6: Кэш-система**

Создайте простую кэш-систему используя collections:

1. Создайте namedtuple `CacheEntry` с полями: key, value, timestamp, access_count
2. Используйте `OrderedDict` для реализации LRU (Least Recently Used) кэша
3. Используйте `deque` для хранения истории доступа к ключам
4. Используйте `Counter` для подсчета частоты доступа к каждому ключу
5. Используйте `defaultdict(int)` для хранения счетчиков доступа
6. Реализуйте методы: get, set, delete, clear, size
7. Используйте `sys.getsizeof()` для мониторинга размера кэша
8. Сериализуйте кэш в pickle формат для сохранения между сессиями


In [None]:
from collections import namedtuple, deque, Counter, defaultdict, OrderedDict
from typing import Any
from datetime import datetime
import os, pickle, sys
CacheEntry = namedtuple('CacheEntry',['key', 'value', 'timestamp', 'access_count']) #namedtuple `CacheEntry`
entry: CacheEntry = CacheEntry('user1', 'data1', timestamp=datetime.now(), access_count=0)
cache: OrderedDict[str, dict] = OrderedDict() #`OrderedDict` для реализации LRU (Least Recently Used) кэша
cache["cache1"]='001'
cache["cache2"]='002'
cache["cache3"]='003'
history_key: deque = deque() #`deque` для хранения истории доступа к ключам
history_key.append('cache2')
history_key.append('cache3')
access_counter = Counter(history_key) #`Counter` для подсчета частоты доступа к каждому ключу
access_count_dict : defaultdict[str, int] = defaultdict(int)   # `defaultdict(int)` для хранения счетчиков доступа
access_count_dict ['user1']+=1
def set_value(key: str, value: Any):
    entry = CacheEntry(key, value, datetime.now(), 0)# создаём запись
    cache[key] = entry # добавляем/обновляем в кэше
    cache.move_to_end(key)# перемещаем в конец для LRU
    history_key.append(key)# добавляем в историю
    access_count_dict[key] += 1 # увеличиваем счётчик доступа
def get_value(key: str, default=None):
    if key in cache:         
        cache.move_to_end(key)# обновляем LRU
        history_key.append(key)# добавляем в историю
        access_count_dict[key] += 1 # увеличиваем счётчик доступа
        return cache[key].value # возвращаем значение
    else:
        return default
def delete_value(key: str):
    if key in cache:
        removed = cache.pop(key)  # удаляем из кэша
        history_key.append(key)   # фиксируем удаление в истории
        return removed.value
    else:
        return None
def clear_cache():
    cache.clear()               # очищаем LRU-кэш
    history_key.clear()         # очищаем историю
    access_count_dict.clear()   # очищаем defaultdict
    access_counter.clear()      # очищаем Counter
def cache_size():
    size = sys.getsizeof(cache)# сколько памяти занимает сам объект cache
    for key, entry in cache.items(): # проходим по всем элементам кэша: key, entry
        size += sys.getsizeof(key) # добавляем размер ключа
        size += sys.getsizeof(entry) # добавляем размер записи CacheEntry
    return size # общий размер кэша в байтах   
with open("cache.pkl", "wb") as f:   
    pickle.dump(cache, f) # сохраняем кэш
if os.path.exists("cache.pkl"):       # проверяем, есть ли файл
    with open("cache.pkl", "rb") as f:  
        cache = pickle.load(f)          # загружаем кэш

**Задача 7: Анализатор текста**

Создайте анализатор текста используя collections:

1. Создайте namedtuple `WordInfo` с полями: word, frequency, length, first_occurrence
2. Используйте `Counter` для подсчета частоты слов
3. Используйте `defaultdict(list)` для группировки слов по длине
4. Используйте `deque` для хранения последних 50 уникальных слов
5. Используйте `OrderedDict` для хранения слов в порядке первого появления
6. Используйте `os.path` для работы с текстовыми файлами
7. Используйте `sys.getsizeof()` для анализа памяти
8. Сохраните результаты анализа в JSON файл


In [5]:
from collections import namedtuple, deque, Counter, defaultdict, OrderedDict
from typing import Any
import json, sys
WordInfo = namedtuple('WordInfo',['word', 'frequency', 'length', 'first_occurrence']) #amedtuple `WordInfo` с полями: word, frequency, length, first_occurrence
word_1: WordInfo = WordInfo('hello', 3, 5, 'hello world')
text = "hello world hello python code python hello" # Текст для анализа (вместо файла)
words = text.split() # разбиение текста на слова
counter = Counter(words) #`Counter` для подсчета частоты слов
group_words: defaultdict[str, list[WordInfo]] = defaultdict(list) #для группировки слов по длине
unic_words: deque = deque(maxlen=50) #`deque` для хранения последних 50 уникальных слов
first_use: OrderedDict[str, dict] = OrderedDict() #`OrderedDict` для хранения слов в порядке первого появления
for idx, word in enumerate(words):#Создание WordInfo для каждого слова и заполнение всех структур
    if word not in first_use:
        wi = WordInfo(word, counter[word], len(word), idx)
        first_use[word] = wi
        group_words[len(word)].append(wi)
        unic_words.append(word) 
print("Размер Counter:", sys.getsizeof(counter))
print("Размер defaultdict:", sys.getsizeof(group_words))
print("Размер OrderedDict:", sys.getsizeof(first_use))
print("Размер deque:", sys.getsizeof(unic_words))
result = {
    "group_words": {k: [wi._asdict() for wi in v] for k, v in group_words.items()},
    "first_use": {k: v._asdict() for k, v in first_use.items()},
    "last_50_words": list(unic_words)
}
with open("analysis.json", "w", encoding="utf-8") as f:
    json.dump(result, f, ensure_ascii=False, indent=4)

Размер Counter: 248
Размер defaultdict: 240
Размер OrderedDict: 488
Размер deque: 624


**Задача 8: Система управления задачами**

Создайте систему управления задачами (TODO) используя все изученные концепции:

1. Создайте namedtuple `Task` с полями: id, title, description, priority, status, created_date
2. Используйте `defaultdict(list)` для группировки задач по статусу (todo, in_progress, done)
3. Используйте `deque` для очереди задач с высоким приоритетом
4. Используйте `Counter` для подсчета задач по приоритету
5. Используйте `OrderedDict` для хранения задач в порядке создания
6. Используйте `ChainMap` для объединения различных списков задач
7. Используйте `os.path` для работы с файлами задач
8. Реализуйте функции: add_task, complete_task, get_tasks_by_status, get_priority_queue
9. Сериализуйте все данные в JSON и pickle форматы


In [None]:
from collections import namedtuple, deque, Counter, defaultdict, OrderedDict
from datetime import datetime
import json
Task = namedtuple('Task',['id', 'title', 'description', 'priority', 'status', 'created_date']) #namedtuple `Task` с полями: id, title, description, priority, status, created_date
task_1: Task = Task('1', 'cook', 'cook dinner', '2', 'todo', datetime(2025, 9, 25))
task_2: Task = Task('2', 'study', 'learn python', '1', 'todo', datetime(2025, 9, 25))
group_status: defaultdict[str, list[Task]] = defaultdict(list) #для группировки задач по статусу (todo, in_progress, done)
high_priority: deque = deque() #для очереди задач с высоким приоритетом
counter_priority = Counter() # для подсчета задач по приоритету
for i in [task_1, task_2]:
    counter_priority[i.priority] += 1
tasks: OrderedDict[str, dict] = OrderedDict() #хранение задач в порядке создания
united_task: ChainMap = ChainMap(high_priority, group_status) # для объединения различных списков задач
def add_task(task: Task):
    tasks[task.id] = task# добавление в OrderedDict, чтобы сохранить порядок создания
    group_status[task.status].append(task) # Добавляем в defaultdict по статусу
    if task.priority == '1':# Добавляем в очередь высоких приоритетов (1 — высокий)
        high_priority.append(task)
    counter_priority[task.priority] += 1 # Обновляем Counter по приоритету
def complete_task(task_id: str):
    if task_id in tasks:
        task = tasks[task_id]
        new_task = Task(task.id, task.title, task.description, task.priority, 'done', task.created_date) # Меняем статус на done
        tasks[task_id] = new_task
        
        group_status[task.status].remove(task) # Обновляем defaultdict
        group_status['done'].append(new_task)
        
        if task in high_priority:# удаляем в очереди высоких приоритетов — 
            high_priority.remove(task)
        
        print(f"Задача {task_id} выполнена.")
    else:
        print(f"Задача {task_id} не найдена.")
def get_tasks_by_status(status: str):
    return group_status.get(status, [])
def get_priority_queue():
    return list(high_priority)
with open("tasks.json", "w", encoding="utf-8") as f:
    json.dump({k: t._asdict() for k, t in tasks.items()}, f, ensure_ascii=False, indent=4)

with open("tasks.pkl", "wb") as f:
    pickle.dump(tasks, f)

**Задача 9: Система мониторинга производительности**

Создайте систему мониторинга производительности используя sys и collections:

1. Создайте namedtuple `PerformanceMetric` с полями: function_name, execution_time, memory_usage, timestamp
2. Используйте `deque` для хранения последних 100 измерений производительности
3. Используйте `defaultdict(list)` для группировки метрик по функциям
4. Используйте `Counter` для подсчета количества вызовов каждой функции
5. Используйте `OrderedDict` для хранения метрик в хронологическом порядке
6. Используйте `sys.getsizeof()` для мониторинга памяти
7. Используйте `os.path` для работы с файлами метрик
8. Реализуйте функции: record_metric, get_function_stats, get_memory_usage, export_metrics
9. Сериализуйте метрики в JSON и pickle форматы


In [8]:
from collections import namedtuple, deque, Counter, defaultdict, OrderedDict
import json, time, pickle, sys, os
from typing import Optional

PerformanceMetric = namedtuple("PerformanceMetric", ["function_name", "execution_time", "memory_usage", "timestamp"]) #namedtuple `PerformanceMetric` с полями: function_name, execution_time, memory_usage, timestamp
last_metrics: deque[PerformanceMetric]  = deque(maxlen=100)  #deque` для хранения последних 100 измерений производительности
metrics_by_function: defaultdict[str, list[PerformanceMetric]] = defaultdict(list) #группировка метрик по функциям 
function_calls: Counter = Counter() #для подсчета количества вызовов каждой функции
metrics_chronological: OrderedDict[float, PerformanceMetric] = OrderedDict() # для хранения метрик в хронологическом порядке
if os.path.exists("metrics.json"):
    print("Файл существует")
def record_metric(function_name: str, execution_time: float) -> None: #запись метрики
    timestamp: float = time.time()
    metric = PerformanceMetric(
        function_name=function_name,
        execution_time=execution_time,
        memory_usage=sys.getsizeof(execution_time),  # создание метрики с текущим временем
        timestamp=timestamp
    )
    last_metrics.append(metric) #добавление в deque 
    metrics_by_function[function_name].append(metric) #группирует метркиу по функции 
    function_calls[function_name] += 1
    metrics_chronological[timestamp] = metric #добавление для хронологии
def get_function_stats(function_name: str) -> Optional[dict[str, float]]: #получение статистики
    metrics = metrics_by_function.get(function_name, [])
    if not metrics:
        return None
    total_time = sum(m.execution_time for m in metrics) #обще количество измерений
    avg_time = total_time / len(metrics) #среднее время выполнения
    return {"total_calls": len(metrics), "average_time": avg_time}
def get_memory_usage()-> int: #получение памяти
    return sum(sys.getsizeof(m) for m in last_metrics)
def export_metrics() -> None:
    with open("metrics.json", "w") as f:
        json.dump([m._asdict() for m in last_metrics], f, indent=2)
    with open("metrics.pkl", "wb") as f:
        pickle.dump(list(last_metrics), f)
if os.path.exists("metrics.json"):
    print("Файл metrics.json существует")


**Задача 10: Комплексная система управления данными**

Создайте комплексную систему управления данными, объединяющую все изученные концепции:

1. Создайте несколько namedtuple для различных типов данных (User, Product, Order, etc.)
2. Используйте `defaultdict` для создания индексов по различным полям
3. Используйте `deque` для реализации очередей обработки данных
4. Используйте `Counter` для аналитики и статистики
5. Используйте `OrderedDict` для хранения данных в определенном порядке
6. Используйте `ChainMap` для объединения различных источников данных
7. Используйте `os` и `sys` для работы с файловой системой и мониторинга
8. Реализуйте CRUD операции (Create, Read, Update, Delete)
9. Добавьте функции экспорта/импорта данных в различных форматах
10. Сериализуйте все данные в JSON, pickle и другие форматы
11. Добавьте типизацию для всех функций и классов
12. Реализуйте систему логирования для отслеживания операций


In [None]:
from collections import namedtuple, defaultdict, deque, Counter, OrderedDict, ChainMap
import json, pickle, os, sys, logging
from typing import Optional, Any
#namedtuple для различных типов данных (User, Product, Order, etc.)
User = namedtuple("User", ["user_id", "name", "email"])
Product = namedtuple("Product", ["product_id", "name", "price"])
Order = namedtuple("Order", ["order_id", "user_id", "product_id", "quantity"])
users: dict[int, User] = {}
products: dict[int, Product] = {}
orders: OrderedDict[int, Order] = OrderedDict()

# Индексы по различным полям
users_by_email: defaultdict[str, list[User]] = defaultdict(list)
products_by_name: defaultdict[str, list[Product]] = defaultdict(list)

order_queue: deque[Order] = deque() # Очередь обработки данных
order_count: Counter = Counter() # для аналитики и статистики
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") #вывод логов для отслеживания всех операций

def create_user(user_id: int, name: str, email: str) -> User:
    user = User(user_id, name, email)
    users[user_id] = user
    users_by_email[email].append(user)
    logging.info(f"User created: {user}")
    return user

def create_product(product_id: int, name: str, price: float) -> Product:
    product = Product(product_id, name, price)
    products[product_id] = product
    products_by_name[name].append(product)
    logging.info(f"Product created: {product}")
    return product

def create_order(order_id: int, user_id: int, product_id: int, quantity: int) -> Order:
    order = Order(order_id, user_id, product_id, quantity)
    orders[order_id] = order
    order_queue.append(order)
    order_count[product_id] += quantity
    logging.info(f"Order created: {order}")
    return order

#получение пользователя по айди, продукта по имени и заказа по айди
def get_user_by_email(email: str) -> list[User]:
    return users_by_email.get(email, [])

def get_product_by_name(name: str) -> list[Product]:
    return products_by_name.get(name, [])

def get_order(order_id: int) -> Optional[Order]:
    return orders.get(order_id)

def update_product_price(product_id: int, new_price: float) -> None:
    if product_id in products:
        product = products[product_id]
        updated_product = Product(product.product_id, product.name, new_price)
        products[product_id] = updated_product
        logging.info(f"Product updated: {updated_product}")
def delete_user(user_id: int) -> None:
    if user_id in users:
        user = users.pop(user_id)
        users_by_email[user.email].remove(user)
        logging.info(f"User deleted: {user}")

def export_data() -> None:
    with open("users.json", "w") as f:
        json.dump([u._asdict() for u in users.values()], f)
    with open("data.pkl", "wb") as f:
        pickle.dump({"users": users, "products": products, "orders": orders}, f)
    logging.info("Data exported to JSON and Pickle")

def import_data() -> None:
    global users, products, orders
    if os.path.exists("data.pkl"):
        with open("data.pkl", "rb") as f:
            data = pickle.load(f)
            users = data["users"]
            products = data["products"]
            orders = data["orders"]
        logging.info("Data imported from Pickle")

combined_data = ChainMap(users, products) #для объединения данных
print(f"Использовано памяти: {sys.getsizeof(users)} bytes")
if not os.path.exists("exports"):
    os.mkdir("exports")

#### Реализуйте простую модель стоянки. 
 1) Скажем, ваши автомобили это именованные кортежи, с указанием марки автомобиля, модели, гос.номера и цвета
 2) Сама стоянка это двусторонняя очередь (deque)
 Представим, что у нас нет коллизий, и если места на стоянке нет, то виртуально вызывается эвакуатор и убирает самый крайний автомобиль (как и работает deque)
 3) Создайте набор автомобилей и в случайном порядке добавляйте/удаляйте их с автостоянки.
 4) Посчитайте, сколько и каких автомобилей осталось на автостоянке.


In [43]:
from collections import namedtuple, deque, Counter
import random
from typing import Optional, Any
Cars = namedtuple('Cars', ['brand', 'model', 'state_number', 'color'])
parking_lot: deque = deque(maxlen=4)
cars_list : list[Cars] = [
    Cars ('volvo', 's40', 'r276ma', 'red'),
    Cars('bmw', 'x5', 'k566aa', 'blue'),
    Cars('audi', 'q7', 'm746mm', 'white'),
    Cars ('lada', 'aura', 'c777ar', 'black'), 
    Cars ('volvo', 'xc90', 'n459ar', 'blue') ,
]
for i in range(5):
    car = random.choice(cars_list)
    if len(parking_lot) == parking_lot.maxlen:
        removed_car = parking_lot.pop() #delete last value 
        print(f"Удалена: {removed_car}")
    parking_lot.append(car)
    print(f"Добавлена: {car}")
counter_cars: Counter = Counter(car.brand for car in parking_lot)
print(f"машин на парковке: {list(parking_lot)}")
print("подсчёт брендов на парковке: ")
for brand, count in counter_cars.items():
            print(f"  {brand}: {count} машин(ы)")




Добавлена: Cars(brand='audi', model='q7', state_number='m746mm', color='white')
Добавлена: Cars(brand='volvo', model='xc90', state_number='n459ar', color='blue')
Добавлена: Cars(brand='audi', model='q7', state_number='m746mm', color='white')
Добавлена: Cars(brand='volvo', model='s40', state_number='r276ma', color='red')
Удалена: Cars(brand='volvo', model='s40', state_number='r276ma', color='red')
Добавлена: Cars(brand='volvo', model='s40', state_number='r276ma', color='red')
машин на парковке: [Cars(brand='audi', model='q7', state_number='m746mm', color='white'), Cars(brand='volvo', model='xc90', state_number='n459ar', color='blue'), Cars(brand='audi', model='q7', state_number='m746mm', color='white'), Cars(brand='volvo', model='s40', state_number='r276ma', color='red')]
подсчёт брендов на парковке: 
  audi: 2 машин(ы)
  volvo: 2 машин(ы)


 Теперь вы работаете в роли архивариуса РСФСР и занимаетесь хранением информации об иуществе раскулаченных граждан Имперской России

 У вас несколько книг с записями, упорядоченных по алфавиту. Каждая книга хранит фамилии, начинающиеся с конкретной буквы.
 
 Внутри книги указаны фамилии, к каждому гражданину указана категория имущества, например, мебель, посуда, картины и т.д.
 В каждой категории указан список вещей, которые были изъяты. 
 
 Вы вносите записи, и если новой записи нет, вы ее создаете. Затем, когда ищите конкретную категорию предмета, начинаете перебирать все книги, начиная с первой буквы по алфавиту в вашем списке.
 Очевидно, что вам нужно использовать defaultdict и ChainMap

In [55]:
from collections import defaultdict, ChainMap
book_1: defaultdict = defaultdict (lambda: defaultdict(list))
book_1['Иванов']['мебель'].append('стул')
book_1['Иванов']['мебель'].append('стол')
book_1['Егорова']['посуда'].append('тарелка')
book_1['Морозова']['картина'].append("дама с собачкой")

book_2: defaultdict = defaultdict (lambda: defaultdict(list))
book_2['Сидоров']['мебель'].append('стул')
book_2['Петрова']['мебель'].append('шкаф')
book_2['Леонтьева']['посуда'].append('сервиз')
book_2['Тарасов']['посуда'].append("блюдо")

category: ChainMap = ChainMap(book_1, book_2)

def add_item(chainmap, surname, category_name, item): #функция для добавления записи
    chainmap[surname][category_name].append(item)
def find_items(chainmap, category_name):
    results = {}
    for book in chainmap.maps:  # перебираем каждую книгу отдельно
        for surname, categories in sorted(book.items()):  # перебираем записи в этой книге
            if category_name in categories:
                results[surname] = categories[category_name]
    return results


 Используя sys и os 
 Выведите некоторые ваши директории с указанием размера всех файлов и упорядочьте их (директории) по времени последнего обращения

 Выведите список расширений файлов, которые хранятся на вашем ПК
 Посчитайте объем памяти, который используется вашим интерпретатором


In [69]:
import sys, os, datetime
directory: str = r"c:\Users\User\OneDrive\Рабочий стол\dev\Python"
result: list = []
for root, dirs, files in os.walk(directory):
    total_size: float = 0 # размер одной папки
    for filename in files:
        filepath = os.path.join(root, filename)  # полный путь к файлу
        size: int = os.path.getsize(filepath)
        total_size += os.path.getsize(filepath)
        
        extension = os.path.splitext(filename)
    folder_access_time: time = os.path.getatime(root)
    result.append((root, total_size, folder_access_time))
results_sorted = sorted(result, key=lambda x: x[2])  # сортируем по времени доступа
for root, total_size, atime in results_sorted:
    time = datetime.datetime.fromtimestamp(atime)
    print(root, total_size, time)
print("расширения: ", extension)
total_memory = 0

# считаем память для списка результатов
total_memory += sys.getsizeof(result)
for item in result:
    total_memory += sys.getsizeof(item)
    for subitem in item:
        total_memory += sys.getsizeof(subitem)

# считаем память для множества расширений
total_memory += sys.getsizeof(extension)
for ext in extension:
    total_memory += sys.getsizeof(ext)

print("Примерный объём памяти, занимаемой объектами:", total_memory, "байт")

c:\Users\User\OneDrive\Рабочий стол\dev\Python 144495 2025-09-25 16:00:52.838967
c:\Users\User\OneDrive\Рабочий стол\dev\Python\.git 14260 2025-09-25 16:00:52.843274
c:\Users\User\OneDrive\Рабочий стол\dev\Python\.git\hooks 25827 2025-09-25 16:00:52.846988
c:\Users\User\OneDrive\Рабочий стол\dev\Python\.git\info 240 2025-09-25 16:00:52.851428
c:\Users\User\OneDrive\Рабочий стол\dev\Python\.git\logs 4971 2025-09-25 16:00:52.851428
c:\Users\User\OneDrive\Рабочий стол\dev\Python\.git\logs\refs 0 2025-09-25 16:00:52.853463
c:\Users\User\OneDrive\Рабочий стол\dev\Python\.git\logs\refs\heads 4783 2025-09-25 16:00:52.853463
c:\Users\User\OneDrive\Рабочий стол\dev\Python\.git\logs\refs\remotes 0 2025-09-25 16:00:52.854945
c:\Users\User\OneDrive\Рабочий стол\dev\Python\.git\logs\refs\remotes\origin 4605 2025-09-25 16:00:52.854945
c:\Users\User\OneDrive\Рабочий стол\dev\Python\.git\objects 0 2025-09-25 16:00:52.856117
c:\Users\User\OneDrive\Рабочий стол\dev\Python\.git\objects\00 48 2025-09-25 1