Стандартные библиотеки Python :
os, sys, collections и другие являются мощным инструментом, которым нужно и полезно уметь пользоваться.

Ресурсы к ознакомлению:

sys -- https://docs.python.org/3/library/sys.html

os -- https://docs.python.org/3/library/os.html

collections -- https://docs.python.org/3/library/collections.html

#### COLLECTIONS

In [None]:
"""
    Начнем с collections. Библиотека хранит в себе специальные контейнеры,
    расширяющие классический список контейнеров: списков, кортежей и словарей.
"""
import collections
"""
    Посмотреть содержащиеся полезные модули можно, написав функцию ниже одной строкой.
"""
print('\n'.join(list(filter(lambda x: not x.startswith('_') and x != 'abc', collections.__dict__.keys()))))
# содержательно эта строка станет понятна по мере освоения курса
# она неявно содержит в себе концепции list comprehension, а также парадигмы функционального программирования
# и в целом демонстрирует лаконичность и эффективность инструментов языка python

deque
defaultdict
OrderedDict
namedtuple
Counter
ChainMap
UserDict
UserList
UserString


In [None]:
'''
    Начнем с рассмотрения namedtuple. Это именованный кортеж, который то же самое, что и кортеж,
    но допускает обращение к своим полям по имени. Более точно, это специальный тип класса с реализованным дандер методом '__repr__()'
'''
from collections import namedtuple
from typing import NamedTuple # нужна для указания типа, или в качестве указания как родительского класса при создании дочернего

CallOption = namedtuple('CallOption', ['strike', 'expiration_date'])

my_option: CallOption = CallOption(strike=100, expiration_date='2026-01-01')
print(my_option, type(my_option))
print(my_option.strike, type(my_option.strike), my_option.expiration_date, type(my_option.expiration_date))

# my_option.strike = 101 # AttributeError: can't set attribute
'''
    Для работы с именованными кортежами полезно пользоваться некоторыми методами
'''
# namedtuple._asdict() -- представляет объект класса как словарь
print('Достаем словарь:', my_option._asdict(), type(my_option._asdict()))

# namedtuple._replace -- возвращает новый экземляр  замененным полем
print('Создаем новый именованный кортеж:', my_option._replace(strike=200))

# namedtuple._fields -- получить кортеж строк полей
print(my_option._fields, type(my_option._fields))

CallOption(strike=100, expiration_date='2026-01-01') <class '__main__.CallOption'>
100 <class 'int'> 2026-01-01 <class 'str'>
Достаем словарь: {'strike': 100, 'expiration_date': '2026-01-01'} <class 'dict'>
Создаем новый именованный кортеж: CallOption(strike=200, expiration_date='2026-01-01')
('strike', 'expiration_date') <class 'tuple'>


In [None]:
"""
    Далее deque. Это список с эффективной реализацией добавления и извлечения элементов.
    В соответствии с описанием в документации:
        Дек это обобщение стека и очереди. Он потоко-безопасный (thread-safe, то есть его можно параллелить
        на уровне потоков без опасения измененения объекта со стороны другого потока, здравствуй impute и lock),
        эффективный по памяти и требует О(1) времени на добавление и извлечение элемента с начала и конца списка.
"""
from collections import deque

my_deque = deque(maxlen=10)
_ = [my_deque.append(i) for i in range(13)]
print(my_deque, ', ограничен по длине ', len(my_deque), '\n')

print('Убрать элемент слева:', my_deque.popleft(), ', в очереди останется:', my_deque, ', длина ', len(my_deque))
print('Убрать элемент справа:', my_deque.pop(), ', в очереди останется:', my_deque, ', длина ', len(my_deque), '\n')

print('Добавить элемент слева:', my_deque.appendleft(0), ', в очереди станет:', my_deque, ', длина ', len(my_deque))
print('Добавить элемент справа:', my_deque.append(0), ', в очереди станет:', my_deque, ', длина ', len(my_deque))
print('Добавить элемент слева:', my_deque.appendleft(0), ', в очереди станет:', my_deque, ', длина ', len(my_deque))
print('Добавить элемент справа:', my_deque.append(0), ', в очереди станет:', my_deque, ', длина ', len(my_deque))
print('Добавить еще элемент слева:', my_deque.appendleft(0), ', в очереди станет:', my_deque, ', длина ', len(my_deque))
print('Добавить еще элемент слева:', my_deque.appendleft(0), ', в очереди станет:', my_deque, ', длина ', len(my_deque))
'''
    Очередь позволяет эффективно осуществлять операции вставки и изъятия объектов в начале и в конце списка.
    И кроме того в ней встроена защита от дурака, которая не позволит создавать бесконечно большой список.
'''
print(my_deque * 2 ** 10, '\n') # мы можем бесконечно долго из-за ошибки в коде нарашивать длину списка, но не сможем для очереди
'''
    deque поддерживает многие операции, доступные list:
    insert, clear, copy, count, extend, remove, reverse
    Есть свой метод rotate(n=1), который по принципу RR(RoundRobin) сдвигает все элементы на n вправо (n < 0 -- влево)
'''
print('используем rotate(2):', my_deque.rotate(2), ', получаем:', my_deque)

deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10) , ограничен по длине  10 

Убрать элемент слева: 3 , в очереди останется: deque([4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10) , длина  9
Убрать элемент справа: 12 , в очереди останется: deque([4, 5, 6, 7, 8, 9, 10, 11], maxlen=10) , длина  8 

Добавить элемент слева: None , в очереди станет: deque([0, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10) , длина  9
Добавить элемент справа: None , в очереди станет: deque([0, 4, 5, 6, 7, 8, 9, 10, 11, 0], maxlen=10) , длина  10
Добавить элемент слева: None , в очереди станет: deque([0, 0, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10) , длина  10
Добавить элемент справа: None , в очереди станет: deque([0, 4, 5, 6, 7, 8, 9, 10, 11, 0], maxlen=10) , длина  10
Добавить еще элемент слева: None , в очереди станет: deque([0, 0, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10) , длина  10
Добавить еще элемент слева: None , в очереди станет: deque([0, 0, 0, 4, 5, 6, 7, 8, 9, 10], maxlen=10) , длина  10
deque([0, 0, 0, 4, 5, 6, 7, 8,

In [None]:
'''
    следующий объект -- defaultdict. Он расширяет обыкновенный словарь, добавляя метод,
    который вызывается каждый раз при отсуствии ключа, по которому произошло обращение.
    При создании словаря с дефолтным значением в случае неуказания этого дефолтного значения (аргумент default_factory)
    он работает как обычный словарь. Если default_factory указан, то по указанному отсутствующему ключу вызывается default_factory
    Сигнатура такая : defaultdict(default_factory: Callable[[None], object])
    При этом
'''

from collections import defaultdict

my_default_dict = defaultdict()
# my_default_dict['0'] # KeyError

# 1 способ
my_default_dict = defaultdict(lambda: 'missing_value')
print(my_default_dict['0'])

# 2 способ
def return_dafault_value():
    return 'missing_value'

my_default_dict = defaultdict(return_dafault_value)
print(my_default_dict['0'])

# 3 способ
def return_dafault_value(value):
    return lambda: value

my_default_dict = defaultdict(return_dafault_value('arbitraty_value_inserted_right_here'))
print(my_default_dict['0'])

# другие варианты
my_default_dict = defaultdict(list) # дефолтом ставим список
my_default_dict['fridge'].append('egg')
my_default_dict['fridge'].append('steak')
my_default_dict['fridge'].append('butter')
my_default_dict['pocket'].append('money')
print(my_default_dict)

missing_value
missing_value
arbitraty_value_inserted_right_here
defaultdict(<class 'list'>, {'fridge': ['egg', 'steak', 'butter'], 'pocket': ['money']})


In [None]:
'''
    OrderedDict -- это словарь, который помнит порядок добавления ключей. Начиная с python 3.7 это умеет и дефолтный словарь (dict, не путать с defaultdict)
    Так что по большому счету он не настолько актуален. Однако если вам нужно в явном виде донести важную идею вашего кода,
    то использование OrderedDict оправдано вашими целями
'''

from collections import OrderedDict

my_ordered_dict: OrderedDict = OrderedDict.fromkeys('abcde')
print('Упорядоченный словарь:', my_ordered_dict, type(my_ordered_dict))
my_ordered_dict.move_to_end('b')
print('Переупорядочили ключи:', ''.join(my_ordered_dict))

my_ordered_dict.move_to_end('b', last=False)
print('Переупорядочили ключи сноав:', ''.join(my_ordered_dict))

print('Текущий порядок:', ''.join(my_ordered_dict), ', вытащим последний добавленный элемент (LIFO):', my_ordered_dict.popitem())
print('Текущий порядок:', ''.join(my_ordered_dict), ', вытащим первый добавленный элемент (FIFO):', my_ordered_dict.popitem(last=False))

Упорядоченный словарь: OrderedDict([('a', None), ('b', None), ('c', None), ('d', None), ('e', None)]) <class 'collections.OrderedDict'>
Переупорядочили ключи: acdeb
Переупорядочили ключи сноав: bacde
Текущий порядок: bacde , вытащим последний добавленный элемент (LIFO): ('e', None)
Текущий порядок: bacd , вытащим первый добавленный элемент (FIFO): ('b', None)


In [None]:
'''
    ChainMap -- подкласс словаря, который объединяет несколько отдельных словарей в один общий.
'''

from collections import ChainMap

my_dict_1 = dict(a=1, b=2, c=3)
my_dict_2 = dict(color='yellow', size=10)
my_dict_3 = dict(date='2025-01-01', time='16:00')

chain_map : ChainMap = ChainMap(my_dict_1, my_dict_2, my_dict_3)
print(chain_map, type(chain_map))
print('Ссылаемся на объект из 3го словаря:', chain_map['time'])
# print('Ссылаемся на объект, которого нет:', chain_map['weather']) # KeyError
# мы можем проитерироваться по всем объектам в chain'e
for key, value in chain_map.items():
    print(key, value)

ChainMap({'a': 1, 'b': 2, 'c': 3}, {'color': 'yellow', 'size': 10}, {'date': '2025-01-01', 'time': '16:00'}) <class 'collections.ChainMap'>
Ссылаемся на объект из 3го словаря: 16:00
date 2025-01-01
time 16:00
color yellow
size 10
a 1
b 2
c 3


In [None]:
'''
    Последний рассматриваемый -- Counter -- объект, который парсит итерируемый объект на тему повторяющихся элементов
    и формирует словарь, в котором ключами являются уникальные элементы входного списка, а значениями их количество.
'''

from collections import Counter

my_counter : Counter = Counter([0,1,2,3,3,2,1,1,1,0,5,0])
print('Счетчик:', my_counter, type(my_counter))

# дополнительно можно посмотреть наиболее часто встречающиеся объекты
print('Два самых частых числа (число, кол-во):', my_counter.most_common(2))

Счетчик: Counter({1: 4, 0: 3, 2: 2, 3: 2, 5: 1}) <class 'collections.Counter'>
Два самых частых числа (число, кол-во): [(1, 4), (0, 3)]


#### SYS and OS

In [None]:
'''
    библиотека os позволяет работать с некоторым функционалом операционной системы,
    которой вы пользуетесь. Как и ранее для более полного ознакомления со всем функционалом рекомендуется читать документацию.
'''

import os
print(', '.join(list(filter(lambda x: not x.startswith('_'), os.__dict__.keys()))))

abc, sys, st, GenericAlias, name, linesep, stat, access, chdir, chmod, getcwd, getcwdb, link, listdir, lstat, mkdir, readlink, rename, replace, rmdir, symlink, system, umask, unlink, remove, utime, times, execv, execve, spawnv, spawnve, getpid, getppid, getlogin, kill, startfile, waitpid, open, close, closerange, device_encoding, dup, dup2, lseek, read, write, fstat, isatty, pipe, ftruncate, truncate, putenv, unsetenv, strerror, fsync, abort, urandom, get_terminal_size, cpu_count, get_inheritable, set_inheritable, get_handle_inheritable, set_handle_inheritable, scandir, fspath, waitstatus_to_exitcode, environ, F_OK, R_OK, W_OK, X_OK, TMP_MAX, O_RDONLY, O_WRONLY, O_RDWR, O_APPEND, O_CREAT, O_EXCL, O_TRUNC, O_BINARY, O_TEXT, O_NOINHERIT, O_SHORT_LIVED, O_TEMPORARY, O_RANDOM, O_SEQUENTIAL, EX_OK, P_WAIT, P_NOWAIT, P_NOWAITO, P_OVERLAY, P_DETACH, error, stat_result, statvfs_result, terminal_size, DirEntry, times_result, uname_result, path, curdir, pardir, sep, pathsep, defpath, extsep, alt

In [None]:
path = None

# environ содержит словарь с переменными окружения
os.environ

# getlogin возвращает имя пользователя
os.getlogin()

# getpid getppid -- id процесса и id его родительского процесса
os.getpid()
os.getppid()

# os.chmod(path, mode, ...) -- установить мод(режим) для файла по пути path

# вернуть текущую директорию
os.getcwd()

# вернуть список файлов в директории
os.listdir()

# создает директорию по указанному пути path;
# если аргумент 'exist_ok' = True , то функция в случае существования директории не поднимет ошибку, если False -- FileExistsError
os.makedirs(path, exist_ok=True)

# remove / unlink удаляет файл по указанному пути; если указать директорию, будет выдана ошибка OSError
os.remove(path)
os.unlink(path)

# remodirs удаляет директорию по указанному пути; в случае ошибки выдает OSError
os.remodirs(path)

# переименовать src файл/директорию в dst
os.rename(src=None, dst=None)

# scandir возвращает итератор объектов, расположенных по пути path
os.scandir(path)

# возвращает количество логических процессоров в системе
os.cpu_count()

#####     os.path     #####

# abspath возвращает абсолютный путь к указанному относительному пути
os.path.abspath('.')

# вовзращает имя конечной точки указанного пути
os.path.basename(path)

# exists проверяет существование директории или файла по указанному пути и возрвращает True или False
os.path.exists(path)

# возвращают время
os.path.getatime('.') # посещения файла/директории
os.path.getmtime('.') # изменения файла/директории
os.path.getctime('.') # создания файла/директории

# вовзращает размер файла/директории в байтах
os.path.getsize(path)

# по-умному конкатенирует части путей в один
os.path.join(path, *paths)

# нормализует путь: удаляет ненужные/кривые символы
os.path.normpath(path)

# разбивает входную сроку-путь на (head, tail), tail последний файл/директория в пути
os.path.split(path)

# разбивает входную сроку-путь на путь и расширение файла
os.path.splitext('somebody/once/told.me')

In [None]:
'''
    библиотека sys позволяет работать с некоторым функционалом интерпретатора.
'''

import sys
print(', '.join(list(filter(lambda x: not x.startswith('_'), sys.__dict__.keys()))))

addaudithook, audit, breakpointhook, displayhook, exception, exc_info, excepthook, exit, getdefaultencoding, getallocatedblocks, getfilesystemencoding, getfilesystemencodeerrors, getrefcount, getrecursionlimit, getsizeof, getwindowsversion, intern, is_finalizing, setswitchinterval, getswitchinterval, setprofile, getprofile, setrecursionlimit, settrace, gettrace, call_tracing, set_coroutine_origin_tracking_depth, get_coroutine_origin_tracking_depth, set_asyncgen_hooks, get_asyncgen_hooks, unraisablehook, get_int_max_str_digits, set_int_max_str_digits, modules, stderr, version, hexversion, api_version, copyright, platform, maxsize, float_info, int_info, hash_info, maxunicode, builtin_module_names, stdlib_module_names, byteorder, dllhandle, winver, version_info, implementation, flags, float_repr_style, thread_info, meta_path, path_importer_cache, path_hooks, path, executable, prefix, base_prefix, exec_prefix, base_exec_prefix, platlibdir, pycache_prefix, argv, orig_argv, warnoptions, dont

In [None]:
# argv возвращает аргументы скрипта, [0] элемент это имя файла скрипта
sys.argv

# содержит copyright
sys.copyright

# очищает кеш интерпретатора
sys._clear_type_cache() # до python 3.13
# sys._clear_internal_caches() # python 3.13+

#  sys.exception() возвращает пойманную в 'except' ошибку; вне конструкции возвращает None (то есть ничего)
try:
    _ = 0 / 0
except:
    exception = sys.exception()

# возвращает SystemExit exception, сигнализирующий о завершении выполнения интерпретатора
# sys.exit()

# возвращает количество блоков памяти, занятых интерпретатором
sys.getallocatedblocks()

# возвращает предел рекурсии в интерпретаторе
sys.getrecursionlimit()
# sys.setrecursionlimit(N) # устанавливает лимит на рекурсию

# возвращает размер объекта в байтах
sys.getsizeof( 0 )

# возвращает глубину вложенных корутин
sys.get_coroutine_origin_tracking_depth()

# список всех текущих модулей
sys.modules.keys()

# версия интерпретатора
sys.version



#### TYPING

In [None]:
'''
    В общем, строгая типизация и аннотирование вашего кода значительно упростят
    всю дальнейшую работу с ним. Как вам, так и другим, кто будет его читать.
'''

import typing
print(', '.join(sorted(list(filter(lambda x: not x.startswith('_'), typing.__dict__.keys())))))

'''
    Вам желательно запомнить некоторые наиболее часто употребляемые типы.
    Касательно остальных было неплохо их хотя бы знать, чтобы при случае вы могли ими воспользоваться.
'''



#### SERIALIZATION

In [None]:
'''
wikipedia
    Сериализация (в программировании) — процесс перевода структуры данных в битовую последовательность.
    Обратной к операции сериализации является операция десериализации (структуризации) —
        создание структуры данных из битовой последовательности.
    Мы будем ссылаться на сериализацию как на возможность создания и сохранения файла
    из некоторого объекта python.
    Есть несколько библиотек, которые позволяют сериализовать некоторый объект.
    Если это dataframe, то его можно сериализовать в parquet, csv и другие форматы, предусмотренные библиотекой.
    Если это некоторая строка, словарь, список -- можно использовать json
    Если это какой-то произвольный объект (любой, в том числе и из вышеперечисленных) --
        его можно сохранить в виде бинарного файла. Библиотеки: dill, cloudpickle, pickle
        В реальности, разумно ограничиться одной библиотекой. Ознакомьтесь с каждой из списка выше
'''

import json, cloudpickle, pickle, dill

# В общем случае, для работы с файлами используют контекстный менеджер with
# Он гарантирует, что вне его блока кода файл корректно будет закрыт

# default'ная запись и чтение текстового файла
with open('somefile.txt', 'w') as file: # первый аргумент - имя файла, второй режим использования ('w' - запись, 'r' - чтение)
    file.write('Hello, default write!')
with open('somefile.txt', 'r') as file:
    line = file.read()
    print(line, '\n')

# сериализация с помощью json
with open('somefile.json', 'w') as file:
    json.dump('Hello, json!', file)
with open('somefile.json', 'r') as file:
    line = json.load(file)
    print(line, '\n')

# сериализация с помощью dill, pickle, cloudpickle
with open('somefile.pickle', 'wb') as file: # для бинарных файлов режим доступа нужно писать 'wb', 'rb' что значит 'binary'
    dill.dump('Hello, dill|pickle|cloudpickle!', file)
    pickle.dump('Hello, dill|pickle|cloudpickle!', file)
    cloudpickle.dump('Hello, dill|pickle|cloudpickle!', file)
with open('somefile.pickle', 'rb') as file:
    line = dill.load(file)
    line = pickle.load(file)
    line = cloudpickle.load(file)
    print(line, '\n')

Hello, default write! 

Hello, json! 

Hello, dill|pickle|cloudpickle! 



In [None]:
# вы можете сериализовать почти любой объект и его состояние

# class
class ToDemonstrate:
    def __init__(self):
        pass
    def print_hello(self):
        print('Hello, world!')

my_class = ToDemonstrate()

with open('demonstration_cls.pkl', 'wb') as f:
    cloudpickle.dump(my_class, f)
with open('demonstration_cls.pkl', 'rb') as f:
    new_read_class = cloudpickle.load(f)
new_read_class.print_hello()
print()

# объект из библиотеки, например, объект линейной регресии, который тоже класс
from sklearn.linear_model import LinearRegression
lr = LinearRegression()
with open('not_fitted_lr.pkl', 'wb') as f:
    cloudpickle.dump(lr, f)
with open('not_fitted_lr.pkl', 'rb') as f:
    new_read_lr = cloudpickle.load(f)
print(lr, new_read_lr)
print()

# фунцию (но не анонимную lambda функцию!)
def to_save():
    print('Defined function')
with open('my_func.pkl', 'wb') as f:
    cloudpickle.dump(to_save, f)
with open('my_func.pkl', 'rb') as f:
    new_read_func = cloudpickle.load(f)
new_read_func()

Hello, world!

LinearRegression() LinearRegression()

Defined function


#### ЗАДАЧИ

In [None]:
'''
    Ниже вам будут предложены задачи для освоения материалов.
    Собственно, первая и параллельная задача:
        Все объекты, которые вы создадите нужно упаковать в виде файлов (то есть сериализовать).
        По расширению файла должно быть понятно, как его прочитать.
        Дополнительно вам нужно узнать как сохранять файлы в формате .rar, .zip, .tar.gz
            и тоже сохранить файлы в этих форматах.

    Требования к написанию кода: структура, читаемость, строгая типизация и аннотирования.
    Код должен быть очевиден, понятен, структурирован.
'''

In [None]:
# Реализуйте простую модель стоянки.
# 1) Скажем, ваши автомобили это именованные кортежи, с указанием марки автомобиля, модели, гос.номера и цвета
# 2) Сама стоянка это двусторонняя очередь (deque)
#   Представим, что у нас нет коллизий, и если места на стоянке нет, то виртуально вызывается эвакуатор и убирает самый крайний автомобиль (как и работает deque)
# 3) Создайте набор автомобилей и в случайном порядке добавляйте/удаляйте их с автостоянки.
# 4) Посчитайте, сколько и каких автомобилей осталось на автостоянке.

from collections import deque, namedtuple
import random

Car = namedtuple('Car', ['brand', 'model', 'license_plate', 'color'])

parking = deque(maxlen=3)  # 3 машины

cars = [
    Car('Toyota', 'Camry', 'А123ВС77', 'Красный'),
    Car('Honda', 'Civic', 'В456ОР78', 'Синий'),
    Car('BMW', 'X5', 'Е789КХ77', 'Черный'),
    Car('Lada', 'Vesta', 'М321ТУ78', 'Белый'),
    Car('Kia', 'Rio', 'Р654НС77', 'Зеленый'),
    Car('Kia', 'Rio2', 'Р654НС79', 'Зеленый'),
    Car('Kia', 'Rio3', 'Р654НС73', 'Зеленый'),
    Car('Kia', 'Rio4', 'Р654НС70', 'Зеленый'),
    Car('Kia', 'Rio5', 'Р654НС71', 'Зеленый')
]

print("Все автомобили:")
for car in cars:
    print(f"{car.brand} {car.model} - {car.license_plate}")

operations = ['park', 'remove'] * 5
random.shuffle(operations)

print("\nОперации со стоянкой:")
for operation in operations:
    if operation == 'park' and cars:
        car = random.choice(cars)
        if len(parking) >= parking.maxlen:
            removed_car = parking.popleft()
            print(f"Эвакуировали: {removed_car.license_plate}")

        parking.append(car)
        print(f"Припарковали: {car.license_plate}")

    elif operation == 'remove' and parking:
        car = random.choice(list(parking))
        parking.remove(car)
        print(f"Уехал: {car.license_plate}")

# 5) Результат
print(f"\nНа стоянке осталось {len(parking)} машин:")
for i, car in enumerate(parking, 1):
    print(f"{i}. {car.brand} {car.model} - {car.license_plate} - {car.color}")

brands = {}
for car in parking:
    brands[car.brand] = brands.get(car.brand, 0) + 1

print("\nСтатистика по маркам:")
for brand, count in brands.items():
    print(f"{brand}: {count} машин")

Все автомобили:
Toyota Camry - А123ВС77
Honda Civic - В456ОР78
BMW X5 - Е789КХ77
Lada Vesta - М321ТУ78
Kia Rio - Р654НС77
Kia Rio2 - Р654НС79
Kia Rio3 - Р654НС73
Kia Rio4 - Р654НС70
Kia Rio5 - Р654НС71

Операции со стоянкой:
Припарковали: Р654НС70
Уехал: Р654НС70
Припарковали: В456ОР78
Уехал: В456ОР78
Припарковали: В456ОР78
Уехал: В456ОР78
Припарковали: Р654НС77
Припарковали: А123ВС77

На стоянке осталось 2 машин:
1. Kia Rio - Р654НС77 - Зеленый
2. Toyota Camry - А123ВС77 - Красный

Статистика по маркам:
Kia: 1 машин
Toyota: 1 машин


In [None]:
# Теперь вы работаете в роли архивариуса РСФСР и занимаетесь хранением информации об иуществе раскулаченных граждан Имперской России
# У вас несколько книг с записями, упорядоченных по алфавиту. Каждая книга хранит фамилии, начинающиеся с конкретной буквы.
# Внутри книги указаны фамилии, к каждому гражданину указана категория имущества, например, мебель, посуда, картины и т.д.
# В каждой категории указан список вещей, которые были изъяты.
# Вы вносите записи, и если новой записи нет, вы ее создаете. Затем, когда ищите конкретную категорию предмета, начинаете перебирать все книги, начиная с первой буквы по алфавиту в вашем списке.
# Очевидно, что вам нужно использовать defaultdict и ChainMap

from collections import defaultdict, ChainMap

books = {}
for letter in 'АБВГДЕЁЖЗИЙКЛМНОПРСТУФХЦЧШЩЪЫЬЭЮЯ':
    books[letter] = defaultdict(lambda: defaultdict(list))

all_books = ChainMap(*[books])

def add_record(surname, category, item):
    first_letter = surname[0].upper()

    if first_letter in all_books:
        all_books[first_letter][surname][category].append(item)
        print(f"Добавлено: {surname} - {category} - {item}")
    else:
        print(f"Нет книги для буквы '{first_letter}'")

def find_items_by_category(category):
    print(f"\nПоиск предметов категории '{category}':")

    for letter in 'АБВГДЕЁЖЗИЙКЛМНОПРСТУФХЦЧШЩЪЫЬЭЮЯ':
        if letter in all_books:
            for surname, categories in all_books[letter].items():
                if category in categories:
                    items = categories[category]
                    print(f"   {surname}: {', '.join(items)}")

add_record("Иванов", "мебель", "дубовый стол")
add_record("Иванов", "мебель", "кресло")
add_record("Петров", "картины", "портрет императора")

find_items_by_category("мебель")

Добавлено: Иванов - мебель - дубовый стол
Добавлено: Иванов - мебель - кресло
Добавлено: Петров - картины - портрет императора

Поиск предметов категории 'мебель':
   Иванов: дубовый стол, кресло


In [None]:
# Используя sys и os
# Выведите некоторые ваши директории с указанием размера всех файлов и упорядочьте их (директории) по времени последнего обращения
# Выведите список расширений файлов, которые хранятся на вашем ПК
# Посчитайте объем памяти, который используется вашим интерпретатором


#### ДОПОЛНИТЕЛЬНЫЕ ПРАКТИЧЕСКИЕ ЗАДАЧИ

Ниже представлены дополнительные задачи для закрепления изученного материала. Каждая задача использует только те концепции, которые были представлены в данном уроке.


**Задача 1: Система управления библиотекой**

Создайте систему управления библиотекой используя collections:

1. Создайте namedtuple `Book` с полями: title, author, isbn, year
2. Создайте namedtuple `Reader` с полями: name, reader_id, phone
3. Используйте `defaultdict(list)` для хранения книг по жанрам
4. Используйте `deque` для очереди читателей, ожидающих популярную книгу
5. Используйте `Counter` для подсчета количества книг каждого автора
6. Используйте `OrderedDict` для хранения истории выдачи книг (читатель -> список книг)
7. Сериализуйте все данные в JSON и pickle форматы


In [None]:
import json
import pickle
from collections import namedtuple, defaultdict, deque, Counter, OrderedDict
from datetime import datetime
from typing import Dict, List, Any
import os

# namedtuple для книг и читателей
Book = namedtuple('Book', ['title', 'author', 'isbn', 'year', 'genre'])
Reader = namedtuple('Reader', ['name', 'reader_id', 'phone'])

class LibraryManagementSystem:
    def __init__(self):
        # defaultdict для хранения книг по жанрам
        self.books_by_genre = defaultdict(list)

        # deque для очереди читателей, ожидающих популярную книгу
        self.waiting_queue = deque()

        # для подсчета количества книг каждого автора
        self.books_by_author = Counter()

        # для хранения истории выдачи книг
        self.lending_history = OrderedDict()

        # структуры для хранения данных
        self.all_books = []
        self.all_readers = []

    def add_book(self, title: str, author: str, isbn: str, year: int, genre: str) -> None:
        book = Book(title, author, isbn, year, genre)
        self.all_books.append(book)
        self.books_by_genre[genre].append(book)
        self.books_by_author[author] += 1

    def add_reader(self, name: str, reader_id: str, phone: str) -> None:
        reader = Reader(name, reader_id, phone)
        self.all_readers.append(reader)

    def lend_book(self, reader_id: str, isbn: str) -> None:
        # находим книгу и читателя
        book = next((b for b in self.all_books if b.isbn == isbn), None)
        reader = next((r for r in self.all_readers if r.reader_id == reader_id), None)

        if book and reader:
            # добавляем в историю выдачи
            if reader_id not in self.lending_history:
                self.lending_history[reader_id] = []
            self.lending_history[reader_id].append({
                'book_isbn': isbn,
                'book_title': book.title,
                'lend_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            })
            print(f"Книга '{book.title}' выдана читателю {reader.name}")
        else:
            print("Книга или читатель не найдены")

    def add_to_waiting_queue(self, reader_id: str, book_title: str) -> None:
        self.waiting_queue.append({
            'reader_id': reader_id,
            'book_title': book_title,
            'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        })
        print(f"Читатель {reader_id} добавлен в очередь ожидания книги '{book_title}'")

    def get_next_in_queue(self) -> Dict[str, str]:
        if self.waiting_queue:
            return self.waiting_queue.popleft()
        return None

    def get_books_by_genre(self, genre: str) -> List[Book]:
        return self.books_by_genre.get(genre, [])

    def get_author_stats(self) -> Dict[str, int]:
        return dict(self.books_by_author)

    def get_lending_history(self) -> OrderedDict:
        return self.lending_history

    def to_dict(self) -> Dict[str, Any]:
        return {
            'books': [book._asdict() for book in self.all_books],
            'readers': [reader._asdict() for reader in self.all_readers],
            'books_by_genre': {
                genre: [book._asdict() for book in books]
                for genre, books in self.books_by_genre.items()
            },
            'waiting_queue': list(self.waiting_queue),
            'books_by_author': dict(self.books_by_author),
            'lending_history': {
                reader_id: history
                for reader_id, history in self.lending_history.items()
            }
        }

    def from_dict(self, data: Dict[str, Any]) -> None:
        # очищаем текущие данные
        self.__init__()

        # загружаем книги
        for book_data in data.get('books', []):
            self.add_book(**book_data)

        # загружаем читателей
        for reader_data in data.get('readers', []):
            self.add_reader(**reader_data)

        # загружаем историю выдачи
        for reader_id, history in data.get('lending_history', {}).items():
            self.lending_history[reader_id] = history

        # загружаем очередь ожидания
        self.waiting_queue.extend(data.get('waiting_queue', []))

    def save_to_json(self, filename: str) -> None:
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(self.to_dict(), f, ensure_ascii=False, indent=2)
        print(f"Данные сохранены в {filename}")

    def load_from_json(self, filename: str) -> None:
        if os.path.exists(filename):
            with open(filename, 'r', encoding='utf-8') as f:
                data = json.load(f)
            self.from_dict(data)
            print(f"Данные загружены из {filename}")
        else:
            print(f"Файл {filename} не существует")

    def save_to_pickle(self, filename: str) -> None:
        with open(filename, 'wb') as f:
            pickle.dump(self.to_dict(), f)
        print(f"Данные сохранены в {filename}")

    def load_from_pickle(self, filename: str) -> None:
        if os.path.exists(filename):
            with open(filename, 'rb') as f:
                data = pickle.load(f)
            self.from_dict(data)
            print(f"Данные загружены из {filename}")
        else:
            print(f"Файл {filename} не существует")

    def display_stats(self) -> None:
        print("\nСТАТИСТИКА БИБЛИОТЕКИ")
        print(f"Всего книг: {len(self.all_books)}")
        print(f"Всего читателей: {len(self.all_readers)}")
        print(f"Книг в очереди ожидания: {len(self.waiting_queue)}")
        print(f"Записей в истории выдачи: {len(self.lending_history)}")

        print("\nКниги по жанрам:")
        for genre, books in self.books_by_genre.items():
            print(f"  {genre}: {len(books)} книг")

        print("\nТоп авторов:")
        for author, count in self.books_by_author.most_common(5):
            print(f"  {author}: {count} книг")

library = LibraryManagementSystem()

library.add_book("Война и мир", "Лев Толстой", "978-5-389-07464-0", 1869, "Роман")
library.add_book("Анна Каренина", "Лев Толстой", "978-5-389-07465-7", 1877, "Роман")
library.add_book("Преступление и наказание", "Фёдор Достоевский", "978-5-389-07466-4", 1866, "Роман")
library.add_book("1984", "Джордж Оруэлл", "978-5-389-07467-1", 1949, "Антиутопия")
library.add_book("Мастер и Маргарита", "Михаил Булгаков", "978-5-389-07468-8", 1967, "Роман")

library.add_reader("Иван Иванов", "R001", "+7-999-123-45-67")
library.add_reader("Петр Петров", "R002", "+7-999-987-65-43")
library.add_reader("Мария Сидорова", "R003", "+7-999-555-44-33")

library.lend_book("R001", "978-5-389-07464-0")
library.lend_book("R002", "978-5-389-07467-1")
library.lend_book("R001", "978-5-389-07468-8")

library.add_to_waiting_queue("R003", "Война и мир")
library.add_to_waiting_queue("R002", "Мастер и Маргарита")

library.save_to_json('library_data.json')
library.save_to_pickle('library_data.pkl')

library.display_stats()

print("\n" + "="*50)
print("Создаем новую систему и загружаем данные...")

new_library = LibraryManagementSystem()
new_library.load_from_json('library_data.json')
new_library.display_stats()

Книга 'Война и мир' выдана читателю Иван Иванов
Книга '1984' выдана читателю Петр Петров
Книга 'Мастер и Маргарита' выдана читателю Иван Иванов
Читатель R003 добавлен в очередь ожидания книги 'Война и мир'
Читатель R002 добавлен в очередь ожидания книги 'Мастер и Маргарита'
Данные сохранены в library_data.json
Данные сохранены в library_data.pkl

СТАТИСТИКА БИБЛИОТЕКИ
Всего книг: 5
Всего читателей: 3
Книг в очереди ожидания: 2
Записей в истории выдачи: 2

Книги по жанрам:
  Роман: 4 книг
  Антиутопия: 1 книг

Топ авторов:
  Лев Толстой: 2 книг
  Фёдор Достоевский: 1 книг
  Джордж Оруэлл: 1 книг
  Михаил Булгаков: 1 книг

Создаем новую систему и загружаем данные...
Данные загружены из library_data.json

СТАТИСТИКА БИБЛИОТЕКИ
Всего книг: 5
Всего читателей: 3
Книг в очереди ожидания: 2
Записей в истории выдачи: 2

Книги по жанрам:
  Роман: 4 книг
  Антиутопия: 1 книг

Топ авторов:
  Лев Толстой: 2 книг
  Фёдор Достоевский: 1 книг
  Джордж Оруэлл: 1 книг
  Михаил Булгаков: 1 книг


**Задача 2: Анализатор файловой системы**

Создайте анализатор файловой системы используя os и sys:

1. Создайте namedtuple `FileInfo` с полями: name, size, extension, modified_time
2. Используйте `os.walk()` для обхода директории
3. Используйте `os.path` функции для получения информации о файлах
4. Используйте `Counter` для подсчета файлов по расширениям
5. Используйте `defaultdict(list)` для группировки файлов по размеру (маленькие < 1MB, средние 1-100MB, большие > 100MB)
6. Используйте `deque` для хранения последних 10 найденных файлов
7. Выведите статистику используя `sys.getsizeof()` для подсчета памяти
8. Сохраните результаты в JSON файл


In [None]:
import os
import sys
import json
from collections import namedtuple, Counter, defaultdict, deque
from datetime import datetime

# namedtuple для информации о файле
FileInfo = namedtuple('FileInfo', ['name', 'size', 'extension', 'modified_time'])

def analyze_filesystem(directory):
    # os.walk для обхода директории
    file_counter = Counter()
    size_groups = defaultdict(list)
    recent_files = deque(maxlen=10)  # deque для последних 10 файлов

    for root, dirs, files in os.walk(directory):
        for file in files:
            file_path = os.path.join(root, file)

            # os.path для получения информации
            if os.path.isfile(file_path):
                size = os.path.getsize(file_path)
                ext = os.path.splitext(file)[1].lower() or 'no_extension'
                mod_time = datetime.fromtimestamp(os.path.getmtime(file_path))

                file_info = FileInfo(file, size, ext, mod_time)

                # для расширений
                file_counter[ext] += 1

                # для группировки по размеру
                if size < 1024 * 1024:  # < 1MB
                    size_groups['small'].append(file_info)
                elif size < 100 * 1024 * 1024:  # 1-100MB
                    size_groups['medium'].append(file_info)
                else:  # > 100MB
                    size_groups['large'].append(file_info)

                # добавляем в deque последних файлов
                recent_files.append(file_info)

    # подсчет памяти
    memory_usage = {
        'file_counter': sys.getsizeof(file_counter),
        'size_groups': sys.getsizeof(size_groups),
        'recent_files': sys.getsizeof(recent_files)
    }

    # подготовка данных для JSON
    result = {
        'extensions': dict(file_counter),
        'size_groups': {
            group: [file._asdict() for file in files]
            for group, files in size_groups.items()
        },
        'recent_files': [file._asdict() for file in recent_files],
        'memory_usage': memory_usage
    }

    # сохранение в JSON
    with open('filesystem_analysis.json', 'w', encoding='utf-8') as f:
        json.dump(result, f, ensure_ascii=False, indent=2, default=str)

    return result

if __name__ == "__main__":
    directory_to_scan = input("Введите путь для анализа: ") or "."
    result = analyze_filesystem(directory_to_scan)
    print(f"Анализ завершен. Результаты сохранены в filesystem_analysis.json")
    print(f"Найдено расширений: {len(result['extensions'])}")

Введите путь для анализа:  C:\Users\hunte


Анализ завершен. Результаты сохранены в filesystem_analysis.json
Найдено расширений: 423


**Задача 3: Система конфигурации приложения**

Создайте систему конфигурации используя ChainMap и defaultdict:

1. Создайте namedtuple `Config` с полями: key, value, section, default_value
2. Создайте несколько словарей конфигурации (default, user, environment)
3. Используйте `ChainMap` для объединения конфигураций с приоритетом
4. Используйте `defaultdict(dict)` для группировки настроек по секциям
5. Используйте `OrderedDict` для сохранения порядка загрузки конфигураций
6. Используйте `os.environ` для чтения переменных окружения
7. Сериализуйте конфигурацию в JSON и pickle форматы


In [None]:
import os
import json
import pickle
from collections import namedtuple, ChainMap, defaultdict, OrderedDict

# namedtuple
Config = namedtuple('Config', ['key', 'value', 'section', 'default_value'])

class ConfigurationSystem:
    def __init__(self):
        # словари конфигурации
        self.default_config = {}
        self.user_config = {}
        self.environment_config = {}

        # для объединения с приоритетом (последний имеет высший приоритет)
        self.chain = ChainMap(self.environment_config, self.user_config, self.default_config)

        # для группировки по секциям
        self.section_config = defaultdict(dict)

        # для порядка загрузки
        self.load_order = OrderedDict()

    def add_config(self, key: str, value: str, section: str = 'general', default_value: str = None):
        config = Config(key, value, section, default_value)

        # добавляем в default config
        self.default_config[key] = config

        # группируем по секциям
        self.section_config[section][key] = config

        # запоминаем порядок
        self.load_order[key] = config

    def set_user_config(self, key: str, value: str):
        if key in self.default_config:
            config = self.default_config[key]._replace(value=value)
            self.user_config[key] = config
            self.section_config[config.section][key] = config

    def load_environment_variables(self, prefix: str = "APP_"):
        for env_key, env_value in os.environ.items():
            if env_key.startswith(prefix):
                config_key = env_key[len(prefix):].lower()
                if config_key in self.default_config:
                    config = self.default_config[config_key]._replace(value=env_value)
                    self.environment_config[config_key] = config
                    self.section_config[config.section][config_key] = config

    def get(self, key: str):
        return self.chain[key].value if key in self.chain else None

    def get_section(self, section: str):
        return dict(self.section_config[section])

    def to_dict(self):
        return {
            'default_config': {k: v._asdict() for k, v in self.default_config.items()},
            'user_config': {k: v._asdict() for k, v in self.user_config.items()},
            'environment_config': {k: v._asdict() for k, v in self.environment_config.items()},
            'section_config': {
                section: {k: v._asdict() for k, v in configs.items()}
                for section, configs in self.section_config.items()
            },
            'load_order': {k: v._asdict() for k, v in self.load_order.items()}
        }

    def save_to_json(self, filename: str):
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(self.to_dict(), f, ensure_ascii=False, indent=2)

    def save_to_pickle(self, filename: str):
        with open(filename, 'wb') as f:
            pickle.dump(self.to_dict(), f)

    def load_from_json(self, filename: str):
        with open(filename, 'r', encoding='utf-8') as f:
            data = json.load(f)
            self._from_dict(data)

    def load_from_pickle(self, filename: str):
        with open(filename, 'rb') as f:
            data = pickle.load(f)
            self._from_dict(data)

    def _from_dict(self, data: dict):
        self.default_config = {k: Config(**v) for k, v in data['default_config'].items()}
        self.user_config = {k: Config(**v) for k, v in data['user_config'].items()}
        self.environment_config = {k: Config(**v) for k, v in data['environment_config'].items()}
        self.section_config = defaultdict(dict, {
            section: {k: Config(**v) for k, v in configs.items()}
            for section, configs in data['section_config'].items()
        })
        self.load_order = OrderedDict({k: Config(**v) for k, v in data['load_order'].items()})
        self.chain = ChainMap(self.environment_config, self.user_config, self.default_config)

config_system = ConfigurationSystem()

config_system.add_config('host', 'localhost', 'database', '127.0.0.1')
config_system.add_config('port', '5432', 'database', '5432')
config_system.add_config('debug', 'false', 'general', 'false')
config_system.add_config('log_level', 'info', 'logging', 'info')

config_system.set_user_config('host', '192.168.1.100')
config_system.set_user_config('log_level', 'debug')

os.environ['APP_PORT'] = '3306'
config_system.load_environment_variables()

print("Host:", config_system.get('host'))  # user_config
print("Port:", config_system.get('port'))  # environment_config
print("Debug:", config_system.get('debug'))  # default_config

config_system.save_to_json('config.json')
config_system.save_to_pickle('config.pkl')

Host: 192.168.1.100
Port: 3306
Debug: false


**Задача 4: Мониторинг системы**

Создайте систему мониторинга используя sys и os:

1. Создайте namedtuple `SystemInfo` с полями: cpu_count, memory_usage, process_id, user_name
2. Используйте `os.cpu_count()` для получения количества процессоров
3. Используйте `sys.getallocatedblocks()` для мониторинга памяти
4. Используйте `os.getpid()` и `os.getlogin()` для информации о процессе
5. Используйте `deque` для хранения последних 20 измерений
6. Используйте `Counter` для подсчета частоты использования различных функций
7. Используйте `defaultdict(list)` для группировки измерений по времени
8. Сохраните историю мониторинга в pickle файл


In [2]:
import sys
import os
from collections import namedtuple, deque, Counter, defaultdict
import pickle
from datetime import datetime

SystemInfo = namedtuple('SystemInfo', ['cpu_count', 'memory_usage', 'process_id', 'user_name'])

class SystemMonitor:
    def __init__(self):
        self.measurements = deque(maxlen=20)
        self.function_counter = Counter()
        self.time_grouped_data = defaultdict(list)

    def collect_system_info(self):
        self.function_counter['collect_system_info'] += 1

        cpu_count = os.cpu_count()
        memory_usage = sys.getallocatedblocks()
        process_id = os.getpid()

        try:
            user_name = os.getlogin()
        except OSError:
            user_name = os.environ.get('USER', 'unknown')

        info = SystemInfo(cpu_count, memory_usage, process_id, user_name)
        timestamp = datetime.now()

        self.measurements.append((timestamp, info))
        self.time_grouped_data[timestamp.strftime("%Y-%m-%d %H:%M")].append(info)

        return info

    def save_history(self, filename='monitoring_history.pkl'):
        history = {
            'measurements': list(self.measurements),
            'function_stats': self.function_counter,
            'time_grouped_data': dict(self.time_grouped_data)
        }

        with open(filename, 'wb') as f:
            pickle.dump(history, f)

monitor = SystemMonitor()

for _ in range(25):
    monitor.collect_system_info()

monitor.save_history()

print(f"последние {len(monitor.measurements)} измерений:")
for timestamp, info in monitor.measurements:
    print(f"{timestamp}: CPU={info.cpu_count}, Memory={info.memory_usage}")

print(f"\nстатистика вызовов: {monitor.function_counter}")

последние 20 измерений:
2025-10-03 06:41:03.582147: CPU=2, Memory=523570
2025-10-03 06:41:03.582204: CPU=2, Memory=523575
2025-10-03 06:41:03.582260: CPU=2, Memory=523581
2025-10-03 06:41:03.582315: CPU=2, Memory=523586
2025-10-03 06:41:03.582405: CPU=2, Memory=523591
2025-10-03 06:41:03.582472: CPU=2, Memory=523596
2025-10-03 06:41:03.582530: CPU=2, Memory=523601
2025-10-03 06:41:03.582586: CPU=2, Memory=523606
2025-10-03 06:41:03.582642: CPU=2, Memory=523612
2025-10-03 06:41:03.582697: CPU=2, Memory=523617
2025-10-03 06:41:03.582753: CPU=2, Memory=523623
2025-10-03 06:41:03.582820: CPU=2, Memory=523629
2025-10-03 06:41:03.582880: CPU=2, Memory=523634
2025-10-03 06:41:03.582935: CPU=2, Memory=523639
2025-10-03 06:41:03.582991: CPU=2, Memory=523645
2025-10-03 06:41:03.583049: CPU=2, Memory=523649
2025-10-03 06:41:03.583105: CPU=2, Memory=523653
2025-10-03 06:41:03.583161: CPU=2, Memory=523656
2025-10-03 06:41:03.583218: CPU=2, Memory=523659
2025-10-03 06:41:03.583273: CPU=2, Memory=523

**Задача 5: Система логирования**

Создайте систему логирования используя все изученные коллекции:

1. Создайте namedtuple `LogEntry` с полями: timestamp, level, message, module, function
2. Используйте `deque` для хранения последних 100 логов (кольцевой буфер)
3. Используйте `defaultdict(list)` для группировки логов по уровням (DEBUG, INFO, WARNING, ERROR)
4. Используйте `Counter` для подсчета количества логов каждого уровня
5. Используйте `OrderedDict` для хранения логов по времени (FIFO)
6. Используйте `ChainMap` для объединения различных источников логов
7. Используйте `os.path` для работы с файлами логов
8. Сериализуйте логи в JSON и pickle форматы


In [4]:
import json
import pickle
import os
from collections import namedtuple, deque, defaultdict, Counter, OrderedDict, ChainMap
from datetime import datetime

LogEntry = namedtuple('LogEntry', ['timestamp', 'level', 'message', 'module', 'function'])

class Logger:
    def __init__(self):
        self.log_buffer = deque(maxlen=100)
        self.logs_by_level = defaultdict(list)
        self.log_counter = Counter()
        self.ordered_logs = OrderedDict()
        self.log_sources = ChainMap({})

    def log(self, level, message, module, function):
        timestamp = datetime.now()
        entry = LogEntry(timestamp, level, message, module, function)

        self.log_buffer.append(entry)
        self.logs_by_level[level].append(entry)
        self.log_counter[level] += 1
        self.ordered_logs[timestamp] = entry

        return entry

    def add_log_source(self, source_name, logs):
        self.log_sources = self.log_sources.new_child({source_name: logs})

    def save_json(self, filename='logs.json'):
        def convert_entry(entry):
            return {
                'timestamp': entry.timestamp.isoformat(),
                'level': entry.level,
                'message': entry.message,
                'module': entry.module,
                'function': entry.function
            }

        logs_data = {
            'buffer': [convert_entry(entry) for entry in self.log_buffer],
            'by_level': {level: [convert_entry(entry) for entry in entries]
                        for level, entries in self.logs_by_level.items()},
            'counts': dict(self.log_counter),
            'ordered': {ts.isoformat(): convert_entry(entry)
                       for ts, entry in self.ordered_logs.items()}
        }

        with open(filename, 'w') as f:
            json.dump(logs_data, f, indent=2)

    def save_pickle(self, filename='logs.pkl'):
        data = {
            'buffer': self.log_buffer,
            'by_level': self.logs_by_level,
            'counts': self.log_counter,
            'ordered': self.ordered_logs,
            'sources': dict(self.log_sources)
        }

        with open(filename, 'wb') as f:
            pickle.dump(data, f)

logger = Logger()

logger.log('INFO', 'система запущена', 'main', 'run')
logger.log('DEBUG', 'инициализация компонентов', 'app', 'init')
logger.log('WARNING', 'низкая память', 'system', 'check_memory')
logger.log('ERROR', 'ошибка подключения', 'network', 'connect')

external_logs = [LogEntry(datetime.now(), 'INFO', 'внешний лог', 'external', 'process')]
logger.add_log_source('external_system', external_logs)

logger.save_json()
logger.save_pickle()

print(f"всего логов: {sum(logger.log_counter.values())}")
print(f"распределение по уровням: {dict(logger.log_counter)}")

всего логов: 4


**Задача 6: Кэш-система**

Создайте простую кэш-систему используя collections:

1. Создайте namedtuple `CacheEntry` с полями: key, value, timestamp, access_count
2. Используйте `OrderedDict` для реализации LRU (Least Recently Used) кэша
3. Используйте `deque` для хранения истории доступа к ключам
4. Используйте `Counter` для подсчета частоты доступа к каждому ключу
5. Используйте `defaultdict(int)` для хранения счетчиков доступа
6. Реализуйте методы: get, set, delete, clear, size
7. Используйте `sys.getsizeof()` для мониторинга размера кэша
8. Сериализуйте кэш в pickle формат для сохранения между сессиями


In [5]:
import sys
import pickle
from collections import namedtuple, OrderedDict, deque, Counter, defaultdict
from datetime import datetime

CacheEntry = namedtuple('CacheEntry', ['key', 'value', 'timestamp', 'access_count'])

class Cache:
    def __init__(self, max_size=100):
        self.cache = OrderedDict()
        self.access_history = deque(maxlen=1000)
        self.access_frequency = Counter()
        self.access_counters = defaultdict(int)
        self.max_size = max_size

    def get(self, key):
        if key in self.cache:
            entry = self.cache[key]
            self.cache.move_to_end(key)

            updated_entry = CacheEntry(
                key=entry.key,
                value=entry.value,
                timestamp=entry.timestamp,
                access_count=entry.access_count + 1
            )
            self.cache[key] = updated_entry

            self.access_history.append((datetime.now(), key, 'get'))
            self.access_frequency[key] += 1
            self.access_counters[key] += 1

            return updated_entry.value
        return None

    def set(self, key, value):
        if len(self.cache) >= self.max_size:
            self._evict()

        entry = CacheEntry(
            key=key,
            value=value,
            timestamp=datetime.now(),
            access_count=0
        )
        self.cache[key] = entry
        self.cache.move_to_end(key)

        self.access_history.append((datetime.now(), key, 'set'))
        self.access_frequency[key] += 1
        self.access_counters[key] = 0

    def delete(self, key):
        if key in self.cache:
            del self.cache[key]
            self.access_history.append((datetime.now(), key, 'delete'))
            return True
        return False

    def clear(self):
        self.cache.clear()
        self.access_history.clear()
        self.access_frequency.clear()
        self.access_counters.clear()

    def size(self):
        return len(self.cache)

    def _evict(self):
        if self.cache:
            oldest_key = next(iter(self.cache))
            del self.cache[oldest_key]

    def get_cache_size(self):
        total_size = 0
        for entry in self.cache.values():
            total_size += sys.getsizeof(entry.key)
            total_size += sys.getsizeof(entry.value)
            total_size += sys.getsizeof(entry)
        return total_size

    def save_cache(self, filename='cache.pkl'):
        with open(filename, 'wb') as f:
            pickle.dump({
                'cache': dict(self.cache),
                'access_history': list(self.access_history),
                'access_frequency': dict(self.access_frequency),
                'access_counters': dict(self.access_counters)
            }, f)

    def load_cache(self, filename='cache.pkl'):
        try:
            with open(filename, 'rb') as f:
                data = pickle.load(f)
                self.cache = OrderedDict(data['cache'])
                self.access_history = deque(data['access_history'], maxlen=1000)
                self.access_frequency = Counter(data['access_frequency'])
                self.access_counters = defaultdict(int, data['access_counters'])
        except FileNotFoundError:
            pass

cache = Cache(max_size=5)

for i in range(10):
    cache.set(f'key{i}', f'value{i}')

cache.get('key5')
cache.get('key6')
cache.get('key5')

print(f"размер кэша: {cache.size()}")
print(f"размер в памяти: {cache.get_cache_size()} байт")
print(f"частота доступа: {dict(cache.access_frequency.most_common(3))}")

cache.save_cache()

размер кэша: 5
размер в памяти: 820 байт
частота доступа: {'key5': 3, 'key6': 2, 'key0': 1}


**Задача 7: Анализатор текста**

Создайте анализатор текста используя collections:

1. Создайте namedtuple `WordInfo` с полями: word, frequency, length, first_occurrence
2. Используйте `Counter` для подсчета частоты слов
3. Используйте `defaultdict(list)` для группировки слов по длине
4. Используйте `deque` для хранения последних 50 уникальных слов
5. Используйте `OrderedDict` для хранения слов в порядке первого появления
6. Используйте `os.path` для работы с текстовыми файлами
7. Используйте `sys.getsizeof()` для анализа памяти
8. Сохраните результаты анализа в JSON файл


In [6]:
import json
import os
import sys
from collections import namedtuple, Counter, defaultdict, deque, OrderedDict
import re

WordInfo = namedtuple('WordInfo', ['word', 'frequency', 'length', 'first_occurrence'])

class TextAnalyzer:
    def __init__(self):
        self.word_frequency = Counter()
        self.words_by_length = defaultdict(list)
        self.recent_words = deque(maxlen=50)
        self.first_occurrence = OrderedDict()
        self.position_counter = 0

    def analyze_text(self, text):
        words = re.findall(r'\b\w+\b', text.lower())

        for word in words:
            self.position_counter += 1

            self.word_frequency[word] += 1

            self.words_by_length[len(word)].append(word)

            if word not in self.first_occurrence:
                self.first_occurrence[word] = self.position_counter
                if word not in self.recent_words:
                    self.recent_words.append(word)

    def analyze_file(self, filename):
        if os.path.exists(filename):
            with open(filename, 'r', encoding='utf-8') as f:
                text = f.read()
                self.analyze_text(text)

    def get_word_info(self, word):
        if word in self.word_frequency:
            return WordInfo(
                word=word,
                frequency=self.word_frequency[word],
                length=len(word),
                first_occurrence=self.first_occurrence[word]
            )
        return None

    def get_memory_usage(self):
        return {
            'word_frequency': sys.getsizeof(self.word_frequency),
            'words_by_length': sys.getsizeof(self.words_by_length),
            'recent_words': sys.getsizeof(self.recent_words),
            'first_occurrence': sys.getsizeof(self.first_occurrence)
        }

    def save_results(self, filename='analysis.json'):
        results = {
            'most_common_words': self.word_frequency.most_common(10),
            'words_by_length': {length: words[:10] for length, words in self.words_by_length.items()},
            'recent_words': list(self.recent_words),
            'first_occurrence': dict(list(self.first_occurrence.items())[:20]),
            'memory_usage': self.get_memory_usage(),
            'total_unique_words': len(self.word_frequency),
            'total_words': sum(self.word_frequency.values())
        }

        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(results, f, indent=2, ensure_ascii=False)

analyzer = TextAnalyzer()

sample_text = """
Это пример текста для анализа. Текст содержит несколько слов разной длины.
Некоторые слова повторяются несколько раз в этом тексте для демонстрации частоты.
Анализ текста помогает понять структуру и содержание документа.
"""

analyzer.analyze_text(sample_text)
analyzer.analyze_text("дополнительный текст с новыми и повторяющимися словами")

print(f"уникальных слов: {len(analyzer.word_frequency)}")
print(f"всего слов: {sum(analyzer.word_frequency.values())}")
print(f"последние слова: {list(analyzer.recent_words)}")

analyzer.save_results()

уникальных слов: 32
всего слов: 37
последние слова: ['это', 'пример', 'текста', 'для', 'анализа', 'текст', 'содержит', 'несколько', 'слов', 'разной', 'длины', 'некоторые', 'слова', 'повторяются', 'раз', 'в', 'этом', 'тексте', 'демонстрации', 'частоты', 'анализ', 'помогает', 'понять', 'структуру', 'и', 'содержание', 'документа', 'дополнительный', 'с', 'новыми', 'повторяющимися', 'словами']


**Задача 8: Система управления задачами**

Создайте систему управления задачами (TODO) используя все изученные концепции:

1. Создайте namedtuple `Task` с полями: id, title, description, priority, status, created_date
2. Используйте `defaultdict(list)` для группировки задач по статусу (todo, in_progress, done)
3. Используйте `deque` для очереди задач с высоким приоритетом
4. Используйте `Counter` для подсчета задач по приоритету
5. Используйте `OrderedDict` для хранения задач в порядке создания
6. Используйте `ChainMap` для объединения различных списков задач
7. Используйте `os.path` для работы с файлами задач
8. Реализуйте функции: add_task, complete_task, get_tasks_by_status, get_priority_queue
9. Сериализуйте все данные в JSON и pickle форматы


In [7]:
import json
import pickle
import os
from collections import namedtuple, defaultdict, deque, Counter, OrderedDict, ChainMap
from datetime import datetime

Task = namedtuple('Task', ['id', 'title', 'description', 'priority', 'status', 'created_date'])

class TaskManager:
    def __init__(self):
        self.tasks_by_status = defaultdict(list)
        self.priority_queue = deque()
        self.priority_counter = Counter()
        self.ordered_tasks = OrderedDict()
        self.task_sources = ChainMap({})
        self.next_id = 1

    def add_task(self, title, description, priority=1):
        task_id = self.next_id
        self.next_id += 1

        task = Task(
            id=task_id,
            title=title,
            description=description,
            priority=priority,
            status='todo',
            created_date=datetime.now()
        )

        self.tasks_by_status['todo'].append(task)
        if priority >= 3:
            self.priority_queue.append(task)
        self.priority_counter[priority] += 1
        self.ordered_tasks[task_id] = task

        return task

    def complete_task(self, task_id):
        if task_id in self.ordered_tasks:
            task = self.ordered_tasks[task_id]

            self.tasks_by_status[task.status].remove(task)
            self.tasks_by_status['done'].append(task)

            updated_task = task._replace(status='done')
            self.ordered_tasks[task_id] = updated_task

            if task in self.priority_queue:
                self.priority_queue.remove(task)

            return updated_task
        return None

    def get_tasks_by_status(self, status):
        return self.tasks_by_status.get(status, [])

    def get_priority_queue(self):
        return list(self.priority_queue)

    def add_task_source(self, source_name, tasks):
        self.task_sources = self.task_sources.new_child({source_name: tasks})

    def save_json(self, filename='tasks.json'):
        def convert_task(task):
            return {
                'id': task.id,
                'title': task.title,
                'description': task.description,
                'priority': task.priority,
                'status': task.status,
                'created_date': task.created_date.isoformat()
            }

        tasks_data = {
            'tasks_by_status': {status: [convert_task(task) for task in tasks]
                               for status, tasks in self.tasks_by_status.items()},
            'priority_queue': [convert_task(task) for task in self.priority_queue],
            'priority_counts': dict(self.priority_counter),
            'ordered_tasks': {task_id: convert_task(task)
                             for task_id, task in self.ordered_tasks.items()},
            'next_id': self.next_id
        }

        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(tasks_data, f, indent=2, ensure_ascii=False)

    def save_pickle(self, filename='tasks.pkl'):
        data = {
            'tasks_by_status': self.tasks_by_status,
            'priority_queue': self.priority_queue,
            'priority_counter': self.priority_counter,
            'ordered_tasks': self.ordered_tasks,
            'task_sources': dict(self.task_sources),
            'next_id': self.next_id
        }

        with open(filename, 'wb') as f:
            pickle.dump(data, f)

    def load_json(self, filename='tasks.json'):
        if os.path.exists(filename):
            with open(filename, 'r', encoding='utf-8') as f:
                data = json.load(f)

                for status, tasks in data['tasks_by_status'].items():
                    for task_data in tasks:
                        task = Task(
                            id=task_data['id'],
                            title=task_data['title'],
                            description=task_data['description'],
                            priority=task_data['priority'],
                            status=task_data['status'],
                            created_date=datetime.fromisoformat(task_data['created_date'])
                        )
                        self.tasks_by_status[status].append(task)
                        self.ordered_tasks[task.id] = task

                self.next_id = data['next_id']

manager = TaskManager()

manager.add_task("задача 1", "описание задачи 1", 2)
manager.add_task("срочная задача", "важная задача", 3)
manager.add_task("обычная задача", "простая задача", 1)

manager.complete_task(1)

print(f"задачи в работе: {len(manager.get_tasks_by_status('in_progress'))}")
print(f"выполненные задачи: {len(manager.get_tasks_by_status('done'))}")
print(f"очередь приоритетных: {len(manager.get_priority_queue())}")

manager.save_json()
manager.save_pickle()

задачи в работе: 0
выполненные задачи: 1
очередь приоритетных: 1


**Задача 9: Система мониторинга производительности**

Создайте систему мониторинга производительности используя sys и collections:

1. Создайте namedtuple `PerformanceMetric` с полями: function_name, execution_time, memory_usage, timestamp
2. Используйте `deque` для хранения последних 100 измерений производительности
3. Используйте `defaultdict(list)` для группировки метрик по функциям
4. Используйте `Counter` для подсчета количества вызовов каждой функции
5. Используйте `OrderedDict` для хранения метрик в хронологическом порядке
6. Используйте `sys.getsizeof()` для мониторинга памяти
7. Используйте `os.path` для работы с файлами метрик
8. Реализуйте функции: record_metric, get_function_stats, get_memory_usage, export_metrics
9. Сериализуйте метрики в JSON и pickle форматы


In [8]:
import json
import pickle
import os
import sys
import time
from collections import namedtuple, deque, defaultdict, Counter, OrderedDict
from datetime import datetime

PerformanceMetric = namedtuple('PerformanceMetric', ['function_name', 'execution_time', 'memory_usage', 'timestamp'])

class PerformanceMonitor:
    def __init__(self):
        self.metrics_buffer = deque(maxlen=100)
        self.metrics_by_function = defaultdict(list)
        self.function_call_counter = Counter()
        self.chronological_metrics = OrderedDict()

    def record_metric(self, function_name, execution_time, memory_usage):
        timestamp = datetime.now()
        metric = PerformanceMetric(function_name, execution_time, memory_usage, timestamp)

        self.metrics_buffer.append(metric)
        self.metrics_by_function[function_name].append(metric)
        self.function_call_counter[function_name] += 1
        self.chronological_metrics[timestamp] = metric

        return metric

    def get_function_stats(self, function_name):
        if function_name in self.metrics_by_function:
            metrics = self.metrics_by_function[function_name]
            execution_times = [m.execution_time for m in metrics]
            memory_usages = [m.memory_usage for m in metrics]

            return {
                'call_count': len(metrics),
                'avg_execution_time': sum(execution_times) / len(execution_times),
                'avg_memory_usage': sum(memory_usages) / len(memory_usages),
                'min_execution_time': min(execution_times),
                'max_execution_time': max(execution_times)
            }
        return None

    def get_memory_usage(self):
        return {
            'metrics_buffer': sys.getsizeof(self.metrics_buffer),
            'metrics_by_function': sys.getsizeof(self.metrics_by_function),
            'function_call_counter': sys.getsizeof(self.function_call_counter),
            'chronological_metrics': sys.getsizeof(self.chronological_metrics)
        }

    def export_metrics(self, filename_prefix='metrics'):
        def convert_metric(metric):
            return {
                'function_name': metric.function_name,
                'execution_time': metric.execution_time,
                'memory_usage': metric.memory_usage,
                'timestamp': metric.timestamp.isoformat()
            }

        json_data = {
            'metrics_buffer': [convert_metric(m) for m in self.metrics_buffer],
            'metrics_by_function': {func: [convert_metric(m) for m in metrics]
                                   for func, metrics in self.metrics_by_function.items()},
            'function_call_counter': dict(self.function_call_counter),
            'chronological_metrics': {ts.isoformat(): convert_metric(metric)
                                     for ts, metric in self.chronological_metrics.items()},
            'memory_usage': self.get_memory_usage()
        }

        with open(f'{filename_prefix}.json', 'w', encoding='utf-8') as f:
            json.dump(json_data, f, indent=2, ensure_ascii=False)

        pickle_data = {
            'metrics_buffer': self.metrics_buffer,
            'metrics_by_function': self.metrics_by_function,
            'function_call_counter': self.function_call_counter,
            'chronological_metrics': self.chronological_metrics
        }

        with open(f'{filename_prefix}.pkl', 'wb') as f:
            pickle.dump(pickle_data, f)

def monitor_function(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        start_memory = sys.getsizeof(args) + sys.getsizeof(kwargs)

        result = func(*args, **kwargs)

        end_time = time.time()
        end_memory = sys.getsizeof(result)

        execution_time = end_time - start_time
        memory_usage = end_memory - start_memory

        monitor.record_metric(func.__name__, execution_time, memory_usage)

        return result
    return wrapper

@monitor_function
def example_function(n):
    time.sleep(0.1)
    return [i for i in range(n)]

@monitor_function
def another_function(text):
    time.sleep(0.05)
    return text.upper()

monitor = PerformanceMonitor()

for i in range(5):
    example_function(1000)
    another_function("test string")

print(f"всего записей: {len(monitor.metrics_buffer)}")
print(f"вызовы функций: {dict(monitor.function_call_counter)}")

stats = monitor.get_function_stats('example_function')
if stats:
    print(f"статистика example_function: {stats}")

print(f"использование памяти: {monitor.get_memory_usage()}")

monitor.export_metrics()

всего записей: 10
вызовы функций: {'example_function': 5, 'another_function': 5}
статистика example_function: {'call_count': 5, 'avg_execution_time': 0.10022220611572266, 'avg_memory_usage': 8744.0, 'min_execution_time': 0.10014867782592773, 'max_execution_time': 0.10033369064331055}
использование памяти: {'metrics_buffer': 760, 'metrics_by_function': 192, 'function_call_counter': 200, 'chronological_metrics': 864}


**Задача 10: Комплексная система управления данными**

Создайте комплексную систему управления данными, объединяющую все изученные концепции:

1. Создайте несколько namedtuple для различных типов данных (User, Product, Order, etc.)
2. Используйте `defaultdict` для создания индексов по различным полям
3. Используйте `deque` для реализации очередей обработки данных
4. Используйте `Counter` для аналитики и статистики
5. Используйте `OrderedDict` для хранения данных в определенном порядке
6. Используйте `ChainMap` для объединения различных источников данных
7. Используйте `os` и `sys` для работы с файловой системой и мониторинга
8. Реализуйте CRUD операции (Create, Read, Update, Delete)
9. Добавьте функции экспорта/импорта данных в различных форматах
10. Сериализуйте все данные в JSON, pickle и другие форматы
11. Добавьте типизацию для всех функций и классов
12. Реализуйте систему логирования для отслеживания операций


In [10]:
import json
import pickle
import os
import sys
from collections import namedtuple, defaultdict, deque, Counter, OrderedDict, ChainMap
from datetime import datetime
from typing import Dict, List, Optional, Any, Union

User = namedtuple('User', ['id', 'name', 'email', 'created_at'])
Product = namedtuple('Product', ['id', 'name', 'price', 'category', 'stock'])
Order = namedtuple('Order', ['id', 'user_id', 'product_id', 'quantity', 'status', 'created_at'])

class DataManager:
    def __init__(self):
        self.users: OrderedDict = OrderedDict()
        self.products: OrderedDict = OrderedDict()
        self.orders: OrderedDict = OrderedDict()

        self.user_indexes = defaultdict(dict)
        self.product_indexes = defaultdict(dict)
        self.order_indexes = defaultdict(dict)

        self.processing_queue = deque(maxlen=100)
        self.data_sources = ChainMap({})

        self.stats_counter = Counter()
        self.operation_log = deque(maxlen=200)

        self.next_ids = {'user': 1, 'product': 1, 'order': 1}

    def _log_operation(self, operation: str, entity: str, entity_id: int) -> None:
        log_entry = f"{datetime.now()}: {operation} {entity} {entity_id}"
        self.operation_log.append(log_entry)
        self.stats_counter[f"{operation}_{entity}"] += 1

    def create_user(self, name: str, email: str) -> User:
        user_id = self.next_ids['user']
        self.next_ids['user'] += 1

        user = User(user_id, name, email, datetime.now())
        self.users[user_id] = user

        self.user_indexes['email'][email] = user_id
        self.user_indexes['name'][name] = user_id

        self._log_operation('CREATE', 'user', user_id)
        return user

    def get_user(self, user_id: int) -> Optional[User]:
        self._log_operation('READ', 'user', user_id)
        return self.users.get(user_id)

    def update_user(self, user_id: int, **kwargs) -> Optional[User]:
        if user_id not in self.users:
            return None

        old_user = self.users[user_id]
        user_dict = old_user._asdict()
        user_dict.update(kwargs)

        new_user = User(**user_dict)
        self.users[user_id] = new_user

        if 'email' in kwargs:
            self.user_indexes['email'][kwargs['email']] = user_id
        if 'name' in kwargs:
            self.user_indexes['name'][kwargs['name']] = user_id

        self._log_operation('UPDATE', 'user', user_id)
        return new_user

    def delete_user(self, user_id: int) -> bool:
        if user_id in self.users:
            user = self.users[user_id]
            del self.users[user_id]

            if user.email in self.user_indexes['email']:
                del self.user_indexes['email'][user.email]
            if user.name in self.user_indexes['name']:
                del self.user_indexes['name'][user.name]

            self._log_operation('DELETE', 'user', user_id)
            return True
        return False

    def create_product(self, name: str, price: float, category: str, stock: int) -> Product:
        product_id = self.next_ids['product']
        self.next_ids['product'] += 1

        product = Product(product_id, name, price, category, stock)
        self.products[product_id] = product

        self.product_indexes['category'][category] = product_id
        self.product_indexes['name'][name] = product_id

        self._log_operation('CREATE', 'product', product_id)
        return product

    def create_order(self, user_id: int, product_id: int, quantity: int) -> Optional[Order]:
        if user_id not in self.users or product_id not in self.products:
            return None

        order_id = self.next_ids['order']
        self.next_ids['order'] += 1

        order = Order(order_id, user_id, product_id, quantity, 'pending', datetime.now())
        self.orders[order_id] = order

        self.order_indexes['user_id'][user_id] = order_id
        self.order_indexes['product_id'][product_id] = order_id
        self.order_indexes['status']['pending'] = order_id

        self.processing_queue.append(('process_order', order_id))

        self._log_operation('CREATE', 'order', order_id)
        return order

    def get_entities_by_index(self, entity_type: str, index_field: str, value: Any) -> List:
        if entity_type == 'user':
            entities = self.users
            indexes = self.user_indexes
        elif entity_type == 'product':
            entities = self.products
            indexes = self.product_indexes
        elif entity_type == 'order':
            entities = self.orders
            indexes = self.order_indexes
        else:
            return []

        if value in indexes[index_field]:
            entity_id = indexes[index_field][value]
            return [entities[entity_id]]
        return []

    def add_data_source(self, source_name: str, data: Dict) -> None:
        self.data_sources = self.data_sources.new_child({source_name: data})

    def get_memory_usage(self) -> Dict[str, int]:
        return {
            'users': sys.getsizeof(self.users),
            'products': sys.getsizeof(self.products),
            'orders': sys.getsizeof(self.orders),
            'indexes': sys.getsizeof(self.user_indexes) + sys.getsizeof(self.product_indexes) + sys.getsizeof(self.order_indexes),
            'queue': sys.getsizeof(self.processing_queue),
            'logs': sys.getsizeof(self.operation_log)
        }

    def export_data(self, format_type: str = 'json') -> None:
        if format_type == 'json':
            def convert_entity(entity):
                data = entity._asdict()
                for key, value in data.items():
                    if isinstance(value, datetime):
                        data[key] = value.isoformat()
                return data

            data = {
                'users': [convert_entity(user) for user in self.users.values()],
                'products': [convert_entity(product) for product in self.products.values()],
                'orders': [convert_entity(order) for order in self.orders.values()],
                'stats': dict(self.stats_counter),
                'logs': list(self.operation_log)
            }

            with open('data_export.json', 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)

        elif format_type == 'pickle':
            data = {
                'users': self.users,
                'products': self.products,
                'orders': self.orders,
                'indexes': dict(self.user_indexes),
                'stats': self.stats_counter,
                'next_ids': self.next_ids
            }

            with open('data_export.pkl', 'wb') as f:
                pickle.dump(data, f)

    def import_data(self, format_type: str = 'json') -> None:
        if format_type == 'json' and os.path.exists('data_export.json'):
            with open('data_export.json', 'r', encoding='utf-8') as f:
                data = json.load(f)

                for user_data in data['users']:
                    user_data['created_at'] = datetime.fromisoformat(user_data['created_at'])
                    user = User(**user_data)
                    self.users[user.id] = user

                for product_data in data['products']:
                    product = Product(**product_data)
                    self.products[product.id] = product

                for order_data in data['orders']:
                    order_data['created_at'] = datetime.fromisoformat(order_data['created_at'])
                    order = Order(**order_data)
                    self.orders[order.id] = order

manager = DataManager()

user1 = manager.create_user("иван иванов", "ivan@mail.com")
user2 = manager.create_user("петр петров", "petr@mail.com")

product1 = manager.create_product("ноутбук", 50000.0, "электроника", 10)
product2 = manager.create_product("книга", 500.0, "литература", 100)

order1 = manager.create_order(user1.id, product1.id, 1)
order2 = manager.create_order(user2.id, product2.id, 2)

print(f"всего пользователей: {len(manager.users)}")
print(f"всего товаров: {len(manager.products)}")
print(f"всего заказов: {len(manager.orders)}")
print(f"статистика операций: {dict(manager.stats_counter)}")
print(f"использование памяти: {manager.get_memory_usage()}")

manager.export_data('json')
manager.export_data('pickle')

всего пользователей: 2
всего товаров: 2
всего заказов: 2
статистика операций: {'CREATE_user': 2, 'CREATE_product': 2, 'CREATE_order': 2}
использование памяти: {'users': 416, 'products': 416, 'orders': 416, 'indexes': 576, 'queue': 760, 'logs': 760}


#### РЕКОМЕНДАЦИИ ПО ВЫПОЛНЕНИЮ ЗАДАЧ

1. **Начните с простых задач** (1-3) для понимания базовых концепций
2. **Используйте типизацию** - добавляйте type hints для всех функций и переменных
3. **Структурируйте код** - разбивайте задачи на логические функции и классы
4. **Документируйте код** - добавляйте docstrings для всех функций
5. **Тестируйте сериализацию** - убедитесь, что данные корректно сохраняются и загружаются
6. **Обрабатывайте ошибки** - используйте try-except блоки для обработки исключений
7. **Оптимизируйте память** - используйте `sys.getsizeof()` для мониторинга использования памяти
8. **Следуйте принципам** - код должен быть читаемым, понятным и структурированным

**Дополнительные требования:**
- Все объекты должны быть сериализованы в соответствующие форматы
- Код должен содержать комментарии на русском языке
- Функции должны быть небольшими и выполнять одну задачу
- Используйте только те библиотеки и концепции, которые представлены в уроке


In [None]:
import math

def sqrt1_plus_x_taylor(x, epsilon=1e-10):

    # проверка области сходимости ряда
    if abs(x) > 1:
        raise ValueError("сходится только при |x| <= 1")

    def _iter():
        n = 0
        current_term = 1.0

        while True:
            yield current_term
            n += 1
            current_term = current_term * ((0.5 - (n - 1)) / n) * x

    total = 0.0
    prev_total = 0.0
    iterations = 0

    for term in _iter():
        total += term
        iterations += 1

        # проверяем точность
        if abs(total - prev_total) < epsilon:
            break
        prev_total = total

    return total, iterations

test_values = [0.0, 0.5, 1.0, -0.5, 0.25]

for x in test_values:
    approx, iters = sqrt1_plus_x_taylor(x)
    exact = math.sqrt(1 + x)

    print(f"x = {x:5.2f}")
    print(f"  Точное значение: {exact:.10f}")
    print(f"  Приближение:     {approx:.10f}")
    print(f"  Итераций:        {iters}")
    print("-" * 50)

x =  0.00
  Точное значение: 1.0000000000
  Приближение:     1.0000000000
  Итераций:        2
--------------------------------------------------
x =  0.50
  Точное значение: 1.2247448714
  Приближение:     1.2247448714
  Итераций:        26
--------------------------------------------------
x =  1.00
  Точное значение: 1.4142135624
  Приближение:     1.4142135624
  Итераций:        1996476
--------------------------------------------------
x = -0.50
  Точное значение: 0.7071067812
  Приближение:     0.7071067812
  Итераций:        26
--------------------------------------------------
x =  0.25
  Точное значение: 1.1180339887
  Приближение:     1.1180339888
  Итераций:        14
--------------------------------------------------


In [None]:
import math
from typing import Generator

def taylor_gen(x: float) -> Generator[float, None, None]:
    n: int = 0
    current_term: int = 1.0

    while True:
        yield current_term
        n += 1
        current_term = current_term * ((0.5 - (n - 1)) / n) * x

def sqrt1_plus_x_taylor(x: float, epsilon: float = 1e-10) -> tuple[float, int]:

    if abs(x) > 1:
        raise ValueError("сходится только при |x| <= 1")

    total = 0.0
    prev_total = 0.0
    iterations = 0

    # создаем генератор
    generator = taylor_gen(x)

    while True:
        # получаем следующий член ряда
        term = next(generator)
        total += term
        iterations += 1

        # проверяем точность
        if abs(total - prev_total) < epsilon:
            break
        prev_total = total

    return total, iterations

test_values = [0.0, 0.5, 1.0, -0.5, 0.25]

for x in test_values:
    approx, iters = sqrt1_plus_x_taylor(x)
    exact = math.sqrt(1 + x)

    print(f"x = {x:5.2f}")
    print(f"  Точное значение: {exact:.10f}")
    print(f"  Приближение:     {approx:.10f}")
    print(f"  Итераций:        {iters}")
    print("-" * 50)

x =  0.00
  Точное значение: 1.0000000000
  Приближение:     1.0000000000
  Итераций:        2
--------------------------------------------------
x =  0.50
  Точное значение: 1.2247448714
  Приближение:     1.2247448714
  Итераций:        26
--------------------------------------------------
x =  1.00
  Точное значение: 1.4142135624
  Приближение:     1.4142135624
  Итераций:        1996476
--------------------------------------------------
x = -0.50
  Точное значение: 0.7071067812
  Приближение:     0.7071067812
  Итераций:        26
--------------------------------------------------
x =  0.25
  Точное значение: 1.1180339887
  Приближение:     1.1180339888
  Итераций:        14
--------------------------------------------------
