In [1]:
from dask.distributed import Client

# Start Dask client with 4 workers, each having 4GB memory
client = Client(n_workers=2, threads_per_worker=1, memory_limit='8GB')
client  # This will print the dashboard link (e.g., http://localhost:8787/status)

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 2,Total memory: 14.90 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:54346,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 2
Started: Just now,Total memory: 14.90 GiB

0,1
Comm: tcp://127.0.0.1:54357,Total threads: 1
Dashboard: http://127.0.0.1:54358/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:54349,
Local directory: C:\Users\a4293604\AppData\Local\Temp\dask-scratch-space\worker-41nuwxb4,Local directory: C:\Users\a4293604\AppData\Local\Temp\dask-scratch-space\worker-41nuwxb4

0,1
Comm: tcp://127.0.0.1:54360,Total threads: 1
Dashboard: http://127.0.0.1:54361/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:54351,
Local directory: C:\Users\a4293604\AppData\Local\Temp\dask-scratch-space\worker-fl5jr49d,Local directory: C:\Users\a4293604\AppData\Local\Temp\dask-scratch-space\worker-fl5jr49d


In [3]:
import dask.dataframe as dd

# Step 1: Load the dataset
data = dd.read_parquet('TRAIN_Reco_2021_2022_2023.parquet.gzip', blocksize='50MB')

# Step 2: Shuffle the data by 'ID' and create 672 partitions, one for each unique ID
data = data.shuffle(on="ID", npartitions=672)

In [4]:
data.npartitions

672

In [5]:
partition_0 = data.partitions[0].compute()
partition_0



Unnamed: 0_level_0,ID,high,low,close,volume
ExecutionTime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1


In [6]:
# Check the size (number of rows) of each partition
partition_sizes = data.map_partitions(len).compute()
print(partition_sizes)

0           0
1           0
2      103877
3           0
4      103877
        ...  
667         0
668    103877
669    207754
670    207754
671         0
Length: 672, dtype: int64


In [16]:
# Updated feature engineering function for Dask
import pandas as pd


def create_lagged_features(df):
    # Generate lagged features (for the past 10 time steps)
    for lag in range(1, 11):
        df[f'High_lag_{lag}'] = df['high'].shift(lag)
        df[f'Low_lag_{lag}'] = df['low'].shift(lag)
        df[f'Close_lag_{lag}'] = df['close'].shift(lag)
        df[f'Volume_lag_{lag}'] = df['volume'].shift(lag)
    
    return df

def create_rolling_features(df):
    # Rolling window features (mean for a 10-step window)
    df['High_rolling_mean_10'] = df['high'].rolling(window=10).mean()
    df['Low_rolling_mean_10'] = df['low'].rolling(window=10).mean()
    df['Close_rolling_mean_10'] = df['close'].rolling(window=10).mean()
    
    return df

# Assuming `ddf` is your Dask DataFrame
# Apply the lagged features in parallel using `map_partitions`
lagged_df = data.map_partitions(create_lagged_features)

meta = pd.DataFrame({
    'ID': pd.Series(dtype='object'),  # Include 'ID' as an object (or its correct dtype)
    'high': pd.Series(dtype='float64'),
    'low': pd.Series(dtype='float64'),
    'close': pd.Series(dtype='float64'),
    'volume': pd.Series(dtype='float64'),
    # Add the lagged columns in the correct order
    'High_lag_1': pd.Series(dtype='float64'),
    'Low_lag_1': pd.Series(dtype='float64'),
    'Close_lag_1': pd.Series(dtype='float64'),
    'Volume_lag_1': pd.Series(dtype='float64'),
    'High_lag_2': pd.Series(dtype='float64'),
    'Low_lag_2': pd.Series(dtype='float64'),
    'Close_lag_2': pd.Series(dtype='float64'),
    'Volume_lag_2': pd.Series(dtype='float64'),
    'High_lag_3': pd.Series(dtype='float64'),
    'Low_lag_3': pd.Series(dtype='float64'),
    'Close_lag_3': pd.Series(dtype='float64'),
    'Volume_lag_3': pd.Series(dtype='float64'),
    'High_lag_4': pd.Series(dtype='float64'),
    'Low_lag_4': pd.Series(dtype='float64'),
    'Close_lag_4': pd.Series(dtype='float64'),
    'Volume_lag_4': pd.Series(dtype='float64'),
    'High_lag_5': pd.Series(dtype='float64'),
    'Low_lag_5': pd.Series(dtype='float64'),
    'Close_lag_5': pd.Series(dtype='float64'),
    'Volume_lag_5': pd.Series(dtype='float64'),
    'High_lag_6': pd.Series(dtype='float64'),
    'Low_lag_6': pd.Series(dtype='float64'),
    'Close_lag_6': pd.Series(dtype='float64'),
    'Volume_lag_6': pd.Series(dtype='float64'),
    'High_lag_7': pd.Series(dtype='float64'),
    'Low_lag_7': pd.Series(dtype='float64'),
    'Close_lag_7': pd.Series(dtype='float64'),
    'Volume_lag_7': pd.Series(dtype='float64'),
    'High_lag_8': pd.Series(dtype='float64'),
    'Low_lag_8': pd.Series(dtype='float64'),
    'Close_lag_8': pd.Series(dtype='float64'),
    'Volume_lag_8': pd.Series(dtype='float64'),
    'High_lag_9': pd.Series(dtype='float64'),
    'Low_lag_9': pd.Series(dtype='float64'),
    'Close_lag_9': pd.Series(dtype='float64'),
    'Volume_lag_9': pd.Series(dtype='float64'),
    'High_lag_10': pd.Series(dtype='float64'),
    'Low_lag_10': pd.Series(dtype='float64'),
    'Close_lag_10': pd.Series(dtype='float64'),
    'Volume_lag_10': pd.Series(dtype='float64'),
    # Add the rolling mean columns
    'High_rolling_mean_10': pd.Series(dtype='float64'),
    'Low_rolling_mean_10': pd.Series(dtype='float64'),
    'Close_rolling_mean_10': pd.Series(dtype='float64')
})
# Apply the rolling window operation with `map_overlap`
# `map_overlap` is used for operations that need to "look back" across partitions
rolled_df = lagged_df.map_overlap(create_rolling_features, 
                                  before=10,  # Window size for overlap
                                  after=0,  # No overlap after the window
                                  meta=meta)  # Metadata to ensure Dask knows the structure

# Drop NaN values caused by shifting and rolling
clean_df = rolled_df.dropna()

# Trigger the computation (if needed)
# final_df = clean_df.compute()  # You can trigger the computation or write to file


In [17]:
clean_df

Unnamed: 0_level_0,ID,high,low,close,volume,High_lag_1,Low_lag_1,Close_lag_1,Volume_lag_1,High_lag_2,Low_lag_2,Close_lag_2,Volume_lag_2,High_lag_3,Low_lag_3,Close_lag_3,Volume_lag_3,High_lag_4,Low_lag_4,Close_lag_4,Volume_lag_4,High_lag_5,Low_lag_5,Close_lag_5,Volume_lag_5,High_lag_6,Low_lag_6,Close_lag_6,Volume_lag_6,High_lag_7,Low_lag_7,Close_lag_7,Volume_lag_7,High_lag_8,Low_lag_8,Close_lag_8,Volume_lag_8,High_lag_9,Low_lag_9,Close_lag_9,Volume_lag_9,High_lag_10,Low_lag_10,Close_lag_10,Volume_lag_10,High_rolling_mean_10,Low_rolling_mean_10,Close_rolling_mean_10
npartitions=100,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1,Unnamed: 44_level_1,Unnamed: 45_level_1,Unnamed: 46_level_1,Unnamed: 47_level_1,Unnamed: 48_level_1
,object,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [18]:
# Convert Dask DataFrame to Pandas (this requires enough memory, so ensure it's manageable)
import pandas as pd
pandas_data = clean_df.compute()

2024-10-20 16:40:06,664 - distributed.scheduler - ERROR - Task ('repartitiontofewer-9d01a8ee34636910b7f1420ab28438b4', 0) has 25.95 GiB worth of input dependencies, but worker tcp://127.0.0.1:51046 has memory_limit set to 7.45 GiB.


MemoryError: Task ('repartitiontofewer-9d01a8ee34636910b7f1420ab28438b4', 0) has 25.95 GiB worth of input dependencies, but worker tcp://127.0.0.1:51046 has memory_limit set to 7.45 GiB.

In [None]:
# Ensure your date column is properly formatted as datetime
pandas_data['date'] = pd.to_datetime(pandas_data['date'])
pandas_data.set_index('date', inplace=True)

In [None]:
# Apply rolling mean operation using map_overlap
df['rolling_mean'] = df['feature'].map_overlap(lambda x: x.rolling(10).mean(), window=10, meta=('feature', 'f8'))
