In [3]:
from funcs.dvc_funcs import dagshub_initialization, run_dvc_command

# Initialize DVC and MLflow setup
dagshub_initialization()

# Now proceed with the rest of your script
# # Example:
# import mlflow

# # Set MLflow experiment
# mlflow.set_tracking_uri("https://dagshub.com/najibabounasr/Macro_Economic_Forecast_Stage_2.mlflow")
# mlflow.set_experiment("YourExperimentName")

# # Your further code logic here...


DVC remote 'origin' set to https://dagshub.com/najibabounasr/MacroEconomicAPI.dvc


In [2]:
from funcs.process_data_funcs import fetch_data
import pandas as pd
from local_settings import settings
from fredapi import Fred
import os
from funcs.dvc_funcs import run_dvc_command, dagshub_initialization, upload_to_dagshub
from dagshub import get_repo_bucket_client




def fetch_all_data(target_feature):
    # Initialize FRED API with your API key
    fred = Fred(api_key=settings['api_key'])

    # Fetch and store data for each series in a dictionary
    data_frames = {}
    for series_id in settings['series_ids']:
        frequency = settings['frequency_map'].get(series_id, 'm')  # Default to 'm' if not specified
        try:
            data_frame = fetch_data(series_id, frequency)
            if not data_frame.empty:
                data_frames[series_id] = data_frame
        except Exception as e:
            print(f"Error fetching data for {series_id}: {e}")

    # Combine all data into a single DataFrame
    combined_data = pd.concat(data_frames.values(), axis=1, keys=data_frames.keys())
    combined_data = combined_data.asfreq('MS')
    combined_data.index.name = 'Date'

    # Drop a level from the multi-level columns
    combined_data.columns = combined_data.columns.droplevel(1)

    # Ensure the target feature is included in the dataset
    if target_feature not in combined_data.columns:
        valid_features = ", ".join(combined_data.columns)
        raise ValueError(f"Target feature '{target_feature}' is not available in the dataset. Valid features are: {valid_features}")

    # Save raw data locally
    if not os.path.exists('data/raw'):
        os.makedirs('data/raw')
    combined_data.to_csv('data/raw/raw_data.csv')

    # Upload raw data to Dagshub
    upload_to_dagshub('data/raw/raw_data.csv', 'data/raw/raw_data.csv')

    print("Data fetching complete and tracked with DVC and Dagshub.")
    return combined_data

def main():
    dagshub_initialization()
    # This function is used to test the script independently
    target_feature = 'GDP'  # Example target feature
    fetch_all_data(target_feature)

if __name__ == "__main__":
    main()


DVC remote 'origin' set to https://dagshub.com/najibabounasr/MacroEconomicAPI.dvc
Fetching data for FEDFUNDS
Fetching data for GDP
Fetching data for CPIAUCSL
Fetching data for CUSR0000SAH1
Fetching data for CPILFESL
Fetching data for PCE
Fetching data for PRFI
Fetching data for PNFI
Fetching data for EXPGS
Fetching data for HOUST
Fetching data for DSPI
Fetching data for DGS2
Fetching data for DGS5
Fetching data for DGS10
Fetching data for AAA
Fetching data for BAA
Fetching data for WTISPLC
Fetching data for IMPGS
Fetching data for GCE
Fetching data for FGCE
Fetching data for GDPCTPI
Fetching data for PCEPI
Fetching data for PCEPILFE
Fetching data for PAYEMS
Fetching data for UNRATE
Fetching data for INDPRO
Fetching data for CUMFNS
Fetching data for USREC


Data fetching complete and tracked with DVC and Dagshub.


In [30]:
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from funcs.process_data_funcs import (
    impute_missing_values_spline, deflate_nominal_values, apply_log_transformations,
    apply_best_transformations, cap_outliers
)
from funcs.dvc_funcs import dagshub_initialization
from dagshub import get_repo_bucket_client

def process_data(target):
    # Load combined data from CSV
    combined_data = pd.read_csv('data/raw/raw_data.csv', parse_dates=True, index_col='Date')

    # Split data into X (features) and y (target)
    X = combined_data.drop(columns=[target])
    y = combined_data[[target]]

    # Split the data into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=False)

    # Impute missing values using spline interpolation
    quarterly_columns = ['GDP', 'PRFI', 'PNFI', 'EXPGS', 'IMPGS', 'GCE', 'FGCE', 'GDPCTPI']
    treasury_yield_columns = ['DGS2', 'DGS5', 'DGS10']

    for column in quarterly_columns + treasury_yield_columns:
        if column in X_train.columns:
            X_train = impute_missing_values_spline(X_train, column)
            X_test = impute_missing_values_spline(X_test, column)
        elif column in y_train.columns:
            y_train = impute_missing_values_spline(y_train, column)
            y_test = impute_missing_values_spline(y_test, column)

    # Ensure index column is named 'Date'
    for df in [X_train, X_test, y_train, y_test]:
        df.index.name = 'Date'

    # Combine X and y dataframes
    train_combined = pd.concat([X_train, y_train], axis=1)
    test_combined = pd.concat([X_test, y_test], axis=1)

    # Ensure CPIAUCSL is included in the dataframes
    cpi_col_name = 'CPIAUCSL'
    columns_to_deflate = ['GDP', 'PCE', 'PRFI', 'PNFI', 'EXPGS', 'IMPGS', 'GCE', 'FGCE', 'DSPI']

    # Deflate target column if it's in the columns to deflate
    if target in columns_to_deflate:
        deflated_train = deflate_nominal_values(train_combined[[target, cpi_col_name]], cpi_col_name, [target])
        deflated_test = deflate_nominal_values(test_combined[[target, cpi_col_name]], cpi_col_name, [target])
        train_combined[target] = deflated_train[target]
        test_combined[target] = deflated_test[target]
        columns_to_deflate.remove(target)

    # Deflate remaining columns
    train_combined = deflate_nominal_values(train_combined, cpi_col_name, columns_to_deflate)
    test_combined = deflate_nominal_values(test_combined, cpi_col_name, columns_to_deflate)

    # Apply logarithmic transformations
    columns_to_transform = ['GDP', 'PCE', 'PRFI', 'PNFI', 'EXPGS', 'IMPGS', 'GCE', 'FGCE', 'HOUST', 'DSPI']

    if target in columns_to_transform:
        train_combined[target] = apply_log_transformations(train_combined[[target]], [target])
        test_combined[target] = apply_log_transformations(test_combined[[target]], [target])
        columns_to_transform.remove(target)

    train_combined = apply_log_transformations(train_combined, columns_to_transform)
    test_combined = apply_log_transformations(test_combined, columns_to_transform)

    # Standardize/Normalize the Data
    scaler = StandardScaler()
    X_train[X_train.columns] = scaler.fit_transform(X_train)
    X_test[X_test.columns] = scaler.transform(X_test)
    y_train[target] = scaler.fit_transform(y_train[target].values.reshape(-1, 1))
    y_test[target] = scaler.transform(y_test[target].values.reshape(-1, 1))

    # Apply Percentage Change
    X_train_pct_change = X_train.pct_change().dropna()
    X_test_pct_change = X_test.pct_change().dropna()
    y_train_pct_change = y_train.pct_change().dropna()
    y_test_pct_change = y_test.pct_change().dropna()

    # Apply the best transformations
    X_train_transformed = apply_best_transformations(X_train_pct_change)
    X_test_transformed = apply_best_transformations(X_test_pct_change)
    y_train_transformed = apply_best_transformations(y_train_pct_change)
    y_test_transformed = apply_best_transformations(y_test_pct_change)

    # Handle outliers in the transformed data
    X_train_transformed = cap_outliers(X_train_transformed, cap_factor=3.0)
    y_train_transformed = cap_outliers(y_train_transformed, cap_factor=3.0)

    # Combine X and y after final transformations
    train_transformed_combined = pd.concat([X_train_transformed, y_train_transformed], axis=1)
    test_transformed_combined = pd.concat([X_test_transformed, y_test_transformed], axis=1)

    # Save the transformed data locally
    train_transformed_combined.to_csv('data/processed/train_transformed_combined.csv', index=True)
    test_transformed_combined.to_csv('data/processed/test_transformed_combined.csv', index=True)

    # Upload to Dagshub storage
    s3 = get_repo_bucket_client("najibabounasr/MacroEconomicAPI")
    s3.upload_file(
        Bucket="MacroEconomicAPI",
        Filename="data/processed/train_transformed_combined.csv",
        Key="train_transformed_combined.csv",
    )
    s3.upload_file(
        Bucket="MacroEconomicAPI",
        Filename="data/processed/test_transformed_combined.csv",
        Key="test_transformed_combined.csv",
    )

    # Save individual transformed datasets locally
    X_train_transformed.to_csv('data/processed/X_train_transformed.csv', index=True)
    X_test_transformed.to_csv('data/processed/X_test_transformed.csv', index=True)
    y_train_transformed.to_csv('data/processed/y_train_transformed.csv', index=True)
    y_test_transformed.to_csv('data/processed/y_test_transformed.csv', index=True)

if __name__ == "__main__":
    import sys
    dagshub_initialization()
    if len(sys.argv) < 2:
        raise ValueError("No target feature provided. Please specify the target feature.")
    target = sys.argv[1]
    process_data(target)


DVC remote 'origin' set to https://dagshub.com/najibabounasr/MacroEconomicAPI.dvc


KeyError: "['--f=c:\\\\Users\\\\nabounaser\\\\AppData\\\\Roaming\\\\jupyter\\\\runtime\\\\kernel-v2-15444EWxWh1m3A8cQ.json'] not found in axis"