In [2]:
from sqlalchemy import create_engine
import pandas as pd
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()
rds_host = os.getenv('RDS_HOST')
rds_user = os.getenv('RDS_USER')
rds_password = os.getenv('RDS_PASSWORD')
rds_db = os.getenv('RDS_DB')
rds_port = os.getenv('RDS_PORT')

# Connect to PostgreSQL
engine = create_engine(f'postgresql://{rds_user}:{rds_password}@{rds_host}:{rds_port}/{rds_db}')

def load_and_merge_data():
    """Load and merge datasets with proper timezone handling"""
    try:
        print("Loading economic data...")
        df_econ = pd.read_sql('SELECT "Date" as date, "GDP", "Unemployment_Rate" FROM economic_indicators', engine)
        df_econ['date'] = pd.to_datetime(df_econ['date']).dt.tz_localize(None)
        print(f"Economic data loaded: {len(df_econ)} rows")
        
        print("\nLoading sentiment data...")
        df_sentiment = pd.read_sql('SELECT "publishedAt" as date, sentiment_score, sentiment_label FROM financial_news_sentiment', engine)
        # Convert to timezone-naive datetime by first converting to UTC then removing timezone
        df_sentiment['date'] = pd.to_datetime(df_sentiment['date']).dt.tz_convert('UTC').dt.tz_localize(None)
        print(f"Sentiment data loaded: {len(df_sentiment)} rows")
        
        print("\nLoading stock data...")
        df_stock = pd.read_sql('SELECT "Date" as date, "Close_AAPL" as close, "Volume_AAPL" as volume FROM apple_stock', engine)
        if df_stock.empty:
            print("Warning: No stock data found - using empty DataFrame")
        else:
            df_stock['date'] = pd.to_datetime(df_stock['date']).dt.tz_localize(None)
            print(f"Stock data loaded: {len(df_stock)} rows")

        # Process sentiment data (aggregate by day)
        print("\nProcessing sentiment data...")
        sentiment_daily = df_sentiment.groupby(pd.Grouper(key='date', freq='D')).agg({
            'sentiment_score': ['mean', 'count'],
            'sentiment_label': lambda x: x.mode()[0] if not x.empty else None
        })
        sentiment_daily.columns = ['sentiment_mean', 'sentiment_count', 'sentiment_mode']
        sentiment_daily = sentiment_daily.reset_index()
        
        # Process economic data (quarterly to daily)
        print("Processing economic data...")
        df_econ.set_index('date', inplace=True)
        df_econ_daily = df_econ.resample('D').ffill()
        
        # Merge stock with sentiment (if stock data exists)
        if not df_stock.empty:
            print("\nMerging stock and sentiment data...")
            df_merged = pd.merge_asof(
                df_stock.sort_values('date'),
                sentiment_daily.sort_values('date'),
                on='date',
                direction='nearest',
                tolerance=pd.Timedelta('3D')
            )
        else:
            print("No stock data to merge with sentiment")
            df_merged = sentiment_daily.copy()
        
        # Merge with economic data
        print("Merging with economic data...")
        df_final = pd.merge_asof(
            df_merged.sort_values('date'),
            df_econ_daily.reset_index().sort_values('date'),
            on='date',
            direction='backward'
        )
        
        # Calculate metrics if we have the required columns
        if not df_final.empty:
            if 'close' in df_final.columns and 'GDP' in df_final.columns:
                df_final['price_gdp_ratio'] = df_final['close'] / df_final['GDP']
            if 'sentiment_mean' in df_final.columns and 'volume' in df_final.columns:
                df_final['sentiment_volume'] = df_final['sentiment_mean'] * df_final['volume']
        
        print("\nMerge completed successfully!")
        print(f"Final dataset size: {len(df_final)} rows")
        print(f"Date range: {df_final['date'].min()} to {df_final['date'].max()}")
        
        return df_final

    except Exception as e:
        print(f"\nError occurred: {str(e)}")
        return None

# Execute the function
final_data = load_and_merge_data()

if final_data is not None and not final_data.empty:
    print("\nFinal merged data sample:")
    print(final_data.head())
else:
    print("\nNo data was returned from the merge operation")

Loading economic data...
Economic data loaded: 8 rows

Loading sentiment data...
Sentiment data loaded: 97 rows

Loading stock data...

Processing sentiment data...
Processing economic data...
No stock data to merge with sentiment
Merging with economic data...

Merge completed successfully!
Final dataset size: 31 rows
Date range: 2025-03-28 00:00:00 to 2025-04-27 00:00:00

Final merged data sample:
        date  sentiment_mean  sentiment_count sentiment_mode        GDP  \
0 2025-03-28        -0.16480                5       negative  29723.864   
1 2025-03-29        -0.11315                2       negative  29723.864   
2 2025-03-30        -0.05160                1       negative  29723.864   
3 2025-03-31        -0.01210                4        neutral  29723.864   
4 2025-04-01        -0.26010                4       negative  29723.864   

   Unemployment_Rate  
0                4.1  
1                4.1  
2                4.1  
3                4.1  
4                4.1  


In [56]:
# First, inspect your table's exact column names
table_info = pd.read_sql("SELECT column_name FROM information_schema.columns WHERE table_name = 'apple_stock'", engine)
print(table_info)

      column_name
0       High_AAPL
1  Adj Close_AAPL
2      Close_AAPL
3     Volume_AAPL
4        Low_AAPL
5       Open_AAPL
6            Date


In [63]:

# Check for missing values in df_merged
missing_values = final_data.isnull().sum()

# Display missing values for each column
print(missing_values)


date                 0
sentiment_mean       2
sentiment_count      0
sentiment_mode       2
GDP                  0
Unemployment_Rate    0
dtype: int64


In [3]:
print(len(final_data))

final_data


31


Unnamed: 0,date,sentiment_mean,sentiment_count,sentiment_mode,GDP,Unemployment_Rate
0,2025-03-28,-0.1648,5,negative,29723.864,4.1
1,2025-03-29,-0.11315,2,negative,29723.864,4.1
2,2025-03-30,-0.0516,1,negative,29723.864,4.1
3,2025-03-31,-0.0121,4,neutral,29723.864,4.1
4,2025-04-01,-0.2601,4,negative,29723.864,4.1
5,2025-04-02,0.075433,3,neutral,29723.864,4.1
6,2025-04-03,-0.138271,7,neutral,29723.864,4.1
7,2025-04-04,-0.007738,8,neutral,29723.864,4.1
8,2025-04-05,0.0,1,neutral,29723.864,4.1
9,2025-04-06,-0.2732,1,negative,29723.864,4.1


In [4]:
#missing values show there is no news those days 
#  Treat missing sentiment as neutral (0)
final_data['sentiment_mean'] = final_data['sentiment_mean'].fillna(0)

In [5]:
final_data

Unnamed: 0,date,sentiment_mean,sentiment_count,sentiment_mode,GDP,Unemployment_Rate
0,2025-03-28,-0.1648,5,negative,29723.864,4.1
1,2025-03-29,-0.11315,2,negative,29723.864,4.1
2,2025-03-30,-0.0516,1,negative,29723.864,4.1
3,2025-03-31,-0.0121,4,neutral,29723.864,4.1
4,2025-04-01,-0.2601,4,negative,29723.864,4.1
5,2025-04-02,0.075433,3,neutral,29723.864,4.1
6,2025-04-03,-0.138271,7,neutral,29723.864,4.1
7,2025-04-04,-0.007738,8,neutral,29723.864,4.1
8,2025-04-05,0.0,1,neutral,29723.864,4.1
9,2025-04-06,-0.2732,1,negative,29723.864,4.1


In [8]:
#save final_data to AWS RDS postgresql


from sqlalchemy import Date, Float, Integer, String
from sqlalchemy.dialects.postgresql import DOUBLE_PRECISION

def save_to_rds(final_data, table_name='merged_financial_data'):
    """
    Save merged DataFrame to AWS RDS PostgreSQL with proper type handling
    
    Args:
        final_data: Merged DataFrame to save
        table_name: Target table name (default: 'merged_financial_data')
    """
    try:
        # 1. Set up RDS connection
        db_username = os.getenv('RDS_USER')
        db_password = os.getenv('RDS_PASSWORD')
        db_host = os.getenv('RDS_HOST')
        db_port = os.getenv('RDS_PORT')
        db_name = os.getenv('RDS_DB')
        
        connection_string = f"postgresql://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}"
        engine = create_engine(connection_string)
        
        # 2. Prepare proper SQLAlchemy data types
        dtype_mapping = {
            'date': Date(),  # SQLAlchemy Date type
            'sentiment_mean': Float(),  # SQLAlchemy Float type
            'sentiment_count': Integer(),  # SQLAlchemy Integer type
            'sentiment_mode': String(20),  # SQLAlchemy String type with length
            'GDP': DOUBLE_PRECISION(),  # PostgreSQL-specific double precision
            'Unemployment_Rate': Float()  # Standard float
        }
        
        # 3. Ensure date column is datetime type
        if 'date' in final_data.columns:
            final_data['date'] = pd.to_datetime(final_data['date']).dt.date
        
        # 4. Create table if not exists
        with engine.connect() as conn:
            table_exists = conn.execute(text(
                f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{table_name}')"
            )).scalar()
            
            if not table_exists:
                create_table_sql = f"""
                CREATE TABLE {table_name} (
                    id SERIAL PRIMARY KEY,
                    date DATE NOT NULL,
                    sentiment_mean FLOAT,
                    sentiment_count INTEGER,
                    sentiment_mode VARCHAR(20),
                    GDP DOUBLE PRECISION,
                    Unemployment_Rate FLOAT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
                CREATE INDEX idx_{table_name}_date ON {table_name}(date);
                """
                conn.execute(text(create_table_sql))
                conn.commit()
                print(f"Created new table: {table_name}")
        
        # 5. Save data to RDS with proper type mapping
        final_data.to_sql(
            name=table_name,
            con=engine,
            if_exists='append',
            index=False,
            dtype=dtype_mapping,
            method='multi',
            chunksize=1000
        )
        
        print(f"Successfully saved {len(final_data)} records to {table_name}")
        return True
    
    except Exception as e:
        print(f"Error saving to RDS: {str(e)}")
        return False
    finally:
        if 'engine' in locals():
            engine.dispose()