# Redux + Saga
- Redux = Reducer + Flux
- Reducer = reduce state history + current state -> new State
### Redux is to revive Flux pattern with simplifications
- Remove dispatcher
- action events send directly to store and processed by reducer.
- Single store instead of multiple stores in Flux


In [1]:
# Redux only

from typing import Dict, Any, Callable
from enum import Enum

# Action Types
class ActionType(Enum):
    INCREMENT = 'INCREMENT'
    DECREMENT = 'DECREMENT'
    SET_USER = 'SET_USER'

# Action Creators
def increment():
    return {'type': ActionType.INCREMENT}

def decrement():
    return {'type': ActionType.DECREMENT}

def set_user(name: str):
    return {'type': ActionType.SET_USER, 'payload': name}

# Reducers
def counter_reducer(state: int, action: Dict[str, Any]) -> int:
    if action['type'] == ActionType.INCREMENT:
        return state + 1
    elif action['type'] == ActionType.DECREMENT:
        return state - 1
    return state

def user_reducer(state: Dict[str, Any], action: Dict[str, Any]) -> Dict[str, Any]:
    if action['type'] == ActionType.SET_USER:
        return {'name': action['payload']}
    return state

# Combine reducers
def root_reducer(state: Dict[str, Any], action: Dict[str, Any]) -> Dict[str, Any]:
    return {
        'counter': counter_reducer(state.get('counter', 0), action),
        'user': user_reducer(state.get('user', {}), action)
    }

# Store
class Store:
    def __init__(self, reducer: Callable, initial_state: Dict[str, Any]):
        self._state = initial_state
        self._reducer = reducer
        self._listeners = []

    def get_state(self) -> Dict[str, Any]:
        return self._state

    def dispatch(self, action: Dict[str, Any]):
        self._state = self._reducer(self._state, action)
        for listener in self._listeners:
            listener()

    def subscribe(self, listener: Callable):
        self._listeners.append(listener)
        return lambda: self._listeners.remove(listener)

# Usage
if __name__ == "__main__":
    # Create the store
    initial_state = {'counter': 0, 'user': {}}
    store = Store(root_reducer, initial_state)

    # Subscribe to state changes
    def listener():
        print("Current state:", store.get_state())

    unsubscribe = store.subscribe(listener)

    # Dispatch actions
    store.dispatch(increment())
    store.dispatch(increment())
    store.dispatch(decrement())
    store.dispatch(set_user("Alice"))

    # Unsubscribe
    unsubscribe()

    # Dispatch one more action (listener won't be called)
    store.dispatch(increment())

    # Final state
    print("Final state:", store.get_state())

Current state: {'counter': 1, 'user': {}}
Current state: {'counter': 2, 'user': {}}
Current state: {'counter': 1, 'user': {}}
Current state: {'counter': 1, 'user': {'name': 'Alice'}}
Final state: {'counter': 2, 'user': {'name': 'Alice'}}


In [1]:
# Redux + Saga

import asyncio
from typing import Dict, Any, Callable, Coroutine

# Actions
FETCH_USER = 'FETCH_USER'
FETCH_USER_SUCCESS = 'FETCH_USER_SUCCESS'
FETCH_USER_FAILURE = 'FETCH_USER_FAILURE'

# Action creators
def fetch_user(user_id: int):
    return {'type': FETCH_USER, 'payload': user_id}

def fetch_user_success(user: Dict[str, Any]):
    return {'type': FETCH_USER_SUCCESS, 'payload': user}

def fetch_user_failure(error: str):
    return {'type': FETCH_USER_FAILURE, 'payload': error}

# Mock API
async def fetch_user_from_api(user_id: int) -> Dict[str, Any]:
    # Simulate API call
    await asyncio.sleep(1)
    return {'id': user_id, 'name': f'User {user_id}'}

# Saga
async def fetch_user_saga(action: Dict[str, Any], dispatch: Callable):
    try:
        user = await fetch_user_from_api(action['payload'])
        dispatch(fetch_user_success(user))
    except Exception as e:
        dispatch(fetch_user_failure(str(e)))

# Redux-like store
class Store:
    def __init__(self, reducer: Callable, initial_state: Dict[str, Any]):
        self.state = initial_state
        self.reducer = reducer
        self.listeners = []
        self.sagas = {}

    def get_state(self):
        return self.state

    def dispatch(self, action: Dict[str, Any]):
        self.state = self.reducer(self.state, action)
        for listener in self.listeners:
            listener()

        # Run saga if exists for this action type
        if action['type'] in self.sagas:
            asyncio.create_task(self.sagas[action['type']](action, self.dispatch))

    def subscribe(self, listener: Callable):
        self.listeners.append(listener)

    def add_saga(self, action_type: str, saga: Coroutine):
        self.sagas[action_type] = saga

# Reducer
def user_reducer(state: Dict[str, Any], action: Dict[str, Any]) -> Dict[str, Any]:
    if action['type'] == FETCH_USER:
        return {**state, 'loading': True}
    elif action['type'] == FETCH_USER_SUCCESS:
        return {**state, 'user': action['payload'], 'loading': False}
    elif action['type'] == FETCH_USER_FAILURE:
        return {**state, 'error': action['payload'], 'loading': False}
    return state

# Usage
async def main():
    initial_state = {'user': None, 'loading': False, 'error': None}
    store = Store(user_reducer, initial_state)
    
    # Add saga
    store.add_saga(FETCH_USER, fetch_user_saga)

    # Subscribe to state changes
    def listener():
        print("State changed:", store.get_state())
    store.subscribe(listener)

    # Dispatch action
    store.dispatch(fetch_user(1))

    # Wait for saga to complete
    await asyncio.sleep(2)

if __name__ == "__main__":
    await main()

State changed: {'user': None, 'loading': True, 'error': None}
State changed: {'user': {'id': 1, 'name': 'User 1'}, 'loading': False, 'error': None}


In [5]:
# Saga Composition

import asyncio
from typing import Dict, Any, Callable, Coroutine
from enum import Enum

# Action Types
class ActionType(Enum):
    USER_FETCH_REQUESTED = 'USER_FETCH_REQUESTED'
    USER_FETCH_SUCCEEDED = 'USER_FETCH_SUCCEEDED'
    USER_FETCH_FAILED = 'USER_FETCH_FAILED'
    USER_POSTS_FETCH_SUCCEEDED = 'USER_POSTS_FETCH_SUCCEEDED'
    USER_POSTS_FETCH_FAILED = 'USER_POSTS_FETCH_FAILED'

# Mock API
class API:
    @staticmethod
    async def fetch_user(user_id: int) -> Dict[str, Any]:
        await asyncio.sleep(1)  # Simulate API delay
        return {'id': user_id, 'name': f'User {user_id}'}

    @staticmethod
    async def fetch_posts(user_id: int) -> list[Dict[str, Any]]:
        await asyncio.sleep(1)  # Simulate API delay
        return [{'id': i, 'title': f'Post {i} by User {user_id}'} for i in range(3)]

# Saga-like coroutines
async def fetch_user(action: Dict[str, Any], dispatch: Callable):
    try:
        user = await API.fetch_user(action['payload']['userId'])
        dispatch({'type': ActionType.USER_FETCH_SUCCEEDED, 'payload': user})
    except Exception as e:
        dispatch({'type': ActionType.USER_FETCH_FAILED, 'payload': str(e)})

async def fetch_user_posts(action: Dict[str, Any], dispatch: Callable):
    try:
        posts = await API.fetch_posts(action['payload']['userId'])
        dispatch({'type': ActionType.USER_POSTS_FETCH_SUCCEEDED, 'payload': posts})
    except Exception as e:
        dispatch({'type': ActionType.USER_POSTS_FETCH_FAILED, 'payload': str(e)})

async def fetch_user_with_posts(action: Dict[str, Any], dispatch: Callable):
    await fetch_user(action, dispatch)
    await fetch_user_posts(action, dispatch)

# Watcher coroutine
async def watch_fetch_user(get_action: Callable[[], Coroutine], dispatch: Callable):
    while True:
        action = await get_action()
        if action['type'] == ActionType.USER_FETCH_REQUESTED:
            await fetch_user_with_posts(action, dispatch)

# Root saga
async def root_saga(get_action: Callable[[], Coroutine], dispatch: Callable):
    await asyncio.gather(
        watch_fetch_user(get_action, dispatch),
        # Other watchers can be added here
    )

# Simple store implementation
class Store:
    def __init__(self, reducer: Callable, initial_state: Dict[str, Any]):
        self._state = initial_state
        self._reducer = reducer
        self._listeners = []
        self._action_queue = asyncio.Queue()

    def get_state(self) -> Dict[str, Any]:
        return self._state

    def dispatch(self, action: Dict[str, Any]):
        self._state = self._reducer(self._state, action)
        for listener in self._listeners:
            listener()
        self._action_queue.put_nowait(action)

    def subscribe(self, listener: Callable):
        self._listeners.append(listener)

    async def get_action(self):
        return await self._action_queue.get()

# Example usage
async def main():
    def reducer(state, action):
        # Implement your reducer logic here
        return state

    store = Store(reducer, {'user': None, 'posts': []})

    def listener():
        print("State updated:", store.get_state())

    store.subscribe(listener)

    # Start the saga
    saga_task = asyncio.create_task(root_saga(store.get_action, store.dispatch))

    # Dispatch an action
    store.dispatch({
        'type': ActionType.USER_FETCH_REQUESTED,
        'payload': {'userId': 1}
    })

    # Run for a few seconds
    await asyncio.sleep(3)

    # Cancel the saga
    saga_task.cancel()
    try:
        await saga_task
    except asyncio.CancelledError:
        print("Saga was cancelled")

if __name__ == "__main__":
    await main()

State updated: {'user': None, 'posts': []}
State updated: {'user': None, 'posts': []}
State updated: {'user': None, 'posts': []}
Saga was cancelled


In [6]:
# Redux Thunk

from typing import Dict, Any, Callable
from enum import Enum
import asyncio

# Action Types
class ActionType(Enum):
    FETCH_USER_REQUEST = 'FETCH_USER_REQUEST'
    FETCH_USER_SUCCESS = 'FETCH_USER_SUCCESS'
    FETCH_USER_FAILURE = 'FETCH_USER_FAILURE'

# Action Creators
def fetch_user_request():
    return {'type': ActionType.FETCH_USER_REQUEST}

def fetch_user_success(user):
    return {'type': ActionType.FETCH_USER_SUCCESS, 'payload': user}

def fetch_user_failure(error):
    return {'type': ActionType.FETCH_USER_FAILURE, 'payload': error}

# Thunk Action Creator
def fetch_user(user_id: int):
    # This function is a thunk
    async def thunk(dispatch: Callable, get_state: Callable):
        dispatch(fetch_user_request())
        try:
            # Simulate API call
            await asyncio.sleep(1)
            user = {'id': user_id, 'name': f'User {user_id}'}
            dispatch(fetch_user_success(user))
        except Exception as e:
            dispatch(fetch_user_failure(str(e)))
    return thunk

# Reducer
def user_reducer(state: Dict[str, Any], action: Dict[str, Any]) -> Dict[str, Any]:
    if action['type'] == ActionType.FETCH_USER_REQUEST:
        return {**state, 'loading': True}
    elif action['type'] == ActionType.FETCH_USER_SUCCESS:
        return {**state, 'loading': False, 'data': action['payload']}
    elif action['type'] == ActionType.FETCH_USER_FAILURE:
        return {**state, 'loading': False, 'error': action['payload']}
    return state

# Store with Thunk support
class Store:
    def __init__(self, reducer: Callable, initial_state: Dict[str, Any]):
        self._state = initial_state
        self._reducer = reducer
        self._listeners = []

    def get_state(self) -> Dict[str, Any]:
        return self._state

    async def dispatch(self, action: Dict[str, Any] or Callable):
        if callable(action):
            # If action is a function (thunk), call it with dispatch and get_state
            await action(self.dispatch, self.get_state)
        else:
            # Normal action dispatch
            self._state = self._reducer(self._state, action)
            for listener in self._listeners:
                listener()

    def subscribe(self, listener: Callable):
        self._listeners.append(listener)

# Usage
async def main():
    # Create the store
    initial_state = {'loading': False, 'data': None, 'error': None}
    store = Store(user_reducer, initial_state)

    # Subscribe to state changes
    def listener():
        print("Current state:", store.get_state())

    store.subscribe(listener)

    # Dispatch the thunk action
    await store.dispatch(fetch_user(1))

    # Final state
    print("Final state:", store.get_state())

if __name__ == "__main__":
    await main()

  dispatch(fetch_user_request())


Final state: {'loading': False, 'data': None, 'error': None}


  dispatch(fetch_user_success(user))
