In [1]:
class Condition:
    category = None
    operator = None
    value = 0
    next = None
    def __init__(self, category, operator, value, next):
        self.category = category
        self.operator = operator
        self.value = value
        self.next = next

    def next_workflow(self, part):
        part_value = part.category(self.category)
        result = part_value - self.value
        cmp_res = 0
        if result > 0:
            cmp_res = 1
        else:
            cmp_res = -1
        expected_values = {'>': 1, '<': -1}
        if cmp_res == expected_values[self.operator]:
            return self.next
        return None

    @staticmethod
    def from_str(c_str):
        if ':' not in c_str:
            return AlwaysApplyRule(c_str)
        parts = c_str.split(':')
        return Condition(parts[0][0], parts[0][1], int(parts[0][2:]), parts[1])
        
    def __repr__(self):
        return f'{self.category}{self.operator}{self.value}:{self.next}'

In [2]:
class AlwaysApplyRule(Condition):
    def __init__(self, next):
        self.next = next

    def next_workflow(self, part):
        return self.next

    def __repr__(self):
        return self.next

In [3]:
class Workflow:
    name = None
    conditions = []

    def __init__(self, name, conditions):
        self.name = name
        self.conditions = conditions

    def apply(self, part, all_workflows):
        for c in self.conditions:
            res = c.next_workflow(part)
            if res:
                if res in ('A', 'R'):
                    return res == 'A'
                return all_workflows[res].apply(part, all_workflows)

    @staticmethod
    def from_str(w_str):
        name, rest = w_str.split('{')
        conditions = [Condition.from_str(p) for p in rest.rstrip('}').split(',')]
        return Workflow(name, conditions)

    def __repr__(self):
        return '{}{{{}}}'.format(self.name, ','.join(repr(c) for c in self.conditions))
    

In [4]:
class Part:
    categories = None
    
    def __init__(self, categories):
        self.categories = categories

    def category(self, name):
        return self.categories.get(name)

    @staticmethod
    def from_str(p_str):
        parts = [p.split('=') for p in p_str[1:-1].split(',')]
        return Part({ k: int(v) for k, v in parts})

    def __repr__(self):
        cat = ','.join(f'{k}={v}' for k, v in self.categories.items())
        return f'{{{cat}}}'

In [5]:
def get_input(fname='test.txt'):
    workflows, parts = [], []
    is_parts = False
    with open(fname) as f:
        for l in f.readlines():
            line = l.rstrip()
            if not line:
                is_parts = True
                continue
            if is_parts:
                parts.append(Part.from_str(line))
            else:
                workflows.append(Workflow.from_str(line))
    return workflows, parts

In [6]:
test_workflows, test_parts = get_input('test.txt')
my_workflows, my_parts = get_input('input.txt')

In [7]:
Part.from_str('{x=787,m=2655,a=1222,s=2876}')

{x=787,m=2655,a=1222,s=2876}

In [8]:
test_workflows = { w.name: w for w in test_workflows }

In [9]:
test_workflows

{'px': px{a<2006:qkq,m>2090:A,rfg},
 'pv': pv{a>1716:R,A},
 'lnx': lnx{m>1548:A,A},
 'rfg': rfg{s<537:gd,x>2440:R,A},
 'qs': qs{s>3448:A,lnx},
 'qkq': qkq{x<1416:A,crn},
 'crn': crn{x>2662:A,R},
 'in': in{s<1351:px,qqz},
 'qqz': qqz{s>2770:qs,m<1801:hdj,R},
 'gd': gd{a>3333:R,R},
 'hdj': hdj{m>838:A,pv}}

In [10]:
def solve(workflows, parts):
    return [p for p in parts if workflows['in'].apply(p, workflows)]

In [11]:
solve(test_workflows, test_parts)

[{x=787,m=2655,a=1222,s=2876},
 {x=2036,m=264,a=79,s=2244},
 {x=2127,m=1623,a=2188,s=1013}]

In [12]:
sum(sum(p.categories.values()) for p in solve(test_workflows, test_parts))

19114

In [13]:
my_workflows = { w.name: w for w in my_workflows }

In [14]:
sum(sum(p.categories.values()) for p in solve(my_workflows, my_parts))

377025

In [15]:
class Range:
    min = None
    max = None

    def __init__(self, min=1, max=4000):
        self.min = min
        self.max = max

    def __repr__(self):
        return '[{}:{}]'.format(self.min or '-', self.max or '-')

    def is_bound(self):
        return self.min is not None and self.max is not None

    def is_valid(self):
        return not self.is_bound() or self.min <= self.max

    def length(self):
        return self.max - self.min + 1

In [16]:
Range()

[1:4000]

In [17]:
def merge(range1, range2):
    lft = None
    for m in [range1.min, range2.min]:
        if m:
            lft = max(lft or m, m)
    rgt = None
    for m in [range1.max, range2.max]:
        if m:
            rgt = min(rgt or m, m)
    return Range(lft, rgt)

In [18]:
def opposite_range(condition):
    # category, operator, value
    if condition.operator is None:
        return Range()
    elif condition.operator == '<': # >=
        return Range(min=condition.value)
    else: # <=
        return Range(max=condition.value)

In [19]:
def normal_range(condition):
    if condition.operator is None:
        return Range()
    elif condition.operator == '<':
        return Range(max=condition.value-1)
    else: # <=
        return Range(min=condition.value+1)

In [20]:
from collections import deque

In [21]:
def deep_copy(part_ranges):
    new_part_ranges = {}
    for k, r in part_ranges.items():
        new_part_ranges[k] = Range(min=r.min, max=r.max)
    return new_part_ranges

In [22]:
def all_possibilities(workflows):
    condition_ranges = {
        'x': Range(),
        'm': Range(),
        'a': Range(),
        's': Range(),
    } # all values
    q = deque([('in', condition_ranges)])
    finished = {
        'A': [],
        'R': []
    }
    while q:
        wf_name, c_ranges = q.popleft()
        if wf_name in ('A', 'R'):
            finished[wf_name].append(c_ranges)
            continue
        workflow = workflows[wf_name]
        for condition in workflow.conditions:
            if condition.operator is None:
                # last condition, must pass
                q.append((condition.next, deep_copy(c_ranges)))
            else:
                # 1 - pass
                cat_range = merge(c_ranges[condition.category], normal_range(condition))
                if cat_range.is_valid():
                    next_ranges = deep_copy(c_ranges)
                    next_ranges[condition.category] = cat_range
                    q.append((condition.next, next_ranges))
                # 2 - fail
                cat_range = merge(c_ranges[condition.category], opposite_range(condition))
                if not cat_range.is_valid():
                    break
                c_ranges[condition.category] = cat_range
                    
    accepted = finished['A']
    total = 0
    for a in accepted:
        t = 1
        for c in 'xmas':
            t *= a[c].length()
        total += t
    return total

In [23]:
all_possibilities(test_workflows)

167409079868000

In [24]:
all_possibilities(my_workflows)

135506683246673