https://www.kaggle.com/datasets/borismarjanovic/price-volume-data-for-all-us-stocks-etfs

## Without parallelism

In [3]:
import pandas as pd
import numpy as np
from sklearn.feature_selection import SelectKBest, f_regression
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVR
import os
from tqdm import tqdm
import joblib

def load_data(filename):
    try:
        df = pd.read_csv(filename, parse_dates=['Date'])
        if df.empty:
            raise ValueError("File is empty")
        df.set_index('Date', inplace=True)
        return df
    except pd.errors.EmptyDataError:
        raise ValueError("File is empty")
    except Exception as e:
        raise ValueError(f"Error loading file: {str(e)}")

def add_features(df):
    df['MA5'] = df['Close'].rolling(window=5).mean()
    df['MA20'] = df['Close'].rolling(window=20).mean()
    df['LR_Slope'] = df['Close'].rolling(window=20).apply(lambda x: np.polyfit(range(20), x, 1)[0])
    
    delta = df['Close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / loss
    df['RSI'] = 100 - (100 / (1 + rs))
    
    exp1 = df['Close'].ewm(span=12, adjust=False).mean()
    exp2 = df['Close'].ewm(span=26, adjust=False).mean()
    df['MACD'] = exp1 - exp2
    
    df.dropna(inplace=True)
    return df

def feature_selection(df):
    X = df[['MA5', 'MA20', 'LR_Slope', 'RSI', 'MACD']]
    y = df['Close']
    selector = SelectKBest(score_func=f_regression, k=3)
    X_new = selector.fit_transform(X, y)
    selected_features = X.columns[selector.get_support()].tolist()
    return df[selected_features + ['Close']]

def train_models(df):
    X = df.drop('Close', axis=1)
    y = df['Close']
    
    models = {
        'RandomForest': RandomForestRegressor(n_estimators=100, random_state=42),
        'LinearRegression': LinearRegression(),
        'SVR': SVR(kernel='rbf')
    }
    
    results = {}
    for name, model in models.items():
        scores = cross_val_score(model, X, y, cv=5, scoring='neg_mean_squared_error')
        results[name] = -scores.mean()
    
    best_model_name = min(results, key=results.get)
    best_model = models[best_model_name]
    best_model.fit(X, y)
    
    return best_model, best_model_name, X.columns.tolist()

def process_file(filename):
    try:
        df = load_data(filename)
        if len(df) < 20:  # Not enough data to compute features
            return f"Skipped {filename}: Not enough data"
        
        df = add_features(df)
        df = feature_selection(df)
        best_model, model_name, features = train_models(df)
        
        output_dir = 'output'
        os.makedirs(output_dir, exist_ok=True)
        
        base_filename = os.path.splitext(os.path.basename(filename))[0]
        model_filename = f"{output_dir}/{base_filename}_model.joblib"
        joblib.dump((best_model, model_name, features), model_filename)
        
        return f"Processed {filename}, best model: {model_name}, features: {features}"
    except ValueError as e:
        return f"Skipped {filename}: {str(e)}"
    except Exception as e:
        return f"Error processing {filename}: {str(e)}"

if __name__ == "__main__":
    # Get all file paths
    stock_files = [os.path.join('data/stock', f) for f in os.listdir('data/stock') if f.endswith('.txt')]
    etf_files = [os.path.join('data/etfs', f) for f in os.listdir('data/etfs') if f.endswith('.txt')]
    all_files = stock_files + etf_files
    
    # Process all files sequentially
    for file in tqdm(all_files[:10]):
        result = process_file(file)

 10%|█         | 1/10 [00:06<01:00,  6.78s/it]

Processed data/stock\a.us.txt, best model: LinearRegression, features: ['MA5', 'MA20', 'MACD']


 20%|██        | 2/10 [00:31<02:16, 17.09s/it]

Processed data/stock\aa.us.txt, best model: LinearRegression, features: ['MA5', 'MA20', 'MACD']


 30%|███       | 3/10 [00:31<01:07,  9.60s/it]

Processed data/stock\aaap.us.txt, best model: LinearRegression, features: ['MA5', 'MA20', 'MACD']


 40%|████      | 4/10 [00:40<00:55,  9.25s/it]

Processed data/stock\aaba.us.txt, best model: LinearRegression, features: ['MA5', 'MA20', 'MACD']


 50%|█████     | 5/10 [00:41<00:31,  6.28s/it]

Processed data/stock\aac.us.txt, best model: LinearRegression, features: ['MA5', 'MA20', 'MACD']


 60%|██████    | 6/10 [00:42<00:18,  4.62s/it]

Processed data/stock\aal.us.txt, best model: LinearRegression, features: ['MA5', 'MA20', 'MACD']


 70%|███████   | 7/10 [00:44<00:10,  3.65s/it]

Processed data/stock\aamc.us.txt, best model: LinearRegression, features: ['MA5', 'MA20', 'MACD']


 80%|████████  | 8/10 [00:48<00:07,  3.62s/it]

Processed data/stock\aame.us.txt, best model: LinearRegression, features: ['MA5', 'MA20', 'MACD']


 90%|█████████ | 9/10 [00:54<00:04,  4.55s/it]

Processed data/stock\aan.us.txt, best model: LinearRegression, features: ['MA5', 'MA20', 'MACD']


100%|██████████| 10/10 [00:56<00:00,  5.66s/it]

Processed data/stock\aaoi.us.txt, best model: LinearRegression, features: ['MA5', 'MA20', 'MACD']





Let's say it would take around 1 Minute to run 10 Files. So, for 8539 Files, it would take around 853.9 Minutes.
We can derive the time it would take to run 8539 Files using the following formula:

Complexity of the code is $O(F \cdot k \cdot n^2 \cdot m)$, where $F$ is the number of files, $k$ is the number of stocks, $n$ is the number of days, and $m$ is the number of features.

Complexity of the parallel code is $O\left(\frac{F \cdot k \cdot n^2 \cdot m}{P}\right)$, where $P$ is the number of processes.
Which in my case is 16. So, the speedup would be:

$$
\text{Speedup} = \frac{O(F \cdot k \cdot n^2 \cdot m)}{O\left(\frac{F \cdot k \cdot n^2 \cdot m}{P}\right)}
$$

We know that the parallel code took around 57 minutes to run 8539 files using 16 processors. So we can calculate the time it would take for the non-parallel code to run 8539 files:

$$
\text{Time for serial code} = 16 \times \text{Time for parallel code}
$$

Substituting the values:

$$
\text{Time for serial code} = 16 \times 57 \text{ minutes} = 912 \text{ minutes}
$$

Therefore, the non-parallel code would take approximately 912 minutes to process 8539 files.
Which is pretty close to our assumption of 853.9 minutes.

In [None]:
import dask.dataframe as dd
import dask.bag as db
from dask.distributed import Client, LocalCluster
import pandas as pd
import numpy as np

from sklearn.model_selection import cross_val_score
from sklearn.feature_selection import SelectKBest, f_regression
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVR
import os
import joblib

def load_data(filename: str) -> pd.DataFrame:
    try:
        df = pd.read_csv(filename, parse_dates=['Date'])
        if df.empty:
            raise ValueError("File is empty")
        df.set_index('Date', inplace=True)
        return df
    except pd.errors.EmptyDataError:
        raise ValueError("File is empty")
    except Exception as e:
        raise ValueError(f"Error loading file: {str(e)}")

def add_features(df: pd.DataFrame) -> pd.DataFrame:
    df['MA5'] = df['Close'].rolling(window=5).mean()
    df['MA20'] = df['Close'].rolling(window=20).mean()
    df['LR_Slope'] = df['Close'].rolling(window=20).apply(lambda x: np.polyfit(range(20), x, 1)[0])
    
    delta = df['Close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / loss
    df['RSI'] = 100 - (100 / (1 + rs))
    
    exp1 = df['Close'].ewm(span=12, adjust=False).mean()
    exp2 = df['Close'].ewm(span=26, adjust=False).mean()
    df['MACD'] = exp1 - exp2
    
    df.dropna(inplace=True)
    return df

def feature_selection(df: pd.DataFrame) -> pd.DataFrame:
    X = df[['MA5', 'MA20', 'LR_Slope', 'RSI', 'MACD']]
    y = df['Close']
    selector = SelectKBest(score_func=f_regression, k=3)
    X_new = selector.fit_transform(X, y)
    selected_features = X.columns[selector.get_support()].tolist()
    return df[selected_features + ['Close']]

def train_models(df: pd.DataFrame) -> tuple:
    X = df.drop('Close', axis=1)
    y = df['Close']
    
    models = {
        'RandomForest': RandomForestRegressor(n_estimators=100, random_state=42),
        'LinearRegression': LinearRegression(),
        'SVR': SVR(kernel='rbf')
    }
    
    results = {}
    for name, model in models.items():
        scores = cross_val_score(model, X, y, cv=5, scoring='neg_mean_squared_error')
        results[name] = -scores.mean()
    
    best_model_name = min(results, key=results.get)
    best_model = models[best_model_name]
    best_model.fit(X, y)
    
    return best_model, best_model_name, X.columns.tolist()

def process_file(filename: str) -> str:
    try:
        df = load_data(filename)
        if len(df) < 20:  # Not enough data to compute features
            return f"Skipped {filename}: Not enough data"
        
        df = add_features(df)
        df = feature_selection(df)
        best_model, model_name, features = train_models(df)
        
        output_dir = 'output'
        os.makedirs(output_dir, exist_ok=True)
        
        base_filename = os.path.splitext(os.path.basename(filename))[0]
        model_filename = f"{output_dir}/{base_filename}_model.joblib"
        joblib.dump((best_model, model_name, features), model_filename)
        
        return f"Processed {filename}, best model: {model_name}, features: {features}"
    except ValueError as e:
        return f"Skipped {filename}: {str(e)}"
    except Exception as e:
        return f"Error processing {filename}: {str(e)}"

if __name__ == "__main__":
    # Set up Dask client
    cluster = LocalCluster()
    client = Client(cluster)
    print(f"Dask dashboard available at: {client.dashboard_link}")
    
    # Get all file paths
    stock_files = [os.path.join('data/stock', f) for f in os.listdir('data/stock') if f.endswith('.txt')]
    etf_files = [os.path.join('data/etfs', f) for f in os.listdir('data/etfs') if f.endswith('.txt')]
    all_files = stock_files + etf_files
    print(f"Total files: {len(all_files)}")
    # Create a Dask bag from the file list
    bag = db.from_sequence(all_files)
    
    # Process all files in parallel
    bag.map(process_file).compute()

    

Perhaps you already have a cluster running?
Hosting the HTTP server on port 51642 instead


Dask dashboard available at: http://127.0.0.1:51642/status
Total files: 8539
