## Test task for the position of Data Engineer.

The assignment includes 3 small tasks. It is **recommended** to leave comments in each task, code should be formatted according to **PEP8**. Tasks should be performed without using Pandas and yandex-weather-api.

**Before performing a test task, you should copy the notebook to your disk, and perform the test task in your copy**.

---
#### 1. Downloading data from Yandex.Weather API and converting it to csv

Using Yandex.Weather API, it is necessary to upload forecast data for 7 days for Moscow, Kazan, St. Petersburg, Tula and Novosibirsk. In case the API gives empty values for a day, they should be removed.

The information should be presented by hours with an extended set of fields for precipitation.

The received json should be converted to csv format:

\{begin{array}{ccc}
\text{city}, \text{date}, \text{hour}, \text{temperature_c}, \text{pressure_mm}, \text{is_rainy} \\
Moscow, 08/19/2023, 12, 27, 750, 0\.
Moscow, 08/19/2023, 13, 27, 750, 0\.
... \\
Kazan, 19.08.2023, 12, 20, 770, 1\\ ...
Kazan, 08/19/2023, 13, 21, 770, 0 \.
\end{array}

**Field Description:**

city - City

date - Date of event

hour - Hours

temperature_c - Temperature in Celsius

pressure_mm - Pressure in mmHg

is_rainy - Flag of rain in a particular day and hour (see API documentation - field description).

The resulting csv should be uploaded to a cloud disk and a link should be provided at the end of the solution.

**Link to get the key:** https://yandex.ru/dev/weather/doc/dg/concepts/about.html#about__onboarding


**Additional questions:** What are the possible ways to speed up API data retrieval and conversion? Is it possible to use these ways in Airflow?

In [2]:
import requests
import json

cities_coordinates = {
    "Moscow": {"latitude": 55.7558, "longitude": 37.6176},
    "Kazan": {"latitude": 55.8304, "longitude": 49.0661},
    "St. Petersburg": {"latitude": 59.9343, "longitude": 30.3351},
    "Tula": {"latitude": 54.2021, "longitude": 37.6177},
    "Novosibirsk": {"latitude": 55.0084, "longitude": 82.9357}
}

url = "https://api.weather.yandex.ru/v2/forecast?"
language = "en_US"
api_key = "a20bd4f4-9675-47b2-bd24-9710abbf49de"

headers = {
    "X-Yandex-API-Key": api_key
}

weather_results = {}  

for city, coordinates in cities_coordinates.items():
    params = {
        "lat": coordinates["latitude"],
        "lon": coordinates["longitude"],
        "lang": language,
        "limit": 7,
        "hours": 'true',
        "extra": 'true'
        
    }


    response = requests.get(url, headers=headers, params=params)
    
    if response.status_code == 200:
        weather_data = response.json()
        weather_results[city] = weather_data
    else:
        print(f"Error getting weather data for {city}: {response.status_code}")


weather_results_json = json.dumps(weather_results, ensure_ascii=False, indent=4)
#print(weather_results_json)

In [3]:
weather_results = json.loads(weather_results_json)
import csv
# Create a CSV file 
with open('weather_data.csv', 'w', newline='') as csvfile:
    fieldnames = ['city','date','hour','temperature_c','pressure_mm','is_rainy']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()

    # Record weather data for each city in CSV
    for city, city_data in weather_results.items():
        if 'forecasts' in city_data:
            
            for forecast in city_data['forecasts']:
                date = forecast['date']
                
                for hour_data in forecast['hours']:
                    hour = hour_data['hour']
                    temperature_c = hour_data['temp']
                    pressure_mm = hour_data['pressure_mm']
                    is_rainy = hour_data['prec_type']
                    
                    # Creating CSV
                    writer.writerow({
                        'city': city,
                        'date': date,
                        'hour': hour,
                        'temperature_c': temperature_c,
                        'pressure_mm': pressure_mm,
                        'is_rainy': is_rainy
                    })

print("CSV created.")

CSV created.


---
#### 2. Loading data into the database (PostgreSQL).

Using the received csv file, it is necessary to load data into PostgreSQL. Beforehand, it is necessary to create schemas in the database: for receiving raw data and for future aggregation tables.

When creating tables, the use of partitioning and indexing (if possible and necessary) is encouraged.

The solution needs to show the data loading code, scripts for creating schemas and tables for item 2 and 2.1.

Hint: to solve the problem you need to deploy the database, we recommend to do it locally using docker.

docker run --name my_postgres -e POSTGRES_PASSWORD=password -d -p 5432:5432 postgres

docker ps # check if docker is running

In [2]:
import psycopg2
import csv

# Establish connection to the database
try:
    conn = psycopg2.connect(
    dbname="postgres",
    user="postgres",
    password="password",
    host="localhost",
    port="5432"
    )
    print("Connection to PostgreSQL database successful.")

    cur = conn.cursor()

    # Load data from CSV into PostgreSQL
    with open('weather_data.csv', 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        # for row in reader:
        #     cur.execute(
        #         "INSERT INTO raw_data.weather (city, date, hour, temperature_c, pressure_mm, is_rainy) VALUES (%s, %s, %s, %s, %s, %s)",
        #         (row['city'], row['date'], row['hour'], row['temperature_c'], row['pressure_mm'], row['is_rainy'])
        #     )

    # Commit the transaction
    conn.commit()

    # Close cursor and connection
    cur.close()
    conn.close()

    print("Data successfully loaded into PostgreSQL.")

except Exception as e:
    print("Error connecting to PostgreSQL database:", e)




Connection to PostgreSQL database successful.
Data successfully loaded into PostgreSQL.


#### 2.1 Forming a View (PostgreSQL).

1. Using a table with raw data, we need to build a View where for each city and day the rain start hours will be specified. Let's condition that rain can start only 1 time in a day in any of the cities.

2. It is necessary to create a View where for each city, day and hour a moving average of temperature and pressure will be calculated.


The resulting queries need to be inserted into google colab, and the results need to be uploaded in csv/xlsx format and posted as a link in google colab.

Hint: if the raw file did not contain the fact of the beginning of rain, then it is necessary to randomize the values of the fact of rain in the table with raw data.

---
#### 3. Task of designing a database based on Yandex.Metrics data

The functionality of Yandex.Metrics includes the ability to pump out raw data using API: views and visits are pumped out by separate requests. For this process it is necessary to design a database, providing for several data layers and "wants" of customers: in 90% of cases customers need data aggregates (for example, to build a funnel by visits on pages and phone number input in terms of dates, pages, utm tags, or to build user flow by device, OS, etc.).

The result should be provided in the form of a schema with description.

Links to table structure:

https://yandex.ru/dev/metrika/doc/api2/logs/fields/hits.html

https://yandex.ru/dev/metrika/doc/api2/logs/fields/visits.html