## Import libs

In [1]:

import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("app.log"),          # File output
        logging.StreamHandler()                  # Console output
    ],
    force=True  # This overrides any prior logging config
)

import clickhouse_connect
import requests
import pandas as pd
import dateutil


import sys
import os

from pydantic import BaseModel, Field

from typing import List
import os
import uuid

from dotenv import load_dotenv
load_dotenv()


CLICKHOUSE_USER=os.getenv("CLICKHOUSE_USER")
CLICKHOUSE_PASSWORD=os.getenv("CLICKHOUSE_PASSWORD")
CH_IP=os.getenv("CH_IP")
CH_PORT=os.getenv("CH_PORT")  
CLICKHOUSE_DB=os.getenv("CLICKHOUSE_DB") 

# Initialize ClickHouse client
client_ch = clickhouse_connect.get_client(
    host=CH_IP,
    port=CH_PORT,     
    username=CLICKHOUSE_USER,
    password=CLICKHOUSE_PASSWORD,
    database=CLICKHOUSE_DB
)


# import tools
# Get the parent directory
parent_dir = os.path.abspath(os.path.join(os.getcwd(), "../.."))

# find tools in parent dir
if os.path.isdir(os.path.join(parent_dir, 'tools')):
    # Add parent directory to sys.path if found
    sys.path.append(parent_dir)
    
else:
    # for run in spark
    parent_dir = os.path.abspath(os.path.join(os.getcwd(), "../airflow/airflow_data"))
    
    # Add parent directory to sys.path
    sys.path.append(parent_dir)


from tools import pd_tools
from tools.paths import Paths
from tools.db_tools import DbTools

data_path = 'data'
tmp_path = 'tmp'

db_tools = DbTools(data_path, tmp_path, client_ch)

Вам предоставлена таблица air_quality с данными о качестве воздуха в различных районах города. База содержит информацию об измерениях различных загрязнителей воздуха (PM2.5, NO2, O3 и др.) и связанных с ними показателях.

Структура таблицы air_quality
    - 1. Name - название показателя (PM2.5, NO2, O3 и др.)
    - 2. Geo_Place_Name - место измерения
    - 3. Start_Date - дата измерения
    - 4. Data_Value - измеренное значение

In [37]:
air_quality = pd.read_csv('data/Air_Quality.csv')
air_quality.head()

Unnamed: 0,Unique ID,Indicator ID,name,Measure,Measure Info,Geo Type Name,Geo Join ID,Geo Place Name,Time Period,Start_Date,Unnamed: 10,Data Value
0,419355,365,Fine particles (PM 2.5),Estimated annual rate,per square mile,Citywide,208.0,St. George and Stapleton (CD1),Summer 2010,12/31/2022,,0
1,542128,365,Fine particles (PM 2.5),Estimated annual rate (age 30+),"per 100,000",Borough,404406.0,Northern SI,Summer 2010,12/31/2022,,47
2,419346,640,Nitrogen dioxide (NO2),Number per km2,"per 100,000",CD,307.0,Fresh Meadows,Summer 2011,31.12.2022,,0
3,419347,365,Boiler Emissions- Total PM2.5 Emissions,Estimated annual rate (age 30+),"per 100,000 adults",UHF42,305.0,Southeast Queens,Annual Average 2009,31.12.2022,,0
4,419348,367,Ozone (O3),Estimated annual rate (age 30+),ppb,UHF42,105.0,Greenpoint and Williamsburg (CD1),Annual Average 2011,31.12.2022,,0


## Data analyze

In [9]:
air_quality.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 134788 entries, 0 to 134787
Data columns (total 12 columns):
 #   Column          Non-Null Count   Dtype  
---  ------          --------------   -----  
 0   Unique ID       134788 non-null  int64  
 1   Indicator ID    134788 non-null  int64  
 2   name            134788 non-null  object 
 3   Measure         134788 non-null  object 
 4   Measure Info    134788 non-null  object 
 5   Geo Type Name   134788 non-null  object 
 6   Geo Join ID     134779 non-null  float64
 7   Geo Place Name  134779 non-null  object 
 8   Time Period     134788 non-null  object 
 9   Start_Date      134788 non-null  object 
 10  Unnamed: 10     0 non-null       float64
 11  Data Value      134788 non-null  object 
dtypes: float64(2), int64(2), object(8)
memory usage: 12.3+ MB


In [39]:
air_quality = air_quality.drop(columns='Unnamed: 10')

In [11]:
pd_tools.df_info(air_quality)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 134788 entries, 0 to 134787
Data columns (total 11 columns):
 #   Column          Non-Null Count   Dtype  
---  ------          --------------   -----  
 0   Unique ID       134788 non-null  int64  
 1   Indicator ID    134788 non-null  int64  
 2   name            134788 non-null  object 
 3   Measure         134788 non-null  object 
 4   Measure Info    134788 non-null  object 
 5   Geo Type Name   134788 non-null  object 
 6   Geo Join ID     134779 non-null  float64
 7   Geo Place Name  134779 non-null  object 
 8   Time Period     134788 non-null  object 
 9   Start_Date      134788 non-null  object 
 10  Data Value      134788 non-null  object 
dtypes: float64(1), int64(2), object(8)
memory usage: 11.3+ MB
None


'First 5 rows in df'

Unnamed: 0,Unique ID,Indicator ID,name,Measure,Measure Info,Geo Type Name,Geo Join ID,Geo Place Name,Time Period,Start_Date,Data Value
0,419355,365,Fine particles (PM 2.5),Estimated annual rate,per square mile,Citywide,208.0,St. George and Stapleton (CD1),Summer 2010,12/31/2022,0
1,542128,365,Fine particles (PM 2.5),Estimated annual rate (age 30+),"per 100,000",Borough,404406.0,Northern SI,Summer 2010,12/31/2022,47
2,419346,640,Nitrogen dioxide (NO2),Number per km2,"per 100,000",CD,307.0,Fresh Meadows,Summer 2011,31.12.2022,0
3,419347,365,Boiler Emissions- Total PM2.5 Emissions,Estimated annual rate (age 30+),"per 100,000 adults",UHF42,305.0,Southeast Queens,Annual Average 2009,31.12.2022,0
4,419348,367,Ozone (O3),Estimated annual rate (age 30+),ppb,UHF42,105.0,Greenpoint and Williamsburg (CD1),Annual Average 2011,31.12.2022,0


Column Unique ID has only one type: <class 'int'>
Column Indicator ID has only one type: <class 'int'>
Column name has only one type: <class 'str'>
Column Measure has only one type: <class 'str'>
Column Measure Info has only one type: <class 'str'>
Column Geo Type Name has only one type: <class 'str'>
Column Geo Join ID has only one type: <class 'float'>
Column Geo Place Name has only one type: <class 'str'>
Column Time Period has only one type: <class 'str'>
Column Start_Date has only one type: <class 'str'>
Column Data Value has only one type: <class 'str'>


Unnamed: 0,duplicates_full
duplicates_full,0


Unnamed: 0,Unique ID,Indicator ID,name,Measure,Measure Info,Geo Type Name,Geo Join ID,Geo Place Name,Time Period,Start_Date,Data Value
duplicates_by_cols,0,134765,134770,134780,134779,134783,134707,134664,134732,128817,134424


zeroes


minus_ones


Unnamed: 0,Geo Join ID,Geo Place Name
nulls,9,9


Unnamed: 0,Geo Join ID,Geo Place Name
nans,9,9


nones


NA placeholder


null placeholder


N/A placeholder


In [40]:
air_quality.columns

Index(['Unique ID', 'Indicator ID', 'name', 'Measure', 'Measure Info',
       'Geo Type Name', 'Geo Join ID', 'Geo Place Name', 'Time Period',
       'Start_Date', 'Data Value'],
      dtype='object')

## Preprocess data

In [41]:
# rename columns
air_quality = air_quality.rename(columns={
    'Unique ID': 'unique_id',
    'Indicator ID': 'indicator_id',
    'Measure': 'measure',
    'Measure Info': 'measure_info',
    'Geo Type Name': 'geo_type_name',
    'Geo Join ID': 'geo_join_id',
    'Geo Place Name': 'geo_place_name',
    'Time Period': 'time_period',
    'Start_Date': 'start_date',
    'Data Value': 'data_value'
})


In [14]:
air_quality.dtypes

unique_id           int64
indicator_id        int64
name               object
measure            object
measure_info       object
geo_type_name      object
geo_join_id       float64
geo_place_name     object
time_period        object
start_date         object
data_value         object
dtype: object

In [42]:
# change data type of columns
air_quality['data_value'] = air_quality['data_value'].replace(',', '.', regex=True).astype(float)
air_quality['data_value'].head()

0     0.0
1    47.0
2     0.0
3     0.0
4     0.0
Name: data_value, dtype: float64

In [43]:
# fill nan to 0 and change to int
air_quality['geo_join_id'] = air_quality['geo_join_id'].fillna(0).astype('UInt32')
air_quality['geo_join_id'].head()

0       208
1    404406
2       307
3       305
4       105
Name: geo_join_id, dtype: UInt32

In [44]:
air_quality['start_date']

0         12/31/2022
1         12/31/2022
2         31.12.2022
3         31.12.2022
4         31.12.2022
             ...    
134783     2/18/2023
134784     1/25/2023
134785     1/16/2023
134786     1/15/2023
134787     1/10/2023
Name: start_date, Length: 134788, dtype: object

In [45]:
air_quality['start_date'] = air_quality['start_date'].apply(lambda x: dateutil.parser.parse(x))
air_quality['start_date'].head()

0   2022-12-31
1   2022-12-31
2   2022-12-31
3   2022-12-31
4   2022-12-31
Name: start_date, dtype: datetime64[ns]

In [46]:
air_quality.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 134788 entries, 0 to 134787
Data columns (total 11 columns):
 #   Column          Non-Null Count   Dtype         
---  ------          --------------   -----         
 0   unique_id       134788 non-null  int64         
 1   indicator_id    134788 non-null  int64         
 2   name            134788 non-null  object        
 3   measure         134788 non-null  object        
 4   measure_info    134788 non-null  object        
 5   geo_type_name   134788 non-null  object        
 6   geo_join_id     134788 non-null  UInt32        
 7   geo_place_name  134779 non-null  object        
 8   time_period     134788 non-null  object        
 9   start_date      134788 non-null  datetime64[ns]
 10  data_value      134788 non-null  float64       
dtypes: UInt32(1), datetime64[ns](1), float64(1), int64(2), object(6)
memory usage: 10.9+ MB


In [47]:
air_quality.to_csv("data/air_quality_norm.csv", index=False)

## Upload data to DB

In [48]:
air_quality_norm = pd.read_csv(f"{data_path}/air_quality_norm.csv")

In [49]:
db_tools.uu_to_clickhouse(air_quality_norm, CLICKHOUSE_DB, 'air_quality', iana_timezone='UTC')


Checking table dbch.air_quality...
Table dbch.air_quality does not exist
Table dbch.air_quality created
Data uploaded to created dbch.air_quality
Table dbch.air_quality created and data uploaded


In [50]:
client_ch.query_df("SELECT * FROM air_quality limit 10")

Unnamed: 0,unique_id,indicator_id,name,measure,measure_info,geo_type_name,geo_join_id,geo_place_name,time_period,start_date,data_value
0,419355,365,Fine particles (PM 2.5),Estimated annual rate,per square mile,Citywide,208,St. George and Stapleton (CD1),Summer 2010,2022-12-31,0.0
1,542128,365,Fine particles (PM 2.5),Estimated annual rate (age 30+),"per 100,000",Borough,404406,Northern SI,Summer 2010,2022-12-31,47.0
2,419346,640,Nitrogen dioxide (NO2),Number per km2,"per 100,000",CD,307,Fresh Meadows,Summer 2011,2022-12-31,0.0
3,419347,365,Boiler Emissions- Total PM2.5 Emissions,Estimated annual rate (age 30+),"per 100,000 adults",UHF42,305,Southeast Queens,Annual Average 2009,2022-12-31,0.0
4,419348,367,Ozone (O3),Estimated annual rate (age 30+),ppb,UHF42,105,Greenpoint and Williamsburg (CD1),Annual Average 2011,2022-12-31,0.0
5,419349,367,Cardiac and respiratory deaths due to Ozone,Estimated annual rate (under age 18),"per 100,000 adults",UHF42,309310,Long Island City - Astoria,Summer 2009,2022-12-31,0.0
6,419350,366,Deaths due to PM2.5,Estimated annual rate,mcg/m3,UHF42,304,Lower Manhattan,Winter 2010-11,2022-12-31,0.0
7,419351,640,Ozone (O3),Million miles,"per 100,000 adults",Borough,211,West Queens,Summer 2009,2022-12-31,0.0
8,419352,366,Outdoor Air Toxics - Benzene,Million miles,mcg/m3,CD,104,Washington Heights and Inwood (CD12),Annual Average 2009,2022-12-31,0.0
9,419353,367,Cardiac and respiratory deaths due to Ozone,Mean,per square mile,UHF34,1,Kingsbridge - Riverdale,Annual Average 2011,2022-12-31,0.0


## Exercises

1. ▶️Найдите все локации (Geo Place Name), где среднее значение озона (name = 'Ozone (O3)') превышает 65 и количество измерений не менее 255. Сколько таких локаций. 
Для сохранения информации о загрязнителях в CRM Битрикс24 потребуется создать пользовательские поля для компаний. В таблицах базы данных Битрикс24 эта информация будет храниться в:

In [103]:
client_ch.query_df(
    '''
    SELECT 
        a.indicator_id
        ,a.name
    FROM air_quality a
    WHERE
        name='Ozone (O3)'

    '''
)

Unnamed: 0,indicator_id,name
0,367,Ozone (O3)
1,640,Ozone (O3)
2,366,Ozone (O3)
3,367,Ozone (O3)
4,367,Ozone (O3)
...,...,...
27613,367,Ozone (O3)
27614,367,Ozone (O3)
27615,367,Ozone (O3)
27616,367,Ozone (O3)


In [92]:
client_ch.query_df('''
    SELECT 
        a.geo_place_name
        , AVG(a.data_value) as avg_data_value
        , COUNT(a.data_value) as count_data_value
    FROM air_quality a
    WHERE 
        a.name = 'Ozone (O3)'                
    GROUP BY a.geo_place_name, a.name
    HAVING
        avg_data_value > 65
        AND count_data_value >= 255
''')

**Ответ:** локаций, удовлетворяющих условиям задания не найдено в данных.


2. ▶️Сопоставьте данные по PM2.5 (Name = 'Data ValueM 2.5') и NO2 (Name = 'Nitrogen dioxide (NO2)') для одних и тех же локаций и дат. Сколько есть совпадающих измерений за 1 квартал 2021 года (когда в один день в одной локации измеряли оба показателя)? Как реализовать на портале на 1С-Битрикс доступы так чтобы администраторы видели все данные, менеджеры уровня A имели доступ только к данным PM2.5 и NO2, менеджеры уровня B имели доступ только к данным O3 и CO, все остальные пользователи видели только общие сводки без детализации:

In [98]:


client_ch.query_df(
    '''
    SELECT COUNT(*) AS cnt
    FROM (
        SELECT 
            a.geo_place_name,
            a.start_date,
            a.data_value AS pm25_value,
            b.data_value AS no2_value
        FROM air_quality a
        JOIN air_quality b
            ON a.geo_place_name = b.geo_place_name
           AND a.start_date = b.start_date
        WHERE a.name = 'Data ValueM 2.5'
          AND b.name = 'Nitrogen dioxide (NO2)'
          AND a.start_date BETWEEN '2021-01-01' AND '2021-03-31'
        GROUP BY a.geo_place_name, a.start_date, pm25_value, no2_value
    )
    '''
)


Unnamed: 0,cnt
0,0


**Ответ:** локаций, удовлетворяющих условиям задания не найдено в данных.

3. ▶️ Найдите количество локаций, где среднее значение озона (Name = 'Ozone (O3)') выше, чем общее среднее значение озона по всем локациям. В каком году количество таких локаций было самое минимальное, но не нулевое? 
Для публикации интерактивной карты качества воздуха на сайте, работающем на 1С-Битрикс, требуется настроить автоматическую интеграцию данных. Как правильно организовать эту интеграцию с точки зрения архитектуры сайта?

In [106]:
client_ch.query_df(
    '''
    WITH global_avg AS (
        SELECT avg(data_value) AS avg_o3
        FROM air_quality
        WHERE name = 'Ozone (O3)'
    )
    SELECT 
        year,
        countDistinct(geo_place_name) AS locations_above_avg
    FROM (
        SELECT 
            toYear(parseDateTimeBestEffort(a.start_date)) AS year,
            a.geo_place_name,
            avg(data_value) AS avg_o3_location
        FROM air_quality a
        WHERE a.name = 'Ozone (O3)'
        GROUP BY year, a.geo_place_name
        HAVING avg_o3_location > (SELECT avg_o3 FROM global_avg)
    )
    GROUP BY year
    HAVING locations_above_avg > 0
    ORDER BY locations_above_avg ASC
    LIMIT 1
    '''
)


Unnamed: 0,year,locations_above_avg
0,2020,38


**Ответ**



4. ▶️ Для каждой локации найдите максимальное значение PM2.5 и его ранг (от наибольшего к наименьшему) среди всех локаций. Сколько локаций имеют ранг <=5 (учитывая возможные одинаковые значения)? Сколько из них имеет имеет больше 10 и меньше 20 дат, когда одновременно проводились измерения и PM2.5, и NO2 (совпадающие измерения обоих загрязнителей) 
Необходимо реализовать автоматическую отправку email-уведомлений жителям при превышении допустимых норм PM2.5 в их районе. Каким образом реализовать персонализированную подписку пользователей на уведомления по конкретным локациям?

In [107]:
client_ch.query_df(
    '''
        WITH pm25_max_ranked AS (
            SELECT
                geo_place_name,
                max(data_value) AS max_pm25,
                RANK() OVER (ORDER BY max(data_value) DESC) AS rank_pm25
            FROM air_quality
            WHERE name = 'Data ValueM 2.5'
            GROUP BY geo_place_name
        ),

        measurements_counts AS (
            SELECT
                a.geo_place_name,
                countDistinct(a.start_date) AS matching_dates_count
            FROM air_quality a
            JOIN air_quality b
            ON a.geo_place_name = b.geo_place_name
            AND a.start_date = b.start_date
            WHERE a.name = 'Data ValueM 2.5'
            AND b.name = 'Nitrogen dioxide (NO2)'
            GROUP BY a.geo_place_name
        )

        SELECT
            count(*) AS total_locations_rank_le_5,
            countIf(matching_dates_count BETWEEN 10 AND 19) AS locations_with_10_19_matching_dates
        FROM pm25_max_ranked pm
        LEFT JOIN measurements_counts mc ON pm.geo_place_name = mc.geo_place_name
        WHERE pm.rank_pm25 <= 5
    '''
)


Unnamed: 0,total_locations_rank_le_5,locations_with_10_19_matching_dates
0,0,0


**Ответ:**

▶️ Сколько дней в январе 2021 года было зафиксировано превышение среднесуточной нормы PM2.5 (>35.5 мкг/м³) хотя бы в одной локации?  При разработке комплексной системы мониторинга с публичной частью на 1С-Битрикс и корпоративной частью на Битрикс24, каким образом правильно организовать обмен данными между системами на уровне баз данных?

In [None]:
client_ch.query_df(
    '''
        SELECT
            countDistinct(a.start_date) AS days_exceeding_norm
        FROM air_quality a
        WHERE 
            a.start_date BETWEEN '2021-01-01' AND '2021-01-31'
            AND a.name LIKE '%PM2.5%'
            AND a.measure_info = 'mcg/m3'
            AND a.measure='Mean'
            AND a.data_value > 35
    '''
)


Unnamed: 0,days_exceeding_norm
0,1


**Ответ:** 1 день.