In [9]:
import requests
import csv
from io import StringIO
from clickhouse_sqlalchemy import make_session, get_declarative_base
from sqlalchemy import create_engine, Column, String, Float, Date
from sqlalchemy.orm import sessionmaker
from datetime import datetime

In [14]:
# Настройки подключения к ClickHouse
CLICKHOUSE_URI = 'clickhouse+native://root:020491@192.168.1.73:9000/metrics'

# Настройки API
API_URL = 'https://api.appmetrica.yandex.ru/logs/v1/export/events.csv'
params = {
    'application_id': '3757420',
    'date_since': '2023-12-01',
    'date_until': '2023-12-02',
    'fields': 'event_datetime,event_name,event_json,appmetrica_device_id,session_id'
}
oauth_token = 'y0_AgAAAAAisvRKAAutVAAAAAEC7JiLAAAtHPjoix1FhahM1KuvZy5qiQ3ZvQ'
headers = {
    'Authorization': f'OAuth {oauth_token}'
}

# Создание подключения к ClickHouse
engine = create_engine(CLICKHOUSE_URI)
session_factory = sessionmaker(bind=engine)
session = session_factory()


# Модель для таблицы в ClickHouse
Base = get_declarative_base()

class DataTable(Base):
    __tablename__ = 'metrica_events'
    event_datetime = Column(Date)
    event_name = Column(String)
    event_json = Column(String)
    appmetrica_device_id = Column(String, primary_key=True)
    session_id = Column(String)


# Функция для загрузки данных в ClickHouse
def load_data_to_clickhouse(data):
    for item in data:
        record = DataTable(**item)
        session.add(record)
    session.commit()

# Основной цикл для получения и загрузки данных
def fetch_and_load_data():
    try:
        with requests.get(API_URL, params=params, headers=headers, stream=True) as response:
            if response.status_code != 200:
                raise Exception(f"Ошибка при запросе данных: {response.status_code}")
            
            # Чтение данных построчно
            buffer = []
            lines = response.iter_lines(decode_unicode=True)
            reader = csv.DictReader(lines)
            
            for row in reader:
                event_date = datetime.strptime(row['event_datetime'], '%Y-%m-%d %H:%M:%S')
                item = {
                    'event_datetime': event_date,
                    'event_name': row['event_name'],
                    'event_json': row['event_json'],
                    'appmetrica_device_id': row['appmetrica_device_id'],
                    'session_id': row['session_id'],
                }
                buffer.append(item)
                
                if len(buffer) >= 1000:
                    load_data_to_clickhouse(buffer)
                    buffer = []
            
            if buffer:
                load_data_to_clickhouse(buffer)
    
    except Exception as e:
        print(f"Произошла ошибка: {e}")
    finally:
        session.close()

In [None]:
# Запуск процесса
fetch_and_load_data()