In [1]:
import pandas as pd
import numpy as np
from multiprocessing import Pool, cpu_count
import time
import warnings
warnings.filterwarnings("ignore")

In [106]:
df = pd.read_csv("temperature_data.csv")
df.head()

Unnamed: 0,city,timestamp,temperature,season
0,New York,2010-01-01,3.225691,winter
1,New York,2010-01-02,-1.491755,winter
2,New York,2010-01-03,-5.728947,winter
3,New York,2010-01-04,3.179504,winter
4,New York,2010-01-05,1.976765,winter


Создаем функции, которые вычисляют скользящее среднее за 30 дней, вычисляют статистику по городу,сезону и на основе статистики по сезону,городу вычисляют аномалии

In [107]:
def rolling_average(data):
    data['rolling_avg'] = data['temperature'].rolling(window=30, center=True).mean()
    return data

df_avg = rolling_average(df).copy()
df_avg[~df_avg["rolling_avg"].isna()].tail()

Unnamed: 0,city,timestamp,temperature,season,rolling_avg
54731,Mexico City,2019-12-11,6.196322,winter,13.130485
54732,Mexico City,2019-12-12,8.957218,winter,12.846377
54733,Mexico City,2019-12-13,8.384216,winter,12.62002
54734,Mexico City,2019-12-14,10.919698,winter,12.015437
54735,Mexico City,2019-12-15,5.078125,winter,11.60136


In [108]:
def city_season_temp(data):
    df_grouped = data.groupby(['city', 'season'])['temperature'].agg(['mean', 'std']).reset_index()
    return df_grouped

df_grouped = city_season_temp(df)
df_grouped.head()

Unnamed: 0,city,season,mean,std
0,Beijing,autumn,16.177553,4.992722
1,Beijing,spring,12.862734,4.966864
2,Beijing,summer,27.239888,4.81294
3,Beijing,winter,-1.726825,4.884696
4,Berlin,autumn,11.225418,5.001138


In [113]:
def check_anomalies(data, data_grouped):
    anomalies = []
    for _, i in data_grouped.iterrows():
        city, season, mean, std = i
        cur_season_city = data[(data['city'] == city) & (data['season'] == season)]
        cur_season_city['anomaly'] = ~cur_season_city['temperature'].between(mean - 2 * std, mean + 2 * std)
        anomalies.append(cur_season_city)
    return pd.concat(anomalies)

In [114]:
anomalies = check_anomalies(df,df_grouped)
anomalies[anomalies['anomaly'] == True].tail()

Unnamed: 0,city,timestamp,temperature,season,rolling_avg,anomaly
13924,Tokyo,2018-02-22,16.534278,winter,8.757388,True
14223,Tokyo,2018-12-18,-7.887501,winter,5.093415,True
14253,Tokyo,2019-01-17,16.750643,winter,6.50347,True
14258,Tokyo,2019-01-22,-6.365854,winter,6.700736,True
14290,Tokyo,2019-02-23,-5.750146,winter,9.104678,True


Далее создадим функции, которые выполняют анализ последовательно и параллельно

In [69]:
def full_analyze(data):
    data_avg = rolling_average(data)
    data_grouped = city_season_temp(data)
    anomalies = check_anomalies(data, data_grouped)
    return data_avg, data_grouped, anomalies

df_avg,df_grouped,anomaly = full_analyze(df)

In [78]:
def parallel_analysis(data):
    cities = data['city'].unique()
    groups = [data[data['city'] == city] for city in cities]

    with Pool(cpu_count()) as pool:
        results = pool.map(full_analyze, groups)

    data_avg = pd.concat([result[0] for result in results])
    data_grouped = pd.concat([result[1] for result in results])
    anomalies = pd.concat([result[2] for result in results])

    return data_avg, data_grouped, anomalies

df_avg,df_grouped,anomaly = parallel_analysis(df)

In [90]:
start = time.time()
_ = full_analyze(df)
time_unparallel = round(time.time() - start,4)

start_parallel = time.time()
_ = parallel_analysis(df)
time_parallel = round(time.time() - start_parallel,4)

print(f"Непараллельное выполнение: {time_unparallel} сек.")
print(f"Параллельное выполнение (многопроцессное) : {time_parallel:} сек.")




Непараллельное выполнение: 0.4876 сек.
Параллельное выполнение (многопроцессное) : 0.2167 сек.


Параллельное выполнение ускоряет работу анализа в два раза.

P.S. Запускал код лоакльно на ОС Ubuntu,cpu_count = 8