### Основной код

In [1]:
import requests
import sqlite3
from cryptography.fernet import Fernet
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import schedule
import time
import tkinter as tk
from tkinter import messagebox, scrolledtext, simpledialog
import logging
import threading
import requests
import xml.etree.ElementTree as ET
import logging

# Параметры электронной почты
SMTP_SERVER = 'smtp.yandex.ru'
SMTP_PORT = 587
EMAIL_ADDRESS = 'fylhtq239@yandex.ru'
EMAIL_PASSWORD = 'zingjvhnhuzmwmyy'
ADMIN_EMAIL = 'fylhtq239@yandex.ru'

# Настройка логирования
logging.basicConfig(filename='data_pipeline.log', level=logging.INFO)

# Создание ключа
def load_key():
    try:
        with open("secret.key", "rb") as key_file:
            key = key_file.read()
    except FileNotFoundError:
        key = Fernet.generate_key()
        with open("secret.key", "wb") as key_file:
            key_file.write(key)
    return key

# Encrypt & descrypt data
def encrypt_data(data, key):
    f = Fernet(key)
    encrypted_data = f.encrypt(data.encode())
    return encrypted_data

def decrypt_data(encrypted_data, key):
    f = Fernet(key)
    decrypted_data = f.decrypt(encrypted_data).decode()
    return decrypted_data

# Email sending
def send_email(subject, message):
    msg = MIMEMultipart()
    msg['From'] = EMAIL_ADDRESS
    msg['To'] = ADMIN_EMAIL
    msg['Subject'] = subject

    msg.attach(MIMEText(message, 'plain'))

    server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
    server.starttls()
    server.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
    text = msg.as_string()
    server.sendmail(EMAIL_ADDRESS, ADMIN_EMAIL, text)
    server.quit()

# Aggregate data
def aggregate_data(encrypted_data):
    conn = sqlite3.connect('aggregated_data.db')
    cursor = conn.cursor()
    
    cursor.execute('''CREATE TABLE IF NOT EXISTS data_storage (
                      id INTEGER PRIMARY KEY AUTOINCREMENT,
                      data BLOB)''')
    
    cursor.execute("INSERT INTO data_storage (data) VALUES (?)", (encrypted_data,))
    conn.commit()
    conn.close()


def fetch_and_process_data(key):
    try:
        # Выполняем запрос к серверу
        url = "http://domain.com/fetch"
        headers = {'Content-Type': 'application/xml'}
        response = requests.get(url, headers=headers)

        if response.status_code == 200:
            # Парсим XML-ответ
            root = ET.fromstring(response.content)
            
            # Преобразуем XML в список объектов
            items = []
            for item in root.findall('item'):
                obj = {
                    'subject': item.find('subject').text,
                    'region': item.find('region').text,
                    'level': item.find('level').text,
                    'result': int(item.find('result').text),
                    'year': int(item.find('year').text),
                    'uuid': item.find('uuid').text
                }
                items.append(obj)

            # Преобразуем полученные данные в строку
            aggregated_data = str(items)

            # Шифруем данные
            encrypted_data = encrypt_data(aggregated_data, key)

            # Сохраняем зашифрованные данные в базе данных
            aggregate_data(encrypted_data)
            logging.info(f"Data encrypted and stored: {aggregated_data}")

            # Отправляем email о успешной агрегации данных
            send_email('Data Aggregated', 'Data has been successfully aggregated.')

        else:
            logging.error(f"Error fetching data: {response.status_code}")
            send_email('Error', f"Error fetching data: {response.status_code}")

    except Exception as e:
        logging.error(f"Error fetching data: {e}")
        send_email('Error', f"Error fetching data: {e}")

def read_and_decrypt_data(key):
    conn = sqlite3.connect('aggregated_data.db')
    cursor = conn.cursor()
    
    # Создаем таблицу, если она не существует
    cursor.execute('''CREATE TABLE IF NOT EXISTS data_storage (
                      id INTEGER PRIMARY KEY AUTOINCREMENT,
                      data BLOB)''')

    cursor.execute("SELECT data FROM data_storage")
    rows = cursor.fetchall()
    
    decrypted_rows = []
    for row in rows:
        decrypted_data = decrypt_data(row[0], key)
        decrypted_rows.append(decrypted_data)
    
    conn.close()
    return decrypted_rows

def schedule_data_fetching(key):
    schedule.every(10).seconds.do(fetch_and_process_data, key=key)
    
    while True:
        schedule.run_pending()
        time.sleep(1)


# Роли и права доступа
user_roles = {
    'admin': ['view_data', 'start_processing'],
    'viewer': ['view_data']
}

# Пароли для ролей
user_passwords = {
    'admin': 'admin_password',
    'viewer': 'viewer_password'
}

def authenticate_user():
    role = simpledialog.askstring("Role", "Enter your role (admin/viewer):")
    
    if role in user_roles:
        password = simpledialog.askstring("Password", "Enter your password:", show='*')
        
        if password == user_passwords.get(role):
            return role
        else:
            messagebox.showerror("Error", "Invalid password.")
    else:
        messagebox.showerror("Error", "Invalid username or role.")
    return None

def start_interface():
    role = authenticate_user()
    if not role:
        return
    
    root = tk.Tk()
    root.title("Data Processor")

    def start_processing():
        if 'start_processing' in user_roles[role]:
            messagebox.showinfo("Message", "Processing started. Data will be fetched every 10 seconds.")
            key = load_key()

            # Запуск в отдельном потоке
            processing_thread = threading.Thread(target=schedule_data_fetching, args=(key,), daemon=True)
            processing_thread.start()
        else:
            messagebox.showerror("Error", "You do not have permission to start processing.")

    def view_data():
        if 'view_data' in user_roles[role]:
            key = load_key()
            decrypted_data = read_and_decrypt_data(key)
            data_window = tk.Toplevel(root)
            data_window.title("Decrypted Data")
            text_area = scrolledtext.ScrolledText(data_window, width=60, height=20)
            text_area.pack(pady=10)
            
            for data in decrypted_data:
                text_area.insert(tk.END, f"{data}\n")
        else:
            messagebox.showerror("Error", "You do not have permission to view data.")

    def clear_database():
        conn = sqlite3.connect('aggregated_data.db')
        cursor = conn.cursor()
        
        # Удаляем все данные из таблицы
        cursor.execute("DELETE FROM data_storage")
        
        # Сбрасываем автоинкремент (ID начинается заново)
        cursor.execute("DELETE FROM sqlite_sequence WHERE name='data_storage'")
        
        conn.commit()
        conn.close()
        messagebox.showinfo("Success", "Database has been cleared.")
    
    # Кнопка для начала процесса
    process_button = tk.Button(root, text="Start Processing", command=start_processing)
    process_button.pack(pady=10)

    # Кнопка для просмотра данных
    view_button = tk.Button(root, text="View Decrypted Data", command=view_data)
    view_button.pack(pady=10)

    # Добавляем кнопку очистки данных
    clear_button = tk.Button(root, text="Clear Data", command=clear_database)
    clear_button.pack(pady=10)

    root.mainloop()

# Запуск программы
if __name__ == "__main__":
    start_interface()

### Проверка кода с тестовыми данными

In [8]:
import requests
import sqlite3
from cryptography.fernet import Fernet
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import schedule
import time
import tkinter as tk
from tkinter import messagebox, scrolledtext, simpledialog
import logging
import threading
import requests
import xml.etree.ElementTree as ET
import logging

# Параметры электронной почты
SMTP_SERVER = 'smtp.yandex.ru'

SMTP_PORT = 587
EMAIL_ADDRESS = 'Fylhtq239@yandex.ru'
EMAIL_PASSWORD = 'jvmnmpvraqmrwktd'
ADMIN_EMAIL = 'Fylhtq239@yandex.ru'

# Настройка логирования
logging.basicConfig(filename='data_pipeline.log', level=logging.INFO)

# Создание ключа
def load_key():
    try:
        with open("secret.key", "rb") as key_file:
            key = key_file.read()
    except FileNotFoundError:
        key = Fernet.generate_key()
        with open("secret.key", "wb") as key_file:
            key_file.write(key)
    return key

# Encrypt & descrypt data
def encrypt_data(data, key):
    f = Fernet(key)
    encrypted_data = f.encrypt(data.encode())
    return encrypted_data

def decrypt_data(encrypted_data, key):
    f = Fernet(key)
    decrypted_data = f.decrypt(encrypted_data).decode()
    return decrypted_data

# Email sending
def send_email(subject, message):
    msg = MIMEMultipart()
    msg['From'] = EMAIL_ADDRESS
    msg['To'] = ADMIN_EMAIL
    msg['Subject'] = subject

    msg.attach(MIMEText(message, 'plain'))

    server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
    server.starttls()
    server.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
    text = msg.as_string()
    server.sendmail(EMAIL_ADDRESS, ADMIN_EMAIL, text)
    server.quit()

# Aggregate data
def aggregate_data(encrypted_data):
    conn = sqlite3.connect('aggregated_data.db')
    cursor = conn.cursor()
    
    cursor.execute('''CREATE TABLE IF NOT EXISTS data_storage (
                      id INTEGER PRIMARY KEY AUTOINCREMENT,
                      data BLOB)''')
    
    cursor.execute("INSERT INTO data_storage (data) VALUES (?)", (encrypted_data,))
    conn.commit()
    conn.close()


import xml.etree.ElementTree as ET
import logging

def fetch_and_process_data(key):
    try:
        # Локальный XML, заменяющий HTTP-запрос
        xml_data = """
        <collection>
            <item>
                <subject>Физика</subject>
                <region>Чудовский район</region>
                <level>Шэ</level>
                <result>60</result>
                <year>2025</year>
                <uuid>ffffffff-d383-8a44-ffff-ffffd3838a44</uuid>
            </item>
            <item>
                <subject>Информатика</subject>
                <region>Чудовский район</region>
                <level>M3</level>
                <result>40</result>
                <year>2025</year>
                <uuid>ffffffff-d37e-1364-ffff-ffffd37e1364</uuid>
            </item>
        </collection>
        """
        
        # Парсим XML-данные
        root = ET.fromstring(xml_data)
        
        # Преобразуем XML в список объектов
        items = []
        for item in root.findall('item'):
            obj = {
                'subject': item.find('subject').text,
                'region': item.find('region').text,
                'level': item.find('level').text,
                'result': int(item.find('result').text),
                'year': int(item.find('year').text),
                'uuid': item.find('uuid').text
            }
            items.append(obj)

        # Преобразуем полученные данные в строку
        aggregated_data = str(items)

        # Шифруем данные
        encrypted_data = encrypt_data(aggregated_data, key)

        # Сохраняем зашифрованные данные в базе данных
        aggregate_data(encrypted_data)
        logging.info(f"Data encrypted and stored: {aggregated_data}")

        # Отправляем email о успешной агрегации данных
        send_email('Data Aggregated', 'Data has been successfully aggregated.')

    except Exception as e:
        logging.error(f"Error processing data: {e}")
        send_email('Error', f"Error processing data: {e}")


def read_and_decrypt_data(key):
    conn = sqlite3.connect('aggregated_data.db')
    cursor = conn.cursor()
    
    # Создаем таблицу, если она не существует
    cursor.execute('''CREATE TABLE IF NOT EXISTS data_storage (
                      id INTEGER PRIMARY KEY AUTOINCREMENT,
                      data BLOB)''')

    cursor.execute("SELECT data FROM data_storage")
    rows = cursor.fetchall()
    
    decrypted_rows = []
    for row in rows:
        decrypted_data = decrypt_data(row[0], key)
        decrypted_rows.append(decrypted_data)
    
    conn.close()
    return decrypted_rows

def schedule_data_fetching(key):
    schedule.every(2).minutes.do(fetch_and_process_data, key=key)
    
    while True:
        schedule.run_pending()
        time.sleep(1)


# Роли и права доступа
user_roles = {
    'admin': ['view_data', 'start_processing'],
    'viewer': ['view_data']
}

# Пароли для ролей
user_passwords = {
    'admin': 'admin',
    'viewer': 'viewer'
}

def authenticate_user():
    role = simpledialog.askstring("Role", "Enter your role (admin/viewer):")
    
    if role in user_roles:
        password = simpledialog.askstring("Password", "Enter your password:", show='*')
        
        if password == user_passwords.get(role):
            return role
        else:
            messagebox.showerror("Error", "Invalid password.")
    else:
        messagebox.showerror("Error", "Invalid username or role.")
    return None

def start_interface():
    role = authenticate_user()
    if not role:
        return
    
    root = tk.Tk()
    root.title("Data Processor")

    def start_processing():
        if 'start_processing' in user_roles[role]:
            messagebox.showinfo("Message", "Processing started. Data will be fetched every 30 seconds.")
            key = load_key()

            # Запуск в отдельном потоке
            processing_thread = threading.Thread(target=schedule_data_fetching, args=(key,), daemon=True)
            processing_thread.start()
        else:
            messagebox.showerror("Error", "You do not have permission to start processing.")

    def view_data():
        if 'view_data' in user_roles[role]:
            key = load_key()
            decrypted_data = read_and_decrypt_data(key)
            data_window = tk.Toplevel(root)
            data_window.title("Decrypted Data")
            text_area = scrolledtext.ScrolledText(data_window, width=60, height=20)
            text_area.pack(pady=10)
            
            for data in decrypted_data:
                text_area.insert(tk.END, f"{data}\n")
        else:
            messagebox.showerror("Error", "You do not have permission to view data.")

    def clear_database():
        conn = sqlite3.connect('aggregated_data.db')
        cursor = conn.cursor()
        
        # Удаляем все данные из таблицы
        cursor.execute("DELETE FROM data_storage")
        
        # Сбрасываем автоинкремент (ID начинается заново)
        cursor.execute("DELETE FROM sqlite_sequence WHERE name='data_storage'")
        
        conn.commit()
        conn.close()
        messagebox.showinfo("Success", "Database has been cleared.")
    
    # Кнопка для начала процесса
    process_button = tk.Button(root, text="Start Processing", command=start_processing)
    process_button.pack(pady=10)

    # Кнопка для просмотра данных
    view_button = tk.Button(root, text="View Decrypted Data", command=view_data)
    view_button.pack(pady=10)

    # Добавляем кнопку очистки данных
    clear_button = tk.Button(root, text="Clear Data", command=clear_database)
    clear_button.pack(pady=10)

    root.mainloop()

# Запуск программы
if __name__ == "__main__":
    start_interface()

Рекомендации к горизонтальному и вертикальному масштабированию такие: 
В рамках горизонтального масштабирования рекомендуется увеличивать количество точек, которые будут раздавать данные и будут обрабатывать их, в рамках вертикального рекомендуется увеличить процессорную частоту для увеличения скорости обработки, количество потоков увеличивать не целесообразно, потому что код обладает высоким уровнем синхронизации