In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import statsmodels.api as sm
from statsmodels.tsa.stattools import grangercausalitytests, adfuller
from statsmodels.tsa.api import VAR
from scipy.stats import normaltest
import os
import pickle
import gc
import warnings

warnings.filterwarnings('ignore')

In [3]:
# finbert sentiment data
df = pd.read_pickle('final_m.pkl')

In [2]:
"""whole process for data filtration and strategy"""
# # if choose other data
# # set path
# directory = 'the path'
# # get all Parquet file in this path
def parquet_concat(directory):
    """
    Aggregates raw sentiment data from multiple parquet files
    
    Parameters:
        directory (str): Path containing parquet files
        
    Process Flow:
        1. Load all parquet files in target directory
        2. Merge with mapping table for data validation
        3. Filter by publication date (post-2017-08-17)
        4. Apply logarithmic transformation to FAMC values
        5. Explicit garbage collection for memory management
    
    Returns:
        DataFrame: Cleaned, merged dataset with essential features
    """
    parquet_files = [f for f in os.listdir(directory) if f.endswith('.parquet')]
    # read and merge all the Parquet files
    df_list = []
    for file in parquet_files:
        file_path = os.path.join(directory, file)
        temp_df = pd.read_parquet(file_path)
        df_list.append(temp_df)

    df = pd.concat(df_list, ignore_index=True)

    # use map table to filter these sentiment data
    map_table = pd.read_pickle('final_map_table.pkl')
    df = pd.merge(df, map_table, on=['id'], how='right')   
    df['date'] = [x[:10] for x in df['publishedDate']]
    df = df[df['date'] >= '2017-08-17']
    df['log_FAMC'] = np.log(df['FAMC'])
    gc.collect()
    return df


def sentiment_cluster(df_, stock_counts, fin_col, weighted_method, freq='d', impact=3):
    """
    Generates market-level sentiment indicators with news impact weighting
    
    Parameters:
        df_ (DataFrame): Filtered sentiment data from parquet_concat
        stock_counts (Series): Market-wide news frequency baseline
        fin_col (list): Sentiment score columns to aggregate
        weighted_method (str): Weighting scheme ('ew', 'vw', or 'log vw')
        freq (str): Time frequency ('d' daily/'h' hourly)
        impact (float): News impact multiplier (default 3x)
    
    Key Processes:
        1. Intra-period deduplication: Average scores per ticker-period
        2. Weighted aggregation: News impact-adjusted averages
        3. Sentiment persistence: Forward filling for assets without news
        4. Adaptive smoothing: Blends current news impact with historical values
    
    Returns:
        DataFrame: Time-series of market sentiment indicators
    """
    """ps: 1. some stocks have many news during same period, we should avoid double counting."""
    df_ = df_.copy()
    if freq == 'h':
        freq_col = 'publishedDate'
        stock_counts.name = 'count'
        df_ = pd.merge(df_, stock_counts, left_on='date', right_index=True, how='left')
        stock_counts = df_[['count', freq_col]].drop_duplicates().set_index([freq_col])
    else:
        freq_col = 'date'
        stock_counts.name = 'count'
    # for the repeated news about the same stock at a certain period, we take the average.
    df_['ew'] = 1
    temp_senti_s = df_.groupby(['ticker', freq_col]).apply(lambda x: x[fin_col + [weighted_method]].mean(axis=0))
    # suppose the stocks have no news remain the same sentiment score as last period.
    temp_senti_freq = temp_senti_s.groupby(level=1).apply(lambda x: pd.DataFrame(np.average(
        a=x[fin_col].values, weights=x[weighted_method].values, axis=0), index=fin_col).T).reset_index(level=1, drop=True)
    temp_count_freq = temp_senti_s.groupby(level=1).apply(lambda x: x.shape[0])
    temp_count_freq.name = 'inner_count'
    temp_merge = pd.concat([stock_counts, temp_count_freq, temp_senti_freq], axis=1).sort_index().ffill()
    cluster_dict = {x: [] for x in fin_col}
    last_senti = {x: temp_merge.head(1)[x].values[0] for x in fin_col}
    for ind, row in temp_merge.iterrows():
        for s in fin_col:
            temp_put = (row[s] * impact * row['inner_count'] + last_senti[s] * (row['count'] - row['inner_count'])
                           ) / (impact * row['inner_count'] + row['count'] - row['inner_count'])
            
            cluster_dict[s].append(temp_put)
            last_senti[s] = temp_put
    return pd.DataFrame(data=cluster_dict, index=temp_merge.index)


def sector_cluster(df_, stock_counts_sector, fin_col, weighted_method, freq='d', impact=3):
    """
    Sector-specific version of sentiment_cluster
    
    Parameters:
        stock_counts_sector (DataFrame): Sector-annotated news frequency data
        
    Process Flow:
        1. Splits data by GICS sector classification
        2. Parallel processing of sector groups
        3. Aggregates sector-specific sentiment indicators
        
    Returns:
        dict: {sector_code: DataFrame} of sector sentiment indicators
    """
    sec_dict = {}
    for sector, group in df_.groupby('gsector'):
        tc = stock_counts_sector[stock_counts_sector.index.get_level_values(0) == sector].reset_index(level=0, drop=True)
        temp_d = sentiment_cluster(group, tc, fin_col, weighted_method, freq, impact)
        sec_dict[sector] = temp_d
    return sec_dict

In [3]:
def data_manipulation_pipeline(config):
    """
    Main execution pipeline for sentiment data processing and analysis
    
    Parameters:
        config (dict): Configuration dictionary with keys:
            - model: List of model names to process
            - pkl_file: List of preprocessed data cache paths
            - parquet_file: List of raw data directories
            - fin_col: List of sentiment score columns per model
            - impacts: List of impact factors per model
    
    Returns:
        dict: Nested dictionary structure containing:
            {
                model_name: {
                    weighting_scheme: [
                        market_cluster_df,
                        sector_cluster_dict
                    ]
                }
            }
    
    Key Features:
        - Caching system avoids redundant data processing
        - Parallel processing for multiple models
        - Three weighting schemes (equal, volume, log-volume)
        - Generates both market-wide and sector-specific results
        - Maintains model separation throughout processing
    
    Workflow:
        1. Data Loading Phase:
           - Checks for existing preprocessed data (pickle files)
           - Processes raw parquet files only when necessary
        2. Count Data Integration:
           - Loads precomputed news frequency statistics
        3. Sentiment Aggregation:
           - Computes market-level sentiment clusters
           - Generates sector-specific sentiment clusters
        4. Result Organization:
           - Structures outputs by model and weighting scheme
    """
    summary_dict = {}
    for i, model_ in enumerate(config['model']):
        # first step: load data
        if os.path.exists(config['pkl_file'][i]):
            print('file already ready')
            df = pd.read_pickle(config['pkl_file'][i])
        else:
            df = parquet_concat(config['parquet_file'][i])
            df.to_pickle(config['pkl_file'][i])
        
        # counts data
        ticker_counts = pd.read_pickle('ticker_counts_daily.pkl')
        ticker_counts_sector = pd.read_pickle('ticker_counts_sector_daily.pkl')
        
        # 
        contrast_dict = {}
        for w in ['ew', 'FAMC', 'log_FAMC']:
            contrast_dict[w] = []
            contrast_dict[w].append(sentiment_cluster(df, ticker_counts, config['fin_col'][i], w, 'd', config['impacts'][i]))
            contrast_dict[w].append(sector_cluster(df, ticker_counts_sector, config['fin_col'][i], w, 'd', config['impacts'][i]))
        
        summary_dict[model_] = contrast_dict
    
    return summary_dict


In [100]:
config_ = {
    'model': ['finbert', 'deberta', 'roberta'],
    'pkl_file': ['finbert_senti.pkl', 'deberta_senti.pkl', 'roberta_senti.pkl'],
    'parquet_file': [],  # 老师填一下，顺序是'finbert', 'deberta', 'roberta'
    'fin_col': [['finbert_pos', 'finbert_neg']]*3,
    'impacts': [3, 3, 3] 
}

In [None]:
all_model_sentiment = data_manipulation_pipeline(config_)
with open('sentiment_summary_dict.pkl', 'wb') as file:
    pickle.dump(all_model_sentiment, file)