# Выгрузка RUONIA
Измените даты и путь к CSV в коде ниже, затем выполните ячейку, чтобы скачать данные по RUONIA за нужный период.

In [2]:

import datetime as dt
import csv
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional

import requests

try:
    import pandas as pd  # type: ignore
except Exception:
    pd = None

CBR_ENDPOINT = "https://cbr.ru/DailyInfoWebServ/DailyInfo.asmx"

def _dt(value: dt.date | str) -> str:
    if isinstance(value, dt.date):
        value = value.isoformat()
    if "T" not in value:
        value = f"{value}T00:00:00"
    return value

def _soap12_envelope(method: str, body_xml: str) -> str:
    return f"""<?xml version="1.0" encoding="utf-8"?>
<soap12:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                 xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                 xmlns:soap12="http://www.w3.org/2003/05/soap-envelope">
  <soap12:Body>
    <{method} xmlns="http://web.cbr.ru/">
      {body_xml}
    </{method}>
  </soap12:Body>
</soap12:Envelope>""".strip()

def _post_soap12(method: str, body_xml: str, timeout: int = 30) -> requests.Response:
    envelope = _soap12_envelope(method, body_xml)
    headers = {"Content-Type": "application/soap+xml; charset=utf-8"}
    resp = requests.post(CBR_ENDPOINT, data=envelope.encode("utf-8"), headers=headers, timeout=timeout)
    resp.raise_for_status()
    return resp

def _inner_text(elem: Optional[ET.Element]) -> Optional[str]:
    if elem is None:
        return None
    if list(elem):
        return "".join(ET.tostring(child, encoding="unicode") for child in elem).strip()
    text = "".join(elem.itertext()).strip()
    return text or None

def _clean_tag(tag: str) -> str:
    return tag.split("}", 1)[-1]

def _parse_rows_from_inner_xml(xml_text: str) -> List[Dict[str, Any]]:
    root = ET.fromstring(xml_text)
    node = root
    for _ in range(4):
        children = list(node)
        if not children:
            break
        if len(children) == 1 and list(children[0]):
            node = children[0]
            continue
        break

    rows: List[Dict[str, Any]] = []
    for row in node:
        row_dict: Dict[str, Any] = {}
        for field in row:
            key = _clean_tag(field.tag)
            val = (field.text or "").strip()
            row_dict[key] = val
        for k, v in row.attrib.items():
            row_dict[_clean_tag(k)] = v
        if row_dict:
            rows.append(row_dict)
    return rows

def _coerce_types(rows: Iterable[Dict[str, Any]]) -> List[Dict[str, Any]]:
    parsed: List[Dict[str, Any]] = []
    for row in rows:
        copy = dict(row)
        for key in ("ruo", "vol", "volume", "volume_bln_rub"):
            if key in copy:
                try:
                    copy[key] = float(str(copy[key]).replace(",", "."))
                except Exception:
                    pass
        parsed.append(copy)
    return parsed

def _standardize_column_names(rows: Iterable[Dict[str, Any]]) -> List[Dict[str, Any]]:
    mapping = {
        "D0": "date",
        "ruo": "ruonia_percent",
        "vol": "volume_bln_rub",
        "volbln": "volume_bln_rub",
        "dealn": "deals_count",
        "partn": "participants_count",
    }
    renamed: List[Dict[str, Any]] = []
    for row in rows:
        copy = dict(row)
        for old, new in mapping.items():
            if old in copy and new not in copy:
                copy[new] = copy.pop(old)
        renamed.append(copy)
    return renamed

def fetch_ruonia(from_date: dt.date | str,
                 to_date: dt.date | str,
                 timeout: int = 30) -> List[Dict[str, Any]]:
    body = f"<fromDate>{_dt(from_date)}</fromDate><ToDate>{_dt(to_date)}</ToDate>"
    resp = _post_soap12("RuoniaXML", body, timeout=timeout)
    xml = ET.fromstring(resp.content)
    ns = {"soap": "http://www.w3.org/2003/05/soap-envelope", "wb": "http://web.cbr.ru/"}
    result = xml.find(".//wb:RuoniaXMLResult", ns)
    inner = _inner_text(result)
    if not inner:
        raise RuntimeError("Пустой ответ RuoniaXMLResult")

    rows = _parse_rows_from_inner_xml(inner)
    rows = _coerce_types(rows)
    rows = _standardize_column_names(rows)
    if not rows:
        raise RuntimeError("Нет данных за указанный период")
    return rows

def to_dataframe(rows: List[Dict[str, Any]]):
    if pd is None:
        return None
    if not rows:
        return pd.DataFrame()
    df = pd.DataFrame(rows)
    pref = [c for c in ("date", "ruonia_percent", "volume_bln_rub", "deals_count", "participants_count") if c in df.columns]
    df = df[pref + [c for c in df.columns if c not in pref]]
    return df.sort_values("date").reset_index(drop=True)

def save_csv(path: str, rows: List[Dict[str, Any]]):
    keys = sorted({k for row in rows for k in row.keys()})
    with open(path, "w", newline="", encoding="utf-8") as fh:
        writer = csv.DictWriter(fh, fieldnames=keys)
        writer.writeheader()
        for row in rows:
            writer.writerow({k: row.get(k, "") for k in keys})


In [3]:
# --- Параметры выгрузки ---
from_date = "2025-01-01"
to_date = "2025-10-31"
csv_path = Path("ruonia_export.csv")

rows = fetch_ruonia(from_date, to_date)

if pd:
    df = to_dataframe(rows)
    display(df.head())
    df.to_csv(csv_path, index=False)
else:
    for row in rows[:5]:
        print(row)
    save_csv(str(csv_path), rows)

print(f"Saved: {csv_path.resolve()}")

Unnamed: 0,date,ruonia_percent,volume_bln_rub,DateUpdate
0,2025-01-09T00:00:00+03:00,21.0,646.28,2025-01-10T14:20:33.857+03:00
1,2025-01-10T00:00:00+03:00,20.62,580.93,2025-01-13T14:19:21.627+03:00
2,2025-01-13T00:00:00+03:00,20.49,667.45,2025-01-14T14:21:34.737+03:00
3,2025-01-14T00:00:00+03:00,21.0,449.59,2025-01-15T14:14:00.5+03:00
4,2025-01-15T00:00:00+03:00,20.74,369.58,2025-01-16T14:23:58.027+03:00


Saved: /Users/pavel/Desktop/Macro/RUONIA/ruonia_export.csv
