In [None]:
import pandas as pd
import numpy as np
from concurrent.futures import ProcessPoolExecutor
import itertools
import os
import matplotlib.pyplot as plt
from typing import List, Tuple, Optional

class ForexSentimentAnalyzer:
    def __init__(self, df1: pd.DataFrame, df2: pd.DataFrame, forex_tag: str):
        """
        Initialize the ForexSentimentAnalyzer with input dataframes and forex tag.
        
        :param df1: DataFrame containing sentiment scores
        :param df2: DataFrame containing forex prices
        :param forex_tag: String representing the forex pair tag
        """
        self.df1 = df1
        self.df2 = df2
        self.forex_tag = forex_tag
        self.models = [col.split('_')[0] for col in df1.columns if col.endswith('_sentiment_title')]
        self.process_df2()

    def process_df2(self):
        """Process df2 to create 'mid' column and handle USD-started forex pairs."""
        self.df2['mid'] = (self.df2['bid'] + self.df2['ask']) / 2
        if self.forex_tag.startswith('USD'):
            for col in ['bid', 'ask', 'mid']:
                self.df2[col] = 1 / self.df2[col]
            self.df2[['bid', 'ask']] = self.df2[['ask', 'bid']]

    def resample_df1(self, rt: str, window: int, keyword: str = '') -> pd.DataFrame:
        """
        Resample df1 based on the given parameters.
        
        :param rt: Resample time period
        :param window: Window size for EMA calculation
        :param keyword: Keyword to filter df1 (optional)
        :return: Resampled DataFrame
        """
        df = self.df1[self.df1['keyword'] == keyword] if keyword else self.df1
        df11 = df.set_index('createDate').resample(rt).sum().reset_index()
        df11.rename(columns={'createDate': 'time1'}, inplace=True)

        for model in self.models:
            col = f"{model}_sentiment_title"
            ema_col = f"{col}_{window}"
            df11[ema_col] = df11[col].ewm(span=window).mean()

        return df11

    def resample_df2(self, rt: str) -> pd.DataFrame:
        """
        Resample df2 based on the given parameters.
        
        :param rt: Resample time period
        :return: Resampled DataFrame
        """
        df21 = self.df2.set_index('asoftime').resample(rt).last().reset_index()
        df21.rename(columns={'asoftime': 'time2'}, inplace=True)
        return df21

    def merge_dataframes(self, df11: pd.DataFrame, df21: pd.DataFrame) -> pd.DataFrame:
        """
        Merge resampled dataframes df11 and df21.
        
        :param df11: Resampled sentiment DataFrame
        :param df21: Resampled forex price DataFrame
        :return: Merged DataFrame
        """
        df0 = pd.merge(df11, df21, left_on='time1', right_on='time2', how='outer')
        df0['time'] = df0['time1'].combine_first(df0['time2'])
        df0.sort_values('time', inplace=True)
        df0.reset_index(drop=True, inplace=True)
        return df0

    def calculate_returns(self, df0: pd.DataFrame, t: int) -> pd.DataFrame:
        """
        Calculate returns for the given period.
        
        :param df0: Merged DataFrame
        :param t: Number of periods for return calculation
        :return: DataFrame with calculated returns
        """
        df0[f'R_{t}'] = df0['mid'].pct_change(t).shift(-t)
        return df0

    def calculate_correlation(self, df0: pd.DataFrame, model: str, window: int, t: int) -> float:
        """
        Calculate correlation between sentiment scores and returns.
        
        :param df0: DataFrame with sentiment scores and returns
        :param model: Model name
        :param window: Window size for EMA calculation
        :param t: Number of periods for return calculation
        :return: Correlation coefficient
        """
        sentiment_col = f"{model}_sentiment_title_{window}"
        return_col = f'R_{t}'
        return df0[[sentiment_col, return_col]].dropna().corr().iloc[0, 1]

    def save_df0(self, df0: pd.DataFrame, rt: str, keyword: str, window: int):
        """
        Save df0 to an Excel file.
        
        :param df0: DataFrame to save
        :param rt: Resample time period
        :param keyword: Keyword used for filtering
        :param window: Window size for EMA calculation
        """
        filename = f"records/{self.forex_tag}_{rt}_{keyword}_{window}.xlsx"
        os.makedirs('records', exist_ok=True)
        df0.to_excel(filename, index=False)

    def load_df0(self, rt: str, keyword: str, window: int) -> Optional[pd.DataFrame]:
        """
        Load df0 from an Excel file if it exists.
        
        :param rt: Resample time period
        :param keyword: Keyword used for filtering
        :param window: Window size for EMA calculation
        :return: Loaded DataFrame or None if file doesn't exist
        """
        filename = f"records/{self.forex_tag}_{rt}_{keyword}_{window}.xlsx"
        if os.path.exists(filename):
            return pd.read_excel(filename)
        return None

    def optimize_parameters(self, rt_list: List[str], keyword_list: List[str], t_list: List[int], 
                            window_list: List[int], split_date: str, n: int = None) -> List[Tuple[dict, float, float]]:
        """
        Optimize parameters using parallel processing.
        
        :param rt_list: List of resample time periods
        :param keyword_list: List of keywords
        :param t_list: List of return periods
        :param window_list: List of window sizes
        :param split_date: Date to split train and test sets
        :param n: Number of top results to return (optional)
        :return: List of tuples containing parameters, train correlation, and test correlation
        """
        params_list = list(itertools.product(rt_list, keyword_list, self.models, t_list, window_list))
        
        with ProcessPoolExecutor() as executor:
            results = list(executor.map(self.process_params, params_list, [split_date] * len(params_list)))
        
        sorted_results = sorted(results, key=lambda x: x[1], reverse=True)
        return sorted_results[:n] if n else sorted_results

    def process_params(self, params: Tuple[str, str, str, int, int], split_date: str) -> Tuple[dict, float, float]:
        """
        Process a single set of parameters.
        
        :param params: Tuple containing (rt, keyword, model, t, window)
        :param split_date: Date to split train and test sets
        :return: Tuple containing parameters dict, train correlation, and test correlation
        """
        rt, keyword, model, t, window = params
        df0 = self.load_df0(rt, keyword, window)
        
        if df0 is None:
            df11 = self.resample_df1(rt, window, keyword)
            df21 = self.resample_df2(rt)
            df0 = self.merge_dataframes(df11, df21)
            df0 = self.calculate_returns(df0, t)
            self.save_df0(df0, rt, keyword, window)
        
        train_df = df0[df0['time'] < split_date]
        test_df = df0[df0['time'] >= split_date]
        
        train_corr = self.calculate_correlation(train_df, model, window, t)
        test_corr = self.calculate_correlation(test_df, model, window, t)
        
        return {'rt': rt, 'keyword': keyword, 'model': model, 't': t, 'window': window}, train_corr, test_corr

    def plot_top_results(self, results: List[Tuple[dict, float, float]], n: int = 5):
        """
        Plot top n results.
        
        :param results: List of tuples containing parameters, train correlation, and test correlation
        :param n: Number of top results to plot
        """
        top_n = results[:n]
        
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
        
        labels = [f"{r[0]['model']}\n{r[0]['rt']}, w={r[0]['window']}, t={r[0]['t']}" for r in top_n]
        train_corrs = [r[1] for r in top_n]
        test_corrs = [r[2] for r in top_n]
        
        x = range(len(top_n))
        width = 0.35
        
        ax1.bar([i - width/2 for i in x], train_corrs, width, label='Train')
        ax1.bar([i + width/2 for i in x], test_corrs, width, label='Test')
        ax1.set_ylabel('Correlation')
        ax1.set_title('Top Correlations')
        ax1.set_xticks(x)
        ax1.set_xticklabels(labels, rotation=45, ha='right')
        ax1.legend()
        
        ax2.plot(train_corrs, label='Train', marker='o')
        ax2.plot(test_corrs, label='Test', marker='o')
        ax2.set_ylabel('Correlation')
        ax2.set_title('Correlation Comparison')
        ax2.set_xticks(x)
        ax2.set_xticklabels(labels, rotation=45, ha='right')
        ax2.legend()
        
        plt.tight_layout()
        plt.show()

# Usage example:
# df1 = pd.read_csv('sentiment_scores.csv')
# df2 = pd.read_csv('forex_prices.csv')
# analyzer = ForexSentimentAnalyzer(df1, df2, 'EURUSD')
# results = analyzer.optimize_parameters(
#     rt_list=['1H', '4H', '1D'],
#     keyword_list=['', 'economy', 'politics'],
#     t_list=[1, 3, 5],
#     window_list=[5, 10, 20],
#     split_date='2023-01-01'
# )
# for params, train_corr, test_corr in results[:10]:
#     print(f"Parameters: {params}")
#     print(f"Train correlation: {train_corr:.4f}")
#     print(f"Test correlation: {test_corr:.4f}")
#     print()
# analyzer.plot_top_results(results, n=5)

In [None]:
import pandas as pd
import numpy as np
from typing import Tuple, List, Optional
import os
from concurrent.futures import ProcessPoolExecutor

class ForexSentimentAnalyzer:
    def __init__(self, data_dir: str):
        self.data_dir = data_dir

    def load_df0(self, rt: str, keyword: str, window: int, model: str) -> Optional[pd.DataFrame]:
        file_path = os.path.join(self.data_dir, f"df0_{rt}_{keyword}_{window}_{model}.csv")
        if os.path.exists(file_path):
            return pd.read_csv(file_path, parse_dates=['time'])
        return None

    def save_df0(self, df0: pd.DataFrame, rt: str, keyword: str, window: int, model: str):
        file_path = os.path.join(self.data_dir, f"df0_{rt}_{keyword}_{window}_{model}.csv")
        df0.to_csv(file_path, index=False)

    def process_params(self, params: Tuple[str, str, str, int, int], split_date: str) -> Tuple[dict, float, float]:
        rt, keyword, model, t, window = params
        df0 = self.load_df0(rt, keyword, window, model)
        
        if df0 is None:
            df11 = self.resample_df1(rt, window, keyword)
            df21 = self.resample_df2(rt)
            df0 = self.merge_dataframes(df11, df21)
            df0 = self.calculate_returns(df0, t)
            self.save_df0(df0, rt, keyword, window, model)
        
        df0['time'] = pd.to_datetime(df0['time'])
        split_datetime = pd.to_datetime(split_date)
        
        train_df = df0[df0['time'] < split_datetime]
        test_df = df0[df0['time'] >= split_datetime]
        
        train_corr = self.calculate_correlation(train_df, model, window, t)
        test_corr = self.calculate_correlation(test_df, model, window, t)
        
        return {'rt': rt, 'keyword': keyword, 'model': model, 't': t, 'window': window}, train_corr, test_corr

    def calculate_correlation(self, df: pd.DataFrame, model: str, window: int, t: int) -> float:
        column_name = f"{model}_score_ema_{window}"
        returns_column = f"returns_{t}"
        
        # 删除 score 或 returns 列中任何一个为空的行
        df_clean = df.dropna(subset=[column_name, returns_column])
        
        if df_clean.empty:
            return 0.0
        
        correlation = df_clean[column_name].corr(df_clean[returns_column])
        return correlation if not np.isnan(correlation) else 0.0

    def analyze(self, rt_list: List[str], keyword_list: List[str], model_list: List[str], 
                t_list: List[int], window_list: List[int], split_date: str, num_workers: int = 4) -> pd.DataFrame:
        params_list = [(rt, keyword, model, t, window) 
                       for rt in rt_list 
                       for keyword in keyword_list 
                       for model in model_list 
                       for t in t_list 
                       for window in window_list]

        with ProcessPoolExecutor(max_workers=num_workers) as executor:
            results = list(executor.map(self.process_params, params_list, [split_date] * len(params_list)))

        df_results = pd.DataFrame([
            {**params, 'train_corr': train_corr, 'test_corr': test_corr}
            for params, train_corr, test_corr in results
        ])

        # 按照 train_corr 的绝对值从大到小排序
        df_results = df_results.sort_values(by='train_corr', key=abs, ascending=False)

        return df_results

    def plot_top_correlations(self, df_results: pd.DataFrame, top_n: int = 10):
        # 使用原始的 train_corr 值进行绘图
        top_results = df_results.head(top_n)
        
        plt.figure(figsize=(12, 6))
        plt.bar(range(top_n), top_results['train_corr'], align='center', alpha=0.8, label='Train')
        plt.bar(range(top_n), top_results['test_corr'], align='center', alpha=0.6, label='Test')
        plt.xlabel('Parameters')
        plt.ylabel('Correlation')
        plt.title(f'Top {top_n} Correlations')
        plt.legend()
        plt.xticks(range(top_n), [f"{row['rt']}-{row['keyword']}-{row['model']}-{row['t']}-{row['window']}" 
                                  for _, row in top_results.iterrows()], rotation=90)
        plt.tight_layout()
        plt.show()

# 使用示例
analyzer = ForexSentimentAnalyzer('data_directory')
results = analyzer.analyze(rt_list, keyword_list, model_list, t_list, window_list, split_date='2024-01-01')
print(results)
analyzer.plot_top_correlations(results)