In [1]:
import sys
sys.path.append('../')

from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.preprocessing import FunctionTransformer
from sklearn import set_config
import polars as pl

from base.impactpy.model.transformers import (
	ConstantIntradayVolatilityEstimator,
	AverageDailyVolumeEstimator
)

# set_config(transform_output='polars', enable_metadata_routing=True)

In [3]:
from pathlib import Path
p = Path().resolve().parents[0] / 'data' / 'raw_bin_samples' / 'clean.arrow'
lf = pl.scan_ipc(p)

In [4]:
df = lf.collect()

In [5]:
df

symbol,timestamp,price,trade
cat,datetime[μs],f64,i32
"""ADSK""",2019-01-02 09:30:00,125.205,-287
"""ADSK""",2019-01-02 09:30:10,124.61,20
"""ADSK""",2019-01-02 09:30:20,124.61,0
"""ADSK""",2019-01-02 09:30:30,125.155,200
"""ADSK""",2019-01-02 09:30:40,125.705,-385
…,…,…,…
"""ADS""",2019-12-27 15:59:20,110.915,0
"""ADS""",2019-12-27 15:59:30,110.95,0
"""ADS""",2019-12-27 15:59:40,110.92,0
"""ADS""",2019-12-27 15:59:50,110.91,-2


In [6]:
window = 30

def set_col_routing(trans):
	return (
		trans
		.set_fit_request(cols=True)
		.set_transform_request(cols=True)
	)

vola_trans = set_col_routing(ConstantIntradayVolatilityEstimator(window, 'vola'))
volu_trans = set_col_routing(AverageDailyVolumeEstimator(window, 'volu'))

union = FeatureUnion([
	('original', 'passthrough'),
	('vola_trans', vola_trans),
	('volu_trans', volu_trans),
])

def drop_nulls(df: pl.DataFrame) -> pl.DataFrame:
	return df.drop_nulls()
drop_null_trans = FunctionTransformer(drop_nulls)

pipeline = Pipeline([
	('rolling_estimates', union),
	('drop_nulls', drop_null_trans),
])

cols ={
	'timestamp': 'timestamp',
	'price': 'price',
	'symbol': 'symbol'
}

In [10]:
data_df = pipeline.fit_transform(df, cols=cols)

In [3]:
from typing import Self, Iterable, Sequence
from sklearn.base import TransformerMixin, BaseEstimator, _fit_context # type: ignore
from sklearn.linear_model import LinearRegression, Ridge
import numpy as np

class OWModel(TransformerMixin, BaseEstimator):

	def __init__(
			self,
			half_life: int,
			locality: str,
			bin_duration: int,
			is_persistent: bool = False,
			regularization: float = 0,
	) -> None:

		self.half_life = half_life
		self.locality = locality
		self.bin_duration = bin_duration
		self.is_persistent = is_persistent
		self.regularization = regularization

	_parameter_constraints = {
		'half_life': [int],
		'locality': [str],
		'bin_duration': [int],
		'is_persistent': [bool],
		'regularization': [float],
	}

	@property
	def alpha(self) -> float:
		"""Represents the decay of the impact state from one bin to the next."""
		return 1 - np.exp(-np.log(2) / self.half_life * self.bin_duration)
	
	@property
	def over_cols(self) -> Iterable[str]:
		"""Controls whether impact persists."""
		return ['symbol', 'date'] if self.is_persistent else ['symbol']
	
	### Regression functions
	
	@property
	def ridge_alpha(self) -> float:
		return 0 if self.locality != 'mixed' else self.regularization
	
	def _col_change_expr(self, col: str) -> pl.Expr:
		return pl.col(col).diff(self.half_life // self.bin_duration).over(self.over_cols)
	
	@staticmethod
	def _fit_least_squares(
			cols: Sequence[pl.Series],
			ridge_alpha: float = 0,
	) -> Ridge:

		structs = cols[0]
		X = structs.struct.field('X').to_numpy().reshape(-1, 1)
		y = structs.struct.field('y').to_numpy().reshape(-1, 1)
		return Ridge(alpha=ridge_alpha).fit(X, y) # type: ignore
	
	def _fit_local(self, impact_lf: pl.LazyFrame) -> pl.LazyFrame:
		return (
			impact_lf.with_columns(
				self._col_change_expr('impact').alias('d_impact'),
				self._col_change_expr('price').alias('d_price'),
			)
			.with_columns(X_y=pl.struct(X='d_impact', y='d_price'))
			.group_by('symbol', maintain_order=True)
			.agg(model=pl.map_groups('X_y', OWModel._fit_least_squares))
		)
	
	def _fit_global(self, impact_lf: pl.LazyFrame) -> pl.LazyFrame:
		impact_lf = impact_lf.with_columns(symbol=pl.lit('all').cast(pl.Categorical))
		global_push = self._fit_local(impact_lf)
		n_symbols = self.symbols_.select(pl.len()).item(0, 0)
		return global_push.select(
			model=pl.repeat(pl.col('model'), n_symbols),
			symbol=self.symbols_.select('symbol').to_series()
		)

	def _fit_mixed(self, lf: pl.LazyFrame) -> pl.LazyFrame:
		global_model = OWModel._fit_global(lf)
		...


	def _fit_push(self, impact_lf: pl.LazyFrame) -> pl.LazyFrame:
		fns = {
			'local': self._fit_local,
			'global': self._fit_global,
			'mixed': self._fit_mixed,
		}
		try:
			return fns[self.locality](impact_lf)
		except KeyError:
			raise ValueError(f'Unrecognised locality {self.locality}')

### End regression functions

	def _compute_impact(
			self,
			lf: pl.LazyFrame,
			push: pl.LazyFrame,
	) -> pl.LazyFrame:

		return (
			lf.with_columns(pl.col('trade').mul('vola').truediv('adv'))
			
			# EWM = ax_n-1 + (1-a)x_n, but we want ax_n-1 + x_n to approximate
			# exponential decay of the impact state. So divide by 1-a first.
			.with_columns(norm=self.alpha)
			# Don't divide first impact state by 1-a.
			.with_columns(pl.col('norm').shift().over(self.over_cols).fill_null(1))
			.with_columns(pl.col('trade').truediv('norm'))
			
			.with_columns(pl.col('trade').ewm_mean(
				alpha=self.alpha, adjust=False, ignore_nulls=True
			).over(self.over_cols))
			.join(push, how='left', on='symbol')
			.with_columns(impact=pl.col('trade').mul('push'))
		)

	@_fit_context(prefer_skip_nested_validation=True) # type: ignore
	def fit(self, X: pl.DataFrame, y: None, cols: dict[str, str]) -> Self:
		tidy_lf = X.lazy().rename(cols)
		tidy_lf = tidy_lf.with_columns(date=pl.col('timestamp').dt.date())
		
		# Calculate impact with unit push and calculate changes in impact
		# and absolute returns to perform a simple OLS of the form:
		# return = push * change in impact state + c.
		self.symbols_ = tidy_lf.lazy().select(pl.col('symbol').unique()).collect()
		push = self.symbols_.lazy().with_columns(push=1)
		impact_lf = self._compute_impact(tidy_lf, push)

		push = self._fit_push(impact_lf).collect()
		
		#self.clean_price = tidy_lf.select(['symbol', 'timestamp', 'price']).collect()
		return self

	def transform(self, X: pl.DataFrame, cols: dict[str, str]) -> pl.DataFrame: ...

	def fit_transform( # type: ignore
			self,
			X: pl.DataFrame,
			y: None,
			cols: dict[str, str]
	) -> pl.DataFrame:

		return self.fit(X, y, cols).transform(X, cols)
	
	def score(self, X: pl.LazyFrame, y: None) -> float: ...

In [170]:
class MyTrans(TransformerMixin, BaseEstimator):

	def __init__(self, noise):
		self.noise = noise

	def fit(self, X, y):
		print('fit')
		self.reg_ = LinearRegression().fit(X, y + np.random.normal(0, self.noise, y.shape))
		self.score_ = self.reg_.score(X, y)
		return self
	
	def predict(self, X):
		return X

	def score(self, X, y):
		print('score')
		return self.score_

In [81]:
n = 1_000
x = np.random.normal(0, 1, n)
eps = np.random.normal(0, 1, n)
alpha, beta = 3, 10
y = alpha + beta * x + eps

In [84]:
test_df = (
	pl.LazyFrame({'xx': x, 'y': y})
	.with_row_index('idx')
	.with_columns(group=pl.when(pl.col('idx') < n // 2).then(0).otherwise(1))
)

def func(cols):
	# print(cols)
	X = cols[0].struct.field('x').to_numpy().reshape(-1, 1)
	y = cols[0].struct.field('y').to_numpy().reshape(-1, 1)
	return LinearRegression().fit(X, y)

test_df = (
	test_df
	.with_columns(struct=pl.struct(x='xx', y='y'))
	.group_by('group', maintain_order=True).agg(reg=pl.map_groups('struct', func))
).collect()

In [42]:
test_df

group,reg
i32,object
0,LinearRegression()
1,LinearRegression()


In [172]:
half_trans = FunctionTransformer(lambda x: x * 0.5)
fit_trans = MyTrans(noise=1)

In [173]:
pipe = Pipeline([
	('half_trans', half_trans),
	('fit_trans', fit_trans),
])

In [174]:
pipe.fit(x, y)

fit


In [175]:
from sklearn.model_selection import GridSearchCV

In [176]:
search = GridSearchCV(pipe, {'fit_trans__noise': [1, 10, 100]}, scoring='r2')

In [177]:
search.fit(x, y)

fit
fit
fit
fit
fit
fit
fit
fit
fit
fit
fit
fit
fit
fit
fit
fit


In [178]:
search.cv_results_

{'mean_fit_time': array([0.00277743, 0.00317459, 0.00396791]),
 'std_fit_time': array([0.00067293, 0.00080595, 0.00212826]),
 'mean_score_time': array([0.00079379, 0.00109138, 0.00128956]),
 'std_score_time': array([0.00024312, 0.00037132, 0.00135266]),
 'param_fit_trans__noise': masked_array(data=[1, 10, 100],
              mask=[False, False, False],
        fill_value=999999),
 'params': [{'fit_trans__noise': 1},
  {'fit_trans__noise': 10},
  {'fit_trans__noise': 100}],
 'split0_test_score': array([0.02866599, 0.02866599, 0.02866599]),
 'split1_test_score': array([-0.03696981, -0.03696981, -0.03696981]),
 'split2_test_score': array([0.02895233, 0.02895233, 0.02895233]),
 'split3_test_score': array([0.03072833, 0.03072833, 0.03072833]),
 'split4_test_score': array([0.01603507, 0.01603507, 0.01603507]),
 'mean_test_score': array([0.01348238, 0.01348238, 0.01348238]),
 'std_test_score': array([0.02576519, 0.02576519, 0.02576519]),
 'rank_test_score': array([1, 1, 1])}

In [179]:
pipe.fit(x, y).get_params()['fit_trans'].score_

fit


0.9905965243195198

In [13]:
xx = pl.DataFrame({'symbol': ['a', 'b', 'c']})
yy = pl.LazyFrame({'symbol': ['all'], 'model': [123]})
yy = yy.select(
	model=pl.repeat(pl.col('model'), xx.select(pl.len()).item(0, 0)),
	symbol=xx.select('symbol').to_series()
)
yy.collect()
# xx.with_columns(new_col=pl.lit(yy.select('b').collect())).collect()

model,symbol
i64,str
123,"""a"""
123,"""b"""
123,"""c"""


In [None]:
from impactpy.models.model import OWModel