Стандартные библиотеки Python : 
os, sys, collections и другие являются мощным инструментом, которым нужно и полезно уметь пользоваться. 

Ресурсы к ознакомлению:

sys -- https://docs.python.org/3/library/sys.html

os -- https://docs.python.org/3/library/os.html

collections -- https://docs.python.org/3/library/collections.html

#### COLLECTIONS

In [61]:
from IPython.testing.tools import full_path

"""
    Начнем с collections. Библиотека хранит в себе специальные контейнеры, 
    расширяющие классический список контейнеров: списков, кортежей и словарей.
"""
import collections
"""
    Посмотреть содержащиеся полезные модули можно, написав функцию ниже одной строкой.
"""
print('\n'.join(list(filter(lambda x: not x.startswith('_') and x != 'abc', collections.__dict__.keys()))))
# содержательно эта строка станет понятна по мере освоения курса
# она неявно содержит в себе концепции list comprehension, а также парадигмы функционального программирования
# и в целом демонстрирует лаконичность и эффективность инструментов языка python

deque
defaultdict
OrderedDict
namedtuple
Counter
ChainMap
UserDict
UserList
UserString


In [None]:
'''
    Начнем с рассмотрения namedtuple. Это именованный кортеж, который то же самое, что и кортеж,
    но допускает обращение к своим полям по имени. Более точно, это специальный тип класса с реализованным дандер методом '__repr__()'
'''
from collections import namedtuple
from typing import NamedTuple # нужна для указания типа, или в качестве указания как родительского класса при создании дочернего

CallOption = namedtuple('CallOption', ['strike', 'expiration_date'])

my_option: CallOption = CallOption(strike=100, expiration_date='2026-01-01')
print(my_option, type(my_option))
print(my_option.strike, type(my_option.strike), my_option.expiration_date, type(my_option.expiration_date))

# my_option.strike = 101 # AttributeError: can't set attribute
'''
    Для работы с именованными кортежами полезно пользоваться некоторыми методами
'''
# namedtuple._asdict() -- представляет объект класса как словарь
print('Достаем словарь:', my_option._asdict(), type(my_option._asdict()))

# namedtuple._replace -- возвращает новый экземляр  замененным полем
print('Создаем новый именованный кортеж:', my_option._replace(strike=200))

# namedtuple._fields -- получить кортеж строк полей
print(my_option._fields, type(my_option._fields))

CallOption(strike=100, expiration_date='2026-01-01') <class '__main__.CallOption'>
100 <class 'int'> 2026-01-01 <class 'str'>
Достаем словарь: {'strike': 100, 'expiration_date': '2026-01-01'} <class 'dict'>
Создаем новый именованный кортеж: CallOption(strike=200, expiration_date='2026-01-01')
('strike', 'expiration_date') <class 'tuple'>


In [98]:
"""
    Далее deque. Это список с эффективной реализацией добавления и извлечения элементов.
    В соответствии с описанием в документации:
        Дек это обобщение стека и очереди. Он потоко-безопасный (thread-safe, то есть его можно параллелить 
        на уровне потоков без опасения измененения объекта со стороны другого потока, здравствуй impute и lock),
        эффективный по памяти и требует О(1) времени на добавление и извлечение элемента с начала и конца списка.
"""
from collections import deque

my_deque = deque(maxlen=10)
_ = [my_deque.append(i) for i in range(13)]
print(my_deque, ', ограничен по длине ', len(my_deque), '\n')

print('Убрать элемент слева:', my_deque.popleft(), ', в очереди останется:', my_deque, ', длина ', len(my_deque))
print('Убрать элемент справа:', my_deque.pop(), ', в очереди останется:', my_deque, ', длина ', len(my_deque), '\n')

print('Добавить элемент слева:', my_deque.appendleft(0), ', в очереди станет:', my_deque, ', длина ', len(my_deque))
print('Добавить элемент справа:', my_deque.append(0), ', в очереди станет:', my_deque, ', длина ', len(my_deque))
print('Добавить элемент слева:', my_deque.appendleft(0), ', в очереди станет:', my_deque, ', длина ', len(my_deque))
print('Добавить элемент справа:', my_deque.append(0), ', в очереди станет:', my_deque, ', длина ', len(my_deque))
print('Добавить еще элемент слева:', my_deque.appendleft(0), ', в очереди станет:', my_deque, ', длина ', len(my_deque))
print('Добавить еще элемент слева:', my_deque.appendleft(0), ', в очереди станет:', my_deque, ', длина ', len(my_deque))
'''
    Очередь позволяет эффективно осуществлять операции вставки и изъятия объектов в начале и в конце списка.
    И кроме того в ней встроена защита от дурака, которая не позволит создавать бесконечно большой список.
'''
print(my_deque * 2 ** 10, '\n') # мы можем бесконечно долго из-за ошибки в коде нарашивать длину списка, но не сможем для очереди
'''
    deque поддерживает многие операции, доступные list:
    insert, clear, copy, count, extend, remove, reverse
    Есть свой метод rotate(n=1), который по принципу RR(RoundRobin) сдвигает все элементы на n вправо (n < 0 -- влево)
'''
print('используем rotate(2):', my_deque.rotate(2), ', получаем:', my_deque)

deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10) , ограничен по длине  10 

Убрать элемент слева: 3 , в очереди останется: deque([4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10) , длина  9
Убрать элемент справа: 12 , в очереди останется: deque([4, 5, 6, 7, 8, 9, 10, 11], maxlen=10) , длина  8 

Добавить элемент слева: None , в очереди станет: deque([0, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10) , длина  9
Добавить элемент справа: None , в очереди станет: deque([0, 4, 5, 6, 7, 8, 9, 10, 11, 0], maxlen=10) , длина  10
Добавить элемент слева: None , в очереди станет: deque([0, 0, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10) , длина  10
Добавить элемент справа: None , в очереди станет: deque([0, 4, 5, 6, 7, 8, 9, 10, 11, 0], maxlen=10) , длина  10
Добавить еще элемент слева: None , в очереди станет: deque([0, 0, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10) , длина  10
Добавить еще элемент слева: None , в очереди станет: deque([0, 0, 0, 4, 5, 6, 7, 8, 9, 10], maxlen=10) , длина  10
deque([0, 0, 0, 4, 5, 6, 7, 8,

In [None]:
'''
    следующий объект -- defaultdict. Он расширяет обыкновенный словарь, добавляя метод,
    который вызывается каждый раз при отсуствии ключа, по которому произошло обращение. 
    При создании словаря с дефолтным значением в случае неуказания этого дефолтного значения (аргумент default_factory)
    он работает как обычный словарь. Если default_factory указан, то по указанному отсутствующему ключу вызывается default_factory 
    Сигнатура такая : defaultdict(default_factory: Callable[[None], object])
    При этом 
'''

from collections import defaultdict

my_default_dict = defaultdict()
# my_default_dict['0'] # KeyError

# 1 способ
my_default_dict = defaultdict(lambda: 'missing_value')
print(my_default_dict['0'])

# 2 способ
def return_dafault_value():
    return 'missing_value'

my_default_dict = defaultdict(return_dafault_value)
print(my_default_dict['0'])

# 3 способ
def return_dafault_value(value):
    return lambda: value

my_default_dict = defaultdict(return_dafault_value('arbitraty_value_inserted_right_here'))
print(my_default_dict['0'])

# другие варианты
my_default_dict = defaultdict(list) # дефолтом ставим список
my_default_dict['fridge'].append('egg')
my_default_dict['fridge'].append('steak')
my_default_dict['fridge'].append('butter')
my_default_dict['pocket'].append('money')
print(my_default_dict)

missing_value
missing_value
arbitraty_value_inserted_right_here
defaultdict(<class 'list'>, {'fridge': ['egg', 'steak', 'butter'], 'pocket': ['money']})


In [None]:
'''
    OrderedDict -- это словарь, который помнит порядок добавления ключей. Начиная с python 3.7 это умеет и дефолтный словарь (dict, не путать с defaultdict)
    Так что по большому счету он не настолько актуален. Однако если вам нужно в явном виде донести важную идею вашего кода,
    то использование OrderedDict оправдано вашими целями
'''

from collections import OrderedDict

my_ordered_dict: OrderedDict = OrderedDict.fromkeys('abcde')
print('Упорядоченный словарь:', my_ordered_dict, type(my_ordered_dict))
my_ordered_dict.move_to_end('b')
print('Переупорядочили ключи:', ''.join(my_ordered_dict))

my_ordered_dict.move_to_end('b', last=False)
print('Переупорядочили ключи сноав:', ''.join(my_ordered_dict))

print('Текущий порядок:', ''.join(my_ordered_dict), ', вытащим последний добавленный элемент (LIFO):', my_ordered_dict.popitem())
print('Текущий порядок:', ''.join(my_ordered_dict), ', вытащим первый добавленный элемент (FIFO):', my_ordered_dict.popitem(last=False))

Упорядоченный словарь: OrderedDict([('a', None), ('b', None), ('c', None), ('d', None), ('e', None)]) <class 'collections.OrderedDict'>
Переупорядочили ключи: acdeb
Переупорядочили ключи сноав: bacde
Текущий порядок: bacde , вытащим последний добавленный элемент (LIFO): ('e', None)
Текущий порядок: bacd , вытащим первый добавленный элемент (FIFO): ('b', None)


In [114]:
'''
    ChainMap -- подкласс словаря, который объединяет несколько отдельных словарей в один общий.
'''

from collections import ChainMap

my_dict_1 = dict(a=1, b=2, c=3)
my_dict_2 = dict(color='yellow', size=10)
my_dict_3 = dict(date='2025-01-01', time='16:00')

chain_map : ChainMap = ChainMap(my_dict_1, my_dict_2, my_dict_3)
print(chain_map, type(chain_map))
print('Ссылаемся на объект из 3го словаря:', chain_map['time'])
# print('Ссылаемся на объект, которого нет:', chain_map['weather']) # KeyError
# мы можем проитерироваться по всем объектам в chain'e
for key, value in chain_map.items():
    print(key, value)

ChainMap({'a': 1, 'b': 2, 'c': 3}, {'color': 'yellow', 'size': 10}, {'date': '2025-01-01', 'time': '16:00'}) <class 'collections.ChainMap'>
Ссылаемся на объект из 3го словаря: 16:00
date 2025-01-01
time 16:00
color yellow
size 10
a 1
b 2
c 3


In [121]:
'''
    Последний рассматриваемый -- Counter -- объект, который парсит итерируемый объект на тему повторяющихся элементов
    и формирует словарь, в котором ключами являются уникальные элементы входного списка, а значениями их количество.
'''

from collections import Counter

my_counter : Counter = Counter([0,1,2,3,3,2,1,1,1,0,5,0])
print('Счетчик:', my_counter, type(my_counter))

# дополнительно можно посмотреть наиболее часто встречающиеся объекты
print('Два самых частых числа (число, кол-во):', my_counter.most_common(2))

Счетчик: Counter({1: 4, 0: 3, 2: 2, 3: 2, 5: 1}) <class 'collections.Counter'>
Два самых частых числа (число, кол-во): [(1, 4), (0, 3)]


#### SYS and OS

In [None]:
'''
    библиотека os позволяет работать с некоторым функционалом операционной системы, 
    которой вы пользуетесь. Как и ранее для более полного ознакомления со всем функционалом рекомендуется читать документацию.
'''

import os
print(', '.join(list(filter(lambda x: not x.startswith('_'), os.__dict__.keys()))))

abc, sys, st, GenericAlias, name, linesep, stat, access, chdir, chmod, getcwd, getcwdb, link, listdir, lstat, mkdir, readlink, rename, replace, rmdir, symlink, system, umask, unlink, remove, utime, times, execv, execve, spawnv, spawnve, getpid, getppid, getlogin, kill, startfile, waitpid, open, close, closerange, device_encoding, dup, dup2, lseek, read, write, fstat, isatty, pipe, ftruncate, truncate, putenv, unsetenv, strerror, fsync, abort, urandom, get_terminal_size, cpu_count, get_inheritable, set_inheritable, get_handle_inheritable, set_handle_inheritable, scandir, fspath, waitstatus_to_exitcode, environ, F_OK, R_OK, W_OK, X_OK, TMP_MAX, O_RDONLY, O_WRONLY, O_RDWR, O_APPEND, O_CREAT, O_EXCL, O_TRUNC, O_BINARY, O_TEXT, O_NOINHERIT, O_SHORT_LIVED, O_TEMPORARY, O_RANDOM, O_SEQUENTIAL, EX_OK, P_WAIT, P_NOWAIT, P_NOWAITO, P_OVERLAY, P_DETACH, error, stat_result, statvfs_result, terminal_size, DirEntry, times_result, uname_result, path, curdir, pardir, sep, pathsep, defpath, extsep, alt

In [None]:
path = None

# environ содержит словарь с переменными окружения
os.environ

# getlogin возвращает имя пользователя
os.getlogin()

# getpid getppid -- id процесса и id его родительского процесса
os.getpid()
os.getppid()

# os.chmod(path, mode, ...) -- установить мод(режим) для файла по пути path

# вернуть текущую директорию
os.getcwd()

# вернуть список файлов в директории
os.listdir()

# создает директорию по указанному пути path; 
# если аргумент 'exist_ok' = True , то функция в случае существования директории не поднимет ошибку, если False -- FileExistsError
os.makedirs(path, exist_ok=True)

# remove / unlink удаляет файл по указанному пути; если указать директорию, будет выдана ошибка OSError
os.remove(path) 
os.unlink(path)

# remodirs удаляет директорию по указанному пути; в случае ошибки выдает OSError
os.remodirs(path) 

# переименовать src файл/директорию в dst
os.rename(src=None, dst=None)

# scandir возвращает итератор объектов, расположенных по пути path
os.scandir(path)

# возвращает количество логических процессоров в системе
os.cpu_count()

#####     os.path     #####

# abspath возвращает абсолютный путь к указанному относительному пути
os.path.abspath('.')

# вовзращает имя конечной точки указанного пути
os.path.basename(path)

# exists проверяет существование директории или файла по указанному пути и возрвращает True или False
os.path.exists(path)

# возвращают время
os.path.getatime('.') # посещения файла/директории
os.path.getmtime('.') # изменения файла/директории
os.path.getctime('.') # создания файла/директории

# вовзращает размер файла/директории в байтах
os.path.getsize(path)

# по-умному конкатенирует части путей в один
os.path.join(path, *paths)

# нормализует путь: удаляет ненужные/кривые символы
os.path.normpath(path)

# разбивает входную сроку-путь на (head, tail), tail последний файл/директория в пути
os.path.split(path)

# разбивает входную сроку-путь на путь и расширение файла
os.path.splitext('somebody/once/told.me')

In [1]:
'''
    библиотека sys позволяет работать с некоторым функционалом интерпретатора.
'''

import sys
print(', '.join(list(filter(lambda x: not x.startswith('_'), sys.__dict__.keys()))))

addaudithook, audit, breakpointhook, displayhook, exception, exc_info, excepthook, exit, getdefaultencoding, getallocatedblocks, getfilesystemencoding, getfilesystemencodeerrors, getrefcount, getrecursionlimit, getsizeof, getwindowsversion, intern, is_finalizing, setswitchinterval, getswitchinterval, setprofile, getprofile, setrecursionlimit, settrace, gettrace, call_tracing, set_coroutine_origin_tracking_depth, get_coroutine_origin_tracking_depth, set_asyncgen_hooks, get_asyncgen_hooks, unraisablehook, get_int_max_str_digits, set_int_max_str_digits, modules, stderr, version, hexversion, api_version, copyright, platform, maxsize, float_info, int_info, hash_info, maxunicode, builtin_module_names, stdlib_module_names, byteorder, dllhandle, winver, version_info, implementation, flags, float_repr_style, thread_info, meta_path, path_importer_cache, path_hooks, path, executable, prefix, base_prefix, exec_prefix, base_exec_prefix, platlibdir, pycache_prefix, argv, orig_argv, warnoptions, dont

In [None]:
# argv возвращает аргументы скрипта, [0] элемент это имя файла скрипта
sys.argv

# содержит copyright
sys.copyright

# очищает кеш интерпретатора
sys._clear_type_cache() # до python 3.13
# sys._clear_internal_caches() # python 3.13+

#  sys.exception() возвращает пойманную в 'except' ошибку; вне конструкции возвращает None (то есть ничего)
try:
    _ = 0 / 0
except:
    exception = sys.exception()
    
# возвращает SystemExit exception, сигнализирующий о завершении выполнения интерпретатора
# sys.exit()

# возвращает количество блоков памяти, занятых интерпретатором
sys.getallocatedblocks()

# возвращает предел рекурсии в интерпретаторе
sys.getrecursionlimit()
# sys.setrecursionlimit(N) # устанавливает лимит на рекурсию

# возвращает размер объекта в байтах
sys.getsizeof( 0 )

# возвращает глубину вложенных корутин
sys.get_coroutine_origin_tracking_depth()

# список всех текущих модулей
sys.modules.keys()

# версия интерпретатора
sys.version



#### TYPING

In [None]:
'''
    В общем, строгая типизация и аннотирование вашего кода значительно упростят
    всю дальнейшую работу с ним. Как вам, так и другим, кто будет его читать.
'''

import typing
print(', '.join(sorted(list(filter(lambda x: not x.startswith('_'), typing.__dict__.keys())))))

'''
    Вам желательно запомнить некоторые наиболее часто употребляемые типы.
    Касательно остальных было неплохо их хотя бы знать, чтобы при случае вы могли ими воспользоваться.
'''



#### SERIALIZATION

In [17]:
'''
wikipedia
    Сериализация (в программировании) — процесс перевода структуры данных в битовую последовательность. 
    Обратной к операции сериализации является операция десериализации (структуризации) — 
        создание структуры данных из битовой последовательности.
    Мы будем ссылаться на сериализацию как на возможность создания и сохранения файла 
    из некоторого объекта python.
    Есть несколько библиотек, которые позволяют сериализовать некоторый объект.
    Если это dataframe, то его можно сериализовать в parquet, csv и другие форматы, предусмотренные библиотекой.
    Если это некоторая строка, словарь, список -- можно использовать json
    Если это какой-то произвольный объект (любой, в том числе и из вышеперечисленных) --
        его можно сохранить в виде бинарного файла. Библиотеки: dill, cloudpickle, pickle
        В реальности, разумно ограничиться одной библиотекой. Ознакомьтесь с каждой из списка выше
'''

import json, cloudpickle, pickle, dill

# В общем случае, для работы с файлами используют контекстный менеджер with
# Он гарантирует, что вне его блока кода файл корректно будет закрыт

# default'ная запись и чтение текстового файла
with open('somefile.txt', 'w') as file: # первый аргумент - имя файла, второй режим использования ('w' - запись, 'r' - чтение)
    file.write('Hello, default write!')
with open('somefile.txt', 'r') as file:
    line = file.read()
    print(line, '\n')

# сериализация с помощью json
with open('somefile.json', 'w') as file:
    json.dump('Hello, json!', file)
with open('somefile.json', 'r') as file:
    line = json.load(file)
    print(line, '\n')

# сериализация с помощью dill, pickle, cloudpickle
with open('somefile.pickle', 'wb') as file: # для бинарных файлов режим доступа нужно писать 'wb', 'rb' что значит 'binary'
    dill.dump('Hello, dill|pickle|cloudpickle!', file)
    pickle.dump('Hello, dill|pickle|cloudpickle!', file)
    cloudpickle.dump('Hello, dill|pickle|cloudpickle!', file)
with open('somefile.pickle', 'rb') as file:
    line = dill.load(file)
    line = pickle.load(file)
    line = cloudpickle.load(file)
    print(line, '\n')

Hello, default write! 

Hello, json! 

Hello, dill|pickle|cloudpickle! 



In [16]:
# вы можете сериализовать почти любой объект и его состояние

# class
class ToDemonstrate:
    def __init__(self):
        pass
    def print_hello(self):
        print('Hello, world!')

my_class = ToDemonstrate()

with open('demonstration_cls.pkl', 'wb') as f:
    cloudpickle.dump(my_class, f)
with open('demonstration_cls.pkl', 'rb') as f:
    new_read_class = cloudpickle.load(f)
new_read_class.print_hello()
print()

# объект из библиотеки, например, объект линейной регресии, который тоже класс
from sklearn.linear_model import LinearRegression
lr = LinearRegression()
with open('not_fitted_lr.pkl', 'wb') as f:
    cloudpickle.dump(lr, f)
with open('not_fitted_lr.pkl', 'rb') as f:
    new_read_lr = cloudpickle.load(f)
print(lr, new_read_lr)
print()

# фунцию (но не анонимную lambda функцию!)
def to_save():
    print('Defined function')
with open('my_func.pkl', 'wb') as f:
    cloudpickle.dump(to_save, f)
with open('my_func.pkl', 'rb') as f:
    new_read_func = cloudpickle.load(f)
new_read_func()

Hello, world!

LinearRegression() LinearRegression()

Defined function


#### ЗАДАЧИ

In [None]:
'''
    Ниже вам будут предложены задачи для освоения материалов.
    Собственно, первая и параллельная задача:
        Все объекты, которые вы создадите нужно упаковать в виде файлов (то есть сериализовать).
        По расширению файла должно быть понятно, как его прочитать.
        Дополнительно вам нужно узнать как сохранять файлы в формате .rar, .zip, .tar.gz 
            и тоже сохранить файлы в этих форматах.

    Требования к написанию кода: структура, читаемость, строгая типизация и аннотирования.
    Код должен быть очевиден, понятен, структурирован.
'''

In [None]:
# Реализуйте простую модель стоянки. 
# 1) Скажем, ваши автомобили это именованные кортежи, с указанием марки автомобиля, модели, гос.номера и цвета
# 2) Сама стоянка это двусторонняя очередь (deque)
#   Представим, что у нас нет коллизий, и если места на стоянке нет, то виртуально вызывается эвакуатор и убирает самый крайний автомобиль (как и работает deque)
# 3) Создайте набор автомобилей и в случайном порядке добавляйте/удаляйте их с автостоянки.
# 4) Посчитайте, сколько и каких автомобилей осталось на автостоянке.


In [25]:
import collections
from collections import deque, namedtuple
import random

Car = namedtuple('Car', ['marka', 'model', 'id', 'color'])
Parking = deque(maxlen=10)

def park_car(car: namedtuple, parking: deque)-> None:
    parking.append(car)

def unpark_car(car: namedtuple, parking: deque)-> None:
    if car in parking:
        parking.remove(car)

def count_cars(parking: deque)-> int:
    return len(parking)


cars = [
    Car("Toyota", "Camry", "A123BC", "красный"),
    Car("BMW", "X5", "B456DE", "синий"),
    Car("Mercedes", "E-Class", "C789FG", "черный"),
    Car("Audi", "A4", "D012HI", "белый"),
    Car("Honda", "Civic", "E345JK", "зеленый"),
    Car("Ford", "Focus", "F678LM", "серый"),
    Car("Volkswagen", "Golf", "G901NO", "желтый"),
    Car("Hyundai", "Solaris", "H234PQ", "серебристый"),
    Car("Kia", "Rio", "I567RS", "коричневый"),
    Car("Nissan", "Qashqai", "J890TU", "оранжевый"),
    Car("Skoda", "Octavia", "K123VW", "фиолетовый"),
    Car("Lada", "Vesta", "L456XY", "бордовый")
]

for i in range(5):
    car = random.choice(cars)
    park_car(car, Parking)

operations = ['park', 'unpark']
for i in range(20):
    operation = random.choice(operations)
    car = random.choice(cars)

    if operation == 'park':
        park_car(car, Parking)
    else:
        unpark_car(car, Parking)

print(f"Всего автомобилей на стоянке: {count_cars(Parking)}")


print("Список автомобилей на стоянке:")
for i, car in enumerate(Parking, 1):
    print(f"{i}. {car.marka} {car.model} ({car.id}), цвет: {car.color}")

Всего автомобилей на стоянке: 1
Список автомобилей на стоянке:
1. Hyundai Solaris (H234PQ), цвет: серебристый


In [None]:
# Теперь вы работаете в роли архивариуса РСФСР и занимаетесь хранением информации об иуществе раскулаченных граждан Имперской России
# У вас несколько книг с записями, упорядоченных по алфавиту. Каждая книга хранит фамилии, начинающиеся с конкретной буквы.
# Внутри книги указаны фамилии, к каждому гражданину указана категория имущества, например, мебель, посуда, картины и т.д.
# В каждой категории указан список вещей, которые были изъяты. 
# Вы вносите записи, и если новой записи нет, вы ее создаете. Затем, когда ищите конкретную категорию предмета, начинаете перебирать все книги, начиная с первой буквы по алфавиту в вашем списке.
# Очевидно, что вам нужно использовать defaultdict и ChainMap


In [31]:
from collections import defaultdict, ChainMap
from typing import List

# --- Архивы ---
Archive_by_surname = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
Archive_by_category = defaultdict(lambda: defaultdict(list))
Full_archive = ChainMap(*Archive_by_surname.values())

# --- Добавление записи ---
def add_staff(surname: str, category: str, staff: List[str]) -> None:
    first_letter = surname[0].upper()
    for item in staff:
        # По фамилиям
        if item not in Archive_by_surname[first_letter][surname][category]:
            Archive_by_surname[first_letter][surname][category].append(item)
        # По категориям
        if item not in Archive_by_category[category][surname]:
            Archive_by_category[category][surname].append(item)


def show_all_archive():
    print("Архив по буквам:")
    for letter, surnames in Archive_by_surname.items():
        print(f"\nКнига {letter}:")
        for surname, categories in surnames.items():
            print(f"  {surname}:")
            for category, items in categories.items():
                print(f"    {category}: {items}")

def show_by_surname(surname: str):
    try:
        record = Full_archive[surname]
        print(f"{surname}:")
        for category, items in record.items():
            print(f"  {category}: {items}")
    except KeyError:
        print(f"Фамилия '{surname}' не найдена в архиве.")


def show_by_category(category: str):
    try:
        records = Archive_by_category[category]
        print(f"Категория '{category}':")
        for surname, items in records.items():
            print(f"  {surname}: {items}")
    except KeyError:
        print(f"Категория '{category}' не найдена в архиве.")

#Тестовые данные
add_staff("Иванов", "мебель", ["дубовый стол", "венский стул"])
add_staff("Иванов", "посуда", ["серебряный сервиз"])
add_staff("Петров", "мебель", ["кожаный диван", "кресло-качалка"])
add_staff("Петров", "драгоценности", ["серебряные запонки"])
add_staff("Сидоров", "картины", ["портрет неизвестного"])
add_staff("Алексеев", "мебель", ["кровать двуспальная", "тумба прикроватная"])
add_staff("Кузнецов", "посуда", ["медный самовар"])

show_all_archive()        # весь архив по буквам
show_by_surname("Иванов") # имущество Иванова
show_by_category("мебель")# все владельцы мебели
show_by_surname("Смирнов")# фамилия которой нет (KeyError обработан)
show_by_category("книги") # категория которой нет (KeyError обработан)


Архив по буквам:

Книга И:
  Иванов:
    мебель: ['дубовый стол', 'венский стул']
    посуда: ['серебряный сервиз']

Книга П:
  Петров:
    мебель: ['кожаный диван', 'кресло-качалка']
    драгоценности: ['серебряные запонки']

Книга С:
  Сидоров:
    картины: ['портрет неизвестного']

Книга А:
  Алексеев:
    мебель: ['кровать двуспальная', 'тумба прикроватная']

Книга К:
  Кузнецов:
    посуда: ['медный самовар']
Фамилия 'Иванов' не найдена в архиве.
Категория 'мебель':
  Иванов: ['дубовый стол', 'венский стул']
  Петров: ['кожаный диван', 'кресло-качалка']
  Алексеев: ['кровать двуспальная', 'тумба прикроватная']
Фамилия 'Смирнов' не найдена в архиве.
Категория 'книги':


In [None]:
# Используя sys и os 
# Выведите некоторые ваши директории с указанием размера всех файлов и упорядочьте их (директории) по времени последнего обращения
# Выведите список расширений файлов, которые хранятся на вашем ПК
# Посчитайте объем памяти, который используется вашим интерпретатором


In [32]:
import sys, os
import psutil
from collections import namedtuple

DirInfo = namedtuple('DirInfo', ['name', 'last_call', 'size'])


def my_dirs(path: str)->list[DirInfo]:
    all_info: list[DirInfo] = []
    try:
        for entry in os.scandir(path):
            if entry.is_dir():
                total:int=0
                for f in os.scandir(entry.path):
                    if f.is_file():
                        total+=f.stat().st_size
                info = DirInfo(name=entry.name, last_call=entry.stat().st_atime, size=total)
                all_info.append(info)
    except FileNotFoundError:
        print("Такого пути не найдено")
    return sorted(all_info, key = lambda x: x.last_call)


extensions: set[str] = set()
def set_of_extensions(path: str)->None:
    try:
        for entry in os.scandir(path):
            if entry.is_file():
                e=os.path.splitext(entry.name)[1]
                if e:
                    extensions.add(e)
            else:
                set_of_extensions(entry.path)
    except FileNotFoundError:
        print("Такого пути не найдено")
# Путь, который хотим проверить
path = "C:\\Users\\russkievpered\\Desktop\\dev\\proj_1"

dirs_info = my_dirs(path)
print("Директории (имя, время последнего доступа, суммарный размер файлов):")
for d in dirs_info:
    print(d)


set_of_extensions(path)
print("\nРасширения файлов в директории и подпапках:")
print(sorted(extensions))


import psutil, os
process = psutil.Process(os.getpid())
memory_in_mb: float = process.memory_info().rss / 1024 / 1024
print(f"\nОбъём памяти интерпретатора: {memory_in_mb:.2f} МБ")


Директории (имя, время последнего доступа, суммарный размер файлов):
DirInfo(name='logs', last_call=1758787005.6727138, size=1465)
DirInfo(name='.idea', last_call=1758787005.673711, size=4287)
DirInfo(name='python', last_call=1758787005.673711, size=0)
DirInfo(name='.git', last_call=1758794792.15536, size=15423)

Расширения файлов в директории и подпапках:
['.TAG', '.bat', '.cfg', '.dll', '.exe', '.fish', '.iml', '.ipynb', '.json', '.log', '.md', '.nu', '.pem', '.pkl', '.ps1', '.pth', '.py', '.pyc', '.pyd', '.sample', '.swp', '.txt', '.typed', '.virtualenv', '.xml']

Объём памяти интерпретатора: 23.11 МБ


#### ДОПОЛНИТЕЛЬНЫЕ ПРАКТИЧЕСКИЕ ЗАДАЧИ

Ниже представлены дополнительные задачи для закрепления изученного материала. Каждая задача использует только те концепции, которые были представлены в данном уроке.


**Задача 1: Система управления библиотекой**

Создайте систему управления библиотекой используя collections:

1. Создайте namedtuple `Book` с полями: title, author, isbn, year
2. Создайте namedtuple `Reader` с полями: name, reader_id, phone
3. Используйте `defaultdict(list)` для хранения книг по жанрам
4. Используйте `deque` для очереди читателей, ожидающих популярную книгу
5. Используйте `Counter` для подсчета количества книг каждого автора
6. Используйте `OrderedDict` для хранения истории выдачи книг (читатель -> список книг)
7. Сериализуйте все данные в JSON и pickle форматы


In [17]:
import json
import pickle
from collections import namedtuple, defaultdict, deque, Counter, OrderedDict
from typing import List, Dict, Any

Book = namedtuple('Book', ['title', 'author', 'isbn', 'year'])
Reader = namedtuple('Reader', ['name', 'reader_id', 'phone'])

def create_library_data() -> Dict[str, Any]:
    books = [
        Book('Война и мир', 'Толстой', '123456', 1869),
        Book('Анна Каренина', 'Толстой', '654321', 1877),
        Book('Преступление и наказание', 'Достоевский', '111222', 1866),
        Book('Идиот', 'Достоевский', '333444', 1869),
        Book('Мастер и Маргарита', 'Булгаков', '555666', 1967),
    ]

    readers = [
        Reader('Иван Иванов', 1, '+79991234567'),
        Reader('Мария Петрова', 2, '+79997654321'),
        Reader('Сергей Сидоров', 3, '+79999887766'),
    ]

    library = defaultdict(list)
    library['Роман'].extend([books[0], books[1]])
    library['Классика'].extend([books[2], books[3]])
    library['Фантастика'].append(books[4])

    reader_queue = deque([readers[1], readers[2]])  # читатели в очереди

    author_counter = Counter(book.author for book in books)

    history = OrderedDict()
    history[readers[0]] = [books[3]]
    history[readers[1]] = [books[1], books[2]]

    return {
        'books': books,
        'readers': readers,
        'library': library,
        'reader_queue': reader_queue,
        'author_counter': author_counter,
        'history': history
    }

def save_library_data(data: Dict[str, Any]) -> None:
    try:
        with open('library.json', 'w', encoding='utf-8') as f:
            json_data = {
                'books': [b._asdict() for b in data['books']],
                'readers': [r._asdict() for r in data['readers']],
                'library': {k: [b._asdict() for b in v] for k, v in data['library'].items()},
                'reader_queue': [r._asdict() for r in data['reader_queue']],
                'author_counter': dict(data['author_counter']),
                'history': {r.name: [b._asdict() for b in blist] for r, blist in data['history'].items()}
            }
            json.dump(json_data, f, ensure_ascii=False, indent=4)
        print("Данные сохранены в library.json")

        with open('library.pkl', 'wb') as f:
            pickle.dump(data, f)
        print("Данные сохранены в library.pkl")
    except Exception as e:
        print(f"Ошибка при сохранении: {e}")

data = create_library_data()
save_library_data(data)

Данные сохранены в library.json
Данные сохранены в library.pkl


**Задача 2: Анализатор файловой системы**

Создайте анализатор файловой системы используя os и sys:

1. Создайте namedtuple `FileInfo` с полями: name, size, extension, modified_time
2. Используйте `os.walk()` для обхода директории
3. Используйте `os.path` функции для получения информации о файлах
4. Используйте `Counter` для подсчета файлов по расширениям
5. Используйте `defaultdict(list)` для группировки файлов по размеру (маленькие < 1MB, средние 1-100MB, большие > 100MB)
6. Используйте `deque` для хранения последних 10 найденных файлов
7. Выведите статистику используя `sys.getsizeof()` для подсчета памяти
8. Сохраните результаты в JSON файл


In [19]:
from collections import namedtuple, Counter, defaultdict, deque
import os
import sys
from typing import List, Dict, Any

FileInfo = namedtuple('FileInfo', ['name', 'size', 'extension', 'modified_time'])

def analyze_directory(path: str) -> Dict[str, Any]:
    all_files: List[FileInfo] = []
    try:
        for dirpath, _, filenames in os.walk(path): #игнорируем список подпапок
            for filename in filenames:
                full_path = os.path.join(dirpath, filename)
                try:
                    file_info = FileInfo(
                        name=filename,
                        size=os.path.getsize(full_path),
                        extension=os.path.splitext(filename)[1].lower(),
                        modified_time=os.path.getmtime(full_path)
                    )
                    all_files.append(file_info)
                except OSError as e:
                    print(f"Ошибка доступа к файлу {full_path}: {e}")
    except Exception as e:
        print(f"Ошибка обхода директории: {e}")

    extension_counter = Counter(f.extension for f in all_files)

    size_groups = defaultdict(list)
    for file in all_files:
        if file.size < 1024 * 1024:
            size_groups['small'].append(file)
        elif file.size < 100 * 1024 * 1024:
            size_groups['medium'].append(file)
        else:
            size_groups['large'].append(file)

    last_files = deque(all_files[-10:], maxlen=10)  # последние 10 файлов

    return {
        'all_files': all_files,
        'extension_counter': extension_counter,
        'size_groups': size_groups,
        'last_files': last_files
    }

def save_analysis(data: Dict[str, Any], filename: str) -> None:
    try:
        with open(filename, 'w', encoding='utf-8') as f:
            json_data = {
                'all_files': [f._asdict() for f in data['all_files']],
                'extension_counter': dict(data['extension_counter']),
                'size_groups': {k: [f._asdict() for f in v] for k, v in data['size_groups'].items()},
                'last_files': [f._asdict() for f in data['last_files']]
            }
            json.dump(json_data, f, ensure_ascii=False, indent=4)
        print(f"Анализ сохранен в {filename}")
    except Exception as e:
        print(f"Ошибка сохранения: {e}")



if os.path.exists('gfhjk'):
    result = analyze_directory(path)
    save_analysis(result, 'file_analysis.json')
else:
    print("Указанный путь не существует.")

Указанный путь не существует.


**Задача 3: Система конфигурации приложения**

Создайте систему конфигурации используя ChainMap и defaultdict:

1. Создайте namedtuple `Config` с полями: key, value, section, default_value
2. Создайте несколько словарей конфигурации (default, user, environment)
3. Используйте `ChainMap` для объединения конфигураций с приоритетом
4. Используйте `defaultdict(dict)` для группировки настроек по секциям
5. Используйте `OrderedDict` для сохранения порядка загрузки конфигураций
6. Используйте `os.environ` для чтения переменных окружения
7. Сериализуйте конфигурацию в JSON и pickle форматы


In [20]:
from collections import ChainMap, defaultdict, OrderedDict
import json
import pickle
from typing import Dict, Any

Config = namedtuple('Config', ['key', 'value', 'section', 'default_value'])

def create_config_system() -> Dict[str, Any]:
    default_config = {'theme': 'light', 'language': 'en'}
    user_config = {'language': 'fr'}
    env_config = {'theme': 'dark'}

    configs = []
    for key, value in default_config.items():
        configs.append(Config(
            key=key,
            value=value,
            section='general',
            default_value=value
        ))

    for key, value in user_config.items():
        configs.append(Config(
            key=key,
            value=value,  # новое значение пользователя
            section='general',
            default_value=default_config.get(key, 'NOT_SET')  # исходное значение
        ))

    # Для env конфигурации
    for key, value in env_config.items():
        configs.append(Config(
            key=key,
            value=value,  # значение из окружения
            section='general',
            default_value=default_config.get(key, 'NOT_SET')
        ))

    chain = ChainMap(env_config, user_config, default_config)


    grouped_by_sections = defaultdict(list)
    for config in configs:
        grouped_by_sections[config.section].append(config)

    ordered_configs = OrderedDict()
    ordered_configs['env'] = env_config
    ordered_configs['user'] = user_config
    ordered_configs['default'] = default_config

    return {
        'config_objects': configs,  # ТУТ ХРАНЯТСЯ NAMEDTUPLE
        'chain': chain,
        'grouped_by_sections': grouped_by_sections,
        'ordered_configs': ordered_configs
    }

def save_configs(data: Dict[str, Any]) -> None:
    try:
        with open('config.json', 'w', encoding='utf-8') as f:
            json_data = {
                'config_objects': [c._asdict() for c in data['config_objects']],  # namedtuple → dict
                'chain': dict(data['chain']),
                'grouped_by_sections': {
                    section: [c._asdict() for c in configs]
                    for section, configs in data['grouped_by_sections'].items()
                },
                'ordered_configs': dict(data['ordered_configs'])
            }
            json.dump(json_data, f, ensure_ascii=False, indent=4)
        print("Конфиги сохранены в config.json")

        with open('config.pkl', 'wb') as f:
            pickle.dump(data, f)
        print("Конфиги сохранены в config.pkl")

    except Exception as e:
        print(f"Ошибка сохранения: {e}")

if __name__ == "__main__":
    configs = create_config_system()
    save_configs(configs)

    print("\n=== Демонстрация namedtuple ===")
    for config in configs['config_objects']:
        print(f"Ключ: {config.key}, Значение: {config.value}, Секция: {config.section}")

Конфиги сохранены в config.json
Конфиги сохранены в config.pkl

=== Демонстрация namedtuple ===
Ключ: theme, Значение: light, Секция: general
Ключ: language, Значение: en, Секция: general
Ключ: language, Значение: fr, Секция: general
Ключ: theme, Значение: dark, Секция: general


**Задача 4: Мониторинг системы**

Создайте систему мониторинга используя sys и os:

1. Создайте namedtuple `SystemInfo` с полями: cpu_count, memory_usage, process_id, user_name
2. Используйте `os.cpu_count()` для получения количества процессоров
3. Используйте `sys.getallocatedblocks()` для мониторинга памяти
4. Используйте `os.getpid()` и `os.getlogin()` для информации о процессе
5. Используйте `deque` для хранения последних 20 измерений
6. Используйте `Counter` для подсчета частоты использования различных функций
7. Используйте `defaultdict(list)` для группировки измерений по времени
8. Сохраните историю мониторинга в pickle файл


In [21]:
import os
import sys
import time
import pickle
from collections import namedtuple, deque, defaultdict, Counter
from typing import List, Dict, Any

SystemInfo = namedtuple('SystemInfo', ['cpu_count', 'memory_usage', 'process_id', 'user_name'])

def collect_system_info() -> SystemInfo:
    return SystemInfo(
        cpu_count=os.cpu_count(),
        memory_usage=sys.getallocatedblocks(),
        process_id=os.getpid(),
        user_name=os.getlogin()
    )

def monitor_system(interval: float = 1.0, count: int = 5) -> Dict[str, Any]:
    measurements = deque(maxlen=20)
    by_time = defaultdict(list)
    func_counter = Counter()

    for _ in range(count):
        info = collect_system_info()
        measurements.append(info)
        by_time[time.time()].append(info)
        func_counter['measurement'] += 1
        time.sleep(interval)
        func_counter['sleep'] += 1

    return {
        'measurements': measurements,
        'by_time': by_time,
        'func_counter': func_counter
    }

def save_monitoring_data(data: Dict[str, Any], filename: str) -> None:
    try:
        with open(filename, 'wb') as f:
            pickle.dump(data, f)
        print(f"Данные мониторинга сохранены в {filename}")
    except Exception as e:
        print(f"Ошибка сохранения: {e}")

if __name__ == "__main__":
    data = monitor_system()
    save_monitoring_data(data, 'monitoring.pkl')

Данные мониторинга сохранены в monitoring.pkl


**Задача 5: Система логирования**

Создайте систему логирования используя все изученные коллекции:

1. Создайте namedtuple `LogEntry` с полями: timestamp, level, message, module, function
2. Используйте `deque` для хранения последних 100 логов (кольцевой буфер)
3. Используйте `defaultdict(list)` для группировки логов по уровням (DEBUG, INFO, WARNING, ERROR)
4. Используйте `Counter` для подсчета количества логов каждого уровня
5. Используйте `OrderedDict` для хранения логов по времени (FIFO)
6. Используйте `ChainMap` для объединения различных источников логов
7. Используйте `os.path` для работы с файлами логов
8. Сериализуйте логи в JSON и pickle форматы


In [29]:
import time
import json
import pickle
import os
from collections import namedtuple, deque, defaultdict, Counter, OrderedDict, ChainMap
from typing import List, Dict, Any

LogEntry = namedtuple('LogEntry', ['timestamp', 'level', 'message', 'module', 'function'])

class Logger:
    def __init__(self, maxlen: int = 100):
        self.buffer = deque(maxlen=maxlen)
        self.by_level = defaultdict(list)
        self.counter = Counter()
        self.by_time = OrderedDict()

    def log(self, level: str, message: str, module: str, function: str):
        entry = LogEntry(time.time(), level, message, module, function)
        self.buffer.append(entry)
        self.by_level[level].append(entry)
        self.counter[level] += 1
        self.by_time[entry.timestamp] = entry

    def save_logs(self, base_filename: str):
        os.makedirs('logs', exist_ok=True)
        json_path = os.path.join('logs', f'{base_filename}.json')
        pkl_path = os.path.join('logs', f'{base_filename}.pkl')

        with open(json_path, 'w', encoding='utf-8') as f:
            json.dump([e._asdict() for e in self.buffer], f, indent=4)

        with open(pkl_path, 'wb') as f:
            pickle.dump({
                'buffer': list(self.buffer),
                'by_level': dict(self.by_level),
                'counter': dict(self.counter),
                'by_time': dict(self.by_time)
            }, f)

if __name__ == "__main__":
    logger = Logger()
    logger.log('INFO', 'Запуск системы', 'main', 'start')
    logger.log('ERROR', 'Ошибка загрузки', 'auth', 'login')
    logger.save_logs('app_logs')

**Задача 6: Кэш-система**

Создайте простую кэш-систему используя collections:

1. Создайте namedtuple `CacheEntry` с полями: key, value, timestamp, access_count
2. Используйте `OrderedDict` для реализации LRU (Least Recently Used) кэша
3. Используйте `deque` для хранения истории доступа к ключам
4. Используйте `Counter` для подсчета частоты доступа к каждому ключу
5. Используйте `defaultdict(int)` для хранения счетчиков доступа
6. Реализуйте методы: get, set, delete, clear, size
7. Используйте `sys.getsizeof()` для мониторинга размера кэша
8. Сериализуйте кэш в pickle формат для сохранения между сессиями


In [22]:
from collections import OrderedDict, deque, Counter, defaultdict
import time
import pickle
import sys
from typing import Any, Optional

class LRUCache:
    def __init__(self, capacity: int):
        self.capacity = capacity
        self.cache: OrderedDict[Any, Any] = OrderedDict()
        self.history = deque(maxlen=100)
        self.access_count = Counter()
        self.size_counter = defaultdict(int)

    def get(self, key: Any) -> Optional[Any]:
        if key not in self.cache:
            return None
        self.cache.move_to_end(key)
        self.history.append(key)
        self.access_count[key] += 1
        return self.cache[key]

    def set(self, key: Any, value: Any): #добавление значения
        if key in self.cache:
            self.cache.move_to_end(key)
        self.cache[key] = value
        self.history.append(key)
        if len(self.cache) > self.capacity:
            self.cache.popitem(last=False)

    def save(self, filename: str):
        with open(filename, 'wb') as f:
            pickle.dump(self.cache, f)

if __name__ == "__main__":
    c = LRUCache(2)
    c.set('a', 1)
    c.set('b', 2)
    c.save('cache.pkl')


**Задача 7: Анализатор текста**

Создайте анализатор текста используя collections:

1. Создайте namedtuple `WordInfo` с полями: word, frequency, length, first_occurrence
2. Используйте `Counter` для подсчета частоты слов
3. Используйте `defaultdict(list)` для группировки слов по длине
4. Используйте `deque` для хранения последних 50 уникальных слов
5. Используйте `OrderedDict` для хранения слов в порядке первого появления
6. Используйте `os.path` для работы с текстовыми файлами
7. Используйте `sys.getsizeof()` для анализа памяти
8. Сохраните результаты анализа в JSON файл


In [12]:
from collections import Counter, defaultdict, deque, OrderedDict
import os
import sys
import json
from typing import List, Dict, Any

def analyze_text(filepath: str) -> Dict[str, Any]:
    with open(filepath, 'r', encoding='utf-8') as f:
        text = f.read()

    words = text.lower().split()
    word_counter = Counter(words)

    by_length = defaultdict(list)
    for word in words:
        by_length[len(word)].append(word)

    last_unique = deque(word_counter.keys(), maxlen=50)

    first_occurrence = OrderedDict()
    for i, word in enumerate(words):#пары индекс и элемент, то есть номер слова и само слово
        if word not in first_occurrence:
            first_occurrence[word] = i #первое появление слова

    return {
        'word_counter': word_counter,
        'by_length': by_length,
        'last_unique': last_unique,
        'first_occurrence': first_occurrence
    }

def save_analysis(data: Dict[str, Any], filename: str):
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump({
            'word_counter': dict(data['word_counter']),
            'by_length': {k: v for k, v in data['by_length'].items()},
            'last_unique': list(data['last_unique']),
            'first_occurrence': dict(data['first_occurrence'])
        }, f, indent=4, ensure_ascii=False)

if __name__ == "__main__":
    result = analyze_text('sample.txt')
    save_analysis(result, 'text_analysis.json')

Анализ завершен успешно!
Уникальных слов: 11


**Задача 8: Система управления задачами**

Создайте систему управления задачами (TODO) используя все изученные концепции:

1. Создайте namedtuple `Task` с полями: id, title, description, priority, status, created_date
2. Используйте `defaultdict(list)` для группировки задач по статусу (todo, in_progress, done)
3. Используйте `deque` для очереди задач с высоким приоритетом
4. Используйте `Counter` для подсчета задач по приоритету
5. Используйте `OrderedDict` для хранения задач в порядке создания
6. Используйте `ChainMap` для объединения различных списков задач
7. Используйте `os.path` для работы с файлами задач
8. Реализуйте функции: add_task, complete_task, get_tasks_by_status, get_priority_queue
9. Сериализуйте все данные в JSON и pickle форматы


In [23]:
from collections import defaultdict, deque, OrderedDict, ChainMap, Counter
from datetime import datetime
import json
import pickle
from typing import List, Dict, Optional

Task = namedtuple('Task', ['id', 'title', 'description', 'priority', 'status', 'created_at'])

class TaskManager:
    def __init__(self):
        self.tasks = OrderedDict()
        self.by_status = defaultdict(list)
        self.high_priority_queue = deque()
        self.priority_counter = Counter()

    def add_task(self, task: Task):
        self.tasks[task.id] = task
        self.by_status[task.status].append(task)
        if task.priority == 3:
            self.high_priority_queue.append(task)
        self.priority_counter[task.priority] += 1

    def complete_task(self, task_id: int):
        if task_id in self.tasks:
            task = self.tasks[task_id]
            new_task = task._replace(status='done')
            self.tasks[task_id] = new_task
            self.by_status[task.status].remove(task)
            self.by_status['done'].append(new_task)

    def save(self, base_name: str):
        with open(f'{base_name}.json', 'w', encoding='utf-8') as f:
            json.dump([t._asdict() for t in self.tasks.values()], f, indent=4, default=str)

        with open(f'{base_name}.pkl', 'wb') as f:
            pickle.dump(self.tasks, f)

tm = TaskManager()
tm.add_task(Task(1, 'Задача 1', 'Описание', 3, 'todo', datetime.now()))
tm.save('tasks')

**Задача 9: Система мониторинга производительности**

Создайте систему мониторинга производительности используя sys и collections:

1. Создайте namedtuple `PerformanceMetric` с полями: function_name, execution_time, memory_usage, timestamp
2. Используйте `deque` для хранения последних 100 измерений производительности
3. Используйте `defaultdict(list)` для группировки метрик по функциям
4. Используйте `Counter` для подсчета количества вызовов каждой функции
5. Используйте `OrderedDict` для хранения метрик в хронологическом порядке
6. Используйте `sys.getsizeof()` для мониторинга памяти
7. Используйте `os.path` для работы с файлами метрик
8. Реализуйте функции: record_metric, get_function_stats, get_memory_usage, export_metrics
9. Сериализуйте метрики в JSON и pickle форматы


In [15]:
from collections import namedtuple, deque, defaultdict, Counter, OrderedDict
import time
import json
import pickle
import os
from typing import Dict, Any, List

# 1. NamedTuple для метрик (как требуется в задании)
PerformanceMetric = namedtuple('PerformanceMetric', ['function_name', 'execution_time', 'memory_usage', 'timestamp'])

class PerformanceMonitor:
    def __init__(self):
        # 2. Deque для последних 100 измерений
        self.last_metrics: deque[PerformanceMetric] = deque(maxlen=100)

        # 3. DefaultDict для группировки по функциям
        self.metrics_by_function: defaultdict[str, List[PerformanceMetric]] = defaultdict(list)

        # 4. Counter для подсчета вызовов
        self.function_call_counter: Counter[str] = Counter()

        # 5. OrderedDict для хронологического порядка
        self.metrics_ordered: OrderedDict[float, PerformanceMetric] = OrderedDict()

    def record_metric(self, function_name: str, execution_time: float, memory_usage: int = 0) -> None:
        """Записывает метрику производительности"""
        timestamp = time.time()
        metric = PerformanceMetric(function_name, execution_time, memory_usage, timestamp)

        # Сохраняем во все структуры
        self.last_metrics.append(metric)
        self.metrics_by_function[function_name].append(metric)
        self.function_call_counter[function_name] += 1
        self.metrics_ordered[timestamp] = metric

    def get_function_stats(self, function_name: str) -> Dict[str, float]:
        """Возвращает статистику по функции"""
        metrics = self.metrics_by_function.get(function_name, [])
        count = len(metrics)
        if count == 0:
            return {'count': 0, 'average_time': 0, 'total_memory': 0}

        avg_time = sum(m.execution_time for m in metrics) / count
        total_memory = sum(m.memory_usage for m in metrics)

        return {'count': count, 'average_time': avg_time, 'total_memory': total_memory}

    def get_memory_usage(self, function_name: str) -> int:
        """Возвращает общее использование памяти для функции"""
        metrics = self.metrics_by_function.get(function_name, [])
        return sum(m.memory_usage for m in metrics)

    def export_metrics(self, filename: str) -> None:
        """Экспортирует метрики в JSON и pickle"""
        # JSON экспорт
        json_filename = os.path.splitext(filename)[0] + '.json'
        with open(json_filename, 'w', encoding='utf-8') as f:
            json_data = [
                {
                    'function_name': m.function_name,
                    'execution_time': m.execution_time,
                    'memory_usage': m.memory_usage,
                    'timestamp': m.timestamp
                }
                for m in self.metrics_ordered.values()
            ]
            json.dump(json_data, f, indent=4, ensure_ascii=False)

        # Pickle экспорт
        pickle_filename = os.path.splitext(filename)[0] + '.pkl'
        with open(pickle_filename, 'wb') as f:
            pickle.dump({
                'last_metrics': list(self.last_metrics),
                'metrics_by_function': dict(self.metrics_by_function),
                'function_call_counter': dict(self.function_call_counter),
                'metrics_ordered': dict(self.metrics_ordered)
            }, f)

# Пример использования
if __name__ == "__main__":
    monitor = PerformanceMonitor()

    # Симуляция измерения производительности
    monitor.record_metric('calculate_sum', 0.15, 1024)
    monitor.record_metric('process_data', 0.23, 2048)
    monitor.record_metric('calculate_sum', 0.18, 1024)

    # Получение статистики
    stats = monitor.get_function_stats('calculate_sum')
    print(f"Статистика calculate_sum: {stats}")

    # Экспорт метрик
    monitor.export_metrics('performance_metrics')

**Задача 10: Комплексная система управления данными**

Создайте комплексную систему управления данными, объединяющую все изученные концепции:

1. Создайте несколько namedtuple для различных типов данных (User, Product, Order, etc.)
2. Используйте `defaultdict` для создания индексов по различным полям
3. Используйте `deque` для реализации очередей обработки данных
4. Используйте `Counter` для аналитики и статистики
5. Используйте `OrderedDict` для хранения данных в определенном порядке
6. Используйте `ChainMap` для объединения различных источников данных
7. Используйте `os` и `sys` для работы с файловой системой и мониторинга
8. Реализуйте CRUD операции (Create, Read, Update, Delete)
9. Добавьте функции экспорта/импорта данных в различных форматах
10. Сериализуйте все данные в JSON, pickle и другие форматы
11. Добавьте типизацию для всех функций и классов
12. Реализуйте систему логирования для отслеживания операций


In [16]:
from collections import namedtuple, defaultdict, deque, Counter, OrderedDict, ChainMap
from datetime import datetime
import os
import sys
import json
import pickle
from typing import List, Dict, Optional, Any
import logging

# === Настройка логирования ===
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s', #Формат лога: 2024-01-01 12:00:00 - INFO - Сообщение
    handlers=[
        logging.FileHandler("data_system.log"),
        logging.StreamHandler() # рбработчики логов, запись в файл и вывод в консоль
    ]
)

# === NamedTuple для различных данных ===
User = namedtuple('User', ['id', 'name', 'email', 'created_at'])
Product = namedtuple('Product', ['id', 'name', 'price', 'created_at'])
Order = namedtuple('Order', ['id', 'user_id', 'product_id', 'quantity', 'created_at'])

# === Основные структуры данных ===
users_ordered: OrderedDict[int, User] = OrderedDict()
products_ordered: OrderedDict[int, Product] = OrderedDict()
orders_ordered: OrderedDict[int, Order] = OrderedDict()

# Индексы по полям для быстрого поиска
users_by_email: defaultdict[str, list[User]] = defaultdict(list)
products_by_name: defaultdict[str, list[Product]] = defaultdict(list)
orders_by_user: defaultdict[int, list[Order]] = defaultdict(list)

# Очереди для обработки данных
user_queue: deque[User] = deque()
order_queue: deque[Order] = deque()

# Статистика
product_order_counter: Counter[int] = Counter()
user_order_counter: Counter[int] = Counter()

# ChainMap для объединения всех источников данных
all_data: ChainMap = ChainMap(users_ordered, products_ordered, orders_ordered)

# === CRUD операции ===
def create_user(user_id: int, name: str, email: str) -> User:
    user = User(user_id, name, email, datetime.now())
    users_ordered[user_id] = user
    users_by_email[email].append(user)
    user_queue.append(user)
    logging.info(f"User created: {user}")
    return user

def read_user(user_id: int) -> Optional[User]:
    return users_ordered.get(user_id)

def update_user(user_id: int, name: Optional[str] = None, email: Optional[str] = None) -> Optional[User]:
    user = users_ordered.get(user_id)
    if not user:
        return None
    new_user = User(
        id=user.id,
        name=name if name else user.name,
        email=email if email else user.email,
        created_at=user.created_at
    )
    users_ordered[user_id] = new_user
    if email:
        users_by_email[email].append(new_user)
    logging.info(f"User updated: {new_user}")
    return new_user

def delete_user(user_id: int) -> bool:
    user = users_ordered.pop(user_id, None)
    if user:
        users_by_email[user.email].remove(user)
        logging.info(f"User deleted: {user}")
        return True
    return False

def create_product(product_id: int, name: str, price: float) -> Product:
    product = Product(product_id, name, price, datetime.now())
    products_ordered[product_id] = product
    products_by_name[name].append(product)
    logging.info(f"Product created: {product}")
    return product

def create_order(order_id: int, user_id: int, product_id: int, quantity: int) -> Order:
    order = Order(order_id, user_id, product_id, quantity, datetime.now())
    orders_ordered[order_id] = order
    orders_by_user[user_id].append(order)
    order_queue.append(order)
    product_order_counter[product_id] += quantity
    user_order_counter[user_id] += 1
    logging.info(f"Order created: {order}")
    return order


def save_data_json(filename: str) -> None:
    data = {
        'users': [u._asdict() for u in users_ordered.values()],
        'products': [p._asdict() for p in products_ordered.values()],
        'orders': [o._asdict() for o in orders_ordered.values()]
    }
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=4, default=str)
    logging.info(f"Data saved to JSON: {filename}")

def save_data_pickle(filename: str) -> None:
    with open(filename, 'wb') as f:
        pickle.dump((users_ordered, products_ordered, orders_ordered), f)
    logging.info(f"Data saved to pickle: {filename}")


#### РЕКОМЕНДАЦИИ ПО ВЫПОЛНЕНИЮ ЗАДАЧ

1. **Начните с простых задач** (1-3) для понимания базовых концепций
2. **Используйте типизацию** - добавляйте type hints для всех функций и переменных
3. **Структурируйте код** - разбивайте задачи на логические функции и классы
4. **Документируйте код** - добавляйте docstrings для всех функций
5. **Тестируйте сериализацию** - убедитесь, что данные корректно сохраняются и загружаются
6. **Обрабатывайте ошибки** - используйте try-except блоки для обработки исключений
7. **Оптимизируйте память** - используйте `sys.getsizeof()` для мониторинга использования памяти
8. **Следуйте принципам** - код должен быть читаемым, понятным и структурированным

**Дополнительные требования:**
- Все объекты должны быть сериализованы в соответствующие форматы
- Код должен содержать комментарии на русском языке
- Функции должны быть небольшими и выполнять одну задачу
- Используйте только те библиотеки и концепции, которые представлены в уроке
