In [1]:
import sys
import polars as pl
import boto3
import time
from IPython.display import display
import math
print(f"{sys.version=}")

sys.version='3.14.0 (main, Oct  7 2025, 09:34:52) [Clang 17.0.0 (clang-1700.3.19.1)]'


# Список bronze S3

In [2]:
s3 = boto3.resource(
    's3',
    endpoint_url='http://localhost:9000',
    aws_access_key_id='minioadmin',
    aws_secret_access_key='minioadmin',
    aws_session_token=None,
    config=boto3.session.Config(signature_version='s3v4'),
    verify=False
)

In [5]:
bucket = s3.Bucket('bronze')
for obj in bucket.objects.filter(Prefix='hh/vacancies/date=2025-12-02/'):
    print(f"Key: {obj.key}, Last Modified: {obj.last_modified}")

Key: hh/vacancies/date=2025-12-02/part_0001.jsonl.gz, Last Modified: 2025-12-06 11:19:35.896000+00:00
Key: hh/vacancies/date=2025-12-02/part_0002.jsonl.gz, Last Modified: 2025-12-06 11:19:48.163000+00:00
Key: hh/vacancies/date=2025-12-02/part_0003.jsonl.gz, Last Modified: 2025-12-06 11:19:58.221000+00:00
Key: hh/vacancies/date=2025-12-02/part_0004.jsonl.gz, Last Modified: 2025-12-06 11:20:09.970000+00:00
Key: hh/vacancies/date=2025-12-02/part_0005.jsonl.gz, Last Modified: 2025-12-06 11:20:22.409000+00:00


# Проверка дублей vacancy id

In [6]:
bucket = s3.Bucket('bronze')
bronze_report_dates = set()
for f in bucket.objects.all():
    s3fp = f.key
    dt_start_index = s3fp.index('date=')+len('date=')
    dt_end_index = dt_start_index+len('2025-12-02')
    bronze_report_dates.add(s3fp[dt_start_index:dt_end_index])
bronze_report_dates

{'2025-11-10',
 '2025-11-17',
 '2025-11-18',
 '2025-12-02',
 '2025-12-03',
 '2025-12-04',
 '2025-12-05',
 '2025-12-08'}

In [8]:
storage_options = {
    "aws_endpoint_url": "http://localhost:9000", # Важно: aws_endpoint_url (иногда endpoint_url)
    "aws_access_key_id": "minioadmin",
    "aws_secret_access_key": "minioadmin",
    "aws_region": "us-east-1", # Для MinIO часто можно оставить us-east-1
    "aws_allow_http": "true",  # Разрешить HTTP (без SSL)
}
for report_dt in sorted(bronze_report_dates):
    print("="*10, report_dt, "="*10)
    df_lazy = pl.scan_ndjson(
        f"s3://bronze/hh/vacancies/date={report_dt}/*.jsonl.gz",
        storage_options=storage_options,
        infer_schema_length=None
    )
    total_id_cnt = pl.col('id').count()
    unique_id_cnt = pl.col('id').unique().count()
    display(
        df_lazy.select(
            total_id_cnt.alias('total_id_cnt'),
            unique_id_cnt.alias('unique_id_cnt'),
            ((total_id_cnt-unique_id_cnt)/total_id_cnt*100).round(2).alias('share_of_duplicates_pcnt')
        ).collect()
    )
    print()



total_id_cnt,unique_id_cnt,share_of_duplicates_pcnt
u32,u32,f64
42845,33643,21.48





total_id_cnt,unique_id_cnt,share_of_duplicates_pcnt
u32,u32,f64
46243,36444,21.19





total_id_cnt,unique_id_cnt,share_of_duplicates_pcnt
u32,u32,f64
39329,31085,20.96





total_id_cnt,unique_id_cnt,share_of_duplicates_pcnt
u32,u32,f64
52767,38876,26.33





total_id_cnt,unique_id_cnt,share_of_duplicates_pcnt
u32,u32,f64
257124,126201,50.92





total_id_cnt,unique_id_cnt,share_of_duplicates_pcnt
u32,u32,f64
291400,136526,53.15





total_id_cnt,unique_id_cnt,share_of_duplicates_pcnt
u32,u32,f64
292140,133487,54.31





total_id_cnt,unique_id_cnt,share_of_duplicates_pcnt
u32,u32,f64
391061,155111,60.34





# Сверка вакансий по одинаковому id

In [150]:
for report_dt in sorted(bronze_report_dates):
    print(report_dt)
    df_lazy = pl.scan_ndjson(
        f"s3://bronze/hh/vacancies/date={report_dt}/*.jsonl.gz",
        storage_options=storage_options,
        # infer_schema_length=None,
        schema=vacancy_schema
    )
    vacancy_id = df_lazy.select('id').group_by('id').len().sort(by='len', descending=True).head(1).select('id').collect().item()
    unique_rows_cnt = df_lazy.filter(pl.col('id') == vacancy_id).collect().n_unique()
    print("Кол-во уникальных строк с одинаковым vacancy id", unique_rows_cnt)
    print()

2025-11-10
Кол-во уникальных строк с одинаковым vacancy id 1

2025-11-17
Кол-во уникальных строк с одинаковым vacancy id 1

2025-11-18
Кол-во уникальных строк с одинаковым vacancy id 1

2025-12-02
Кол-во уникальных строк с одинаковым vacancy id 1

2025-12-03
Кол-во уникальных строк с одинаковым vacancy id 1

2025-12-04
Кол-во уникальных строк с одинаковым vacancy id 1



**Алгоритм bronze_hh_loader_dag.py/bronze_hh_loader_v3 запрашивает 20-50% вакансий повторно**

# Проверка корректности query params запросов из bronze_hh_loader_dag.py/bronze_hh_loader_v3

In [157]:
# [2025-12-05 14:36:26] INFO - [DRILL DOWN] Interval 2025-12-04 10:09:13.250000+03:00 - 2025-12-04 10:10:09.500000+03:00 has 2541 items. Drilling by params... source=airflow.task loc=bronze_hh_loader_dag.py:269

In [158]:
import requests

def get(url: str, params: dict = {}, headers: dict = {}):
    headers = {**headers, "HH-User-Agent": "Skill Lens/1.0 (loveyousomuch554@gmail.com)"}
    params = {**params}
    res = requests.get(url=url, params=params, headers=headers)
    res.raise_for_status()
    return res.json()

In [162]:
# ISO 8601 (YYYY-MM-DD) или с точностью до секунды YYYY-MM-DDThh:mm:ss±hhmm
date_from = "2025-12-04T10:09:13+0300"
date_to = "2025-12-04T10:10:09+0300"
# number <= 100, default 10
per_page = 100
# ?x= query string
query_params = {
    "date_from": date_from,
    "date_to": date_to,
    "per_page": per_page,
}
vacancies = get("https://api.hh.ru/vacancies", params=query_params)
vacancies.get('found')

2525

In [163]:
EMPLOYMENT_FORMS = [
    "FULL",  # Полная занятость
    "PART",  # Частичная занятость
    "PROJECT",  # Проектная работа
    "FLY_IN_FLY_OUT",  # Вахта
    "SIDE_JOB",  # Подработка
]

WORK_FORMATS = [
    "ON_SITE",  # На месте
    "REMOTE",  # Удаленно
    "HYBRID",  # Гибрид
    "FIELD_WORK",  # Разъездной
]

In [201]:
%%time
for emp in EMPLOYMENT_FORMS:
    emp_ids = list()
    new_query_params = {
        "date_from": date_from,
        "date_to": date_to,
        "per_page": per_page,
        "employment_form": emp,
    }
    vacancies = get("https://api.hh.ru/vacancies", params=new_query_params)
    print(emp, vacancies.get('found'))
    emp_ids.extend([item['id'] for item in vacancies.get('items', [])])
    found = vacancies.get('found')
    for page in range(1, math.ceil(found/per_page)):
        try:
            vacancies = get("https://api.hh.ru/vacancies", params={'page': page, **new_query_params})
            emp_ids.extend([item['id'] for item in vacancies.get('items', [])])
        except Exception as e:
            print(e)
        finally:
            time.sleep(.5)
    print(emp, "total cnt", len(emp_ids), "unique ids count", len(set(emp_ids)))
    time.sleep(1)

FULL 2042
400 Client Error: Bad Request for url: https://api.hh.ru/vacancies?page=20&date_from=2025-12-04T10%3A09%3A13%2B0300&date_to=2025-12-04T10%3A10%3A09%2B0300&per_page=100&employment_form=FULL
FULL total cnt 2000 unique ids count 2000
PART 87
PART total cnt 87 unique ids count 87
PROJECT 9
PROJECT total cnt 9 unique ids count 9
FLY_IN_FLY_OUT 387
FLY_IN_FLY_OUT total cnt 387 unique ids count 387
SIDE_JOB 9
SIDE_JOB total cnt 9 unique ids count 9
CPU times: user 1.08 s, sys: 159 ms, total: 1.24 s
Wall time: 29.2 s


In [202]:
vacancies.keys()

dict_keys(['items', 'found', 'pages', 'page', 'per_page', 'clusters', 'arguments', 'fixes', 'suggests', 'alternate_url'])

In [203]:
vacancies.get('alternate_url')

'https://hh.ru/search/vacancy?date_from=04.12.2025+10%3A09%3A13&date_to=04.12.2025+10%3A10%3A09&employment_form=PROJECT&enable_snippets=true&items_on_page=100'

In [204]:
%%time
for work in WORK_FORMATS:
    work_ids = list()
    new_query_params = {
        "date_from": date_from,
        "date_to": date_to,
        "per_page": per_page,
        "work_format": work,
    }
    vacancies = get("https://api.hh.ru/vacancies", params=new_query_params)
    print(work, vacancies.get('found'))
    work_ids.extend([item['id'] for item in vacancies.get('items', [])])
    found = vacancies.get('found')
    for page in range(1, math.ceil(found/per_page)):
        try:
            vacancies = get("https://api.hh.ru/vacancies", params={'page': page, **new_query_params})
            work_ids.extend([item['id'] for item in vacancies.get('items', [])])
        except Exception as e:
            print(e)
        finally:
            time.sleep(.5)
    print(work, "total cnt", len(work_ids), "unique ids count", len(set(work_ids)))
    time.sleep(1)

ON_SITE 1187
ON_SITE total cnt 1187 unique ids count 1187
REMOTE 116
REMOTE total cnt 116 unique ids count 116
HYBRID 72
HYBRID total cnt 72 unique ids count 72
FIELD_WORK 274
FIELD_WORK total cnt 274 unique ids count 274
CPU times: user 634 ms, sys: 93.3 ms, total: 727 ms
Wall time: 20 s


In [205]:
vacancies.get('alternate_url')

'https://hh.ru/search/vacancy?date_from=04.12.2025+10%3A09%3A13&date_to=04.12.2025+10%3A10%3A09&enable_snippets=true&items_on_page=100&page=2&work_format=FIELD_WORK'

# Ошибка конкатенации схемы, схемы приходят разные

In [17]:
storage_options = {
    "aws_endpoint_url": "http://localhost:9000", # Важно: aws_endpoint_url (иногда endpoint_url)
    "aws_access_key_id": "minioadmin",
    "aws_secret_access_key": "minioadmin",
    "aws_region": "us-east-1", # Для MinIO часто можно оставить us-east-1
    "aws_allow_http": "true",  # Разрешить HTTP (без SSL)
}

In [19]:
bucket = s3.Bucket('bronze')
bronze_report_dates = set()
for f in bucket.objects.all():
    s3fp = f.key
    dt_start_index = s3fp.index('date=')+len('date=')
    dt_end_index = dt_start_index+len('2025-12-02')
    bronze_report_dates.add(s3fp[dt_start_index:dt_end_index])
bronze_report_dates

{'2025-11-10',
 '2025-11-17',
 '2025-11-18',
 '2025-12-02',
 '2025-12-03',
 '2025-12-04',
 '2025-12-05',
 '2025-12-08'}

In [29]:
%%time
outliers = set()
for report_dt in bronze_report_dates:
    print(report_dt)
    schemas = dict()
    for obj in bucket.objects.filter(Prefix=f'hh/vacancies/date={report_dt}/'):
        bucket_path = obj.key
        df_lazy = pl.scan_ndjson(
            f"s3://bronze/{bucket_path}",
            storage_options=storage_options,
            infer_schema_length=None
        )
        columns = df_lazy.collect_schema().names()
        schemas[bucket_path] = columns
        print('\t', bucket_path, 'columns count', len(columns))
    union_schema = set(c for sc in schemas.values() for c in sc)
    print('\t', 'Count of unique columns from all partitions -', len(union_schema))
    for bucket_path, sch in schemas.items():
        sch = set(sch)
        for ucol in union_schema:
            if ucol not in sch:
                print('\t', 'Column', repr(ucol), 'not in', repr(bucket_path))
                outliers.add(ucol)
print()
print('Outlier columns', outliers)

2025-12-05
	 hh/vacancies/date=2025-12-05/part_0001.jsonl.gz columns count 49
	 hh/vacancies/date=2025-12-05/part_0002.jsonl.gz columns count 49
	 hh/vacancies/date=2025-12-05/part_0003.jsonl.gz columns count 49
	 hh/vacancies/date=2025-12-05/part_0004.jsonl.gz columns count 49
	 hh/vacancies/date=2025-12-05/part_0005.jsonl.gz columns count 49
	 hh/vacancies/date=2025-12-05/part_0006.jsonl.gz columns count 49
	 hh/vacancies/date=2025-12-05/part_0007.jsonl.gz columns count 49
	 hh/vacancies/date=2025-12-05/part_0008.jsonl.gz columns count 49
	 hh/vacancies/date=2025-12-05/part_0009.jsonl.gz columns count 49
	 hh/vacancies/date=2025-12-05/part_0010.jsonl.gz columns count 49
	 hh/vacancies/date=2025-12-05/part_0011.jsonl.gz columns count 49
	 hh/vacancies/date=2025-12-05/part_0012.jsonl.gz columns count 49
	 hh/vacancies/date=2025-12-05/part_0013.jsonl.gz columns count 49
	 hh/vacancies/date=2025-12-05/part_0014.jsonl.gz columns count 49
	 hh/vacancies/date=2025-12-05/part_0015.jsonl.gz c

# Определяем схему и забираем только эти поля, делаем варнинг если появятся новые поля

In [2]:
storage_options = {
    "aws_endpoint_url": "http://localhost:9000", # Важно: aws_endpoint_url (иногда endpoint_url)
    "aws_access_key_id": "minioadmin",
    "aws_secret_access_key": "minioadmin",
    "aws_region": "us-east-1", # Для MinIO часто можно оставить us-east-1
    "aws_allow_http": "true",  # Разрешить HTTP (без SSL)
}

In [3]:
df_lazy = pl.scan_ndjson(
    f"s3://bronze/hh/vacancies/date=2025-11-10/part_0001.jsonl.gz",
    storage_options=storage_options,
    infer_schema_length=None
)
str(df_lazy.collect_schema())

"Schema({'internship': Boolean, 'published_at': String, 'working_time_modes': List(Struct({'id': String, 'name': String})), 'working_hours': List(Struct({'id': String, 'name': String})), 'work_schedule_by_days': List(Struct({'id': String, 'name': String})), 'show_logo_in_search': Boolean, 'working_time_intervals': List(Struct({'id': String, 'name': String})), 'response_letter_required': Boolean, 'apply_alternate_url': String, 'branding': Struct({'type': String, 'tariff': String}), 'alternate_url': String, 'snippet': Struct({'requirement': String, 'responsibility': String}), 'night_shifts': Boolean, 'professional_roles': List(Struct({'id': String, 'name': String})), 'sort_point_distance': Null, 'employment_form': Struct({'id': String, 'name': String}), 'salary_range': Struct({'from': Int64, 'to': Int64, 'currency': String, 'gross': Boolean, 'mode': Struct({'id': String, 'name': String}), 'frequency': Struct({'id': String, 'name': String})}), 'schedule': Struct({'id': String, 'name': Str

In [4]:
df_lazy = pl.scan_ndjson(
    f"s3://bronze/hh/vacancies/date=2025-11-17/part_0001.jsonl.gz",
    storage_options=storage_options,
    infer_schema_length=None
)
str(df_lazy.collect_schema())

"Schema({'internship': Boolean, 'published_at': String, 'working_time_modes': List(Struct({'id': String, 'name': String})), 'working_hours': List(Struct({'id': String, 'name': String})), 'work_schedule_by_days': List(Struct({'id': String, 'name': String})), 'show_logo_in_search': Boolean, 'working_time_intervals': List(Struct({'id': String, 'name': String})), 'response_letter_required': Boolean, 'apply_alternate_url': String, 'branding': Struct({'type': String, 'tariff': String}), 'alternate_url': String, 'snippet': Struct({'requirement': String, 'responsibility': String}), 'night_shifts': Boolean, 'professional_roles': List(Struct({'id': String, 'name': String})), 'sort_point_distance': Null, 'employment_form': Struct({'id': String, 'name': String}), 'salary_range': Struct({'from': Int64, 'to': Int64, 'currency': String, 'gross': Boolean, 'mode': Struct({'id': String, 'name': String}), 'frequency': Struct({'id': String, 'name': String})}), 'schedule': Struct({'id': String, 'name': Str

In [5]:
# Единая полная схема, включающая video_vacancy и immediate_redirect_url
full_schema = pl.Schema({
    'is_adv_vacancy': pl.Boolean,
    'internship': pl.Boolean,
    'department': pl.Struct({'id': pl.String, 'name': pl.String}),
    'working_time_intervals': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'show_contacts': pl.Boolean,
    'salary': pl.Struct({'from': pl.Int64, 'to': pl.Int64, 'currency': pl.String, 'gross': pl.Boolean}),
    'apply_alternate_url': pl.String,
    'created_at': pl.String,
    'work_format': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'response_url': pl.String,
    'response_letter_required': pl.Boolean,
    'relations': pl.List(pl.Null),
    'professional_roles': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'accept_incomplete_resumes': pl.Boolean,
    'premium': pl.Boolean,
    'has_test': pl.Boolean,
    'contacts': pl.Struct({'name': pl.Null, 'email': pl.Null, 'phones': pl.List(pl.Null)}),
    'employment': pl.Struct({'id': pl.String, 'name': pl.String}),
    'employment_form': pl.Struct({'id': pl.String, 'name': pl.String}),
    'adv_context': pl.Null,
    'sort_point_distance': pl.Null,
    'name': pl.String,
    'address': pl.Struct({
        'city': pl.String, 'street': pl.String, 'building': pl.String, 
        'lat': pl.Float64, 'lng': pl.Float64, 'description': pl.Null, 
        'raw': pl.String, 
        'metro': pl.Struct({'station_name': pl.String, 'line_name': pl.String, 'station_id': pl.String, 'line_id': pl.String, 'lat': pl.Float64, 'lng': pl.Float64}), 
        'metro_stations': pl.List(pl.Struct({'station_name': pl.String, 'line_name': pl.String, 'station_id': pl.String, 'line_id': pl.String, 'lat': pl.Float64, 'lng': pl.Float64})), 
        'id': pl.String
    }),
    'branding': pl.Struct({'type': pl.String, 'tariff': pl.String}),
    'insider_interview': pl.Struct({'id': pl.String, 'url': pl.String}),
    'working_days': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'fly_in_fly_out_duration': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'url': pl.String,
    'employer': pl.Struct({
        'id': pl.String, 'name': pl.String, 'url': pl.String, 'alternate_url': pl.String, 
        'logo_urls': pl.Struct({'original': pl.String, '90': pl.String, '240': pl.String}), 
        'vacancies_url': pl.String, 'country_id': pl.Int64, 'accredited_it_employer': pl.Boolean, 'trusted': pl.Boolean
    }),
    'accept_temporary': pl.Boolean,
    'published_at': pl.String,
    'salary_range': pl.Struct({
        'from': pl.Int64, 'to': pl.Int64, 'currency': pl.String, 'gross': pl.Boolean, 
        'mode': pl.Struct({'id': pl.String, 'name': pl.String}), 
        'frequency': pl.Struct({'id': pl.String, 'name': pl.String})
    }),
    'work_schedule_by_days': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'id': pl.String,
    'show_logo_in_search': pl.Boolean,
    'type': pl.Struct({'id': pl.String, 'name': pl.String}),
    'experience': pl.Struct({'id': pl.String, 'name': pl.String}),
    'area': pl.Struct({'id': pl.String, 'name': pl.String, 'url': pl.String}),
    'archived': pl.Boolean,
    'working_time_modes': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'alternate_url': pl.String,
    'snippet': pl.Struct({'requirement': pl.String, 'responsibility': pl.String}),
    'schedule': pl.Struct({'id': pl.String, 'name': pl.String}),
    'night_shifts': pl.Boolean,
    'working_hours': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'adv_response_url': pl.Null,
    
    # Те самые проблемные колонки:
    'video_vacancy': pl.Struct({
        'cover_picture': pl.Struct({'resized_path': pl.String, 'resized_width': pl.Int64, 'resized_height': pl.Int64}), 
        'snippet_picture': pl.Struct({'url': pl.String}), 
        'video': pl.Struct({'upload_id': pl.String, 'url': pl.String}), 
        'snippet_video': pl.Struct({'upload_id': pl.String, 'url': pl.String}), 
        'video_url': pl.String, 
        'snippet_video_url': pl.String, 
        'snippet_picture_url': pl.String
    }),
    
    'brand_snippet': pl.Struct({
        'logo': pl.Null, 'logo_xs': pl.Null, 
        'logo_scalable': pl.Struct({
            'default': pl.Struct({'width': pl.Int64, 'height': pl.Int64, 'url': pl.String}), 
            'xs': pl.Struct({'width': pl.Int64, 'height': pl.Int64, 'url': pl.String})
        }), 
        'picture': pl.Null, 'picture_xs': pl.Null, 
        'picture_scalable': pl.Struct({
            'default': pl.Struct({'width': pl.Int64, 'height': pl.Int64, 'url': pl.String}), 
            'xs': pl.Struct({'width': pl.Int64, 'height': pl.Int64, 'url': pl.String})
        }), 
        'background': pl.Struct({
            'color': pl.String, 
            'gradient': pl.Struct({'angle': pl.Float64, 'color_list': pl.List(pl.Struct({'color': pl.String, 'position': pl.Float64}))})
        })
    }),
    
    # Еще одна проблемная колонка
    'immediate_redirect_url': pl.String
})
len(full_schema)

49

In [9]:
s3 = boto3.resource(
    's3',
    endpoint_url='http://localhost:9000',
    aws_access_key_id='minioadmin',
    aws_secret_access_key='minioadmin',
    aws_session_token=None,
    config=boto3.session.Config(signature_version='s3v4'),
    verify=False
)
bucket = s3.Bucket('bronze')
bronze_report_dates = set()
for f in bucket.objects.all():
    s3fp = f.key
    dt_start_index = s3fp.index('date=')+len('date=')
    dt_end_index = dt_start_index+len('2025-12-02')
    bronze_report_dates.add(s3fp[dt_start_index:dt_end_index])
bronze_report_dates

{'2025-11-10',
 '2025-11-17',
 '2025-11-18',
 '2025-12-02',
 '2025-12-03',
 '2025-12-04',
 '2025-12-05',
 '2025-12-08'}

In [12]:
for report_dt in bronze_report_dates:
    df_lazy = pl.scan_ndjson(
        f"s3://bronze/hh/vacancies/date={report_dt}/*.jsonl.gz",
        storage_options=storage_options,
        schema=full_schema,
    )
    
    print(report_dt)
    print(df_lazy.select('id').count().collect())
    print()

2025-12-05
shape: (1, 1)
┌────────┐
│ id     │
│ ---    │
│ u32    │
╞════════╡
│ 292140 │
└────────┘

2025-12-03
shape: (1, 1)
┌────────┐
│ id     │
│ ---    │
│ u32    │
╞════════╡
│ 257124 │
└────────┘

2025-11-17
shape: (1, 1)
┌───────┐
│ id    │
│ ---   │
│ u32   │
╞═══════╡
│ 46243 │
└───────┘

2025-12-04
shape: (1, 1)
┌────────┐
│ id     │
│ ---    │
│ u32    │
╞════════╡
│ 291400 │
└────────┘

2025-11-18
shape: (1, 1)
┌───────┐
│ id    │
│ ---   │
│ u32   │
╞═══════╡
│ 39329 │
└───────┘

2025-11-10
shape: (1, 1)
┌───────┐
│ id    │
│ ---   │
│ u32   │
╞═══════╡
│ 42845 │
└───────┘

2025-12-02
shape: (1, 1)
┌───────┐
│ id    │
│ ---   │
│ u32   │
╞═══════╡
│ 52767 │
└───────┘

2025-12-08
shape: (1, 1)
┌────────┐
│ id     │
│ ---    │
│ u32    │
╞════════╡
│ 391061 │
└────────┘



# Прототип Silver DAG'a

In [3]:
import polars as pl

In [39]:
# Единая полная схема (Master Schema)
full_schema = pl.Schema({
    'is_adv_vacancy': pl.Boolean,
    'internship': pl.Boolean,
    'department': pl.Struct({'id': pl.String, 'name': pl.String}),
    'working_time_intervals': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'show_contacts': pl.Boolean,
    'salary': pl.Struct({'from': pl.Int64, 'to': pl.Int64, 'currency': pl.String, 'gross': pl.Boolean}),
    'apply_alternate_url': pl.String,
    'created_at': pl.String,
    'work_format': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'response_url': pl.String,
    'response_letter_required': pl.Boolean,
    'relations': pl.List(pl.Null),
    'professional_roles': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'accept_incomplete_resumes': pl.Boolean,
    'premium': pl.Boolean,
    'has_test': pl.Boolean,
    'contacts': pl.Struct({'name': pl.Null, 'email': pl.Null, 'phones': pl.List(pl.Null)}),
    'employment': pl.Struct({'id': pl.String, 'name': pl.String}),
    'employment_form': pl.Struct({'id': pl.String, 'name': pl.String}),
    'adv_context': pl.Null,
    'sort_point_distance': pl.Null,
    'name': pl.String,
    'address': pl.Struct({
        'city': pl.String, 'street': pl.String, 'building': pl.String, 
        'lat': pl.Float64, 'lng': pl.Float64, 'description': pl.Null, 
        'raw': pl.String, 
        'metro': pl.Struct({'station_name': pl.String, 'line_name': pl.String, 'station_id': pl.String, 'line_id': pl.String, 'lat': pl.Float64, 'lng': pl.Float64}), 
        'metro_stations': pl.List(pl.Struct({'station_name': pl.String, 'line_name': pl.String, 'station_id': pl.String, 'line_id': pl.String, 'lat': pl.Float64, 'lng': pl.Float64})), 
        'id': pl.String
    }),
    'branding': pl.Struct({'type': pl.String, 'tariff': pl.String}),
    'insider_interview': pl.Struct({'id': pl.String, 'url': pl.String}),
    'working_days': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'fly_in_fly_out_duration': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'url': pl.String,
    'employer': pl.Struct({
        'id': pl.String, 'name': pl.String, 'url': pl.String, 'alternate_url': pl.String, 
        'logo_urls': pl.Struct({'original': pl.String, '90': pl.String, '240': pl.String}), 
        'vacancies_url': pl.String, 'country_id': pl.Int64, 'accredited_it_employer': pl.Boolean, 'trusted': pl.Boolean
    }),
    'accept_temporary': pl.Boolean,
    'published_at': pl.String,
    'salary_range': pl.Struct({
        'from': pl.Int64, 'to': pl.Int64, 'currency': pl.String, 'gross': pl.Boolean, 
        'mode': pl.Struct({'id': pl.String, 'name': pl.String}), 
        'frequency': pl.Struct({'id': pl.String, 'name': pl.String})
    }),
    'work_schedule_by_days': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'id': pl.String,
    'show_logo_in_search': pl.Boolean,
    'type': pl.Struct({'id': pl.String, 'name': pl.String}),
    'experience': pl.Struct({'id': pl.String, 'name': pl.String}),
    'area': pl.Struct({'id': pl.String, 'name': pl.String, 'url': pl.String}),
    'archived': pl.Boolean,
    'working_time_modes': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'alternate_url': pl.String,
    'snippet': pl.Struct({'requirement': pl.String, 'responsibility': pl.String}),
    'schedule': pl.Struct({'id': pl.String, 'name': pl.String}),
    'night_shifts': pl.Boolean,
    'working_hours': pl.List(pl.Struct({'id': pl.String, 'name': pl.String})),
    'adv_response_url': pl.Null,
    
    # Те самые проблемные колонки:
    'video_vacancy': pl.Struct({
        'cover_picture': pl.Struct({'resized_path': pl.String, 'resized_width': pl.Int64, 'resized_height': pl.Int64}), 
        'snippet_picture': pl.Struct({'url': pl.String}), 
        'video': pl.Struct({'upload_id': pl.String, 'url': pl.String}), 
        'snippet_video': pl.Struct({'upload_id': pl.String, 'url': pl.String}), 
        'video_url': pl.String, 
        'snippet_video_url': pl.String, 
        'snippet_picture_url': pl.String
    }),
    
    'brand_snippet': pl.Struct({
        'logo': pl.Null, 'logo_xs': pl.Null, 
        'logo_scalable': pl.Struct({
            'default': pl.Struct({'width': pl.Int64, 'height': pl.Int64, 'url': pl.String}), 
            'xs': pl.Struct({'width': pl.Int64, 'height': pl.Int64, 'url': pl.String})
        }), 
        'picture': pl.Null, 'picture_xs': pl.Null, 
        'picture_scalable': pl.Struct({
            'default': pl.Struct({'width': pl.Int64, 'height': pl.Int64, 'url': pl.String}), 
            'xs': pl.Struct({'width': pl.Int64, 'height': pl.Int64, 'url': pl.String})
        }), 
        'background': pl.Struct({
            'color': pl.String, 
            'gradient': pl.Struct({'angle': pl.Float64, 'color_list': pl.List(pl.Struct({'color': pl.String, 'position': pl.Float64}))})
        })
    }),
    
    # Еще одна проблемная колонка
    'immediate_redirect_url': pl.String
})
len(full_schema)

49

In [69]:
storage_options = {
    "aws_endpoint_url": "http://localhost:9000", # Важно: aws_endpoint_url (иногда endpoint_url)
    "aws_access_key_id": "minioadmin",
    "aws_secret_access_key": "minioadmin",
    "aws_region": "us-east-1", # Для MinIO часто можно оставить us-east-1
    "aws_allow_http": "true",  # Разрешить HTTP (без SSL)
}

def get_report_dts():
    s3 = boto3.resource(
        's3',
        endpoint_url='http://localhost:9000',
        aws_access_key_id='minioadmin',
        aws_secret_access_key='minioadmin',
        aws_session_token=None,
        config=boto3.session.Config(signature_version='s3v4'),
        verify=False
    )
    bucket = s3.Bucket('bronze')
    bronze_report_dates = set()
    for f in bucket.objects.all():
        s3fp = f.key
        dt_start_index = s3fp.index('date=')+len('date=')
        dt_end_index = dt_start_index+len('2025-12-02')
        bronze_report_dates.add(s3fp[dt_start_index:dt_end_index])
    return bronze_report_dates

def lazy_read_bronze_part(s3path):
    df_lazy = pl.scan_ndjson(
        s3path,
        storage_options=storage_options,
        schema=full_schema,
        ignore_errors=True,
    )
    # Трансформации (Type Casting + Deduplication)
    df_lazy = (
        df_lazy
        .unique(subset='id', keep='any')
        .with_columns([
            # 1. Парсим в честный UTC (Universal Time)
            # Было: 12:00+0300 -> Стало: 09:00 UTC
            pl.col("published_at")
              .str.to_datetime(format="%Y-%m-%dT%H:%M:%S%z", strict=False)
              .dt.convert_time_zone("UTC")
              .alias("published_at_utc"),
    
            # 2. Парсим "Время на стене" (Wall Clock Time)
            # Мы говорим Polars: "Игнорируй хвост (+0300) и просто прочитай время"
            # exact=False позволяет распарсить начало строки и забить на хвост
            pl.col("published_at")
              .str.to_datetime(format="%Y-%m-%dT%H:%M:%S", strict=False, exact=False)
              .alias("published_at_wall_clock"),
              
            # Повторяем для created_at
            pl.col("created_at")
              .str.to_datetime(format="%Y-%m-%dT%H:%M:%S%z", strict=False)
              .dt.convert_time_zone("UTC")
              .alias("created_at_utc"),
    
            pl.col("created_at")
              .str.to_datetime(format="%Y-%m-%dT%H:%M:%S", strict=False, exact=False)
              .alias("created_at_wall_clock"),
        ])
        .with_columns([
            # 3. Вычисляем смещение: (Время на стене) - (Время UTC)
            # Чтобы вычесть, нужно убрать пометку "UTC" у первой колонки (replace_time_zone(None))
            # 12:00 (Naive) - 09:00 (Naive) = 180 минут
            ((pl.col("published_at_wall_clock") - pl.col("published_at_utc").dt.replace_time_zone(None))
             .dt.total_minutes()
             .cast(pl.Int16)
             .alias("published_at_offset")),
    
            ((pl.col("created_at_wall_clock") - pl.col("created_at_utc").dt.replace_time_zone(None))
             .dt.total_minutes()
             .cast(pl.Int16)
             .alias("created_at_offset"))
        ])
        # Убираем лишнее
        .drop(["published_at", "created_at", "published_at_wall_clock", "created_at_wall_clock"])
    )
    return df_lazy

In [70]:
bronze_report_dates = get_report_dts()
bronze_report_dates

{'2025-11-10',
 '2025-11-17',
 '2025-11-18',
 '2025-12-02',
 '2025-12-03',
 '2025-12-04',
 '2025-12-05',
 '2025-12-08',
 '2025-12-09'}

In [71]:
for report_dt in bronze_report_dates:
    df_lazy = lazy_read_bronze_part(f"s3://bronze/hh/vacancies/date={report_dt}/*.jsonl.gz")
    print(report_dt, 'rows count', df_lazy.select('id').count().collect().item())

2025-12-08 rows count 155111
2025-12-09 rows count 148628
2025-12-04 rows count 136526
2025-12-03 rows count 126201
2025-11-10 rows count 33643
2025-12-02 rows count 38876
2025-11-17 rows count 36444
2025-12-05 rows count 133487
2025-11-18 rows count 31085


In [72]:
cols = ['published_at_utc',
 'published_at_offset',
 'created_at_utc',
 'created_at_offset',]
df_lazy.select(cols).head(1).collect()

published_at_utc,published_at_offset,created_at_utc,created_at_offset
"datetime[μs, UTC]",i16,"datetime[μs, UTC]",i16
2025-11-18 05:36:20 UTC,180,2025-11-18 05:36:20 UTC,180
