In [None]:
import xmlrpc
# xmlrpc_server.ipynb

# from SimpleXMLRPCServer import SimpleXMLRPCServer
# from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler

from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler

# import xmlrpclib
from xmlrpc.client import Binary
import datetime
import re
import pandas as pd
import pickle
import numpy as np

from httpx import patch
from jupyter_server.base.handlers import path_regex


class RequestHandler(SimpleXMLRPCRequestHandler):
    rpc_paths = ('/RPC2',)

server = SimpleXMLRPCServer(("127.0.0.1", 8008),
                            requestHandler=RequestHandler, allow_none=True)

stats_server = xmlrpc.client.ServerProxy("http://127.0.0.1:8018")

# Добавление в лог через сервер
def add_log(log_line):
    try:
        stats_server.add_log(log_line)
    except (xmlrpc.client.ProtocolError, ConnectionRefusedError, xmlrpc.client.Fault) as e:
        print(f"Ошибка при записи в лог: {e}")
    return True

# Тест
def ping():
    add_log("ping")
    return True
server.register_function(ping, 'ping')

# Время сервера
def now():
    add_log("now")
    return datetime.datetime.now()
server.register_function(now, 'now')

# Отображение строкового вида, типа и значений
def show_type(arg):
    add_log("type")
    return (str(arg), str(type(arg)), arg)
server.register_function(show_type, 'type')

# Сумма
def test_sum(a, b):
    add_log("sum")
    return a + b
server.register_function(test_sum, 'sum')

# Степень
def test_pow(a, b):
    add_log("pow")
    return a**b
server.register_function(test_pow, 'pow')

# Проверка нахождения клиента в черном списке c использованием Pandas Data Frame
def black_list_check(sname):
    frame = pd.read_csv('bad_boys2.csv', header=0, sep=',', encoding='utf8')
    exist = any(frame['Surname'] == sname)
    if exist:
        add_log("black_list_check RES: bad_boy")
        return sname + ": "+ "bad_boy"
    else:
        add_log("black_list_check RES: good_boy")
        return sname + ": "+ "good_boy"

server.register_function(black_list_check, 'black_list_check')


def levenshtein_distance(s1, s2):
    # add_log("levenshtein_distance")
    if len(s1) < len(s2):
        return levenshtein_distance(s2, s1)
    if len(s2) == 0:
        return len(s1)
    previous_row = range(len(s2) + 1)
    for i, c1 in enumerate(s1):
        current_row = [i + 1]
        for j, c2 in enumerate(s2):
            insertions = previous_row[j + 1] + 1
            deletions = current_row[j] + 1
            substitutions = previous_row[j] + (c1 != c2)
            current_row.append(min(insertions, deletions, substitutions))
        previous_row = current_row
    return previous_row[-1]

def black_list_check_full(surname, name, patronym, birth_date):
    date_pattern = r'^\d{2}\.\d{2}\.\d{4}$'
    if not re.match(date_pattern, birth_date):
        add_log(f"black_list_check_full ERROR: Ошибка: Неверный формат даты '{birth_date}'. Ожидается DD.MM.YYYY (например, 22.03.1989)")
        return f"Ошибка: Неверный формат даты '{birth_date}'. Ожидается DD.MM.YYYY (например, 22.03.1989)"

    try:
        input_date = datetime.datetime.strptime(birth_date, '%d.%m.%Y')
        current_date = datetime.datetime.now()
        if input_date > current_date:
            add_log(f"black_list_check_full ERROR: Ошибка: Дата рождения '{birth_date}' не может быть позже текущей даты")
            return f"Ошибка: Дата рождения '{birth_date}' не может быть позже текущей даты"
    except ValueError:
        add_log(f"black_list_check_full ERROR: Ошибка: Неверная дата '{birth_date}'. Проверьте корректность")
        return f"Ошибка: Неверная дата '{birth_date}'. Проверьте корректность"

    frame = pd.read_csv('bad_boys2.csv', header=0, sep=',', encoding='utf-8')

    surname_input = surname.title()
    name_input = name.title()
    patronym_input = patronym.title()

    frame['Surname'] = frame['Surname'].str.title()
    frame['Name'] = frame['Name'].str.title()
    frame['Patronym'] = frame['Patronym'].str.title()

    full_mask = (frame['Surname'] == surname_input) & \
                (frame['Name'] == name_input) & \
                (frame['Patronym'] == patronym_input) & \
                (frame['Birth'] == birth_date)

    if full_mask.any():
        add_log("black_list_check_full RES: yes good boy")
        return f"{surname_input} {name_input} {patronym_input} ({birth_date}): yes good boy"

    frame['surname_dist'] = frame['Surname'].apply(lambda x: levenshtein_distance(x, surname_input))
    frame['name_dist'] = frame['Name'].apply(lambda x: levenshtein_distance(x, name_input))
    frame['patronym_dist'] = frame['Patronym'].apply(lambda x: levenshtein_distance(x, patronym_input))

    similar_mask = (frame['surname_dist'] <= 1) & \
                   (frame['name_dist'] <= 1) & \
                   (frame['patronym_dist'] <= 1) & \
                   (frame['Birth'] == birth_date)

    similar_rows = frame[similar_mask]

    if not similar_rows.empty:
        similar_list = similar_rows.apply(lambda row: f"{row['Surname']} {row['Name']} {row['Patronym']} ({row['Birth']})", axis=1).tolist()
        add_log("black_list_check_full RES: similar good boy")
        return f"{', '.join(similar_list)}: similar good boy"
    add_log("black_list_check_full RES: no, bad boy")
    return f"{surname_input} {name_input} {patronym_input} ({birth_date}): no, bad boy"

server.register_function(black_list_check_full, 'black_list_check_full')

# Бинарная передача данных
def send_back_binary(bin_data):
    data = bin_data.data
    add_log("send_back_binary")
    return Binary(data)
server.register_function(send_back_binary, 'send_back_binary')

# Инверсия цвета
# На вход изображение RGB размерности (M, N, 3) со значениями 0-255
def send_back_inversion(bin_data):
    img_arr = pickle.loads(bin_data.data)
    
    height = img_arr.shape[0]
    width = img_arr.shape[1]
    channels = img_arr.shape[2] if len(img_arr.shape) > 2 else 1

    for i in range(height):
        for j in range(width):
            if channels in (3, 4):
                img_arr[i][j][0] = 255 - img_arr[i][j][0]
                img_arr[i][j][1] = 255 - img_arr[i][j][1]
                img_arr[i][j][2] = 255 - img_arr[i][j][2]
                # if channels == 4:
                #     img_arr[i][j][3] = 255 - img_arr[i][j][3]
            else:
                img_arr[i][j][0] = 255 - img_arr[i][j][0]
    
    pimg = pickle.dumps(img_arr)
    add_log("color_inversion")
    return Binary(pimg)
server.register_function(send_back_inversion, 'color_inversion')

# Бинаризация изображения по порогу (1-255)
def send_back_binarization(bin_data, threshold, need_percent=False):
    if not 1 <= threshold <= 255:
        add_log("send_back_binarization ERROR Порог должен быть в диапазоне 1-255")
        raise ValueError("Порог должен быть в диапазоне 1-255")

    img_arr = pickle.loads(bin_data.data)
    if img_arr.max() <= 1.0:
        img_arr = (img_arr * 255).astype(np.uint8)
    else:
        img_arr = img_arr.astype(np.uint8)

    height = img_arr.shape[0]
    width = img_arr.shape[1]
    channels = img_arr.shape[2] if len(img_arr.shape) > 2 else 1
    binarized_arr = np.zeros_like(img_arr, dtype=np.uint8)

    above_threshold_count = 0
    total_pixels = height * width

    for i in range(height):
        for j in range(width):
            if channels in (3, 4):
                r = img_arr[i][j][0]
                g = img_arr[i][j][1]
                b = img_arr[i][j][2]
                avg = (r + g + b) // 3
                value = 255 if avg >= threshold else 0
                if avg >= threshold:
                    above_threshold_count += 1
                for c in range(min(3, channels)):
                    binarized_arr[i][j][c] = value
                if channels == 4:
                    binarized_arr[i][j][3] = img_arr[i][j][3]
            elif channels == 1:
                value = 255 if img_arr[i][j][0] >= threshold else 0
                if img_arr[i][j][0] >= threshold:
                    above_threshold_count += 1
                binarized_arr[i][j][0] = value

    cloud_percentage = (above_threshold_count / total_pixels) * 100
    binarized_arr = np.array(binarized_arr, dtype=np.uint8)
    pimg = pickle.dumps(binarized_arr)
    if need_percent:
        add_log("send_back_binarization without percent")
        return Binary(pimg), cloud_percentage
    add_log("send_back_binarization with percent")
    return Binary(pimg)
server.register_function(send_back_binarization, 'send_back_binarization')

# Бинаризация изображения по порогу (1-255) с выводом процентов бинаризации
def send_back_binarization_with_percent(bin_data, threshold):
    add_log("send_back_binarization_with_percent")
    return send_back_binarization(bin_data, threshold, need_percent=True)
server.register_function(send_back_binarization_with_percent, 'send_back_binarization_with_percent')

# Разворот изображения относительно вертикали
def send_back_flip_vertical(bin_data):
    img_arr = pickle.loads(bin_data.data)
    height = img_arr.shape[0]
    width = img_arr.shape[1]
    channels = img_arr.shape[2] if len(img_arr.shape) > 2 else 1
    # flipped_arr = [[[0 for _ in range(channels)] for _ in range(width)] for _ in range(height)]

    for i in range(height):
        for j in range(width // 2):
            for c in range(channels):
                img_arr[i][j][c], img_arr[i][width - 1 - j][c] = img_arr[i][width - 1 - j][c], img_arr[i][j][c]
                # flipped_arr[i][width - 1 - j][c] = img_arr[i][j][c]

    pimg = pickle.dumps(img_arr)
    add_log("send_back_flip_vertical")
    return Binary(pimg)
server.register_function(send_back_flip_vertical, 'send_back_flip_vertical')


print("Listening on port 8008...")
server.serve_forever()


Listening on port 8008...
