In [87]:
import pandas as pd
import numpy as np
import csv
import statistics
import matplotlib.pyplot as plt
from collections import defaultdict
import numpy as np
import time
from scipy.stats import linregress, t, sem, norm

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Данные из таблиц документа "6401_ЛР2_моя"

# Данные для OpenMP без параметра Q (Таблица 2.1.1)
threads = [5, 10, 15]

# Время OpenMP без Q (ПК)
t_s_omp_pc_noq = [0.0454, 0.0406, 0.0381]
t_c_omp_pc_noq = [1.0997, 1.7523, 1.7870]
t_a_omp_pc_noq = [0.2822, 0.3928, 0.4555]
t_r_omp_pc_noq = [0.0129, 0.0057, 0.0036]

# Время OpenMP без Q (Кластер)
t_s_omp_cluster_noq = [0.0393, 0.0395, 0.0435]
t_c_omp_cluster_noq = [2.5835, 2.3341, 2.8035]
t_a_omp_cluster_noq = [0.6024, 0.6535, 1.1655]
t_r_omp_cluster_noq = [0.0087, 0.0059, 0.0031]

# Время MPI без Q (Таблица 2.1.2)
# ПК
t_s_mpi_pc_noq = [0.0145, 0.0192, 0.0250]
t_B_mpi_pc_noq = [0.2354, 0.6791, 1.4973]
t_pp_mpi_pc_noq = [0.0119, 0.0226, 0.0310]
t_R_mpi_pc_noq = [0.0113, 0.0144, 0.0258]

# Кластер
t_s_mpi_cluster_noq = [0.0160, 0.0164, 0.03548]
t_B_mpi_cluster_noq = [0.1449, 0.1622, 0.0895]
t_pp_mpi_cluster_noq = [0.0065, 0.0054, 0.0027]
t_R_mpi_cluster_noq = [0.0064, 0.0052, 0.0024]

# Ускорение OpenMP без Q (Таблица 2.1.3)
# ПК
a_c_omp_pc_noq = [0.0412, 0.0231, 0.0212]
a_a_omp_pc_noq = [0.1597, 0.1024, 0.0816]
a_r_omp_pc_noq = [2.968, 4.2236, 2.5315]

# Кластер
a_c_omp_cluster_noq = [0.0152, 0.0169, 0.0155]
a_a_omp_cluster_noq = [0.0654, 0.0602, 0.0372]
a_r_omp_cluster_noq = [4.2988, 5.2658, 8.8879]

# Ускорение MPI без Q (Таблица 2.1.4)
# ПК
a_R_mpi_pc_noq = [1.2827, 1.3271, 0.9662]
a_pp_mpi_pc_noq = [1.2221, 0.8617, 0.8050]
a_RB_mpi_pc_noq = [0.0590, 0.0276, 0.0164]
a_ppB_mpi_pc_noq = [0.0588, 0.0273, 0.0163]

# Кластер
a_R_mpi_cluster_noq = [2.4771, 3.1311, 5.9636]
a_pp_mpi_cluster_noq = [2.4674, 2.9966, 4.4137]
a_RB_mpi_cluster_noq = [0.1058, 0.0978, 0.0545]
a_ppB_mpi_cluster_noq = [0.1058, 0.0977, 0.0544]

# Данные с параметром Q (Таблицы 2.2.1-2.2.4)
# Время OpenMP с Q (Таблица 2.2.1)
# ПК
t_s_omp_pc_q = [0.8273, 0.8125, 0.8791]
t_c_omp_pc_q = [26.4336, 34.9785, 41.5532]
t_a_omp_pc_q = [6.4597, 12.0559, 11.2887]
t_r_omp_pc_q = [0.1540, 0.0959, 0.0743]

# Кластер
t_s_omp_cluster_q = [0.5400, 0.5915, 0.5488]
t_c_omp_cluster_q = [57.3129, 57.4019, 66.8887]
t_a_omp_cluster_q = [13.3903, 15.4791, 27.5552]
t_r_omp_cluster_q = [0.1204, 0.0607, 0.0646]

# Время MPI с Q (Таблица 2.2.2)
# ПК
t_s_mpi_pc_q = [0.2249, 0.2196, 0.1840]
t_B_mpi_pc_q = [0.2249, 0.6627, 1.3002]
t_pp_mpi_pc_q = [0.1481, 0.1407, 0.0604]
t_R_mpi_pc_q = [0.0502, 0.0426, 0.0475]

# Кластер
t_s_mpi_cluster_q = [0.0391, 0.0394, 0.0403]
t_B_mpi_cluster_q = [0.2094, 0.1615, 0.3839]
t_pp_mpi_cluster_q = [0.0127, 0.0067, 0.0050]
t_R_mpi_cluster_q = [0.0126, 0.0065, 0.0048]

# Ускорение OpenMP с Q (Таблица 2.2.3)
# ПК
a_c_omp_pc_q = [0.0313, 0.0232, 0.0212]
a_a_omp_pc_q = [0.1281, 0.0674, 0.0779]
a_r_omp_pc_q = [5.3432, 8.1315, 10.3389]

# Кластер
a_c_omp_cluster_q = [0.0094, 0.0103, 0.0082]
a_a_omp_cluster_q = [0.0403, 0.3821, 0.0199]  # Возможно ошибка в данных для 10 потоков
a_r_omp_cluster_q = [4.4613, 9.2436, 8.4901]

# Ускорение MPI с Q (Таблица 2.2.4)
# ПК
a_R_mpi_pc_q = [4.4816, 5.1587, 3.8763]
a_pp_mpi_pc_q = [1.5182, 1.5615, 3.0460]
a_RB_mpi_pc_q = [0.8173, 0.3114, 0.1365]
a_ppB_mpi_pc_q = [0.6027, 0.2734, 0.1352]

# Кластер
a_R_mpi_cluster_q = [3.0953, 6.0368, 8.4863]
a_pp_mpi_cluster_q = [3.0783, 5.8608, 7.9876]
a_RB_mpi_cluster_q = [0.1761, 0.2347, 0.1037]
a_ppB_mpi_cluster_q = [0.1760, 0.2344, 0.1036]

# Создание графиков
plt.rcParams['font.size'] = 10
plt.rcParams['figure.figsize'] = [12, 8]

# Рисунок 1: Время работы без параметра Q для OpenMP и MPI
fig1, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(14, 10))

# OpenMP на ПК
ax1.plot(threads, t_s_omp_pc_noq, 'b-', marker='o', label='Последовательная', linewidth=2)
ax1.plot(threads, t_c_omp_pc_noq, 'r-', marker='s', label='Critical', linewidth=2)
ax1.plot(threads, t_a_omp_pc_noq, 'g-', marker='^', label='Atomic', linewidth=2)
ax1.plot(threads, t_r_omp_pc_noq, 'm-', marker='d', label='Reduction', linewidth=2)
ax1.set_xlabel('Количество потоков')
ax1.set_ylabel('Время (сек)')
ax1.set_title('OpenMP: Время выполнения (ПК, без Q)')
ax1.legend()
ax1.grid(True, alpha=0.3)

# OpenMP на кластере
ax2.plot(threads, t_s_omp_cluster_noq, 'b-', marker='o', label='Последовательная', linewidth=2)
ax2.plot(threads, t_c_omp_cluster_noq, 'r-', marker='s', label='Critical', linewidth=2)
ax2.plot(threads, t_a_omp_cluster_noq, 'g-', marker='^', label='Atomic', linewidth=2)
ax2.plot(threads, t_r_omp_cluster_noq, 'm-', marker='d', label='Reduction', linewidth=2)
ax2.set_xlabel('Количество потоков')
ax2.set_ylabel('Время (сек)')
ax2.set_title('OpenMP: Время выполнения (Кластер, без Q)')
ax2.legend()
ax2.grid(True, alpha=0.3)

# MPI на ПК
ax3.plot(threads, t_s_mpi_pc_noq, 'b-', marker='o', label='Последовательная', linewidth=2)
ax3.plot(threads, t_pp_mpi_pc_noq, 'c-', marker='v', label='Точка-точка', linewidth=2)
ax3.plot(threads, t_R_mpi_pc_noq, 'y-', marker='*', label='Reduce', linewidth=2)
ax3.plot(threads, t_B_mpi_pc_noq, 'k-', marker='x', label='Broadcast', linewidth=2)
ax3.set_xlabel('Количество процессов')
ax3.set_ylabel('Время (сек)')
ax3.set_title('MPI: Время выполнения (ПК, без Q)')
ax3.legend()
ax3.grid(True, alpha=0.3)

# MPI на кластере
ax4.plot(threads, t_s_mpi_cluster_noq, 'b-', marker='o', label='Последовательная', linewidth=2)
ax4.plot(threads, t_pp_mpi_cluster_noq, 'c-', marker='v', label='Точка-точка', linewidth=2)
ax4.plot(threads, t_R_mpi_cluster_noq, 'y-', marker='*', label='Reduce', linewidth=2)
ax4.plot(threads, t_B_mpi_cluster_noq, 'k-', marker='x', label='Broadcast', linewidth=2)
ax4.set_xlabel('Количество процессов')
ax4.set_ylabel('Время (сек)')
ax4.set_title('MPI: Время выполнения (Кластер, без Q)')
ax4.legend()
ax4.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('Рисунок_1_Время_без_Q.png', dpi=300, bbox_inches='tight')
plt.show()

# Рисунок 2: Ускорение программ без параметра Q для OpenMP и MPI
fig2, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(14, 10))

# OpenMP на ПК
ax1.plot(threads, a_c_omp_pc_noq, 'r-', marker='s', label='Critical', linewidth=2)
ax1.plot(threads, a_a_omp_pc_noq, 'g-', marker='^', label='Atomic', linewidth=2)
ax1.plot(threads, a_r_omp_pc_noq, 'm-', marker='d', label='Reduction', linewidth=2)
ax1.axhline(y=1, color='k', linestyle='--', alpha=0.5, label='Порог ускорения')
ax1.set_xlabel('Количество потоков')
ax1.set_ylabel('Ускорение')
ax1.set_title('OpenMP: Ускорение (ПК, без Q)')
ax1.legend()
ax1.grid(True, alpha=0.3)

# OpenMP на кластере
ax2.plot(threads, a_c_omp_cluster_noq, 'r-', marker='s', label='Critical', linewidth=2)
ax2.plot(threads, a_a_omp_cluster_noq, 'g-', marker='^', label='Atomic', linewidth=2)
ax2.plot(threads, a_r_omp_cluster_noq, 'm-', marker='d', label='Reduction', linewidth=2)
ax2.axhline(y=1, color='k', linestyle='--', alpha=0.5, label='Порог ускорения')
ax2.set_xlabel('Количество потоков')
ax2.set_ylabel('Ускорение')
ax2.set_title('OpenMP: Ускорение (Кластер, без Q)')
ax2.legend()
ax2.grid(True, alpha=0.3)

# MPI на ПК
ax3.plot(threads, a_R_mpi_pc_noq, 'y-', marker='*', label='Reduce (без рассылки)', linewidth=2)
ax3.plot(threads, a_pp_mpi_pc_noq, 'c-', marker='v', label='Точка-точка (без рассылки)', linewidth=2)
ax3.plot(threads, a_RB_mpi_pc_noq, 'y--', marker='*', label='Reduce (с рассылкой)', linewidth=2)
ax3.plot(threads, a_ppB_mpi_pc_noq, 'c--', marker='v', label='Точка-точка (с рассылкой)', linewidth=2)
ax3.axhline(y=1, color='k', linestyle='--', alpha=0.5, label='Порог ускорения')
ax3.set_xlabel('Количество процессов')
ax3.set_ylabel('Ускорение')
ax3.set_title('MPI: Ускорение (ПК, без Q)')
ax3.legend()
ax3.grid(True, alpha=0.3)

# MPI на кластере
ax4.plot(threads, a_R_mpi_cluster_noq, 'y-', marker='*', label='Reduce (без рассылки)', linewidth=2)
ax4.plot(threads, a_pp_mpi_cluster_noq, 'c-', marker='v', label='Точка-точка (без рассылки)', linewidth=2)
ax4.plot(threads, a_RB_mpi_cluster_noq, 'y--', marker='*', label='Reduce (с рассылкой)', linewidth=2)
ax4.plot(threads, a_ppB_mpi_cluster_noq, 'c--', marker='v', label='Точка-точка (с рассылкой)', linewidth=2)
ax4.axhline(y=1, color='k', linestyle='--', alpha=0.5, label='Порог ускорения')
ax4.set_xlabel('Количество процессов')
ax4.set_ylabel('Ускорение')
ax4.set_title('MPI: Ускорение (Кластер, без Q)')
ax4.legend()
ax4.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('Рисунок_2_Ускорение_без_Q.png', dpi=300, bbox_inches='tight')
plt.show()

# Рисунок 3: Время работы с параметром Q для OpenMP и MPI
fig3, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(14, 10))

# OpenMP на ПК
ax1.plot(threads, t_s_omp_pc_q, 'b-', marker='o', label='Последовательная', linewidth=2)
ax1.plot(threads, t_c_omp_pc_q, 'r-', marker='s', label='Critical', linewidth=2)
ax1.plot(threads, t_a_omp_pc_q, 'g-', marker='^', label='Atomic', linewidth=2)
ax1.plot(threads, t_r_omp_pc_q, 'm-', marker='d', label='Reduction', linewidth=2)
ax1.set_xlabel('Количество потоков')
ax1.set_ylabel('Время (сек)')
ax1.set_title('OpenMP: Время выполнения (ПК, с Q)')
ax1.legend()
ax1.grid(True, alpha=0.3)

# OpenMP на кластере
ax2.plot(threads, t_s_omp_cluster_q, 'b-', marker='o', label='Последовательная', linewidth=2)
ax2.plot(threads, t_c_omp_cluster_q, 'r-', marker='s', label='Critical', linewidth=2)
ax2.plot(threads, t_a_omp_cluster_q, 'g-', marker='^', label='Atomic', linewidth=2)
ax2.plot(threads, t_r_omp_cluster_q, 'm-', marker='d', label='Reduction', linewidth=2)
ax2.set_xlabel('Количество потоков')
ax2.set_ylabel('Время (сек)')
ax2.set_title('OpenMP: Время выполнения (Кластер, с Q)')
ax2.legend()
ax2.grid(True, alpha=0.3)

# MPI на ПК
ax3.plot(threads, t_s_mpi_pc_q, 'b-', marker='o', label='Последовательная', linewidth=2)
ax3.plot(threads, t_pp_mpi_pc_q, 'c-', marker='v', label='Точка-точка', linewidth=2)
ax3.plot(threads, t_R_mpi_pc_q, 'y-', marker='*', label='Reduce', linewidth=2)
ax3.plot(threads, t_B_mpi_pc_q, 'k-', marker='x', label='Broadcast', linewidth=2)
ax3.set_xlabel('Количество процессов')
ax3.set_ylabel('Время (сек)')
ax3.set_title('MPI: Время выполнения (ПК, с Q)')
ax3.legend()
ax3.grid(True, alpha=0.3)

# MPI на кластере
ax4.plot(threads, t_s_mpi_cluster_q, 'b-', marker='o', label='Последовательная', linewidth=2)
ax4.plot(threads, t_pp_mpi_cluster_q, 'c-', marker='v', label='Точка-точка', linewidth=2)
ax4.plot(threads, t_R_mpi_cluster_q, 'y-', marker='*', label='Reduce', linewidth=2)
ax4.plot(threads, t_B_mpi_cluster_q, 'k-', marker='x', label='Broadcast', linewidth=2)
ax4.set_xlabel('Количество процессов')
ax4.set_ylabel('Время (сек)')
ax4.set_title('MPI: Время выполнения (Кластер, с Q)')
ax4.legend()
ax4.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('Рисунок_3_Время_с_Q.png', dpi=300, bbox_inches='tight')
plt.show()

# Рисунок 4: Ускорение программ с параметром Q для OpenMP и MPI
fig4, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(14, 10))

# OpenMP на ПК
ax1.plot(threads, a_c_omp_pc_q, 'r-', marker='s', label='Critical', linewidth=2)
ax1.plot(threads, a_a_omp_pc_q, 'g-', marker='^', label='Atomic', linewidth=2)
ax1.plot(threads, a_r_omp_pc_q, 'm-', marker='d', label='Reduction', linewidth=2)
ax1.axhline(y=1, color='k', linestyle='--', alpha=0.5, label='Порог ускорения')
ax1.set_xlabel('Количество потоков')
ax1.set_ylabel('Ускорение')
ax1.set_title('OpenMP: Ускорение (ПК, с Q)')
ax1.legend()
ax1.grid(True, alpha=0.3)

# OpenMP на кластере
ax2.plot(threads, a_c_omp_cluster_q, 'r-', marker='s', label='Critical', linewidth=2)
ax2.plot(threads, a_a_omp_cluster_q, 'g-', marker='^', label='Atomic', linewidth=2)
ax2.plot(threads, a_r_omp_cluster_q, 'm-', marker='d', label='Reduction', linewidth=2)
ax2.axhline(y=1, color='k', linestyle='--', alpha=0.5, label='Порог ускорения')
ax2.set_xlabel('Количество потоков')
ax2.set_ylabel('Ускорение')
ax2.set_title('OpenMP: Ускорение (Кластер, с Q)')
ax2.legend()
ax2.grid(True, alpha=0.3)

# MPI на ПК
ax3.plot(threads, a_R_mpi_pc_q, 'y-', marker='*', label='Reduce (без рассылки)', linewidth=2)
ax3.plot(threads, a_pp_mpi_pc_q, 'c-', marker='v', label='Точка-точка (без рассылки)', linewidth=2)
ax3.plot(threads, a_RB_mpi_pc_q, 'y--', marker='*', label='Reduce (с рассылкой)', linewidth=2)
ax3.plot(threads, a_ppB_mpi_pc_q, 'c--', marker='v', label='Точка-точка (с рассылкой)', linewidth=2)
ax3.axhline(y=1, color='k', linestyle='--', alpha=0.5, label='Порог ускорения')
ax3.set_xlabel('Количество процессов')
ax3.set_ylabel('Ускорение')
ax3.set_title('MPI: Ускорение (ПК, с Q)')
ax3.legend()
ax3.grid(True, alpha=0.3)

# MPI на кластере
ax4.plot(threads, a_R_mpi_cluster_q, 'y-', marker='*', label='Reduce (без рассылки)', linewidth=2)
ax4.plot(threads, a_pp_mpi_cluster_q, 'c-', marker='v', label='Точка-точка (без рассылки)', linewidth=2)
ax4.plot(threads, a_RB_mpi_cluster_q, 'y--', marker='*', label='Reduce (с рассылкой)', linewidth=2)
ax4.plot(threads, a_ppB_mpi_cluster_q, 'c--', marker='v', label='Точка-точка (с рассылкой)', linewidth=2)
ax4.axhline(y=1, color='k', linestyle='--', alpha=0.5, label='Порог ускорения')
ax4.set_xlabel('Количество процессов')
ax4.set_ylabel('Ускорение')
ax4.set_title('MPI: Ускорение (Кластер, с Q)')
ax4.legend()
ax4.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('Рисунок_4_Ускорение_с_Q.png', dpi=300, bbox_inches='tight')
plt.show()

print("Все графики сохранены в файлы:")
print("- Рисунок_1_Время_без_Q.png")
print("- Рисунок_2_Ускорение_без_Q.png") 
print("- Рисунок_3_Время_с_Q.png")
print("- Рисунок_4_Ускорение_с_Q.png")

Задание:
1. 3 штата с наибольшим и 3 с наименьшим средним темпом создания рабочих
мест (Net Job Creation Rate)
2. 3 штата с наиболее стабильным рынком труда и 3 с наиболее турбулентным
– по величине разброса показателя (Reallocation Rate)
3. Динамика темпа закрытия рабочих мест (Job Destruction Rate) для наиболее
нестабильного штата за все время наблюдений

Доп. задание:
4. Корреляцие между темпом создания рабочих мест (Job Creation Rate) и
закрытием рабочих мест (Job Destruction Rate)

In [88]:
FILENAME = '../data/business_dynamics.csv'

In [89]:
orig_data = pd.read_csv('../data/business_dynamics.csv', encoding='utf-8', sep=',')


In [None]:

def read_csv_file_generator(filename):
    """Генератор для чтения CSV файла (стандартная библиотека)"""
    with open(filename, 'r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        for row in reader:
            yield row

def read_csv_pandas_generator(filename):
    """Генератор для чтения CSV файла через Pandas"""
    chunk_iterator = pd.read_csv(filename, chunksize=5000)
    for chunk in chunk_iterator:
        yield chunk

def read_pandas(filename):
    """Генератор для чтения CSV файла через Pandas"""
    df = pd.read_csv(filename)
    return df


def benchmark_read_methods(filename):
    """Сравнение скорости чтения разными методами"""
    print("Бенчмарк методов чтения...")
    
    start_time = time.time()
    count_std = 0
    for row in read_csv_file_generator(filename):
        count_std += 1
    time_std = time.time() - start_time
    
    start_time = time.time()
    count_pandas = 0
    for row in read_csv_pandas_generator(filename):
        count_pandas += 1
    time_pandas = time.time() - start_time
    start = time.time()
    df = read_pandas(FILENAME)
    pd_time = time.time() - start
    print(f"Стандартный CSV: {time_std:.3f} сек, {count_std} строк")
    print(f"Pandas chunk: {time_pandas:.3f} сек, {count_pandas} строк")
    print(f"Pandas: {pd_time:.3f} сек, {count_pandas} строк")



In [91]:
# benchmark_read_methods('../data/test.csv')

In [92]:
orig_data.columns

Index(['State', 'Year', 'Data.DHS Denominator', 'Data.Number of Firms',
       'Data.Calculated.Net Job Creation',
       'Data.Calculated.Net Job Creation Rate',
       'Data.Calculated.Reallocation Rate', 'Data.Establishments.Entered',
       'Data.Establishments.Entered Rate', 'Data.Establishments.Exited',
       'Data.Establishments.Exited Rate',
       'Data.Establishments.Physical Locations', 'Data.Firm Exits.Count',
       'Data.Firm Exits.Establishment Exit', 'Data.Firm Exits.Employments',
       'Data.Job Creation.Births', 'Data.Job Creation.Continuers',
       'Data.Job Creation.Count', 'Data.Job Creation.Rate',
       'Data.Job Creation.Rate/Births', 'Data.Job Destruction.Continuers',
       'Data.Job Destruction.Count', 'Data.Job Destruction.Deaths',
       'Data.Job Destruction.Rate', 'Data.Job Destruction.Rate/Deaths'],
      dtype='object')

In [None]:
df = orig_data[['State', 'Year', 'Data.Calculated.Net Job Creation Rate', 'Data.Calculated.Reallocation Rate', 'Data.Job Destruction.Rate', 'Data.Job Creation.Rate']]
df.rename({'Data.Calculated.Net Job Creation Rate':'net_job_creation_rate', 
           'Data.Calculated.Reallocation Rate':'job_reallocation_rate', 
           'Data.Job Destruction.Rate':'job_destruction_rate', 
           'Data.Job Creation.Rate':'job_creation_rate'},
           axis=1,
           inplace=True)
df.head()

In [106]:
d = {'a':2, 'b':3}
print(d['a'])

2


In [111]:
def read_csv_file(filename, col_name, default_col_name):
    with open(filename, 'r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        for row in reader:
            value = row[col_name]
            # if state==col_name:
            #     value = row[col_name]
            yield value

def extract_data(rows, col_name, default_col_name='State'):
    for row in rows:
        col = row[default_col_name]
        data = float(row[col_name])
        yield col, data

def extract_job_destruction_data(rows, target_state, col_name):
    """Генератор для извлечения данных о Job Destruction Rate для конкретного штата"""
    for row in rows:
        state = row['State']
        year = int(row['Year'])
        job_destruction = float(row[col_name])
        if state == target_state:
            yield year, job_destruction

def aggregate_by_function(rows, aggregation_function):
    """Генератор для агрегации данных"""
    aggregated_data = defaultdict(list)
    
    df['Data.Calculated.Net Job Creation Rate'].mean()
    for col1, col2 in rows:
        df[col1].mean(col2)
        aggregated_data[col1].append(col2)
    
    for key, value in aggregated_data.items():
        aggregated_value = aggregation_function(value)
        yield key, aggregated_value

def task1_pipeline(filename):
    """Пайплайн для задачи 1: Net Job Creation Rate"""
    csv_reader = read_csv_file(filename, col_name='Data.Calculated.Net Job Creation Rate', default_col_name='State')
    print(list(csv_reader)[:5])
    data_extractor = extract_data(csv_reader, col_name='Data.Calculated.Net Job Creation Rate')
    
    data_aggregator = aggregate_by_function(data_extractor, aggregation_function=np.mean)
    
    all_data = list(data_aggregator)
    
    # cорт по среднему значению
    sorted_states = sorted(all_data, key=lambda x: x[1])
    
    worst_3 = sorted_states[:3]
    best_3 = sorted_states[-3:]
    
    return worst_3, best_3

def calculate_confidence_interval(data, confidence=0.95):
    sample_std = np.std(data, ddof=1)
    mean = np.mean(data)
    n = len(data)

    sem = sample_std / np.sqrt(n)
    confidence_interval = norm.interval(confidence, loc=mean, scale=sem)
    return confidence_interval

def task2_pipeline(filename):
    """Пайплайн для задачи 2: Reallocation Rate"""
    csv_reader = read_csv_file(filename)
    
    data_extractor = extract_data(csv_reader, col_name='Data.Calculated.Reallocation Rate')
    
    data_aggregator = aggregate_by_function(data_extractor, aggregation_function=np.std)
    
    all_data = list(data_aggregator)
    
    # сорт по стандартному отклонению
    sorted_states = sorted(all_data, key=lambda x: x[1])
    
    # наиболее стабильные и нестабильные штаты
    stable_3 = sorted_states[:3]
    unstable_3 = sorted_states[-3:]
    
    return stable_3, unstable_3

def get_state_data(filename, states):
    # print('states', states)
    csv_reader = read_csv_file(filename)
    state_data = defaultdict(list)
    data_extractor = extract_data(csv_reader, col_name='Data.Calculated.Reallocation Rate')
    for state, value in data_extractor:
        if state in states:
            state_data[state].append(value)
    
    return state_data

def task3_pipeline(filename, unstable_state):
    """Пайплайн для задачи 3: Job Destruction Rate для нестабильного штата"""
    csv_reader = read_csv_file(filename)
    
    # решил не портить функцию для остальных тасок
    data_extractor = extract_job_destruction_data(csv_reader, unstable_state, col_name='Data.Job Destruction.Rate')
    
    # здесь по годам
    data_aggregator = aggregate_by_function(data_extractor, aggregation_function=np.mean)
    
    return list(data_aggregator)

def task4_pipeline(filename):
    """Пайплайн для задачи 4: Корреляция между Job Creation и Job Destruction"""
    csv_reader1 =read_csv_file(filename)
    csv_reader2 =read_csv_file(filename)

    data_extractor_creation = extract_data(csv_reader1, col_name='Data.Job Creation.Rate')
    creation_data = list(data_extractor_creation)

    data_extractor_destruction = extract_data(csv_reader2, col_name='Data.Job Destruction.Rate')
    destruction_data = list(data_extractor_destruction)

    job_creation_rates = [x[1] for x in creation_data]
    job_destruction_rates = [x[1] for x in destruction_data]
    
    correlation = np.corrcoef(job_creation_rates, job_destruction_rates)[0, 1]
    
    return job_creation_rates, job_destruction_rates, correlation

def plot_results(task1_results, task2_results, task3_results, task4_results):
    """Функция для построения всех графиков"""
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
    
    # задача 1: Bar plot для Net Job Creation Rate
    worst_states, best_states = task1_results
    all_states = worst_states + best_states
    states = [x[0] for x in all_states]
    rates = [x[1] for x in all_states]
    colors = ['red'] * 3 + ['green'] * 3
    
    ax1.bar(states, rates, color=colors)
    ax1.set_title('Задание 1. 3 штата с наибольшим и 3 с наименьшим средним темпом \nсоздания рабочих мест (Net Job Creation Rate)')
    ax1.set_ylabel('Net Job Creation Rate')
    ax1.tick_params(axis='x', rotation=45)
    
    # Задача 2: boxplot для стабильных и нестабильных
    stable_states, turbulent_states = task2_results
    all_states_task2 = stable_states + turbulent_states
    states_task2 = [x[0] for x in all_states_task2]

    state_data = get_state_data(FILENAME, states_task2)
    for state, data in state_data.items():
        print(f"State {state}:, confidence interval: {calculate_confidence_interval(data)}")

    data_for_boxplot = [state_data[state] for state in states_task2]
    
    ax2.boxplot(data_for_boxplot, labels=states_task2)

    ax2.set_title('Задание 2: 3 штата с наиболее стабильным рынком труда и 3 с наиболее турбулентным \n– по величине разброса показателя (Reallocation Rate)')
    ax2.set_ylabel('Reallocation Rate')

    # Задача 3 line plot
    years = [x[0] for x in task3_results]
    destruction_rates = [x[1] for x in task3_results]
    
    ax3.plot(years, destruction_rates, marker='o')
    ax3.set_title('Задание 3: Динамика Job Destruction Rate\nдля наиболее нестабильного штата')
    ax3.set_xlabel('Год')
    ax3.set_ylabel('Job Destruction Rate')
    ax3.grid(True)
    
    # Задача 4 scatter plot и корреляция
    job_creation, job_destruction, correlation = task4_results
    
    ax4.scatter(job_creation, job_destruction)
    ax4.set_title(f'Задание 4: Корреляция между Job Creation и Job Destruction\n(corr = {correlation:.3f})')
    ax4.set_xlabel('Job Creation Rate')
    ax4.set_ylabel('Job Destruction Rate')
    
    slope, intercept, _, _, _ = linregress(job_creation, job_destruction)
    x_line = np.array([min(job_creation), max(job_creation)])
    y_line = intercept + slope * x_line
    ax4.plot(x_line, y_line, 'r-', label=f'регресси')
    ax4.legend()
    
    plt.tight_layout()
    plt.show()

In [None]:
task1_results = task1_pipeline(FILENAME)
task1_results

In [None]:
task2_results = task2_pipeline(FILENAME)
task2_results

([('Pennsylvania', 2.749426433506798),
  ('Kentucky', 2.870258626083507),
  ('Kansas', 2.9286334073661773)],
 [('West Virginia', 4.993452481232994),
  ('North Dakota', 5.347206331817277),
  ('Alaska', 8.974686245106087)])

In [97]:
unstable_state, value = task2_results[1][-1]
task3_results = task3_pipeline(FILENAME, unstable_state=unstable_state)
task3_results[:5]

[(1978, 34.112),
 (1979, 24.211),
 (1980, 24.922),
 (1981, 17.569),
 (1982, 20.781)]

In [98]:
job_creation_rates, job_destruction_rates, correlation = task4_pipeline(FILENAME)
job_creation_rates[:3], job_destruction_rates[:3], correlation

([22.218, 21.223, 16.232], [14.591, 15.626, 16.914], 0.49925056563734005)

In [99]:
task4_results = task4_pipeline(FILENAME)

In [None]:
plot_results(task1_results, task2_results, task3_results, task4_results)