In [1]:
from typing import List
import polars as pl
from typing import TypedDict, List, Optional
from functools import reduce

class FilterDataItem(TypedDict):
    rule: str = ''
    first_act: str = ''
    second_act: Optional[str] = ''

FilterData = List[FilterDataItem]

DATE_FORMAT = "%Y-%m-%d %H:%M:%S%.f"

TIMESTAMP_NAME = 'dataFinal'

FILE_PATH = './unidade_teste_1.csv'
df = (pl.read_csv(FILE_PATH)
        .with_columns(pl.col([TIMESTAMP_NAME])
        .str.strptime(pl.Datetime, format=DATE_FORMAT)))

rules: FilterData = [
	{
    'rule': 'notCoExistence',
    'first_act': 'Distribuição',
    'second_act': 'Conclusão'
	},
	{
    'rule': 'init',
    'first_act': 'Distribuição',
    'second_act': ''
	},
	{
		'rule': 'atLeastOne',
    'first_act': 'Distribuição',
    'second_act': ''
	}
]


In [2]:
def declare(filter_data: FilterData = [], activities: List[str] = []):
	first_act, second_act = filter_data['first_act'], filter_data['second_act']
	match filter_data['rule']:
		case 'init':
			return first_act == activities[0]
		case 'atLeastOne':
			return first_act in activities
		case 'atMostOne':
			return activities.count() <= 1
		case 'end':
			return activities[-1] == first_act
		case 'absence':
			return activities.count() == 0
		case 'respondedExistence':
			return (first_act in activities and second_act in activities) or second_act in activities
		case 'response':
			a_indices = []
			for i, x in enumerate(activities):
				if x == first_act:	a_indices.append(i)
				if x == second_act and len(a_indices) is not None:	return True
			if(len(a_indices) == 0):	return True
			return False
		case 'alternateResponse':
			a_index = None
			for i, x in enumerate(activities):
				if x == first_act and a_index == None:	a_index = i
				if x == second_act and a_index is not None:
					if first_act in activities[a_index:i]: return False
					a_index = None
			if a_index is not None: return False
			return True
		case 'chainResponse':
			if activities[-1] == first_act: return False
			a_indices = [i for i, x in enumerate(activities) if x == first_act]
			b_indices = b_indices.map(lambda x: x+1, a_indices)
			for i in b_indices:
				if activities[i] != second_act: return False
			return True
		case 'precedence':
			return first_act in activities[:activities.index(second_act)]
		case 'alternatePrecedence':
			a_index = None
			for i, x in enumerate(activities):
				if x == first_act: a_index = i
				if x == second_act and a_index is None: return False
				if x == second_act and a_index is not None: a_index = None
			return True
		case 'chainPrecedence':
			if activities[0] == second_act: return False
			for i, x in enumerate(activities):
				if x == second_act and activities[i-1] != first_act: return False
			return True
		case 'coExistence':
			return first_act in activities and second_act in activities
		case 'succesion':
			for i, x in enumerate(activities):
				if x == first_act and second_act not in activities[i:]: return False
			return True
		case 'alternateSuccession':
			a_index = None
			for i, x in enumerate(activities):
				if x == first_act and a_index == None:	a_index = i
				if x == second_act and a_index is not None:
					if first_act in activities[a_index:i]: return False
					a_index = None
			if a_index is not None: return False
			return True
		case 'chainSuccession':
			for i, x in enumerate(activities):
				if x == first_act and activities[i+1] != second_act: return False
			return True
		case 'notCoExistence':
			return first_act not in activities or second_act not in activities
		case 'notSuccesion':
			return second_act not in activities[activities.index(first_act):]
		case 'notChainSuccesion':
			for i, x in enumerate(activities):
				if x == first_act and activities[i+1] == second_act: return False
			return True

In [3]:
filters = []

for rule in rules:
	filters.append(
			pl.all_horizontal(
					pl.col('activity').map_elements(
							lambda x: declare(rule, x), return_dtype=pl.Boolean
					)
			)
	)

combined_declare_rules = reduce(lambda acc, f: acc & f, filters)

dicts = (
    df.lazy()
		.group_by(pl.col('NPU'))
		.agg(
			pl.col('activity'),
			pl.col('processoID').alias('ids'),
		)
    .filter(combined_declare_rules)
		.collect()
)

dicts

NPU,activity,ids
str,list[str],list[i64]
"""0011065-00.202…","[""Distribuição"", ""Audiência"", … ""Definitivo""]","[24260513, 24260513, … 24260513]"
"""0024342-00.202…","[""Distribuição"", ""Audiência"", … ""Definitivo""]","[25240553, 25240553, … 25240553]"
"""0009994-00.201…","[""Distribuição"", ""Audiência"", … ""Definitivo""]","[25143233, 25143233, … 25143233]"
"""0036828-00.201…","[""Distribuição"", ""Audiência"", … ""Definitivo""]","[27087151, 27087151, … 27087151]"
"""0017554-00.202…","[""Conclusão"", ""Distribuição"", … ""Definitivo""]","[23981891, 23981891, … 23981891]"
…,…,…
"""0020048-00.202…","[""Petição"", ""Distribuição"", … ""Definitivo""]","[23337929, 23337929, … 23337929]"
"""0016802-00.202…","[""Distribuição"", ""Conclusão"", … ""Definitivo""]","[23916729, 23916729, … 23916729]"
"""0004468-00.201…","[""Distribuição"", ""Audiência"", … ""Definitivo""]","[25320075, 25320075, … 25320075]"
"""0054349-00.201…","[""Distribuição"", ""Conclusão"", … ""Definitivo""]","[27716095, 27716095, … 27716095]"
