In [1]:
import os
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pandas as pd
import requests
import re
from datetime import datetime, timedelta, date
from functools import reduce
from minio import Minio
from io import BytesIO
import findspark


findspark.init()


In [2]:
load_dotenv()

access_key = os.getenv("MINIO_ACCESS_KEY")
secret_key = os.getenv("MINIO_SECRET_KEY")

postgres_user = os.getenv("POSTGRES_USER")
postgres_password = os.getenv("POSTGRES_PASSWORD")

In [3]:
client = Minio(
    endpoint="localhost:9000",
    access_key=access_key,
    secret_key=secret_key,
    secure=False
)

bucket_html ="raw-html"
bucket_processed = "processed_data"

In [4]:
spark = SparkSession.builder \
    .appName("HTMLProcessor") \
    .master("local[*]") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.access.key", access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", secret_key) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,org.postgresql:postgresql:42.7.5") \
    .getOrCreate()

25/04/28 09:04:10 WARN Utils: Your hostname, maxp-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/04/28 09:04:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/maxp/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/maxp/.ivy2/cache
The jars for the packages stored in: /home/maxp/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7bb8324e-a332-4d74-89d1-7d4058b5a715;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found org.postgresql#postgresql;42.7.5 in central
	found org.checkerframework#checker-qual;3.48.3 in central
:: resolution report :: resolve 1025ms :: artifacts dl 75ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.checkerframework#checker-qual;3.48.3 from central in [default]
	org.postgresql#postgresql;42.7.5 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1

In [13]:
def clean_text(text):
    cleaned_text = re.sub(r"[\n\r\t]+", " ", text).strip()
    cleaned_text = re.sub(r" +", " ", cleaned_text)
    return cleaned_text

In [19]:
class VRNFSO_html_processor_spark:
    
    def __init__(self, url_date, html_page, spark):
        self.url_date = url_date
        self.soup = BeautifulSoup(html_page, "html.parser")
        self.spark = spark
        self.relay_dates_list = []
    
    def get_date_str(self):
        year = self.url_date[:4]
        month = self.url_date[4:6]
        day = self.url_date[6:8]
        return f"{day}.{month}.{year}"
    
    def get_date(self):
        try:
            year = int(self.url_date[:4])
            month = int(self.url_date[4:6])
            day = int(self.url_date[6:8])
            return date(year, month, day)
        except (ValueError, IndexError) as e:
            print(f"Ошибка преобразования даты: {e}")
            return None

    def clean_text(self, text):
        if not isinstance(text, str):
            return ""
        cleaned = re.sub(r"[\n\r\t]+", " ", text)
        cleaned = re.sub(r"[^\w\s\-.,]", "", cleaned)
        cleaned = re.sub(r" +", " ", cleaned)
        return cleaned.strip()
    
    def clean_city_name(self, city):
        """Улучшенная очистка названия города"""
        if not city:
            return None
        
        # Удаляем остаточные артефакты
        city = re.sub(r"(^[\s\-]+)|([\s\-]+$)", "", city)  # Пробелы и дефисы по краям
        city = re.sub(r"\s+", " ", city)  # Множественные пробелы
        city = re.sub(r"[^А-ЯЁа-яё\-\s]", "", city)  # Удаляем спецсимволы, кроме дефиса
        
        # Нормализация регистра: Первая буква заглавная, остальные строчные
        if city:
            city = city.strip()
            parts = [p.capitalize() for p in city.split()]
            city = " ".join(parts)
            city = re.sub(r"\bИ\b", "и", city)  # Исправляем союзы (например, "Ростов-на-Дону")
        
        return city if city and len(city) >= 3 else None

    def parse_events(self):
        if not self.soup:
            return None
        
        try:
            date_str = self.get_date_str()
            date = self.get_date()
            title_text = " ".join(self.soup.find("h1").stripped_strings)
            event_name = self.clean_text(title_text.split(date_str)[0]) if date_str else self.clean_text(title_text)

            if "эстафета" in event_name.lower():
                self.relay_dates_list.append(date)
                # print("Пропуск эстафеты")
                return

            city_matches = re.finditer(r"(?i)г\.?\s*([А-ЯЁ]{1}[а-яё\-\s]+)(?=\s|$|,|\.|<|\))", title_text)
        
            cities = []
            for match in city_matches:
                city = self.clean_city_name(match.group(1))
                if city and len(city) >= 3 and not any(c.isdigit() for c in city):
                    cities.append(city)
            
            city = cities[-1].split()[0] if cities else None

            schema = StructType([
                StructField("start_name", StringType(), True),
                StructField("date", DateType(), True),
                StructField("city", StringType(), True)
            ])
            
            data = [(event_name, date, city)]
            df_events = self.spark.createDataFrame(data, schema)
            return df_events
        
        except Exception as e:
            print("parse_events error: ", e)
            return None
    
    def parse_distances(self):
        if not self.soup:
            return None
        
        try:
            date_str = self.get_date_str()
            date = self.get_date()
            distances = []

            if date in self.relay_dates_list:
                return
            
            for h2 in self.soup.find_all("h2"):
                text = h2.get_text()
                match = re.match(r"(.+),\s*(\d+)\s*КП,\s*([\d.,]+)\s*(км|м)", text)
                if match:
                    group, kp, length, unit = match.groups()
                    length = float(length.replace(",", "."))
                    # конвертируем метры в километры
                    if unit == "м":
                        length /= 1000
                    distances.append((date, group, int(kp), round(length, 2)))

            schema = StructType([
                StructField("date", DateType(), True),
                StructField("group", StringType(), True),
                StructField("cp", IntegerType(), True),
                StructField("length", FloatType(), True)
            ])
            
            df_distances = self.spark.createDataFrame(distances, schema)
            return df_distances
        
        except Exception as e:
            print("parse_distances error: ", e)
            return None
    
    def parse_results(self):
        if not self.soup:
            return None
        
        try:
            date = self.get_date()
            categories = self.soup.find_all('h2')
            results_data = []
            
            if date in self.relay_dates_list:
                return

            for category in categories:
                group = category.get_text(strip=True).split(',')[0]
                pre_tag = category.find_next('pre')
                
                if not pre_tag:
                    continue
                
                lines = pre_tag.get_text().split('\n')
                
                for line in lines:
                    fields = self.results_regex_parser(line)
                    if fields:
                        place = int(fields[8].replace('=', '').strip())
                        results_data.append((
                            date, 
                            group, 
                            int(fields[0]), 
                            fields[1], 
                            fields[2], 
                            fields[3],
                            int(fields[4]), 
                            int(fields[5]), 
                            fields[6], 
                            fields[7], 
                            place
                        ))

            schema = StructType([
                StructField("date", DateType(), True),
                StructField("group", StringType(), True),
                StructField("position_number", IntegerType(), True),
                StructField("full_name", StringType(), True),
                StructField("team", StringType(), True),
                StructField("qualification", StringType(), True),
                StructField("number", IntegerType(), True),
                StructField("year_of_birth", IntegerType(), True),
                StructField("result_time", StringType(), True),
                StructField("time_gap", StringType(), True),
                StructField("finish_position", IntegerType(), True)
            ])
            
            df_results = self.spark.createDataFrame(results_data, schema)
            return df_results
        
        except Exception as e:
            print("parse_results error: ", e)
            return None
    
    @staticmethod
    def results_regex_parser(line):
        """Парсит строку с результатами (без изменений)"""
        pattern = r'''
            ^\s*(\d+)\s+                # №п/п
            ([А-ЯЁ][а-яё-]+\s[А-ЯЁ][а-яё-]+)\s+  # Фамилия и имя
            (.*?)\s{2,}                 # Коллектив
            ([А-Яa-zIЮМСК]+)?\s*        # Квал (может отсутствовать)
            (\d+)\s+                    # Номер
            (\d{4})\s+                  # Год рождения
            (\d{2}:\d{2}:\d{2})\s+      # Результат
            (\+\d{2}:\d{2})\s+          # Отставание
            (=?\s*\d+)\s*               # Место
            (.*)                        # Примечание (если есть)
        '''
        match = re.search(pattern, line, re.VERBOSE | re.IGNORECASE)
        return match.groups() if match else False
    
    def parse_all(self):
        if not self.soup:
            return None, None, None
        try:
            df_events = self.parse_events()
            df_distances = self.parse_distances()
            df_results = self.parse_results()
            return df_events, df_distances, df_results
        except Exception as e:
            print("parse_all error: ", e)
            return None, None, None

In [6]:
bucket_obj_list = [obj.object_name for obj in client.list_objects(bucket_html)]
print(bucket_obj_list)

['20240407', '20240412', '20240413', '20240414', '20240420', '20240421', '20240428', '20240429', '20240430', '20240509', '20240510', '20240511', '20240512', '20240518', '20240521', '20240522', '20240525', '20240526', '20240602', '20240608', '20240609', '20240721', '20240907', '20240908', '20240913', '20240914', '20240915', '20240928', '20240929', '20241006', '20241012', '20241013', '20241019', '20241026', '20241027', '20241102', '20241103', '20241117', '20241201']


In [20]:
events_dfs, dist_dfs, res_dfs = [], [], []

for key in bucket_obj_list:
    html = client.get_object(bucket_html, key).read().decode('windows-1251')
    parser = VRNFSO_html_processor_spark(key, html, spark)
    df_ev, df_ds, df_rs = parser.parse_all()
    if df_ev is not None:
        events_dfs.append(df_ev)
    if df_ds is not None: 
        dist_dfs.append(df_ds)
    if df_rs is not None: 
        res_dfs.append(df_rs)

In [21]:
def union_all(dfs):
    if not dfs:
        return spark.createDataFrame([], StructType())
    return reduce(DataFrame.unionByName, dfs[1:], dfs[0])

In [22]:
all_events    = union_all(events_dfs)
all_distances = union_all(dist_dfs)
all_results   = union_all(res_dfs)

In [None]:
all_results

StructType([StructField('start_name', StringType(), True), StructField('date', DateType(), True), StructField('city', StringType(), True)])

In [None]:
all_distances.show()



+----------+-------+---+------+
|      date|  group| cp|length|
+----------+-------+---+------+
|2024-04-07|    Ж10| 21|   6.4|
|2024-04-07|    Ж12| 21|   6.4|
|2024-04-07|    Ж14| 21|   6.4|
|2024-04-07|    Ж16| 21|   6.4|
|2024-04-07|    Ж20| 21|   6.4|
|2024-04-07|    Ж35| 21|   6.4|
|2024-04-07|    Ж55| 21|   6.4|
|2024-04-07|     ЖЭ| 21|   6.4|
|2024-04-07|    М10| 21|   6.4|
|2024-04-07|    М12| 21|   6.4|
|2024-04-07|    М14| 21|   6.4|
|2024-04-07|    М16| 21|   6.4|
|2024-04-07|    М20| 21|   6.4|
|2024-04-07|    М35| 21|   6.4|
|2024-04-07|    М55| 21|   6.4|
|2024-04-07|     МЭ| 21|   6.4|
|2024-04-07|Новички| 21|   6.4|
|2024-04-12|    Ж12| 10|   1.9|
|2024-04-12|    Ж14| 13|   2.4|
|2024-04-12|    Ж16| 14|   2.9|
|2024-04-12|    Ж18| 15|   3.2|
|2024-04-12|    Ж20| 17|   3.5|
|2024-04-12|    Ж30| 15|   3.2|
|2024-04-12|    Ж40| 14|   3.1|
|2024-04-12|    Ж50| 14|   2.6|
|2024-04-12|    Ж60| 13|   2.4|
|2024-04-12|     ЖБ| 14|   3.1|
|2024-04-12|Женщины| 17|   3.5|
|2024-04

                                                                                

In [23]:
all_events.write.mode("overwrite").parquet("s3a://processed-data/events/")
all_distances.write.mode("overwrite").parquet("s3a://processed-data/distances/")
all_results.write.mode("overwrite").parquet("s3a://processed-data/results/")


25/04/28 09:26:18 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [47]:
sdf_events_loaded = spark.read.parquet("s3a://processed-data/events/")
sdf_distances_loaded = spark.read.parquet("s3a://processed-data/distances/")
sdf_results_loaded = spark.read.parquet("s3a://processed-data/results/")

# sdf_events_cleaned = sdf_events_loaded.fillna("Воронеж", subset=["Город"])

# sdf_events_cleaned.write.mode("overwrite").parquet("s3a://processed-data/events/")

sdf_events_loaded.printSchema()
sdf_distances_loaded.printSchema()
sdf_results_loaded.printSchema()


root
 |-- start_name: string (nullable = true)
 |-- date: date (nullable = true)
 |-- city: string (nullable = true)

root
 |-- date: date (nullable = true)
 |-- group: string (nullable = true)
 |-- cp: integer (nullable = true)
 |-- length: float (nullable = true)

root
 |-- date: date (nullable = true)
 |-- group: string (nullable = true)
 |-- position_number: integer (nullable = true)
 |-- full_name: string (nullable = true)
 |-- team: string (nullable = true)
 |-- qualification: string (nullable = true)
 |-- number: integer (nullable = true)
 |-- year_of_birth: integer (nullable = true)
 |-- result_time: string (nullable = true)
 |-- time_gap: string (nullable = true)
 |-- finish_position: integer (nullable = true)



In [27]:
db_url = "jdbc:postgresql://127.0.0.1:5433/postgres"

db_properties = {
    "user": postgres_user,
    "password": postgres_password,
    "driver": "org.postgresql.Driver"
}


In [None]:
# sdf_events_loaded.write.mode("overwrite").jdbc(db_url, "public.events", properties=db_properties)
# sdf_distances_loaded.write.mode("overwrite").jdbc(db_url, "public.distances", properties=db_properties)
# sdf_results_loaded.write.mode("overwrite").jdbc(db_url, "public.results", properties=db_properties)

                                                                                

In [54]:
sdf_events = sdf_events_loaded.select(F.substring(F.col("start_name"), 1, 100).alias("event_name"), 
                                      F.col("date").alias("event_date"), 
                                      F.substring(F.col("city"), 1, 50).alias("city")).distinct()

sdf_groups = sdf_distances_loaded.select(F.col("date").alias("event_date"), 
                                         F.substring(F.col("group"), 1, 20).alias("group_name"), 
                                         F.col("cp"), 
                                         F.col("length").alias("length_km")).distinct()

sdf_participants = sdf_results_loaded.select(F.col("full_name"), 
                                             F.substring(F.col("team"), 1, 50).alias("team"), 
                                             F.col("year_of_birth").alias("birth_year")).distinct()

sdf_results = sdf_results_loaded.select(F.col("date").alias("event_date"), 
                                        F.col("group").alias("group_name"),
                                        F.col("position_number"),
                                        F.col("full_name"),
                                        F.col("team"),
                                        F.substring(F.col("qualification"), 1, 10).alias("qualification"),
                                        F.col("number").alias("bib_number"),
                                        F.col("year_of_birth").alias("birth_year"),
                                        F.col("finish_position"),
                                        F.col("result_time"),
                                        F.col("time_gap"))

In [34]:
sdf_events.write.jdbc(
    url=db_url,
    table="events",
    mode="append",
    properties=db_properties
)

                                                                                

In [49]:
events_lookup = spark.read.jdbc(
    url=db_url,
    table="events",
    properties=db_properties
).select("event_id", "event_date")

In [50]:
sdf_groups = sdf_groups.join(
    events_lookup,
    on="event_date",
    how="inner"
).select("group_name", "cp", "length_km", "event_id")

In [37]:
sdf_groups.write.jdbc(
    url=db_url,
    table="group_params",
    mode="append",
    properties=db_properties
)

                                                                                

In [51]:
groups_lookup = spark.read.jdbc(
    url=db_url,
    table="group_params",
    properties=db_properties
).select("group_id", "event_id", "group_name")

In [None]:
sdf_participants.write.jdbc(
    url=db_url,
    table="participants",
    mode="append",
    properties=db_properties
)


                                                                                

In [52]:
participants_lookup = spark.read.jdbc(
    url=db_url,
    table="participants",
    properties=db_properties
).select("participant_id", "full_name", "team", "birth_year")

In [55]:
sdf_results = sdf_results \
    .join(events_lookup,       on="event_date",             how="inner") \
    .join(groups_lookup,       on=["event_id","group_name"], how="inner") \
    .join(participants_lookup, on=["full_name","team","birth_year"], how="inner") \
    .select(
        F.col("event_id"),
        F.col("group_id"),
        F.col("participant_id"),
        F.col("position_number"),
        F.col("qualification"),
        F.col("bib_number"),
        F.col("finish_position"),
        F.col("result_time"),
        F.col("time_gap")
    )

In [56]:
sdf_results.write.jdbc(
    url=db_url,
    table="results",
    mode="append",
    properties=db_properties
)

                                                                                