In [None]:
import pandas as pd
import polars as pl
import numpy as np
import os
import gc
from tqdm import tqdm
import lightgbm as lgb
from lightgbm import LGBMRegressor, log_evaluation
from IPython.display import clear_output
from sklearn.metrics import r2_score
import time

gc.enable()

'''
pd.options.display.max_columns = None
#pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.max_colwidth', None)

#pl.Config.set_tbl_rows(-1)
pl.Config.set_tbl_cols(-1)
pl.Config.set_fmt_str_lengths(10000)
'''

In [None]:
'''
from pathlib import Path
DATA_DIR = Path('/kaggle/input/jane-street-real-time-market-data-forecasting')

date_offset = 1500

is_score_dates = 5

pl_all = pl.scan_parquet(DATA_DIR/"train.parquet").filter(pl.col("date_id") >= date_offset-1).collect()

# make syn_test 
syn_test = pl_all.with_columns(
    # pl.lit(True).alias("is_scored"),
    pl.col('date_id') - date_offset
    ).with_row_index(name="row_id", offset=0)

syn_test = syn_test.with_columns(
    pl.when(pl.col('date_id')<is_score_dates-1).then(pl.lit(False)).otherwise(pl.lit(True)).alias("is_scored")
)

syn_test = syn_test.select(
    ['row_id', 'date_id', 'time_id', 'symbol_id', 'weight', 'is_scored'] + [f'feature_{x:02}' for x in range(79)]
)

syn_test_partition = syn_test.partition_by('date_id', maintain_order=True, as_dict=True)

output_dir = "synthetic_test.parquet"
os.makedirs(output_dir, exist_ok=True)

row_id_offset = syn_test.filter(pl.col('date_id')<0).select('row_id').max().item()
print("row_id_offset:", row_id_offset)

for key, _df in syn_test_partition.items():
    if key[0] >= 0:
        os.makedirs(f"{output_dir}/date_id={key[0]}", exist_ok=True)
        _df = _df.with_columns(pl.col('row_id')-row_id_offset)
        _df.write_parquet(f"{output_dir}/date_id={key[0]}/part-0.parquet")

# make syn_lag

syn_lag = pl_all.select(
    ['date_id', 'time_id', 'symbol_id'] + [f'responder_{x}' for x in range(9)]
).with_columns(pl.col('date_id')-date_offset)

syn_lag = syn_lag.rename({f'responder_{x}': f'responder_{x}_lag_1' for x in range(9)})

syn_lag_partition = syn_lag.partition_by('date_id', maintain_order=True, as_dict=True)

output_dir = "synthetic_lags.parquet"
os.makedirs(output_dir, exist_ok=True)

for key, _df in syn_lag_partition.items():
    os.makedirs(f"{output_dir}/date_id={key[0]+1}", exist_ok=True)
    _df = _df.with_columns(pl.col('date_id')+1)
    _df.write_parquet(f"{output_dir}/date_id={key[0]+1}/part-0.parquet")
'''

In [None]:
path = '/kaggle/input/jane-street-real-time-market-data-forecasting/'

In [None]:
schema_df = pl.read_csv('/kaggle/input/schema/schema.csv')
schema_df

In [None]:
best_params = {
    'val_window_size': 8,
    'training_window_size': 404,
    'learning_rate': 0.04588738403235412,
    'max_depth': 12,
    'min_data_in_leaf': 60,
    'num_leaves': 4763,
    'min_gain_to_split': 0.25,
    'lambda_l1': 4.0,
    'lambda_l2': 1786.5166849320328,
    'feature_fraction': 0.9547872173111335
}

In [None]:
val_window_size = best_params['val_window_size']
training_window_size = best_params['training_window_size']

In [None]:
whole_size = val_window_size + training_window_size
whole_size

In [None]:
#temp_df = pl.read_parquet(path + 'train.parquet/', columns=schema_df.columns).select(pl.all().shrink_dtype()).filter((pl.col('date_id') < 1499)&(pl.col('date_id') >= 1499 - whole_size))
#temp_df

In [None]:
#date_id_max = temp_df['date_id'].max()
#date_id_max

In [None]:
date_id_max = pl.scan_parquet(path + 'train.parquet/').select('date_id').max().collect()['date_id'][0]
#date_id_max

In [None]:
starting_date = date_id_max - val_window_size - training_window_size
starting_date

In [None]:
'''
train_df = temp_df.filter(pl.col('date_id') > starting_date)
del temp_df
gc.collect()
#train_df
'''

In [None]:
def find_dtype(dtype_str):
    if dtype_str == 'Int16':
        return pl.Int16
    elif dtype_str == 'Int8':
        return pl.Int8
    elif dtype_str == 'Float32':
        return pl.Float32

In [None]:

train_df = pl.read_parquet(path + 'train.parquet/', columns=schema_df.columns).select(pl.all().shrink_dtype()).filter(pl.col('date_id') > starting_date)
train_df = train_df.with_columns([
    pl.col(col).cast(find_dtype(schema_df[col][0])) for col in schema_df.columns if col in train_df.columns
])
#train_df


In [None]:
train_df['date_id'].n_unique()

In [None]:
train_df.estimated_size() / 1e9

In [None]:
import kaggle_evaluation.jane_street_inference_server

In [None]:
class TimerCallback:
    def __init__(self, max_time_seconds, loop_start_time):
        self.max_time_seconds = max_time_seconds
        #self.start_time = None
        self.loop_start_time = loop_start_time

    def __call__(self, env):
        #if self.start_time is None:
        #    self.start_time = time.time()

        elapsed_time = time.time() - self.loop_start_time
        if elapsed_time > self.max_time_seconds:
            print(f"Stopping training after {elapsed_time:.2f} seconds.")
            best_iteration = env.model.best_iteration
            best_score = env.model.best_score
            raise lgb.callback.EarlyStopException(best_iteration, best_score)

In [None]:
lags_df = None
streaming_data_y_list = []
streaming_data_X_list = []
#streaming_data_lags_list = []
y_concat = None
X_concat = None
streaming_data_X_concat_list = []
temp_i = 0
model = None

# Replace this function with your inference code.
# You can return either a Pandas or Polars dataframe, though Polars is recommended.
# Each batch of predictions (except the very first) must be returned within 1 minute of the batch features being provided.
def predict(test: pl.DataFrame, lags: pl.DataFrame | None) -> pl.DataFrame | pd.DataFrame:
    """Make a prediction."""
    # All the responders from the previous day are passed in at time_id == 0. We save them in a global variable for access at every time_id.
    # Use them as extra features, if you like.
    
    global lags_df, streaming_data_y_list, streaming_data_X_list, temp_i, streaming_data_lags_list, model, y_concat, X_concat, streaming_data_X_concat_list, train_df

    test_df = test.drop(['row_id', 'is_scored'])
    test_df = test_df.with_columns([
        pl.col(col).cast(find_dtype(schema_df[col][0])) for col in schema_df.columns if col in test_df.columns
    ])    

    streaming_data_X_list.append(test_df)
    
    if lags is not None:
        timer_start_time = time.time()
        date_id = lags['date_id'][0]
        temp_date_id = date_id_max + date_id
        lags_df = lags
        
        y_df = lags_df.select(['date_id', 'time_id', 'symbol_id', 'responder_6_lag_1']).with_columns(pl.col('date_id') - 1).rename({'responder_6_lag_1':'responder_6'})
        y_df = y_df.with_columns([
            pl.col(col).cast(find_dtype(schema_df[col][0])) for col in schema_df.columns if col in y_df.columns
        ])        
        streaming_data_y_list.append(y_df)

        y_concat = pl.concat(streaming_data_y_list)
        X_concat = pl.concat(streaming_data_X_list)
        X_concat = X_concat.with_columns([
            pl.col(col).cast(find_dtype(schema_df[col][0])) for col in schema_df.columns if col in X_concat.columns
        ])        
        streaming_data_X_concat_list.append(X_concat)
        streaming_data_X_list = []

        if len(streaming_data_X_concat_list) > 2:
            del streaming_data_X_concat_list[0], streaming_data_y_list[0]

        val_date_id_cut_lower = date_id - val_window_size        
        training_date_id_cut_lower = val_date_id_cut_lower - training_window_size

        streaming_data_X_concat_concat = pl.concat(streaming_data_X_concat_list)
        streaming_data_X_concat_concat = streaming_data_X_concat_concat.with_columns([
            pl.col(col).cast(find_dtype(schema_df[col][0])) for col in schema_df.columns if col in streaming_data_X_concat_concat.columns
        ])
        
        streaming_data_df = streaming_data_X_concat_concat.join(y_concat, on=['date_id', 'time_id', 'symbol_id'], how='left').drop_nulls(subset=['responder_6'])
        streaming_data_df = streaming_data_df.with_columns([
            pl.col(col).cast(find_dtype(schema_df[col][0])) for col in schema_df.columns if col in streaming_data_df.columns
        ])
        streaming_data_df = streaming_data_df.filter(pl.col('date_id') == date_id - 1).with_columns(pl.col('date_id') + date_id_max + 1)

        starting_date = date_id_max - val_window_size - training_window_size + date_id
        train_df = train_df.filter(pl.col('date_id') > starting_date)
        train_df = pl.concat([train_df, streaming_data_df]).select(schema_df.columns)
        #display(train_df)

        val_data_df = train_df.filter(pl.col('date_id') > train_df['date_id'].max() - val_window_size)
        #print('this is val_data_df')
        #display(val_data_df)
        #print('val_data_df date_id n_unique:', val_data_df['date_id'].n_unique())

        train_data_df = train_df.filter(pl.col('date_id') <= train_df['date_id'].max() - val_window_size).sample(n=1100000)
        #print('this is train_data_df')
        #display(train_data_df)
        #print('train_data_df date_id n_unique:', train_data_df['date_id'].n_unique())
        
        
        
        base_params = {
            'verbosity': -1,
            'device': 'gpu',
            'early_stopping_round': 20,
        }
    
        tuned_params = {
            'learning_rate': best_params['learning_rate'],
            'max_depth': best_params['max_depth'],
            'min_data_in_leaf': best_params['min_data_in_leaf'],
            'num_leaves': best_params['num_leaves'],
            'min_gain_to_split': best_params['min_gain_to_split'],
            'lambda_l1': best_params['lambda_l1'],
            'lambda_l2': best_params['lambda_l2'],
            'feature_fraction': best_params['feature_fraction'],
        }
    
        model = LGBMRegressor(
            **base_params,
            **tuned_params,
            n_estimators=100000
        )
        

        X_train = train_data_df.drop(['date_id', 'time_id', 'symbol_id', 'weight', 'responder_6']).select(pl.all().shrink_dtype()).to_pandas()
        X_val = val_data_df.drop(['date_id', 'time_id', 'symbol_id', 'weight', 'responder_6']).select(pl.all().shrink_dtype()).to_pandas()
        #X_test = test_data_df.drop(['date_id', 'time_id', 'symbol_id', 'weight', 'responder_6']).select(pl.all().shrink_dtype()).to_pandas()
    
        y_train = train_data_df.select('responder_6').to_series().to_pandas()
        y_val = val_data_df.select('responder_6').to_series().to_pandas()
        #y_test = test_data_df.select('responder_6').to_series().to_pandas()
    
        weights_train = train_data_df.select('weight').to_series().to_pandas()
        weights_val = val_data_df.select('weight').to_series().to_pandas()
        #weights_test = test_data_df.select('weight').to_series().to_pandas()

        if temp_i == 0:
            timer_callback = TimerCallback(max_time_seconds=110, loop_start_time=timer_start_time)
        else:
            timer_callback = TimerCallback(max_time_seconds=58, loop_start_time=timer_start_time)

        training_timer_start_time = time.time()
        try:
            #model.fit(X_train, y_train, sample_weight=weights_train, eval_set=[(X_train, y_train), (X_val, y_val)], eval_sample_weight=[weights_train, weights_val])#, callbacks=[log_evaluation(period=10)])
            model.fit(X_train, y_train, sample_weight=weights_train, eval_set=[(X_train, y_train), (X_val, y_val)], eval_sample_weight=[weights_train, weights_val], callbacks=[timer_callback])
        except lgb.callback.EarlyStopException as e:
            print(f"Training stopped. Best iteration: {e.best_iteration}, Best score: {e.best_score}")
        training_timer_end_time = time.time()
        training_time = training_timer_end_time - training_timer_start_time
        print(f"Training time: {training_time:.2f} seconds")
        
        #val_preds = model.predict(X_val)
        #val_score = r2_score(y_val, val_preds, sample_weight=weights_val)
        #print('Val Weighted R2 score is:', val_score)
        

        #del X_train, y_train, X_val, y_val, weights_train, weights_val, window_concat, window_df, concat_join_df, y_concat, X_concat
        gc.collect()
                
        temp_i += 1
        timer_end_time = time.time()

        execution_time = timer_end_time - timer_start_time
        print(f"Execution time: {execution_time:.2f} seconds")
        #print('streaming_data_X_concat_list len:', len(streaming_data_X_concat_list))
        #print('streaming_data_y_list len:', len(streaming_data_y_list))


    #
    test_df = test_df.drop(['date_id', 'time_id', 'symbol_id', 'weight']).select(pl.all().shrink_dtype()).to_numpy()
    
    #
    preds = model.predict(test_df)

    predictions = test.select(
        'row_id',
        pl.lit(preds).alias('responder_6'),
    )

    if isinstance(predictions, pl.DataFrame):
        assert predictions.columns == ['row_id', 'responder_6']
    elif isinstance(predictions, pd.DataFrame):
        assert (predictions.columns == ['row_id', 'responder_6']).all()
    else:
        raise TypeError('The predict function must return a DataFrame')
    # Confirm has as many rows as the test data.
    assert len(predictions) == len(test)

    return predictions

In [None]:
#%%time
inference_server = kaggle_evaluation.jane_street_inference_server.JSInferenceServer(predict)

if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    inference_server.run_local_gateway(
        (
            #'/kaggle/working/synthetic_test.parquet',
            #'/kaggle/working/synthetic_lags.parquet',
            '/kaggle/input/jane-street-real-time-market-data-forecasting/test.parquet',
            '/kaggle/input/jane-street-real-time-market-data-forecasting/lags.parquet',
        )
        
    )