In [98]:
import time
import uuid
import plotly.express as px
import polars as pl
import pandas as pd
from typing import Union, Optional, List
import os
from statsmodels.tsa.seasonal import seasonal_decompose
import plotly

from methodtools import lru_cache

COLUMNS = ['temperature', 'season', 'city', 'timestamp']
FILES_DIR = 'files'


class DataManager:

    def __init__(self, file_path: str = None, parallel: bool = True) -> None:
        if file_path is None:
            file_path = 'temperature_data.csv'
        self.file_path = file_path
        self.parallel = parallel
        #self.files_dir = os.path.join(os.path.dirname(__file__), 'files_folder')
        self.files_dir = os.path.join("./", 'files_folder')
        self.check_files_dir()
        self.data = self.check_data(self.load_data(file_path))

    def check_files_dir(self):
        if not os.path.exists(self.files_dir):
            os.makedirs(self.files_dir)

    def load_file(self, file_data):
        file_name = str(uuid.uuid4())
        file_path = f"{os.path.join(self.files_dir, file_name)}.csv"
        with open(file_path, "wb") as f:
            f.write(file_data)

        return file_path

    def upload_streamlit_file(self, file: bytes):
        file_path = self.load_file(file)
        return self.load_data(file_path)

    def load_data(self, file_path: str = None) -> Optional[Union[pl.DataFrame, pd.DataFrame]]:
        print("file_path",file_path)
        if file_path is None:
            file_path = self.file_path
        try:
            if self.parallel:
                return pl.read_csv(file_path)
            return pd.read_csv(file_path)
        except Exception as e:
            print(f"Error during data loading: {e}")
            return None

    def check_data(self, data) -> None:
        try:
            for col in COLUMNS:
                _ = data[col]
            return data
        except Exception as e:
            print(f"Error during data check: {e}")
            return None

    @lru_cache()
    def get_rolling_mean(self, city: str, days: int = 30) -> Optional[Union[pl.Series, pd.Series]]:
        if self.data is None:
            raise ValueError("Data is None")
        try:
            if self.parallel:
                return self.data.filter(pl.col("city") == city)\
                        .select(pl.col("temperature")\
                        .rolling_mean(window_size=days).alias("rolling_mean"))["rolling_mean"]
            return self.data.groupby(by=['city'])\
                .apply(lambda g: g[g['city'] == city])["temperature","timestamp"]\
                .rolling(days).mean()
        except Exception as e:
            print(f"Error calculating rolling mean: {e}")
            return None

    @lru_cache()
    def get_rolling_std(self, city: str, days: int = 30) -> Optional[Union[pl.Series, pd.Series]]:
        try:
            if self.parallel:
                return self.get_data_by_city(city)\
                        .select(pl.col("temperature")\
                        .rolling_std(window_size=days).alias("rolling_std"))["rolling_std"]
            return self.get_data_by_city(city)\
                .apply(lambda g: g[g['city'] == city])["temperature"]\
                .rolling(days).std()
        except Exception as e:
            print(f"Error calculating rolling std: {e}")
            return None
        
    @lru_cache()
    def get_data_by_city(self, city: str):
        if self.parallel:
            return self.data.filter(pl.col("city") == city)
        return self.data[self.data['city'] == city]
    
    @lru_cache()
    def get_temperature_by_city(self, city: str):
        return self.get_data_by_city(city)["temperature"]

    def calculate_abnormal_row(self, row: dict) -> int:
        return not self.is_temperature_normal(row["city"], row["timestamp"], row["temperature"])

    @lru_cache()
    def detect_anomalies(self, city: str) -> Optional[Union[pl.Series, pd.Series]]:
        if self.data is None:
            raise ValueError("Data is None")
        if city is None or city not in self.get_cities():
            raise ValueError("City value is incorrect")
        try:
            temperature_data = self.get_data_by_city(city)
            if self.parallel:
                normality = temperature_data.with_columns(
                    pl.struct(pl.all())
                    .map_elements(self.calculate_abnormal_row, return_dtype=pl.Boolean)
                    .alias("abnormal")
                )["abnormal"]
            else:
                normality = temperature_data.apply(lambda row: self.calculate_abnormal_row(row), axis=1)

            return normality
        except Exception as e:
            raise e
            print(f"Error detecting anomalies: {e}")
            return None

    @lru_cache()
    def get_trend(self, city: str) -> Optional[Union[pl.Series, pd.Series]]:
        if self.data is None:
            raise ValueError("Data is None")
        try:
            city_data = self.get_data_by_city(city)
            trend = self.get_rolling_mean(city, days=30)

            if self.parallel:
                city_data = city_data.with_columns(trend.alias("trend"))
            else:
                city_data['trend'] = trend

            return city_data
        except Exception as e:
            print(f"Error calculating trend: {e}")
            return None

    @lru_cache()
    def get_historical_range(self, timestamp: str, city: str) -> Optional[Union[pl.Series, pd.Series]]:
        if self.data is None:
            raise ValueError("Data is None")
        try:
            stats = self.get_temperature_stats_by_date(city)
            if self.parallel:
                lower_value = stats.filter(pl.col("timestamp") == timestamp)["lower_value"].first()
                upper_value = stats.filter(pl.col("timestamp") == timestamp)["upper_value"].first()
                return lower_value, upper_value
            # else:
            #     lower_value = stats[stats['season'] == season]['q1_temp'].iloc[0]
            #     upper_value = stats[stats['season'] == season]['q3_temp'].iloc[0]
            #     return q1, q3
        except Exception as e:
            print(f"Error calculating historical range: {e}")
            return None

    def get_data(self) -> Union[pl.DataFrame, pd.DataFrame]:
        return self.data

    def get_cities(self) -> Union[pl.Series, pd.Series]:
        return sorted(self.data["city"].unique())

    @lru_cache()
    def get_temperature_stats_by_date(self, city: str):
        days = 30
        if self.parallel:
            data = self.get_data_by_city(city)
            rolling_mean = self.get_rolling_mean(city, days).alias("rolling_mean")
            rolling_std = self.get_rolling_std(city, days).alias("rolling_std")
            anomalies = (data.select(pl.col("timestamp"), pl.col("temperature"), rolling_mean, rolling_std)
                        .with_columns(
                            (pl.col("rolling_mean") - 2 * pl.col("rolling_std")).alias("lower_bound"),
                            (pl.col("rolling_mean") + 2 * pl.col("rolling_std")).alias("upper_bound")
                        ))
            return anomalies
        else:
            return self.get_data_by_city(city).groupby("season")["temperature"]\
                .agg(
                    [
                        ('q1_temp', lambda x: x.quantile(0.25)),
                        ('q3_temp', lambda x: x.quantile(0.75))
                    ]
                ).reset_index()

    @lru_cache()
    def is_temperature_normal(self, city: str, timestamp: str, current_temp: float) -> bool:
        if city is None or city not in self.get_cities():
            raise ValueError("City value is incorrect")
        
        lower_bound, upper_bound = self.get_historical_range(timestamp, city)

        return True if lower_bound <= current_temp <= upper_bound else False

    def plot_anomalies(self, city: str):
        if self.data is None:
            raise ValueError("Data is None")
        try:
            data = self.get_data_by_city(city).to_pandas() if self.parallel else self.get_data_by_city(city)
            data['is_anomaly'] = self.detect_anomalies(city)

            fig = px.scatter(
                data,
                x="timestamp",
                y="temperature",
                color="is_anomaly",
                color_discrete_map={True: "red", False: "blue"},
                labels={"is_anomaly": "Anomaly"},
                title="Temperature Anomalies",
            )
            fig.update_traces(marker=dict(size=6), selector=dict(mode="markers"))
            fig.update_layout(
                xaxis_title="Date",
                yaxis_title="Temperature (°C)",
                legend_title="Anomalies",
                template="plotly_white",
            )
            return fig
        except Exception as e:
            print(f"Error during plotting anomalies: {e}")
            return None

    def add_data_column(self, name, values):
        if self.parallel:
            self.data = self.data.with_columns(pl.Series(name=name, values=values))
        else:
            self.data[name] = values

        return self.get_data()

    def get_decomposition(self, city: str, period=365):
        if self.parallel:
            data = self.get_data_by_city(city).to_pandas().set_index("timestamp")
        else:
            data = self.get_data_by_city(city)
            data.set_index('timestamp', inplace=True)

        decomposition = seasonal_decompose(data['temperature'], model='additive', period=period)

        return decomposition

def test(file_path: str, parallel: bool = True):
    data_manager = DataManager(file_path, parallel)
    start_time = time.time()
    anomalies = data_manager.detect_anomalies("Moscow")
    trend = data_manager.get_trend("Moscow")
    season_range = data_manager.get_historical_range('summer', "Moscow")
    print("Время выполнения (parallel={}): {}".format(parallel, time.time() - start_time))


if __name__ == "__main__":
    pass
    #test('temperature_data.csv', parallel=True)
    #test('temperature_data.csv', parallel=False)


In [99]:
data_manager = DataManager("temperature_data.csv", True)

In [101]:
data_manager.get_temperature_stats_by_date('Moscow')

In [None]:
data_manager.get_temperature_by_city("Moscow")

In [None]:
# import plotly.express as px

# # Создаем DataFrame с информацией об аномалиях
# data = data_manager.get_data_by_city(city).to_pandas() if data_manager.parallel else data_manager.get_data_by_city(city)
# data['is_anomaly'] = data_manager.detect_anomalies(city)

# # Построение интерактивного графика с аномалиями
# fig = px.scatter(
#     data,
#     x="timestamp",
#     y="temperature",
#     color="is_anomaly",
#     color_discrete_map={True: "red", False: "blue"},
#     labels={"is_anomaly": "Anomaly"},
#     title="Temperature Anomalies",
# )
# fig.update_traces(marker=dict(size=6), selector=dict(mode="markers"))
# fig.update_layout(
#     xaxis_title="Date",
#     yaxis_title="Temperature (°C)",
#     legend_title="Anomalies",
#     template="plotly_white",
# )

# fig.show()

In [None]:
# trend = data_manager.get_trend(city)
# if trend is not None:
#     data = data_manager.get_data_by_city(city).to_pandas() if data_manager.parallel else data_manager.get_data_by_city(city)
#     data['trend'] = trend
#     fig = px.line(
#         data,
#         x="timestamp",
#         y=["temperature", "trend"],
#         labels={"value": "Temperature (°C)", "variable": "Type"},
#         title="Temperature Trend",
#         hover_data=["timestamp"]
#     )
#     fig.show()

In [None]:
# 

In [None]:
# import plotly.graph_objects as go
# import plotly.express as px
# import numpy as np

# # Получаем данные
# data = data_manager.get_data_by_city(city).to_pandas() if data_manager.parallel else data_manager.get_data_by_city(city)
# data['is_anomaly'] = data_manager.detect_anomalies(city)

# # Получаем скользящее среднее и стандартное отклонение
# rolling_mean = data_manager.get_rolling_mean(city, days=365)
# rolling_std = data_manager.get_rolling_std(city, days=365)

# # Нормализуем температуру
# data['normalized_temperature'] = (data_manager.get_temperature_by_city(city) - rolling_mean) / rolling_std

# # Создаем объект Figure
# fig = go.Figure()

# # Добавляем scatter plot для нормализованной температуры и аномалий
# scatter = px.scatter(
#     data,
#     x="timestamp",
#     y="normalized_temperature",
#     color="is_anomaly",
#     color_discrete_map={True: "red", False: "blue"},
#     labels={"is_anomaly": "Anomaly"},
# )
# for trace in scatter.data:
#     fig.add_trace(trace)

# # # Добавляем линию тренда, если она доступна
# trend = data_manager.get_trend(city)
# if trend is not None:
#     # Нормализуем тренд
#     normalized_trend = (trend - rolling_mean) / rolling_std
#     data['normalized_trend'] = trend
#     trend_line = px.line(
#         data,
#         x="timestamp",
#         y="normalized_trend",
#         labels={"normalized_trend": "Trend"},
#     )
#     for trace in trend_line.data:
#         fig.add_trace(trace)

# # Настраиваем макет графика
# fig.update_layout(
#     title="Normalized Temperature Anomalies and Trend",
#     xaxis_title="Date",
#     yaxis_title="Normalized Temperature",
#     legend_title="Legend",
#     template="plotly_white",
# )

# # Настраиваем маркеры для scatter plot
# fig.update_traces(marker=dict(size=6), selector=dict(mode="markers"))


# # Показываем график
# fig.show()