#### Distributted Throttle

Problem:
- Assume there is a pool of task workers running a large set of tasks concurrently.
- Some of the tasks require work to be done that has some "natural limit" that spans across all workers. For example, some tasks may use a shared key to hit an external API which enforces a rate limit

Task:
- Create a python decorator that can wrap a function and prevent multiple worker processes from running that function more then once per x seconds (across all workers)

Assumptions:
- Throttled (limitted) calls can return None
- Redis is available
- No specific task queue system is required. You can simulated a task queue by running multiple worker processes that each call the throttled functions at random intervals.


---

#### Solution Design and Approach

**Main Goal**:
Based on my understanding of the task, I needed to limit execution of a function across the threads to once per time period. 

**Design Background**:
- The decorator should acquire a lock in a non-blocking manner and set a timeout before releasing it. The timeout mimicks the API reset interval. The non-blocking approach is important to avoid holding resources that could be used for other tasks. 
- Test with BoundedSemaphore, as it provides the ValueError exception that occurs if release() calls try to increment the BoundedSemaphore counter beyond it’s assigned maximum value. This will allow for the None to be returned for the throttled calls. 

**Detailed Approach**:
1. main() - Spins up a pool of threads across random intervals to simulate a queue
2. retrieve_data() - the function to be throttled - will mimick an API response with a timer
3. @throttler() - decorator that should take in a function to limit across threads and a timeout interval to mimick API reset intervals.

**Results/Logs Overview**:
- "Started API Request" - logs that the API **request** is triggered inside the throttled function (retrieve_data)
- "Shared key is not freed..." - logs for threads that tried to release() before BoundedSemaphore allowance was met
- "Finished API Request" - logs that the API **response** was recieved in retrieve_data()
- The API reset (timeout) was set to 3 seconds
- Below log shows that, for example:
  - Thread-8 started the throttled function execution
  - Thread-9 tried to release before Thread-8 (8 acquired first), but got kicked out due to semaphore limit
  - Thread-10 started the throttled function execution after Thread-8 waited for the timeout period and released
  - Thread-8 meanwhile got the response back from a slow API...
  ...

```
[2018-12-21 14:25:42] [INFO] (MainThread) API Expiration time: 3 second(s)
[2018-12-21 14:25:48] [INFO] (Thread-8 ) Started API request
[2018-12-21 14:25:49] [ERROR] (Thread-9 ) Shared key is not freed...terminating thread and returning None on exit
[2018-12-21 14:25:52] [INFO] (Thread-10) Started API request
[2018-12-21 14:25:54] [INFO] (Thread-8 ) Finished API Request
[2018-12-21 14:25:55] [INFO] (Thread-11) Started API request
[2018-12-21 14:25:57] [ERROR] (Thread-12) Shared key is not freed...terminating thread and returning None on exit
[2018-12-21 14:25:57] [INFO] (Thread-10) Finished API Request
[2018-12-21 14:26:01] [INFO] (Thread-11) Finished API Request
```

In [1]:
"""
throttler.py - YC = 12/21/2018

Decorator for limiting function execution across multiple threads over a period of time.
"""
from threading import BoundedSemaphore
import time
import logging
from functools import wraps

logging.basicConfig(
    level=logging.DEBUG,
    format='[%(asctime)s] [%(levelname)s] (%(threadName)-9s) %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S')

thread_counter = BoundedSemaphore(1)

def throttler(timeout=3):
    """
    Decorator to limit func execution by thread(s) per time interval
    
    @param timeout: time interval at which a thread can run this function 
    """
    logging.info("API Expiration time: {} second(s)".format(timeout))
    
    def callable(func):
        @wraps(func)
        def wrapped(*args, **kwargs):
            # non-blocking acquire - threads pass this point
            thread_counter.acquire(False)
            # don't relese for x number of seconds
            time.sleep(timeout)
            try:
                # release and run after 3 seconds
                thread_counter.release()
                return func(*args, **kwargs)
            except ValueError:
                # if another thread tried to release sooner then semaphore allows, kick it out
                logging.error("Shared key is not freed...terminating thread and returning None on exit")
                return None
        return wrapped
    return callable

In [3]:
"""
dthrottle_impl.py - YC - 12/21/2018

Simulate a task queue by running multipe workers that each call the throttled function at random intervals
"""

import random
import time
from threading import Thread
import logging

logging.basicConfig(
    level=logging.DEBUG,
    format='[%(asctime)s] [%(levelname)s] (%(threadName)-9s) %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S')


@throttler(timeout=3)
def retrieve_data(t):
    """
    Retrieves data from an API call.
    Important assumption: 
        - APIs may take longer then the reset time for the shared key (aka timeout)
        - t > timeout
    
    @param t: int to simulate an API response time
    """
    logging.info("Started API request")
    time.sleep(t)
    logging.info("Finished API Request")
        

        
def main():
    """
    Run a pool of threads, separated by random intervals.
    """
    threads = []
    nthreads = 5

    for i in range(nthreads):
        # Faking Queue with random intervals
        time.sleep(random.randint(1,3))
        threads.append(Thread(target=retrieve_data, args=(random.randint(5,7), )))
        threads[-1].start()

    for thread in threads:
        thread.join()

    print('Done')
    
main()

[2018-12-21 06:22:41] [INFO] (MainThread) API Expiration time: 3 second(s)
[2018-12-21 06:22:45] [INFO] (Thread-9 ) Started API request
[2018-12-21 06:22:48] [INFO] (Thread-10) Started API request
[2018-12-21 06:22:51] [INFO] (Thread-11) Started API request
[2018-12-21 06:22:52] [INFO] (Thread-9 ) Finished API Request
[2018-12-21 06:22:52] [ERROR] (Thread-12) Shared key is not freed...terminating thread and returning None on exit
[2018-12-21 06:22:54] [ERROR] (Thread-13) Shared key is not freed...terminating thread and returning None on exit
[2018-12-21 06:22:54] [INFO] (Thread-10) Finished API Request
[2018-12-21 06:22:58] [INFO] (Thread-11) Finished API Request


Done
