**API Яндекс погоды**
- Прогноз доступен только на 4 дня, соотвествующая проверка есть в решении. Дождь в феврале идёт редко, поэтому в csv переменная 'is_rainy' содержит только значение 0. Проверяла для снега, прогноз снегопадов выгружается.

- Для демонстрации ускорения получения данных через API выбрала подход с использованием asyncio. Этот метод особенно эффективен в случае выполнения большого количества ограниченных задач ввода-вывода, таких как HTTP-запросы к API погоды, что позволяет оптимизировать использование ресурсов и сократить временные задержки при получении и обработке данных.

- Для ускоренного получения данных по API можно также кэшировать данные, например, при помощи декоратора functools.lru_cache, или применить метод параллельного выполнения задач, например, при помощи модуля concurrent.futures.ThreadPoolExecutor.

- В Airflow можно ускорить получение данных API, используя параллельное выполнение задач с помощью библиотеки типа Celery или Dask, которые позволяют запускать задачи в распределенном режиме на нескольких рабочих узлах.

In [None]:
import asyncio
import csv
import os
import aiohttp
from dotenv import load_dotenv


async def fetch_weather(session, api_key, city):
    """
    Асинхронная функция для получения прогноза дождя для заданного города.
    
    Args:
        session (aiohttp.ClientSession): Сессия для выполнения HTTP-запросов.
        api_key (str): API ключ для доступа к API Яндекс.Погоды.
        city (dict): Словарь с информацией о городе (название, широта, долгота).
        
    Returns:
        tuple: Кортеж с названием города и списком прогнозов погоды.
    """
    url = f"https://api.weather.yandex.ru/v2/forecast?lat={city['lat']}&lon={city['lon']}&extra=true"
    headers = {"X-Yandex-API-Key": api_key}
    
    async with session.get(url, headers=headers) as response:
        data = await response.json()
        return city['name'], data.get('forecasts', [])


async def get_weather_forecast(api_key, cities):
    """
    Асинхронная функция для получения прогноза дождя для всех заданных городов и записи результатов в CSV файл.
    
    Args:
        api_key (str): API ключ для доступа к API Яндекс.Погоды.
        cities (list): Список словарей с информацией о городах.
    """
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_weather(session, api_key, city) for city in cities]
        results = await asyncio.gather(*tasks)
        """
        Вывод уникальных дат.
        """
        unique_dates = set()
        for city_name, forecasts in results:
            for forecast in forecasts:
                date = forecast.get('date', '')
                if date:
                    unique_dates.add(date)
        print("Уникальные даты:", unique_dates)
        
        with open('weather_forecast_asyncio.csv', mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(["city", "date", "hour", "temperature_c", "pressure_mm", "is_rainy"])
            for city_name, forecasts in results:
                print(f"Полный ответ от API для города {city_name}:")
                for forecast in forecasts:
                    date = forecast.get('date', '')
                    print("Прогноз для даты", date)
                    hours_forecast = forecast.get('hours', [])
                    print("Количество часов прогноза:", len(hours_forecast))
                    """
                    Проверяем, есть ли прогноз на часы.
                    """
                    if len(hours_forecast) > 0:
                        for hour_data in hours_forecast:
                            hour = hour_data.get('hour', '')
                            temperature_c = hour_data.get('temp', '')
                            pressure_mm = hour_data.get('pressure_mm', '')
                            condition = hour_data.get('condition', '')
                            is_rainy = 1 if 'rain' in condition else 0
                            if date and hour != '' and temperature_c and pressure_mm:
                                writer.writerow([city_name, date, hour, temperature_c, pressure_mm, is_rainy])
                print()


async def main():
    """
    Асинхронная основная функция программы.
    """
    load_dotenv()
    api_key = os.getenv("YANDEX_WEATHER_API_KEY")

    cities = [
        {"name": "Москва", "lat": 55.755826, "lon": 37.6172999},
        {"name": "Казань", "lat": 55.830431, "lon": 49.066082},
        {"name": "Санкт-Петербург", "lat": 59.9342802, "lon": 30.3350986},
        {"name": "Тула", "lat": 54.202, "lon": 37.644},
        {"name": "Новосибирск", "lat": 55.0083526, "lon": 82.9357327}
    ]

    await get_weather_forecast(api_key, cities)


if __name__ == "__main__":
    try:
        loop = asyncio.get_event_loop()
    except RuntimeError:
        loop = None
    if loop and loop.is_running():
        asyncio.create_task(main())
    else:
        asyncio.run(main())

**Создание БД**
- Задача решена с партизированием по месяцам -- было бы полезно, если бы прогнозные данные загружались регулярно -- и с индексированием по городам для ускорения поиска по городу.

In [None]:
import psycopg2
from psycopg2 import sql
import subprocess
import time

"""
Параметры подключения к PostgreSQL
"""
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'weather_db'
DB_USER = 'postgres'
DB_PASSWORD = 'password'


def create_postgres_container():
    """
    Создание контейнера Docker с PostgreSQL.
    """
    try:
        subprocess.run(["docker", "run", "--name", "weather_forecast", "-e", f"POSTGRES_PASSWORD={DB_PASSWORD}", "-p", f"{DB_PORT}:5432", "-d", "postgres"])
    except subprocess.SubprocessError as e:
        print("Ошибка при создании контейнера Docker с PostgreSQL:", e)



def create_database():
    """
    Создание базы данных.
    """
    try:
        conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD)
        conn.autocommit = True
        cursor = conn.cursor()
        cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(DB_NAME)))
        conn.close()
    except psycopg2.Error as e:
        print("Ошибка при создании базы данных:", e)



def connect_to_database():
    """
    Подключение к базе данных.
    """
    try:
        return psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD)
    except psycopg2.Error as e:
        print("Ошибка при подключении к базе данных:", e)



def create_schema_and_tables():
    """
    Создание схем и таблиц.
    """
    conn = connect_to_database()
    if conn is None:
        return

    try:
        cursor = conn.cursor()

        """
        Создание схемы для приемки сырых данных
        """
        cursor.execute("CREATE SCHEMA IF NOT EXISTS raw_data;")
        
        """
        Создание схемы для будущих агрегирующих таблиц
        """
        cursor.execute("CREATE SCHEMA IF NOT EXISTS aggregated_data;")

        """
        Создание таблицы для приемки сырых данных
        """
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS raw_data.weather (
                city VARCHAR,
                date DATE,
                hour INTEGER,
                temperature_c INTEGER,
                pressure_mm INTEGER,
                is_rainy INTEGER
            )
        """)

        """
        Партиционирование таблицы по месяцам
        """
        for month in range(1, 13):
            cursor.execute(sql.SQL("""
                CREATE TABLE IF NOT EXISTS raw_data.weather_{month} (
                    CHECK (EXTRACT(MONTH FROM date) = {month})
                ) INHERITS (raw_data.weather);
            """).format(month=sql.Literal(month)))

        """
        Индексирование таблицы по городам
        """
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_city ON raw_data.weather (city);")

        conn.commit()
        conn.close()
    except psycopg2.Error as e:
        print("Ошибка при создании схем и таблиц:", e)



def load_data_from_csv(csv_file):
    """
    Загрузка данных из CSV в PostgreSQL.
    """
    conn = connect_to_database()
    if conn is None:
        return

    try:
        cursor = conn.cursor()
        with open(csv_file, 'r', newline='', encoding='utf-8') as file:
            reader = csv.reader(file)
            next(reader)  # Пропускаем заголовок
            for row in reader:
                cursor.execute("""
                    INSERT INTO raw_data.weather (city, date, hour, temperature_c, pressure_mm, is_rainy)
                    VALUES (%s, %s, %s, %s, %s, %s)
                """, row)

        conn.commit()
        conn.close()
    except psycopg2.Error as e:
        print("Ошибка при загрузке данных из CSV в PostgreSQL:", e)



def main():
    """
    Основная функция для выполнения всех задач.
    """
    create_postgres_container()
    time.sleep(10)  # Задержка перед созданием базы данных
    create_database()
    time.sleep(5)   # Задержка перед созданием схем и таблиц
    create_schema_and_tables()
    time.sleep(5)   # Задержка перед загрузкой данных из CSV
    load_data_from_csv("weather_forecast_asyncio.csv")


if __name__ == "__main__":
    main()

**Разметка даных и создание витрин**
- Поскольку в csv из первой задачи нет прогнозных дней с дождём, записала прогноз дождя рандомно согласно условию.
- Сформировала две витрины -- а) с часами начала дождя по городам и б) со скользящими средними по температуре и давлению.

In [None]:
import random
 
"""
Параметры подключения к PostgreSQL
"""
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'weather_db'
DB_USER = 'postgres'
DB_PASSWORD = 'password'

def connect_to_database():
    """
    Подключение к базе данных.
    """
    try:
        return psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD)
    except psycopg2.Error as e:
        print("Ошибка при подключении к базе данных:", e)


def mark_rainy_days():
    """
    Разметка данных о дожде в таблице raw_data.weather.
    """
    conn = connect_to_database()
    if conn is None:
        return

    try:
        cursor = conn.cursor()
        """
        Сброс флага дождя для всех записей.
        """
        cursor.execute("""
            UPDATE raw_data.weather
            SET is_rainy = 0
        """)
        """
        Получаем список уникальных дат.
        """
        cursor.execute("""
            SELECT DISTINCT date
            FROM raw_data.weather
        """)
        unique_dates = cursor.fetchall()
        
        rainy_days = set()  # множество для хранения уже размеченных дней
        
        for date in unique_dates:
            if len(rainy_days) >= 7:  # если уже разметили 7 дней, прекращаем
                break
            
            # Получаем случайный город и час для разметки
            cursor.execute("""
                SELECT city, hour
                FROM raw_data.weather
                WHERE date = %s
                ORDER BY random()
                LIMIT 1
            """, (date,))
            row = cursor.fetchone()
            if row:
                city, hour = row
                cursor.execute("""
                    UPDATE raw_data.weather
                    SET is_rainy = 1,
                        hour = %s
                    WHERE city = %s AND date = %s
                """, (hour, city, date))
                rainy_days.add(date)
                
        conn.commit()
        print("Разметка данных о дожде завершена.")
    except psycopg2.Error as e:
        print("Ошибка при разметке данных о дожде:", e)
    finally:
        if conn is not None:
            conn.close()     


def create_views():
    """
    Создание витрин.
    """
    conn = connect_to_database()
    if conn is None:
        return

    try:
        cursor = conn.cursor()
        """
        Создание витрины с часами начала дождя для каждого города и дня.
        """
        cursor.execute("""
            CREATE VIEW rainy_hours AS
            SELECT city, date, hour AS start_hour_of_rain
            FROM raw_data.weather
            WHERE is_rainy = 1;
        """)
        """
        Создание витрины со скользящим средним по температуре и давлению.
        """
        cursor.execute("""
            CREATE VIEW moving_avg AS
            SELECT city, date, hour,
                AVG(temperature_c) OVER (PARTITION BY city, date ORDER BY hour ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) AS temp_moving_avg,
                AVG(pressure_mm) OVER (PARTITION BY city, date ORDER BY hour ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) AS pressure_moving_avg
            FROM raw_data.weather;
        """)

        conn.commit()
        conn.close()
        print("Создание витрин завершено.")
    except psycopg2.Error as e:
        print("Ошибка при создании витрин:", e)


def export_to_csv(view_name, csv_file):
    """
    Экспорт данных из витрины в CSV файл.
    """
    conn = connect_to_database()
    if conn is None:
        return

    try:
        cursor = conn.cursor()
        """
        Получаем данные из витрины.
        """
        cursor.execute(f"SELECT DISTINCT * FROM {view_name}")
        rows = cursor.fetchall()
        """
        Экспортируем данные в CSV файл.
        """
        with open(csv_file, 'w', newline='', encoding='utf-8') as file:
            writer = csv.writer(file)
            writer.writerow([desc[0] for desc in cursor.description])  # записываем заголовок
            for row in rows:
                if len(row) >= 5:
                    writer.writerow([row[0], row[1], row[2], round(row[3], 1), round(row[4], 1)])  # округляем значения
                else:
                    writer.writerow(row)  # записываем кортеж как есть
            print(f"Экспорт данных из витрины {view_name} в CSV файл завершен.")
    except psycopg2.Error as e:
        print(f"Ошибка при экспорте данных из витрины {view_name}:", e)


def main():
    """
    Основная функция для выполнения всех задач.
    """
    mark_rainy_days()
    time.sleep(5)  # задержка 5 секунд
    create_views()
    time.sleep(5)  # ещё одна задержка 5 секунд
    export_to_csv("rainy_hours", "rainy_hours.csv")
    export_to_csv("moving_avg", "moving_average.csv")

if __name__ == "__main__":
    main()


**БД для Яндекс.Метрики**

1.   Слой сырых данных (Raw Data Layer): сюда можно на регулярной основе складывать данные, полученные из обеих API endpoints как есть. Для этого можно использовать AirFlow, настроив частоту обновлений по необходимости (от 5 минут до раз в час или в день).
Для этого слоя данных я бы создала две таблицы *visits_raw* и *pageviews_raw* о визитах и просмотрах страниц соответственно. Каждая таблица содержала бы все поля, описанные в предоставленных данных для визитов и просмотров, а также дополнительные поля для идентификации и обработки данных.

2.   На агрегатном слое (Aggregated Data Layer) можно создать почасовые или подневные агрегаты наиболее востребованных данных. Например, можно сделать featurestore с подневными агрегатами на уровне пользователя для задач ML (предсказание оттока, конверсий, look-alike и т.д.). Например, это может быть materialized view, в котором раз в день для каждого пользователя расчитывается число визитов, число просмотров, длительность визитов и просмотров и прочие полезные признаки, которые могут быть использованы в моделях машинного обучения.

Примерами других витрин могли бы быть:
- daily_visits_summary: суммарная статистика по визитам за каждый день с разбивкой по атрибутам (страницы, utm метки, устройства и т.д.).
- daily_pageviews_summary: суммарная статистика по просмотрам страниц за каждый день с разбивкой по атрибутам.
- funnel_conversions: данные о конверсиях по воронке с разбивкой по датам, страницам, utm меткам и т.д.

3. Для слоя данных, решающего наиболее типовых задач BI (Analytical Data Layer - конверсии, retention, сегменты), лучше всего подойдут materialized view, к которым можно подключить Power BI/Tableau и прочие инструменты. Для ad hoc задач, не входящих в типовые, скорее всего придётся писать view с использованием данных с первого (raw) слоя. апример, можно создать представление utm_campaign_summary, которое объединяет данные из таблиц daily_visits_summary и daily_pageviews_summary для анализа по utm меткам.

**Схемы таблиц**

*- Таблица visits_raw:*

* visitID: Идентификатор визита (UInt64)

* counterID: Номер счетчика (UInt32)

* date: Дата визита (Date)

* dateTime: Дата и время визита (DateTime)
...

* (другие поля, как указано в исходных данных для визитов)


*- Таблица pageviews_raw:*

* watchID: Идентификатор просмотра (UInt64)

* counterID: Номер счетчика (UInt32)

* date: Дата события (Date)

* dateTime: Дата и время события (DateTime)
...

* (другие поля, как указано в исходных данных для просмотров страниц)



*- Таблица daily_visits_summary:*

* date: Дата (Date)

* counterID: Номер счетчика (UInt32)
...

* (поля для суммарной статистики по визитам)


*- Таблица daily_pageviews_summary:*

* date: Дата (Date)

* counterID: Номер счетчика (UInt32)
...

* (поля для суммарной статистики по просмотрам страниц)


*- Таблица funnel_conversions:*

* date: Дата (Date)

* counterID: Номер счетчика (UInt32)
...

* (поля для данных о конверсиях по воронке)


*- Таблица marketing_campaigns:*

* campaignID: Идентификатор кампании (UInt32)

* campaignName: Название кампании (String)
...

* (другие поля для дополнительной информации о маркетинговых кампаниях)