In [53]:
!pip install aiohttp



In [54]:
import pandas as pd
import os
import requests
import json
from time import time
import warnings
from joblib import Parallel, delayed
import aiohttp
import asyncio

warnings.filterwarnings("ignore")

In [2]:
df = pd.read_csv("/Users/almazkhayrullin/Desktop/appied_python/hw_1/data/Temperature Data.csv")
df.head()

Unnamed: 0,city,timestamp,temperature,season
0,New York,2010-01-01,4.650886,winter
1,New York,2010-01-02,0.186908,winter
2,New York,2010-01-03,5.51351,winter
3,New York,2010-01-04,6.728524,winter
4,New York,2010-01-05,-4.447639,winter


In [3]:
df.sort_values(by=["city", "timestamp"]).reset_index(drop=True)

Unnamed: 0,city,timestamp,temperature,season
0,Beijing,2010-01-01,2.061876,winter
1,Beijing,2010-01-02,-15.773126,winter
2,Beijing,2010-01-03,-4.436987,winter
3,Beijing,2010-01-04,-11.834165,winter
4,Beijing,2010-01-05,-3.632313,winter
...,...,...,...,...
54745,Tokyo,2019-12-25,6.145897,winter
54746,Tokyo,2019-12-26,12.190132,winter
54747,Tokyo,2019-12-27,15.187671,winter
54748,Tokyo,2019-12-28,8.248416,winter


### Data Analysis

In [25]:
def process_data_city(data):
    data["rolling_mean"] = (data["temperature"].rolling(window=30, min_periods=1).mean())

    seasons = (data.groupby("season")["temperature"].agg(["mean", "std"]).reset_index())

    result = data.merge(seasons, on="season", how="left")

    result["min"] = result["mean"] - 2 * result["std"]
    result["max"] = result["mean"] + 2 * result["std"]
    result["not_ok"] = ((result["temperature"] < result["min"]) | (
            result["temperature"] > result["max"])).astype(int)

    return result

In [26]:
def data_analysis(data):
    results = []
    cities = data["city"].unique()
    for c in cities:
        city_data = data[data["city"] == c]
        results.append(process_data_city(city_data))
    return pd.concat(results)

In [28]:
def parallel_data_analysis(df, n_jobs=-1):
    cities = df["city"].unique()
    city_data = [df[df["city"] == c] for c in cities]
    results = Parallel(n_jobs=n_jobs)(
        delayed(process_data_city)(city_data) for city_data in city_data
    )
    return pd.concat(results)

In [57]:
kek = data_analysis(df)
kek

Unnamed: 0,city,timestamp,temperature,season,rolling_mean,mean,std,min,max,not_ok
0,New York,2010-01-01,4.650886,winter,4.650886,-0.042706,5.003571,-10.049848,9.964436,0
1,New York,2010-01-02,0.186908,winter,2.418897,-0.042706,5.003571,-10.049848,9.964436,0
2,New York,2010-01-03,5.513510,winter,3.450434,-0.042706,5.003571,-10.049848,9.964436,0
3,New York,2010-01-04,6.728524,winter,4.269957,-0.042706,5.003571,-10.049848,9.964436,0
4,New York,2010-01-05,-4.447639,winter,2.526438,-0.042706,5.003571,-10.049848,9.964436,0
...,...,...,...,...,...,...,...,...,...,...
3645,Mexico City,2019-12-25,12.828099,winter,13.634662,12.088848,4.922788,2.243272,21.934423,0
3646,Mexico City,2019-12-26,10.343160,winter,13.381220,12.088848,4.922788,2.243272,21.934423,0
3647,Mexico City,2019-12-27,9.403314,winter,13.579702,12.088848,4.922788,2.243272,21.934423,0
3648,Mexico City,2019-12-28,30.645927,winter,13.942159,12.088848,4.922788,2.243272,21.934423,1


In [18]:
%%timeit
# let's see what way is faster

data_analysis(df)

51.3 ms ± 904 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [23]:
%%timeit
# let's see what way is faster

parallel_data_analysis(df)

57.4 ms ± 4.48 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


Последовательное выполнение чуть-чуть быстрее, скорее всего из-за того, что параллельное выполнение еще тратит время на всякие системные штуки

### Посмотрим на апишку

In [34]:
API_KEY = "dac0a38de8a84f0c2aa0fc93000d86eb"
URL_TEMPLATE = "https://api.openweathermap.org/data/2.5/weather?q={city}&units=metric&appid={API_KEY}"

In [43]:
def get_current_weather(city):
    resp = requests.get(URL_TEMPLATE.format(
        city=city,
        API_KEY=API_KEY
    ))

    return json.loads(resp.text)

In [44]:
get_current_weather(city="Kogalym")

{'coord': {'lon': 74.4791, 'lat': 62.2654},
 'weather': [{'id': 600,
   'main': 'Snow',
   'description': 'light snow',
   'icon': '13n'}],
 'base': 'stations',
 'main': {'temp': -10.37,
  'feels_like': -17.37,
  'temp_min': -10.37,
  'temp_max': -10.37,
  'pressure': 1023,
  'humidity': 93,
  'sea_level': 1023,
  'grnd_level': 1014},
 'visibility': 243,
 'wind': {'speed': 9.54, 'deg': 186, 'gust': 15.31},
 'snow': {'1h': 0.13},
 'clouds': {'all': 100},
 'dt': 1734885242,
 'sys': {'country': 'RU', 'sunrise': 1734841780, 'sunset': 1734859902},
 'timezone': 18000,
 'id': 6695754,
 'name': 'Kogalym',
 'cod': 200}

In [45]:
def is_weather_anomaly(city, df=None):

    resp = requests.get(URL_TEMPLATE.format(
        city=city,
        API_KEY=API_KEY
    ))

    curr_temp = json.loads(resp.text)["main"]["temp"]

    normal_mean, normal_std = df.loc[
            (df.city == city) & (df.season == "winter"), ["mean", "std"]
        ].values[0]

    lower_bound = normal_mean - 2 * normal_std
    upper_bound = normal_mean + 2 * normal_std

    is_anomaly = True if curr_temp > upper_bound or curr_temp < lower_bound else False

    return is_anomaly

In [49]:
is_weather_anomaly("New York", kek)

False

In [52]:
def get_weather_synch(city):
    resp = requests.get(URL_TEMPLATE.format(
        city=city,
        API_KEY=API_KEY
    ))
    return json.loads(resp.text)

start = time()
for city in df.city.unique():
    get_weather_synch(city)
end = time()
print(f"Synchronious time: {end - start:.2f} sec.")

Synchronious time: 10.65 sec.


In [56]:
async def get_weather_asynch(city):
    url = URL_TEMPLATE.format(
        city=city,
        API_KEY=API_KEY
    )
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
                content = await response.text()
                return json.loads(content)

start = time()
await asyncio.gather(*[get_weather_asynch(city) for city in df.city.unique()])
end = time()
print(f"Asynchronious time: {end - start:.2f} sec.")

Asynchronious time: 0.47 sec.
