In [2]:
"""
题目1：同余分组
implement `group_by`.
P.S: the answer might not be unique and feel free to hack it if possible!
"""
from collections import defaultdict


def for_each(collection):
    def apply(f):
        for each in collection:
            f(each)

    return apply


def group_by(f):
    def apply(collection):
        ret = defaultdict(list)

        for_each(collection)(
            # **实现该括号内的表达式, 以下实现作为参考
            lambda e: ret[f(e)].append(e)
            #
        )

        return ret

    return apply


assert group_by(lambda x: x % 3 == 0)([1, 2, 3, 4]) == {
    True: [3],
    False: [1, 2, 4]
}

assert group_by(type)(['1', True, False, 123, 567, 1.0, 2.0]) == {
    str: ['1'],
    bool: [True, False],
    int: [123, 567],
    float: [1.0, 2.0]
}


In [7]:
"""
题目二: 文本相关性
1. implement function `argmax` in pure python to match assertions.
2. implement an algorithm(`compare`) to softly measure the distance between two words.
    
FYI, you can map these words into vectors of floating numbers and compare them in R^n space.
Or you can just use compare them with sub-patterns.
No matter how you choose the theories and implementations, you should pass the following assertions.

"""

# some codes here for you to implememt helper objects.


def argmax(lst) -> int:
    """
    lst: a list of numbers
    return the index of maximum element in `lst`.

    ** 实现argmax函数: 以下实现为参考
    """
    lst = tuple(lst)
    return max(range(len(lst)), key=lst.__getitem__)


def compare(left, right):
    """
    ** 实现compare函数: 以下实现为参考
    
    下文的测试样例不多，简单的substring或许也能过。
    但这个算法可以直接在我们的文件系统中进行测试，从而得到区分度。
    
    同时，书写这个算法的过程也能体现出受试者的思维方式和知识背景：
    - 使用nlp相关知识进行文本编码
    - 相关性度量的构造
    """
    from collections import Counter

    def to_gram(word: str, gram_num=3):
        n = len(word)
        for k in range(1, gram_num + 1):
            for i in range(n - k):
                yield word[i:i + k]

    def to_dict_vec(word: str):
        return Counter(to_gram(word, 3))

    def corr(a: Counter, b: Counter):
        na = len(a)
        nb = len(b)

        def get_score(l, r):
            weights = 0
            score = 0

            for lk, lv in l.items():
                rv = r.get(lk)

                weight = len(lk)
                weights += weight

                if rv is None:
                    continue

                if rv > lv:
                    rv, lv = lv, rv

                score += weight * rv / lv

            if weights is 0:
                return 0

            return score / weights

        return (get_score(a, b) * na + get_score(b, a) * nb) / (na + nb)

    return corr(to_dict_vec(left), to_dict_vec(right))


assert argmax([1, 2, 3]) == 2
assert argmax([10, 2, 3, 8]) == 0

examples = [
    r"C:\User\new\trinity\GraphEngine",  # 0
    r"C:\User\trinity\GraphEngine",  # 1
    r"C:\User\GraphEngine",  # 2
    r"C:\User\previous\GraphEngine",  # 3
    r"C:\User\trioity\GraphEngine",  # 4
    r"C:\User\triaity\GraphEngine",  # 5
    r"C:\User\trinity\GraphEigen",  # 6
    r"C:\resU\ytinirt\EngineGraph",  # 7
    r"\c\User\trinity",  # 8
    r"C:\User\previous\Patchouli",  # 9
]

assert argmax(map(lambda inp: compare('new ' 'Graph', inp), examples)) is 0

assert argmax(map(lambda inp: compare('trinity '
                                      'Graph', inp), examples)) in (1, 6)

assert argmax(map(lambda inp: compare('pre' 'graph', inp), examples)) is 3


In [5]:
"""
This is a story about asynchronous!
临时学习能力考察:)
"""

from utils import Section
from collections import Generator

with Section("""
    Do you know Python generator?
"""):

    def generator(x, inc):
        yield x
        yield x + inc
        yield x + inc + inc
        return "Done"

    # here're some behaviours of a Python generator
    
    gen = generator(1, inc=2)

    lst = []
    try:
        while True:
            lst.append(next(gen))
    except StopIteration as e:
        ret = e.value

    assert lst == [1, 3, 5]
    assert ret == "Done"

    
    

with Section("""

    Now You have got what's a generator in Python, so could you solve
    the following problems?
    
    
    - If I have 3 generators and I want to drive them
      forward synchronously.
    
    e.g
        gen1: 1, 2, 3
        gen2: "1", "11", "111"
        sync(gen1, gen2): 1, "1", 2, "11", 3, "111"

"""):

    def sync(*gens) -> Generator:
        """
        ** 实现这个函数: 以下实现为参考
        """
        remaining_event_ids = set(range(len(gens)))

        while remaining_event_ids:
            for gen_id in tuple(remaining_event_ids):
                try:
                    yield next(gens[gen_id])

                except StopIteration:
                    remaining_event_ids.remove(gen_id)

    gen1 = generator(1, inc=1)
    gen2 = generator("1", inc="1")

    gen3 = sync(gen1, gen2)


    
with Section("""Test your implementation!"""):
    assert next(gen3) == 1
    assert next(gen3) == '1'
    assert next(gen3) == 2
    assert next(gen3) == '11'


    
    
    
with Section("""
    Now you have got how to synchronize the generators, 
    let's learn what's `async` exactly!
            """):
    import time
    import random

    class Query:
        def __init__(self):

            self.time_cost = random.randint(0, 5)
            self.start = time.time()

        @property  # 受试者需了解基本的property用法才能看懂。property是python控制数据访问权限的重要方式
        def is_finished(self):
            return time.time() - self.start > self.time_cost

        @property
        def result(self):
            return "data fetched"

        def reset(self):
            self.start = time.time()

    # Assume that I have made multiple data queries with function `query`,
    q1 = Query()
    q2 = Query()
    q3 = Query()

    # Each query differs in time cost.
    # How could I be able to assure I can get all the 3 query results as fast as possible.

    # Requirement:
    # Implement the function `feed_back` to monitor the 3 query to return the 3 results within least time period.


    def feed_back():
        """
        ** 实现这个函数: 以下实现为参考
        """
        result = [None, None, None]
        qs = [q1, q2, q3]
        remaining_ids = {0, 1, 2}
        while remaining_ids:
            for remaining_id in tuple(remaining_ids):
                q = qs[remaining_id]
                if q.is_finished:
                    result[remaining_id] = q.result
                    remaining_ids.remove(remaining_id)

        return result

    assert feed_back() == ['data fetched'] * 3

    # Okay, you've solved above problem, that's cool!
    #
    # However, what if you have to pay attention to other tasks when you're monitoring the 3 queries.
    class OtherTask(Query):
        def __init__(self):
            super().__init__()

    q1.reset()
    q2.reset()
    q3.reset()
    other_task = OtherTask()

    # A real world problem is presented here:
    #
    #       Sometimes, each query might cost a very long time, for example, 10 minutes,
    #       while the other tasks would take you a while as well.
    #
    #       Assume that there is no overhead to check if a query is finished,
    #       obviously it's not difficult to get that if you got blocked at checking queries, you might
    #       waste 10 minutes producing nothing.
    #
    # That's the reason why we need `async`.

    # 受试者需读懂以下连续4个函数才能会做下一题。
    # 四个函数复习了这一题中提到的所有知识。

    def async_feed_back():
        result = [None, None, None]
        qs = [q1, q2, q3]
        remaining_ids = {0, 1, 2}
        while remaining_ids:
            for remaining_id in tuple(remaining_ids):
                q = qs[remaining_id]
                if q.is_finished:
                    result[remaining_id] = q.result
                    remaining_ids.remove(remaining_id)
                else:
                    yield

        return result

    def async_other_tasks():
        """
        process other task for a short time, and then just leave right now."
        """
        while not other_task.is_finished:
            yield

        return other_task.result

    def sync_return(*gens):
        
        remaining_event_ids = set(range(len(gens)))
        
        results = [None] * len(gens)
        
        while remaining_event_ids:
        
            for gen_id in tuple(remaining_event_ids):
                
                try:
                
                    yield next(gens[gen_id])

                except StopIteration as e:
                    
                    results[gen_id] = e.value
                    
                    remaining_event_ids.remove(gen_id)
        
        return results

    def get_async_result(gen):
        
        try:
            while True:
                next(gen)
        
        except StopIteration as e:
            return e.value

    gathered_tasks = sync_return(async_feed_back(), async_other_tasks())
    
    query_results, other_task_result = get_async_result(gathered_tasks)
    
    print(query_results, other_task_result)

# Now let's do something that solves some meaningful tasks in the real world!
# 和现实联系。
# 异步爬取推特动态。


with Section(""""Get people's recent tweets from twitter!"""):
    from utils import find_recent_tweets
    
    # FYI:
    #
    # `find_recent_tweets` is an API for you to asynchronously load an user's recent tweets from
    # `https://twitter.com/<username>`.

    # signature =>    
    #     find_recent_tweets: (username: string) -> generator, 
    #                        where this generator function returns the user's recent tweets.
    

    def my_async_query_from_twitter(users_to_ask=('bing', 'microsoft', 'azure')):
        """
        :return: a list of string represents the recent tweets from corresponding users. 
        
        ** 实现这个函数。以下实现为参考。
        """
    
        remaining_event_ids, event_loop = zip(
            *enumerate(map(find_recent_tweets, users_to_ask)))

        remaining_event_ids = set(remaining_event_ids)

        results = [None] * len(event_loop)

        while remaining_event_ids:
            
            for idx in tuple(remaining_event_ids):
                
                event = event_loop[idx]

                try:
                    next(event)
                
                except StopIteration as e:
                    
                    result = results[idx] = e.value
                    print(f'got tweets from {users_to_ask[idx]}: {result}')

                    remaining_event_ids.remove(idx)
                    
        return results
    
    for each in my_async_query_from_twitter():
        print(each)


['data fetched', 'data fetched', 'data fetched'] data fetched




 BeautifulSoup(YOUR_MARKUP})

to this:

 BeautifulSoup(YOUR_MARKUP, "lxml")

  markup_type=markup_type))


got tweets from bing: 
Day trip or backpacking trek—what are you gearing up for this summer? http://msft.social/E5T9jc pic.twitter.com/ZsGraV1LfY

got tweets from microsoft: 
#SurfaceGo - the most portable and versatile Surface ever is now available to purchase at just $399. It’s the ultimate device for saving you time and keeping you productive as you move through your day.https://blogs.windows.com/devices/2018/08/02/surface-go-available-now-starting-at-399/#5erGxtVvHZBUoRtA.97 …

got tweets from azure: 
Download our #dotNET #Microservices guide to explore architectural design & implementation approaches with containers: http://msft.social/sZalEF  [PDF]pic.twitter.com/BZMHsfLsFj


Day trip or backpacking trek—what are you gearing up for this summer? http://msft.social/E5T9jc pic.twitter.com/ZsGraV1LfY


#SurfaceGo - the most portable and versatile Surface ever is now available to purchase at just $399. It’s the ultimate device for saving you time and keeping you productive as you move