https://mrjob.readthedocs.io/en/latest/guides/quickstart.html

In [None]:
# установить можно с помощью pip или conda
# ! pip install mrjob
# ! conda install mrjob

In [48]:
import os
import re

import numpy as np

- A **mapper** takes a single key and value as input, and returns zero or more (key, value) pairs. The pairs from all map outputs of a single step are grouped by key.

- A **combiner** takes a key and a subset of the values for that key as input and returns zero or more (key, value) pairs. Combiners are optimizations that run immediately after each mapper and can be used to decrease total data transfer. Combiners should be idempotent (produce the same output if run multiple times in the job pipeline).

- A **reducer** takes a key and the complete set of values for that key in the current step, and returns zero or more arbitrary (key, value) pairs as output.

    After the reducer has run, if there are more steps, the individual results are arbitrarily assigned to mappers for further processing. If there are no more steps, the results are sorted and made available for reading.

    
https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf

# Word Count

Давайте поработаем с текстом и чего-нибудь там посчитаем

## Lines, Words, Chars

In [37]:
%%writefile job.py

from mrjob.job import MRJob


class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    MRWordFrequencyCount.run()

Overwriting job.py


```
python3 job.by our_file.txt
```

In [35]:
%pip install mrjob



In [38]:
!python3 job.py names.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/job.root.20250214.114734.738423
Running step 1 of 1...
job output is in /tmp/job.root.20250214.114734.738423/output
Streaming final output from /tmp/job.root.20250214.114734.738423/output...
"chars"	20563
"words"	2410
"lines"	1205
Removing temp directory /tmp/job.root.20250214.114734.738423...


![](img/mrjob_example_1.png)

## Names

Давайте немного усложним задачу и попробуем прикинуть, сколько раз в тексте упоминаются пары Имя Отчество?

Для этого нам надо придумать регулярку

In [39]:
with open('crime-punishment.txt', 'r') as file:
    text = file.read()

In [40]:
import re

name_regex = re.compile('([A-Z][a-z]{3,})\s([A-Z][a-z]{2,}(ich|itch|vna))')

In [41]:
matches = re.finditer(name_regex, text)
for res in matches:
    name = res.group()
    name = re.sub('\s+', ' ', name)
    print(name)

Alyona Ivanovna
Alyona Ivanovna
Alyona Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Amalia Fyodorovna
Katerina Ivanovna
Ivan Ivanitch
Katerina Ivanovna
Katerina Ivanovna
Darya Frantsovna
Katerina Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Darya Frantsovna
Sofya Semyonovna
Amalia Fyodorovna
Darya Frantsovna
Katerina Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Ivan Afanasyvitch
Ivan Afanasyvitch
Katerina Ivanovna
Semyon Zaharovitch
Katerina Ivanovna
Katerina Ivanovna
Amalia Fyodorovna
Semyon Zaharovitch
Semyon Zaharovitch
Semyon Zaharovitch
Katerina Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Praskovya Pavlovna
Vassily Ivanovitch
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Pyotr Petrovitch
Marfa Petrovna
Pyotr Petrovitch
Pyotr Petrovitch
Pyotr Petrovitch
Pyotr Petrovitch
Pyotr Petrovitch
Pyotr 

In [42]:
matches = re.finditer(name_regex, text)
with open("names.txt", "w") as f:
  for m in matches:
    name = m.group()
    name = re.sub('\s+', ' ', name)
    f.write(name + "\n")

with open("names.txt", "r") as f:
    lines = f.readlines()
    print(f"{len(lines)} lines were written to names.txt.")

1205 lines were written to names.txt.


Проверили, что регулярка выдает что-то похожее на правду

Применим к нашей джобе

In [43]:
%%writefile job.py

import re
from mrjob.job import MRJob

PATTERN = re.compile(re.compile('([A-Z][a-z]{3,})\s([A-Z][a-z]{2,}(ich|itch|vna))'))

class MRWordMiddleNameCounts(MRJob):

    def mapper(self, _, line):
        for name in re.finditer(PATTERN, line):
            name = re.sub('\s+', ' ', name.group())
            yield name, 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    MRWordMiddleNameCounts.run()

Overwriting job.py


Аргумент `-l local` позволяет запускать задачу локально не в один поток. Аргумент `-q` подавляет дебажную информацию

In [44]:
!python3 job.py names.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/job.root.20250214.115253.365320
Running step 1 of 1...
job output is in /tmp/job.root.20250214.115253.365320/output
Streaming final output from /tmp/job.root.20250214.115253.365320/output...
"Vassily Ivanovitch"	1
"Afanasy Ivanitch"	1
"Afanasy Ivanovitch"	6
"Afanasy Pavlovitch"	1
"Alexandr Grigorievitch"	2
"Alexey Semyonovitch"	1
"Alyona Ivanovna"	16
"Amalia Fyodorovna"	3
"Amalia Ivanovna"	55
"Amalia Ludwigovna"	8
"Andrey Semyonovitch"	21
"Arkady Ivanovitch"	11
"Avdotya Romanovna"	115
"Darya Frantsovna"	4
"Dmitri Prokofitch"	24
"Ilya Petrovitch"	33
"Ivan Afanasyvitch"	2
"Ivan Ivanitch"	1
"Ivan Mihailovitch"	1
"Katerina Ivanovna"	216
"Pyotr Petrovitch"	173
"Rodion Romanovitch"	86
"Semyon Semyonovitch"	2
"Semyon Zaharovitch"	10
"Sofya Ivanovna"	3
"Sofya Semyonovna"	71
"Lizaveta Ivanovna"	5
"Luise Ivanovna"	8
"Madame Resslich"	8
"Marfa Petrovna"	78
"Nastasya Nikiforovna

![](img/mrjob_example_2.png)

## Most common middle name

Теперь попробуем еще один шаг в работе нашей программы -- подсчет самых популярных

*(ставлю на то, что там будет или форма от Петра, или форма от Ивана)*

Здесь мы используем 2 шага. На первом шаге получаем агрегаты вида (int, Отчество), а на втором с помощью дополнительного редьюсера берем максимум

In [45]:
%%writefile job.py
import re

from mrjob.job import MRJob
from mrjob.step import MRStep

PATTERN = re.compile(re.compile('[A-Z][a-z]{2,}(ich|itch|vna)'))

class MRWordMostPopularMiddleName(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper, combiner=self.combiner, reducer=self.reducer),
            MRStep(reducer=self.most_common_reducer)
        ]

    def mapper(self, _, line):
        for name in re.finditer(PATTERN, line):
            yield name.group(), 1

    def combiner(self, key, values):
        yield key, sum(values)

    def reducer(self, key, values):
        yield None, (sum(values), key)

    def most_common_reducer(self, _, values):
        yield max(values)


if __name__ == '__main__':
    MRWordMostPopularMiddleName.run()

Overwriting job.py


![](img/mrjob_example_3.png)

In [46]:
!python3 job.py names.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/job.root.20250214.115750.987978
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/job.root.20250214.115750.987978/output
Streaming final output from /tmp/job.root.20250214.115750.987978/output...
303	"Ivanovna"
Removing temp directory /tmp/job.root.20250214.115750.987978...


*:)*

# Average of numbers

Сгенерируем себе файл с цифрами для примера. Пусть у нас будут n строчек, в каждой по m чисел

In [49]:
mat = np.random.randint(-5, 255, size=(1337, 42))

with open(os.path.join('digits'), 'w') as file:
    for line in mat:
        file.write(f'{str(line.tolist())[1:-1]}\n')

In [50]:
mat.mean()

124.53981906898885

In [52]:
with open(os.path.join('digits'), 'r') as file:
    mat_text = file.readlines()

In [55]:
%%writefile job.py

from mrjob.job import MRJob

class MRNumbersAverager(MRJob):
    def mapper(self, _, line):
        for number in line.strip().split(','):
            yield 1, int(number)

    def reducer(self, key, values):
        values = list(values)
        yield "avg", sum(values) / len(values)


if __name__ == '__main__':
    MRNumbersAverager.run()

Overwriting job.py


In [56]:
!python3 job.py digits.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/job.root.20250214.120617.765145
Running step 1 of 1...
job output is in /tmp/job.root.20250214.120617.765145/output
Streaming final output from /tmp/job.root.20250214.120617.765145/output...
"avg"	123.99216440502903
Removing temp directory /tmp/job.root.20250214.120617.765145...


## ORM Задачи

Задача 1:
Расширьте базу данных компании, которая уже содержит таблицы departments (отделы) и employees (сотрудники), добавив новую модель Project (проект) и реализовав связь «многие ко многим» между сотрудниками и проектами. Для этого необходимо:

1.	Создать модель Project с полями:

`•	id (INTEGER, PRIMARY KEY, AUTOINCREMENT),`

`•	name (TEXT, UNIQUE, NOT NULL).`

2.	Реализовать связь «многие ко многим» между моделями Employee и Project с помощью вспомогательной таблицы employee_project, которая должна содержать:

`•	employee_id (INTEGER, ForeignKey(employees.id)),`

`•	project_id (INTEGER, ForeignKey(projects.id)).`

3.	Добавить данные:

`•	Создать несколько отделов и сотрудников (например, аналогично предыдущему примеру с отделами HR, IT и Sales).`

`•	Создать как минимум 2 проекта.`

`•	Назначить в каждый проект по 2–3 сотрудника (один сотрудник может участвовать сразу в нескольких проектах).`

4.	Написать запросы:

`•	Вывести список всех проектов с именами сотрудников, участвующих в каждом проекте.`

`•	Вывести список всех сотрудников с перечнем проектов, в которых они участвуют.`


In [73]:
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, CheckConstraint, Table
from sqlalchemy.orm import declarative_base, sessionmaker, relationship

# Создаем базовый класс для декларативного определения моделей
Base = declarative_base()

# Вспомогательная таблица для связи многие ко многим между Employee и Project
employee_project = Table(
    'employee_project',
    Base.metadata,
    Column('employee_id', Integer, ForeignKey('employees.id'), primary_key=True),
    Column('project_id', Integer, ForeignKey('projects.id'), primary_key=True)
)

# Модель отдела
class Department(Base):
    __tablename__ = 'departments'
    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True, nullable=False)
    employees = relationship("Employee", back_populates="department")

# Модель сотрудника
class Employee(Base):
    __tablename__ = 'employees'
    id = Column(Integer, primary_key=True)
    name = Column(String, nullable=False)
    salary = Column(Integer, CheckConstraint("salary > 0"), nullable=False)
    department = relationship("Department", back_populates="employees")
    projects = relationship("Project", secondary=employee_project, back_populates="employees")
    department_id = Column(Integer, ForeignKey('departments.id'))

# Модель проекта
class Project(Base):
    __tablename__ = 'projects'
    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True, nullable=False)

    # Обратная связь многие ко многим с сотрудниками
    employees = relationship("Employee", secondary=employee_project, back_populates="projects")

# Создание подключения к базе данных SQLite
engine = create_engine("sqlite:///company1.db", echo=False)
Base.metadata.create_all(engine)

Session = sessionmaker(bind=engine)
session = Session()

# Если таблица отделов пуста, добавляем данные
if not session.query(Department).first():
    # Создаем отделы с сотрудниками
    hr = Department(name="HR", employees=[
        Employee(name="Alice", salary=5000),
        Employee(name="Bob", salary=4500)
    ])
    it = Department(name="IT", employees=[
        Employee(name="Charlie", salary=6000),
        Employee(name="David", salary=7000)
    ])
    sales = Department(name="Sales", employees=[
        Employee(name="Eve", salary=4000),
        Employee(name="Frank", salary=3000)
    ])
    session.add_all([hr, it, sales])
    session.commit()

# Создаем проекты, если их еще нет
if not session.query(Project).first():
    project_alpha = Project(name="Alpha")
    project_beta = Project(name="Beta")

    # Назначаем сотрудников на проекты
    # Например, берем сотрудников из отделов по именам (для простоты выборки)
    alice = session.query(Employee).filter_by(name="Alice").first()
    bob = session.query(Employee).filter_by(name="Bob").first()
    charlie = session.query(Employee).filter_by(name="Charlie").first()
    david = session.query(Employee).filter_by(name="David").first()
    eve = session.query(Employee).filter_by(name="Eve").first()
    frank = session.query(Employee).filter_by(name="Frank").first()

    # Назначаем сотрудников на проект Alpha
    project_alpha.employees.extend([alice, charlie, eve])

    # Назначаем сотрудников на проект Beta
    project_beta.employees.extend([bob, david, frank])

    session.add_all([project_alpha, project_beta])
    session.commit()


# --- Запросы ---

print("Список проектов с сотрудниками:")

for project in session.query(Project).all():
    print(f"Проект: {project.name}", end="")
    for employee in project.employees:
        print(f" {employee.name}", end="")
    print()

print("\nСписок сотрудников с проектами:")
for employee in session.query(Employee).all():
    print(f"Сотрудник: {employee.name}", end="")
    for project in employee.projects:
        print(f" {project.name}", end="")
    print()

session.close()

Список проектов с сотрудниками:
Проект: Alpha Eve Alice Charlie
Проект: Beta David Frank Bob

Список сотрудников с проектами:
Сотрудник: Alice Alpha
Сотрудник: Bob Beta
Сотрудник: Charlie Alpha
Сотрудник: David Beta
Сотрудник: Eve Alpha
Сотрудник: Frank Beta


### Задача 2. Ранжирование сотрудников по зарплате в отделе

Условие:

Напишите запрос, который для каждого сотрудника выводит его имя, зарплату и ранг внутри его отдела по убыванию зарплаты. Используйте оконную функцию RANK() или ROW_NUMBER(), разбивая данные по отделам.

Подсказка:

`•	Примените функцию over(partition_by=Department.name, order_by=Employee.salary.desc()).`

`•	Запрос должен вернуть столбцы: имя сотрудника, зарплата, название отдела, ранг сотрудника в отделе.`


In [76]:
from sqlalchemy import func

e_rank = func.rank().over(
    partition_by=Department.name,
    order_by=Employee.salary.desc()
).label("rank")

query = session.query(
    Employee.name,
    Employee.salary,
    Department.name.label("department_name"),
    e_rank
).join(Department)

for e in query:
    print(e)

('Alice', 5000, 'HR', 1)
('Bob', 4500, 'HR', 2)
('David', 7000, 'IT', 1)
('Charlie', 6000, 'IT', 2)
('Eve', 4000, 'Sales', 1)
('Frank', 3000, 'Sales', 2)


### Задача 3. Кумулятивная сумма зарплат в отделе

Условие:

Для каждого отдела рассчитайте кумулятивную сумму зарплат сотрудников, упорядочив их по зарплате (например, от меньшей к большей). Запрос должен выводить:

`•	Название отдела, имя сотрудника, зарплата, кумулятивная сумма зарплат до текущего сотрудника (включительно).`

Подсказка:
	•	Используйте оконную функцию func.sum(Employee.salary).over(...) с параметрами partition_by и order_by.

In [77]:
csumm = func.sum(Employee.salary).over(
    partition_by=Department.name,
    order_by=Employee.salary
).label("csumm")

query = session.query(
    Department.name.label("department_name"),
    Employee.name,
    Employee.salary,
    csumm
).join(Department)

query = query.order_by(Employee.salary)

for q in query:
    print(q)

('Sales', 'Frank', 3000, 3000)
('Sales', 'Eve', 4000, 7000)
('HR', 'Bob', 4500, 4500)
('HR', 'Alice', 5000, 9500)
('IT', 'Charlie', 6000, 6000)
('IT', 'David', 7000, 13000)


### Задача 4. Разница между зарплатой сотрудника и средней зарплатой отдела

Условие:

Напишите запрос, который для каждого сотрудника вычисляет среднюю зарплату по его отделу (с использованием оконной функции) и выводит разницу между зарплатой сотрудника и этой средней зарплатой.

Подсказка:
`•	Используйте оконную функцию func.avg(Employee.salary).over(partition_by=Employee.department_id) для вычисления средней зарплаты в отделе.`

`•	Вычислите разницу как Employee.salary - avg_salary.`

In [78]:
avg = func.avg(Employee.salary).over(
    partition_by=Employee.department_id
).label("avg")

salary_diff = (Employee.salary - avg).label("salary_diff")

query = session.query(
    Employee.name,
    Employee.salary,
    Employee.department_id,
    avg,
    salary_diff
)

for q in query:
    print(q)

('Alice', 5000, 1, 4750.0, 250.0)
('Bob', 4500, 1, 4750.0, -250.0)
('Charlie', 6000, 2, 6500.0, -500.0)
('David', 7000, 2, 6500.0, 500.0)
('Eve', 4000, 3, 3500.0, 500.0)
('Frank', 3000, 3, 3500.0, -500.0)


### Задача 6. Выборка топ сотрудников по зарплате в каждом отделе

Условие:

Напишите запрос, который для каждого отдела возвращает одного сотрудника с наивысшей зарплатой. Для этого можно использовать оконную функцию ROW_NUMBER() или MAX() для нумерации сотрудников в каждом отделе.

Подсказка:

`•	Создайте подзапрос, в котором вычисляется ROW_NUMBER() OVER(PARTITION BY Employee.department_id ORDER BY Employee.salary DESC) и затем отфильтруйте результат.`

In [80]:
from sqlalchemy import func, and_

max_salary_subq = session.query(
    Employee.department_id,
    func.max(Employee.salary).label("max_salary")
).group_by(Employee.department_id).subquery()

query = session.query(
    Department.name.label("department_name"),
    Employee.name.label("employee_name"),
    Employee.salary
).join(Department).join(
    max_salary_subq,
    and_(
        Employee.department_id == max_salary_subq.c.department_id,
        Employee.salary == max_salary_subq.c.max_salary
    )
)

for q in query:
    print(q)

('HR', 'Alice', 5000)
('IT', 'David', 7000)
('Sales', 'Eve', 4000)
