<a href="https://colab.research.google.com/github/akscent/internships/blob/main/%D0%98%D1%80%D0%B8%D0%BD%D1%8F%D0%BA%D0%BE%D0%B2_%D0%94_%D0%A1_%22%D0%A2%D0%B5%D1%81%D1%82%D0%BE%D0%B2%D0%BE%D0%B5_%D0%B7%D0%B0%D0%B4%D0%B0%D0%BD%D0%B8%D0%B5_Data_Engineer_ipynb%22.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Тестовое задание на позицию Data Engineer.

Задание включает в себя 3 небольших задачи. В каждой задаче **рекомендуется** оставлять комментарии, код должен быть оформлен согласно **PEP8**. Задания необходимо выполнить без использования Pandas и yandex-weather-api.

**Перед выполнением тестового задания, необходимо скопировать notebook к себе на диск, и выполнять тестовое в своей копии**.

---
####1. Выгрузка данных из API Яндекс.Погоды и преобразование их в csv

Используя API Яндекс.Погоды, необходимо выгрузить прогнозные данные за 7 дней для Москвы, Казани, Санкт-Петербурга, Тулы и Новосибирска. В случае, если API отдает пустые значения за день, то их необходимо удалить.

Информация должна быть представлена по часам с расширенным набором полей по осадкам.

Полученный json необходимо преобразовать в csv, формат:

\begin{array}{ccc}
\text{city}, \text{date}, \text{hour}, \text{temperature_c}, \text{pressure_mm}, \text{is_rainy} \\
Moscow, 19.08.2023, 12, 27, 750, 0 \\
Moscow, 19.08.2023, 13, 27, 750, 0 \\
... \\
Kazan, 19.08.2023, 12, 20, 770, 1 \\
Kazan, 19.08.2023, 13, 21, 770, 0 \\
\end{array}

**Описание полей:**

city - Город

date - Дата события

hour - Часы

temperature_c - Температура в Цельсиях

pressure_mm - Давление в мм ртутного столба

is_rainy - Флаг наличия дождя в конкретный день и час (см. документацию по API - описание полей).

Полученный csv необходимо выгрузить на облачный диск и в конце решения предоставить ссылку.

**Ссылка на получение ключа:** https://yandex.ru/dev/weather/doc/dg/concepts/about.html#about__onboarding


**Дополнительно ответьте на вопросы:** какие существуют возможные пути ускорения получения данных по API и их преобразования? Возможно ли эти способы использовать в Airflow?

In [28]:
import json
import csv
import requests as req
from geopy import geocoders

TOKEN_YANDEX = "80a4c74c-d6a7-4d51-ab31-e3d850aee839"

def geo_pos(city: str):
    """
    Retrieve the geographical coordinates (latitude and longitude) of the given city.

    Parameters:
    - city: a string representing the name of the city

    Return: a tuple containing the latitude and longitude of the city, or None if the coordinates cannot be retrieved
    """
    geolocator = geocoders.Nominatim(user_agent="YaTest")
    location = geolocator.geocode(city)
    if location:
        latitude = location.latitude
        longitude = location.longitude
        return latitude, longitude
    else:
        print(f"Failed to retrieve coordinates for {city}")
        return None

def fill_coordinates(cities):
    """
    Fill coordinates for each city in the input list using the geo_pos function.

    Parameters:
    - cities: a list of dictionaries, each containing information about a city including its name

    Returns:
    None
    """
    for city in cities:
        city_name = city["name"]
        coordinates = geo_pos(city_name)
        if coordinates:
            city["latitude"], city["longitude"] = coordinates

def yandex_weather(latitude: float, longitude: float, token_yandex: str):
    """
    Sends a request to the Yandex Weather API using the provided latitude, longitude, and Yandex API token.
    Returns the weather forecast data in JSON format.
    """
    url_yandex = f"https://api.weather.yandex.ru/v2/forecast/"
    params = {"lat": latitude, "lon": longitude, "lang": "ru_RU"}
    headers = {"X-Yandex-API-Key": token_yandex}
    yandex_req = req.get(url_yandex, headers=headers, params=params)
    if yandex_req.status_code != 200:
        print(f"Error {yandex_req.status_code} in request")
        return None
    yandex_json = yandex_req.json()

    return yandex_json

def convert_to_csv(city: str, forecast_data):
    """
    Convert the forecast data for a given city into a CSV format.

    Parameters:
    - city: the name of the city for which the forecast data is being converted
    - forecast_data: the forecast data containing information about the weather

    Returns:
    A list of lists representing the CSV rows, each containing city, date, hour, temperature in Celsius, pressure in mm, rainy indicator, and condition
    """
    csv_rows = []

    for forecast in forecast_data["forecasts"]:
        date = forecast["date"]
        if "hours" not in forecast:
            continue

        for hour_data in forecast["hours"]:
            hour = hour_data["hour"]
            temperature_c = hour_data.get("temp", "N/A")
            pressure_mm = hour_data.get("pressure_mm", "N/A")
            is_rainy = 1 if hour_data.get("prec_mm", 0) > 0 else 0
            condition = hour_data.get("condition", "N/A")

            row = [city, date, hour, temperature_c, pressure_mm, is_rainy, condition]
            csv_rows.append(row)

    return csv_rows


In [30]:
# Check coordinates for cities
cities = [
    {"name": "Moscow", },
    {"name": "Kazan", },
    {"name": "Saint Petersburg", },
    {"name": "Tula", },
    {"name": "Novosibirsk", },
]

fill_coordinates(cities)

for city in cities:
    print(f"{city['name']}: Latitude - {city.get('latitude', 'N/A')}, Longitude - {city.get('longitude', 'N/A')}")

if cities:
    print("\nok")

Moscow: Latitude - 55.625578, Longitude - 37.6063916
Kazan: Latitude - 40.2054445, Longitude - 32.6813148
Saint Petersburg: Latitude - 59.9606739, Longitude - 30.1586551
Tula: Latitude - 45.2678347, Longitude - 1.7706797
Novosibirsk: Latitude - 54.96781445, Longitude - 82.95159894278376

ok


In [31]:

def main():
    """
    A function to collect weather data for a list of cities and save it in a CSV file.
    """
    csv_rows = []
    cities = [
        {"name": "Moscow", },
        {"name": "Kazan", },
        {"name": "Saint Petersburg", },
        {"name": "Tula", },
        {"name": "Novosibirsk", },
    ]

    for city in cities:
        # Get coordinates
        coordinates = geo_pos(city["name"])

        if coordinates:
            latitude, longitude = coordinates
            # Get weather
            weather_data = yandex_weather(latitude, longitude, TOKEN_YANDEX)
            if weather_data:
                # Преобразуем данные о погоде в CSV формат
                city_rows = convert_to_csv(city["name"], weather_data)
                csv_rows.extend(city_rows)

    # write in csv file
    with open('weather_data.csv', 'w', newline='', encoding='utf-8') as csv_file:
        csv_header = ['city', 'date', 'hour', 'temperature_c', 'pressure_mm', 'is_rainy', 'condition']
        csv_writer = csv.writer(csv_file)
        csv_writer.writerow(csv_header)
        csv_writer.writerows(csv_rows)

if __name__ == '__main__':
    main()


**Ускорение получения данных из API** и их преобразования может быть достигнуто несколькими способами:

- Пакетная обработка данных: Вместо того, чтобы делать отдельные запросы для каждого города, можно отправить запрос для нескольких городов одновременно.

- Асинхронные запросы: Выполнения нескольких запросов параллельно. Можно использовать, поскольку результаты запросов для отдельных городов никак не влияют друг на друга.

С этими способами может помочь **Apache Airflow**:

- Планирование задач: Airflow позволяет планировать и запускать задачи по определенным правилам, что полезно для аинхронного кода.

- Декомпозиция задач: Можно разбить пайплайн на набор задач с учетом их зависимостей. Это поможет оптимизировать параллельное выполнение и сделать пайплайн более масштабируемым.

---
####2. Загрузка данных в БД (PostgreSQL).

Используя полученный csv файл, необходимо загрузить данных в PostgreSQL. Предварительно в БД необходимо создать схемы: для приемки сырых данных и для будущих агрегирующих таблиц.

При создании таблиц приветствуется использование партицирования и индексирования (по возможности и необходимости).

В решении необходимо показать код загрузки данных, скрипты создания схем и таблиц для пункта 2 и 2.1.

Подсказка: для решения задачи нужно развернуть БД, мы рекомендуем это сделать локально с помощью докера.

Разворачиваем psql в docker.

Для этого выполним следующие команды

Здесь `852c07c3682d` - ваш CONTAINER id, который можно посмотреть командой `docker ps`

```docker  
docker-compose up -d
docker ps
# Копирвоание скриптов на создание
docker cp 'C:\Users\user\OneDrive\Документы\GitHub\internships\API Яндекс.Погоды\create_t.sql' 852c07c3682d:/tmp/
docker cp 'C:\Users\user\OneDrive\Документы\GitHub\internships\API Яндекс.Погоды\weather_data.csv' 852c07c3682d:/tmp/
docker cp 'C:\Users\user\OneDrive\Документы\GitHub\internships\API Яндекс.Погоды\load_data.sql' 852c07c3682d:/tmp/
# Запуск команд на копирвоание данных в БД
docker exec -i 852c07c3682d psql -U karnaksp -d test_api -a -f /tmp/create_t.sql
docker exec -i 852c07c3682d psql -U karnaksp -d test_api -a -f /tmp/load_data.sql
# Копирвоание скриптов на создание витрин
docker cp 'C:\Users\user\OneDrive\Документы\GitHub\internships\API Яндекс.Погоды\weather_moving.sql' 852c07c3682d:/tmp/
docker cp 'C:\Users\user\OneDrive\Документы\GitHub\internships\API Яндекс.Погоды\weather_rainy.sql' 852c07c3682d:/tmp/
# Запуск скриптов на создание витрин
docker exec -i 852c07c3682d psql -U karnaksp -d test_api -a -f /tmp/weather_moving.sql
docker exec -i 852c07c3682d psql -U karnaksp -d test_api -a -f /tmp/weather_rainy.sql
# Проверка
docker exec -i 852c07c3682d psql -U karnaksp -d test_api -c "SELECT * FROM aggregated_data.weather_aggregate limit 5;"

```

**Содержимое файла docker-compose.yaml**

```
version: '3.8'

services:
  postgres:
    image: postgres:latest
    environment:
      POSTGRES_DB: test_api
      POSTGRES_USER: karnaksp
      POSTGRES_PASSWORD: a1s2s3
    ports:
      - "5432:5432"

```



```sql
--Здесь представлены скрипты sql--

--Создание схем--
CREATE SCHEMA raw_data;
CREATE SCHEMA aggregated_data;
--Создание первичной таблицы--
CREATE TABLE IF NOT EXISTS raw_data.weather_data (
    id SERIAL PRIMARY KEY,
    city VARCHAR(255),
    date DATE,
    hour INT,
    temperature_c INT,
    pressure_mm INT,
    is_rainy INT,
    condition VARCHAR(255)
);

--Загрузка данных--
COPY raw_data.weather_data(city, date, hour, temperature_c, pressure_mm, is_rainy, condition)
FROM '/tmp/weather_data.csv' DELIMITER ',' CSV HEADER;
```


####2.1 Формирование витрин (PostgreSQL).

1. Используя таблицу с сырыми данными, необходимо собрать витрину, где для каждого города и дня будут указаны часы начала дождя. Условимся, что дождь может начаться только 1 раз за день в любом из городов.

2. Необходимо создать витрину, где для каждого города, дня и часа будет рассчитано скользящее среднее по температуре и по давлению.


Полученные запросы необходимо вставить в google colab, а результаты - выгрузить в формате csv/xlsx и выложить в виде ссылки в google colab.

Подсказка: если в исходном файле не было факта начала дождя, то необходимо расставить рандомно значения факта дождя в таблице с сырыми данными.


```sql
--Создание и заполнение витрин--
--Час начала дождя--
CREATE TABLE IF NOT EXISTS aggregated_data.weather_aggregate (
    city VARCHAR(255),
    date DATE,
    start_rain_hour INT,
    temperature_avg FLOAT,
    pressure_avg FLOAT
);

INSERT INTO aggregated_data.weather_aggregate (city, date, start_rain_hour, temperature_avg, pressure_avg)
SELECT
    city,
    date,
    MIN(hour) AS start_rain_hour,
FROM
    raw_data.weather_data
WHERE
    is_rainy = 1
GROUP BY
    city, date;

--Скользящие средние показатели--
CREATE TABLE IF NOT EXISTS aggregated_data.hourly_aggregate (
    city VARCHAR(255),
    date DATE,
    hour INT,
    temperature_avg FLOAT,
    pressure_avg FLOAT
);

INSERT INTO aggregated_data.hourly_aggregate (city, date, hour, temperature_avg, pressure_avg)
SELECT
    city,
    date,
    hour,
    AVG(temperature_c) OVER (PARTITION BY city, date ORDER BY hour ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS temperature_avg,
    AVG(pressure_mm) OVER (PARTITION BY city, date ORDER BY hour ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS pressure_avg
FROM
    raw_data.weather_data;

```

---
####3. Задача на проектирование БД на данных Яндекс.Метрики

В функционал Яндекс.Метрики входит возможность выкачивания сырых данных с помощью API: отдельными запросами выкачиваются просмотры и визиты. Для этого процесса необходимо спроектировать базу данных, предусмотрев несколько слоев данных и "хотелки" заказчиков: в 90% случаев заказчикам необходимы агрегаты данных (например, построить воронку по визитам на страницах и вводу номеров телефонов в разрезе дат, страниц, utm меток, или построить флоу пользователей в разрезе устройств, ОС, и т.д.).

Результат необходимо предоставить в виде схемы с описанием.

Ссылки на структуру таблиц:

https://yandex.ru/dev/metrika/doc/api2/logs/fields/hits.html

https://yandex.ru/dev/metrika/doc/api2/logs/fields/visits.html

In [None]:
# При переходе по ссылкам вывод: limited