In [1]:
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from sklearn.linear_model import LinearRegression
from joblib import Parallel, delayed
import httpx
import aiohttp
import asyncio
import time

In [2]:
pd.options.mode.chained_assignment = None

### EDA

In [3]:
# Загрузка датасета
df = pd.read_csv('temperature_data.csv', parse_dates=['timestamp'])
df.head()

Unnamed: 0,city,timestamp,temperature,season
0,New York,2010-01-01,-1.670631,winter
1,New York,2010-01-02,-0.990672,winter
2,New York,2010-01-03,3.556789,winter
3,New York,2010-01-04,-1.721118,winter
4,New York,2010-01-05,-4.411674,winter


In [4]:
# Проверка типов данных
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 54750 entries, 0 to 54749
Data columns (total 4 columns):
 #   Column       Non-Null Count  Dtype         
---  ------       --------------  -----         
 0   city         54750 non-null  object        
 1   timestamp    54750 non-null  datetime64[ns]
 2   temperature  54750 non-null  float64       
 3   season       54750 non-null  object        
dtypes: datetime64[ns](1), float64(1), object(2)
memory usage: 1.7+ MB


In [5]:
# Описательные статистики
df.describe(include='all')

Unnamed: 0,city,timestamp,temperature,season
count,54750,54750,54750.0,54750
unique,15,,,4
top,New York,,,spring
freq,3650,,,13800
mean,,2014-12-30 12:00:00,18.288415,
min,,2010-01-01 00:00:00,-26.042894,
25%,,2012-07-01 00:00:00,11.206228,
50%,,2014-12-30 12:00:00,18.780396,
75%,,2017-06-30 00:00:00,26.086149,
max,,2019-12-29 00:00:00,55.060193,


In [6]:
# Вычисление скользящего среднего с окном в 30 дней
grouped_df = df.groupby(['city'])

for city, group in grouped_df:
    df.loc[group.index, 'rolling_mean'] = group['temperature'].rolling(window=30).mean()
    df.loc[group.index, 'rolling_std'] = group['temperature'].rolling(window=30).std()

df = df.dropna()
df

Unnamed: 0,city,timestamp,temperature,season,rolling_mean,rolling_std
29,New York,2010-01-30,9.245857,winter,0.112613,4.896877
30,New York,2010-01-31,2.895913,winter,0.264831,4.910490
31,New York,2010-02-01,-6.525404,winter,0.080340,5.060954
32,New York,2010-02-02,-0.543022,winter,-0.056320,5.019023
33,New York,2010-02-03,-1.781314,winter,-0.058327,5.019723
...,...,...,...,...,...,...
54745,Mexico City,2019-12-25,8.174663,winter,12.189621,5.383108
54746,Mexico City,2019-12-26,9.240142,winter,11.749971,5.045996
54747,Mexico City,2019-12-27,6.633155,winter,11.568562,5.131004
54748,Mexico City,2019-12-28,16.448924,winter,11.530699,5.089098


In [7]:
# Построим график средней температуры по скользящему среднему для всех городов
fig = px.line(df, x='timestamp', y='rolling_mean', color='city', title='Rolling mean temperature by cities')
fig.update_xaxes(title='date')
fig.update_yaxes(title='Rolling mean temperature (degrees c)')
fig.show()

In [8]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=df[df['city']=='Moscow']['timestamp'], y=df[df['city']=='Moscow']['temperature'], 
                         name='temperature', line=dict(color='CornflowerBlue', width=1)))
fig.add_trace(go.Scatter(x=df[df['city']=='Moscow']['timestamp'], y=df[df['city']=='Moscow']['rolling_mean'], 
                         name='temperature rolling mean', line=dict(color='Navy', width=4)))
fig.update_layout(title='Temperature and rolling mean temperature in Moscow', 
                  xaxis=dict(title=dict(text='date')),  yaxis=dict(title=dict(text='Temperature (degrees C)')))
fig.show()

In [9]:
# Получение линии тренда для одного города при помощи линейной регрессии
X = df[df['city']=='Mexico City'].dropna()[['timestamp']].astype('int')
y = df[df['city']=='Mexico City'].dropna()['rolling_mean']
reg = LinearRegression()
reg.fit(X, y)
pred = reg.predict(X)

In [10]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=df[df['city']=='Mexico City']['timestamp'], y=df[df['city']=='Mexico City']['temperature'], 
                         name='temperature', line=dict(color='CornflowerBlue', width=1)))
fig.add_trace(go.Scatter(x=df[df['city']=='Mexico City']['timestamp'], y=df[df['city']=='Mexico City']['rolling_mean'], 
                         name='temperature rolling mean', line=dict(color='Navy', width=4)))
fig.add_trace(go.Scatter(x=df[df['city']=='Mexico City']['timestamp'], y=pred, 
                         name='trend line', line=dict(color='Crimson', width=4)))
fig.update_layout(title='Temperature and rolling mean temperature in Mexico City', 
                  xaxis=dict(title=dict(text='date')),  yaxis=dict(title=dict(text='Temperature (degrees C)')))

In [11]:
# Определим направление линии тренда

trend_line_coef = reg.coef_[0]
if trend_line_coef > 0:
    print(f'Тренд восходящий с коэффициентом {trend_line_coef}')
elif trend_line_coef < 0:
    print(f'Тренд нисходящий с коэффициентом {trend_line_coef}')
else:
    print('Тренд без изменений (горизонтальная линия)')

Тренд нисходящий с коэффициентом -4.283020304305708e-19


In [12]:
# Температурные профили сезонов по городам
seasons_profile = df[['city', 'season', 'temperature']].groupby(['city', 'season'], as_index=False)\
                    .agg(temp_mean=('temperature', 'mean'), temp_std=('temperature', 'std'), 
                         temp_min=('temperature', 'min'), temp_max=('temperature', 'max'))
seasons_profile


Unnamed: 0,city,season,temp_mean,temp_std,temp_min,temp_max
0,Beijing,autumn,16.206013,5.105279,-1.319941,31.68919
1,Beijing,spring,13.532876,4.938591,-4.96451,27.700372
2,Beijing,summer,26.670668,5.019855,9.30562,42.657388
3,Beijing,winter,-2.159349,5.017286,-16.545212,14.18459
4,Berlin,autumn,11.030841,4.939311,-6.843221,25.869659
5,Berlin,spring,10.011535,4.958291,-4.443505,26.031544
6,Berlin,summer,19.936992,5.085671,3.746157,35.077247
7,Berlin,winter,0.30325,4.942319,-15.486114,15.106645
8,Cairo,autumn,24.764598,4.948879,9.542805,40.01984
9,Cairo,spring,24.925759,5.15207,9.465858,40.579256


In [13]:
# Поиск аномальных значений температуры относительно средней температуры для сезона по городам
df = pd.merge(df, seasons_profile, how='inner', on=['city', 'season'])
df['anomaly'] = abs(df['temperature']) > abs(df['temp_mean']) + df['temp_std'] * 2
df.head(30)

Unnamed: 0,city,timestamp,temperature,season,rolling_mean,rolling_std,temp_mean,temp_std,temp_min,temp_max,anomaly
0,New York,2010-01-30,9.245857,winter,0.112613,4.896877,0.052858,4.998612,-15.692108,15.061622,False
1,New York,2010-01-31,2.895913,winter,0.264831,4.91049,0.052858,4.998612,-15.692108,15.061622,False
2,New York,2010-02-01,-6.525404,winter,0.08034,5.060954,0.052858,4.998612,-15.692108,15.061622,False
3,New York,2010-02-02,-0.543022,winter,-0.05632,5.019023,0.052858,4.998612,-15.692108,15.061622,False
4,New York,2010-02-03,-1.781314,winter,-0.058327,5.019723,0.052858,4.998612,-15.692108,15.061622,False
5,New York,2010-02-04,-6.88603,winter,-0.140805,5.113177,0.052858,4.998612,-15.692108,15.061622,False
6,New York,2010-02-05,7.386751,winter,0.131797,5.292168,0.052858,4.998612,-15.692108,15.061622,False
7,New York,2010-02-06,-5.574352,winter,-0.114384,5.382367,0.052858,4.998612,-15.692108,15.061622,False
8,New York,2010-02-07,-8.349027,winter,-0.251448,5.540993,0.052858,4.998612,-15.692108,15.061622,False
9,New York,2010-02-08,4.220088,winter,-0.112085,5.600809,0.052858,4.998612,-15.692108,15.061622,False


In [14]:
# Построим график с выделением аномальных значений температуры, на этот раз для Берлина
fig = go.Figure()
fig.add_trace(go.Scatter(x=df[df['city']=='Berlin']['timestamp'], y=df[df['city']=='Berlin']['temperature'], 
                         name='temperature', line=dict(color='CornflowerBlue', width=1)))
fig.add_trace(go.Scatter(x=df[df['city']=='Berlin']['timestamp'], y=df[df['city']=='Berlin']['rolling_mean'], 
                         name='temperature rolling mean', line=dict(color='Navy', width=4)))
fig.add_trace(go.Scatter(x=df[(df['city']=='Berlin') & (df['anomaly']==True)]['timestamp'],
                         y=df[(df['city']=='Berlin') & (df['anomaly']==True)]['temperature'],
                         mode='markers', name='anomaly', line=dict(color='Purple', width=4)))
fig.update_layout(title='Temperature and rolling mean temperature in Berlin', 
                  xaxis=dict(title=dict(text='Date')),  yaxis=dict(title=dict(text='Temperature (degrees C)')))

In [15]:
# Кол-во аномалий по сезонам для каждого города
df[['city', 'season', 'anomaly']].groupby(['city', 'season']).sum()

Unnamed: 0_level_0,Unnamed: 1_level_0,anomaly
city,season,Unnamed: 2_level_1
Beijing,autumn,16
Beijing,spring,19
Beijing,summer,16
Beijing,winter,19
Berlin,autumn,17
Berlin,spring,16
Berlin,summer,18
Berlin,winter,34
Cairo,autumn,23
Cairo,spring,21


In [16]:
# Температурные профили городов
city_profile = df[['city', 'temperature']].groupby(['city'], as_index=False)\
                    .agg(temp_mean=('temperature', 'mean'), temp_std=('temperature', 'std'), 
                         temp_min=('temperature', 'min'), temp_max=('temperature', 'max'))
city_profile

Unnamed: 0,city,temp_mean,temp_std,temp_min,temp_max
0,Beijing,13.768003,11.403458,-16.545212,42.657388
1,Berlin,10.45425,8.519588,-15.486114,35.077247
2,Cairo,25.018633,8.571229,-3.774874,51.970598
3,Dubai,30.080364,8.567246,3.341856,55.060193
4,London,11.663772,6.838192,-11.701662,35.095612
5,Los Angeles,19.588809,6.191846,0.096748,43.365931
6,Mexico City,16.428239,5.891757,-4.307543,37.119959
7,Moscow,5.452504,11.16329,-26.042894,32.590492
8,Mumbai,29.978402,6.096257,9.863008,51.889113
9,New York,12.545155,10.200461,-15.692108,44.786657


### Эксперименты с распараллеливанием функции

In [17]:
def get_temperature_data(city: str, data: pd.DataFrame) -> tuple[str, pd.DataFrame, float, pd.DataFrame, pd.DataFrame]:
    '''
        Функция принимает название города и датафрейм с данными температуры.
        Возвращает кортеж из отфильтрованного по городу датафрейма с добавленными столбцами скользящего среднего,
            стандартного отклонения, среднего, точек для линии тренда и индикатор аномальности значения температуры;
            число с плавающей точкой - коэффициент линии тренда, датафрейм с температурным профилем сезонов в выбранном
            городе и датафрейм с температурным профилем города.
    '''
    
    data = data[data['city'] == city]
    
    # Расчет скользящего среднего и скользящего std
    grouped_df = data.groupby(['city'])
    for city, group in grouped_df:
        data.loc[group.index, 'rolling_mean'] = group['temperature'].rolling(window=30).mean()
        data.loc[group.index, 'rolling_std'] = group['temperature'].rolling(window=30).std()
    
    data = data.dropna()
    
    # Получение линии тренда для одного города при помощи линейной регрессии
    X = data.dropna()[['timestamp']].astype('int')
    y = data.dropna()['rolling_mean']
    reg = LinearRegression()
    reg.fit(X, y)
    trend_line = X
    trend_line['trend_line_points'] = reg.predict(X)
    
    # Коэффициент линии тренда
    trend_line_coef = reg.coef_[0]
    
    # Температурные профили сезонов по городам
    seasons_profile = data[['city', 'season', 'temperature']].groupby(['city', 'season'], as_index=False)\
                        .agg(temp_mean=('temperature', 'mean'), temp_std=('temperature', 'std'), 
                            temp_min=('temperature', 'min'), temp_max=('temperature', 'max'))

    # Поиск аномальных значений температуры относительно средней температуры для сезона по городам
    data = pd.merge(data, seasons_profile, how='inner', on=['city', 'season'])
    data['anomaly'] = abs(data['temperature']) > abs(data['temp_mean']) + data['temp_std'] * 2
    
    # Температурные профили городов
    city_profile = data[['city', 'temperature']].groupby(['city'], as_index=False)\
                        .agg(temp_mean=('temperature', 'mean'), temp_std=('temperature', 'std'), 
                            temp_min=('temperature', 'min'), temp_max=('temperature', 'max'))
    
    return (city, data, trend_line_coef, seasons_profile, city_profile)

In [18]:
df = pd.read_csv('temperature_data.csv', parse_dates=['timestamp'])
cities = sorted(df['city'].unique())

In [19]:
%%time

# Замерим время синхронного выполнения функции для всех городов в цикле for

synch_processed_dfs = []

for city in cities:
    synch_processed_dfs.append(get_temperature_data(city, df))

CPU times: user 165 ms, sys: 3.08 ms, total: 168 ms
Wall time: 172 ms


In [20]:
%%time

# Замерим время параллельной работы на нескольких потоках

cities = df['city'].unique()
list_df = [chunk for chunk in df.groupby('city')]

parallel_threads_processed_dfs = Parallel(n_jobs=15, prefer="threads")(delayed(get_temperature_data)(*data) for data in list_df)

CPU times: user 156 ms, sys: 40.8 ms, total: 196 ms
Wall time: 177 ms


In [21]:
%%time
cities = df['city'].unique()
list_df = [chunk for chunk in df.groupby('city')]

parallel_processes_processed_dfs = Parallel(n_jobs=-1)(delayed(get_temperature_data)(*data) for data in list_df)

CPU times: user 70.7 ms, sys: 64 ms, total: 135 ms
Wall time: 2.58 s


Позапускал несколько раз, получилось, что распараллеливание на потоки в данном случае чаще работает немного дольше, иногда чуть быстрее ( в общем не дает стабильного преимущества в скорости выполнения). Распараллеливание на процессы работает существенно дольше (видимо в данном случае поднять процессы, а потом соединить результат их работы в один кортеж сильно дороже работы самой функции), но возможно на существенно большем объеме данных оно бы дало прирост.

### Работа с API OpenWeatherMap

In [22]:
API_KEY = '7ee8bac3cb89d52d712527d1a8c24772'

In [23]:
# Протестируем запросы к api на Москве
city = 'Moscow'
url = f'https://api.openweathermap.org/data/2.5/weather?q={city}&appid={API_KEY}&units=metric'

In [24]:
request = httpx.get(url)
request.status_code

200

In [25]:
# Значение текущей температуры
current_temp = request.json().get('main').get('temp')
current_temp

-2.96

In [26]:
city, data, trend_line_coef, seasons_profile, city_profile = get_temperature_data(city='Moscow', data=df)

mean_temp = seasons_profile.loc[seasons_profile['season']=='winter', 'temp_mean'].to_list()[0]
temp_std = seasons_profile.loc[seasons_profile['season']=='winter', 'temp_std'].to_list()[0]

if round(abs(mean_temp - current_temp) / temp_std, 2) > 2:
    print('Текущая температура является аномальной')
else:
    print('Текущая температура в пределах исторической нормы для сезона')    

Текущая температура в пределах исторической нормы для сезона


In [27]:
# Теперь посмотрим на текущую температуру во всех рассматриваемых городах

for city in cities:
    city_, data, trend_line_coef, seasons_profile, city_profile = get_temperature_data(city=city, data=df)
    mean_temp = seasons_profile.loc[seasons_profile['season']=='winter', 'temp_mean'].to_list()[0]
    temp_std = seasons_profile.loc[seasons_profile['season']=='winter', 'temp_std'].to_list()[0]
    
    url = f'https://api.openweathermap.org/data/2.5/weather?q={city}&appid={API_KEY}&units=metric'
    request = httpx.get(url)
    if request.status_code == 200:
        current_temp = request.json().get('main').get('temp')

        if round(abs(mean_temp - current_temp) / temp_std, 2) > 2:
            print(f'Текущая температура в городе {city} составляет {current_temp:.2f}С и является аномальной')
        else:
            print(f'Текущая температура в городе {city} составляет {current_temp:.2f}C и находится в пределах исторической нормы для сезона')
    
    else:
        print(f'Для города {city} код ответа: {request.status_code}')  

Текущая температура в городе New York составляет -6.54C и находится в пределах исторической нормы для сезона
Текущая температура в городе London составляет 6.49C и находится в пределах исторической нормы для сезона
Текущая температура в городе Paris составляет 5.21C и находится в пределах исторической нормы для сезона
Текущая температура в городе Tokyo составляет 3.83C и находится в пределах исторической нормы для сезона
Текущая температура в городе Moscow составляет -2.96C и находится в пределах исторической нормы для сезона
Текущая температура в городе Sydney составляет 21.73C и находится в пределах исторической нормы для сезона
Текущая температура в городе Berlin составляет 4.92C и находится в пределах исторической нормы для сезона
Текущая температура в городе Beijing составляет -7.06C и находится в пределах исторической нормы для сезона
Текущая температура в городе Rio de Janeiro составляет 26.29C и находится в пределах исторической нормы для сезона
Текущая температура в городе Dub

Проведем эксперимент, сравним время выполнения синхронной и асинхронной функции для получения текущей температуры.

In [28]:
# Синхронная функция для получения текущей температуры

def get_current_temperature(api_key: str, city: str) -> float | None:
    '''ОПИСАНИЕ'''
    
    url = f'https://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}&units=metric'
    request = httpx.get(url)
    if request.status_code == 200:
        current_temp = request.json().get('main').get('temp')
        return current_temp
    else:
        print(f'Ошибка, сервер вернул ответ с кодом {request.status_code}')
        return None

In [29]:
# Измерим время работы синхронной функции получения текущей температуры на всех городах

start = time.time()

for city in cities:
    current_temp = get_current_temperature(API_KEY, city)
    if current_temp:
        print(f'Текущая температура в городе {city}: {current_temp}')

print(f'Время выполнения - {time.time() - start:.2f} секунд')

Текущая температура в городе New York: -6.54
Текущая температура в городе London: 6.49
Текущая температура в городе Paris: 5.21
Текущая температура в городе Tokyo: 3.83
Текущая температура в городе Moscow: -2.96
Текущая температура в городе Sydney: 21.73
Текущая температура в городе Berlin: 4.92
Текущая температура в городе Beijing: -7.06
Текущая температура в городе Rio de Janeiro: 26.29
Текущая температура в городе Dubai: 21.96
Текущая температура в городе Los Angeles: 15.2
Текущая температура в городе Singapore: 26.69
Текущая температура в городе Mumbai: 24.99
Текущая температура в городе Cairo: 17.42
Текущая температура в городе Mexico City: 17.75
Время выполнения - 3.62 секунд


In [30]:
# Асинхронная функция для получения текущей температуры

async def async_get_current_temperature(api_key: str, city: str) -> float | None:
    '''ОПИСАНИЕ'''

    url = f'https://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}&units=metric'
    
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            if response.status == 200:
                current_temp = await response.json() 
                return current_temp.get('main').get('temp')
            else:
                print(f'Ошибка, сервер вернул ответ с кодом {response.status}')
            return None

async def main(cities_list: list):
    '''
        Функция принимает ключ для OpenWeatherMap API и название города.
        Возвращает текущую температуру в выбранном городе.
    '''
    
    tasks = []
    for city in cities_list:
        tasks.append(asyncio.create_task(async_get_current_temperature(API_KEY, city)))
    return await asyncio.gather(*tasks)

In [31]:
# Измерим время работы асинхронной функции получения текущей температуры на всех городах

start = time.time()

temperatures_list = await main(cities)

for city, current_temp in zip(cities, temperatures_list):
    if current_temp:
        print(f'Текущая температура в городе {city}: {current_temp}') 

print(f'Время выполнения - {time.time() - start:.2f} секунд')

Текущая температура в городе New York: -6.54
Текущая температура в городе London: 6.49
Текущая температура в городе Paris: 5.21
Текущая температура в городе Tokyo: 3.83
Текущая температура в городе Moscow: -2.96
Текущая температура в городе Sydney: 21.73
Текущая температура в городе Berlin: 4.92
Текущая температура в городе Beijing: -7.06
Текущая температура в городе Rio de Janeiro: 26
Текущая температура в городе Dubai: 21.96
Текущая температура в городе Los Angeles: 15.2
Текущая температура в городе Singapore: 26.69
Текущая температура в городе Mumbai: 24.99
Текущая температура в городе Cairo: 17.42
Текущая температура в городе Mexico City: 17.75
Время выполнения - 0.20 секунд


В данном случае асинхронный подход дает значительный прирост в скорости (асинхронные запросы к api отрабатывают примерно в 15-20 раз быстрее)