In [28]:
a = [1,2,3]
a1 = a
print(f'{a[1]} == {a1.__getitem__(1)}') # __getitem__ is what's used to get list value by index under the hood
b = a.copy()
b[1] = 4
print(f'{a[1]} != {b[1]}')
c = a[:]
c[1] = 5
print(f'{a[1]} != {c[1]}')

2 == 2
2 != 4
2 != 5


In [None]:
a = 'abcdefg'
b = a
try:
    # strings don't allow assignment by index, use replace or build another string
    b[1] = 'x'
except TypeError as e:
    print(e)

c = a.replace('b', 'x')

print(f'"{a}" != "{c}" - replace returns a new string')

In [30]:
a = (1,2,3,4)
try:
    a.append(5)
except AttributeError as e:
    print(e)

try:
    a.pop()
except AttributeError as e:
    print(e)

# tuples don't have append and other list methods

'tuple' object has no attribute 'append'
'tuple' object has no attribute 'pop'


In [32]:
from collections import OrderedDict
OrderedDict.fromkeys([1,2,2,2,2,2,2,3,3,3,3,3,3,1,1,1]) # another way beside 'set' to get unique list items

OrderedDict([(1, None), (2, None), (3, None)])

In [33]:
a = [1,2,3,4]
b = list([1,2,3,4])
c = [1,2,3,4,5]
d = [1,2,3,5]

assert(a == b)
assert(a < c)
assert(a < d)

In [34]:
a = set([1,2,3,4])
b = set([2,4,1,3])
a == b

True

In [35]:
{True: 'True', 1: '1', 1.0: '1.0'}

{True: '1.0'}

In [36]:
a = {'a': 1, 'b': 2, 'c': 3}
k = a.keys()
print(k)
try:
    print(k[0]) # keys is an iterator and doesn't allow getting keys by index
except TypeError as e:
    print(e)

print(list(k)[0])

dict_keys(['a', 'b', 'c'])
'dict_keys' object is not subscriptable
a


In [37]:
a = [{'a': 10}, {'a': 40}, {'a': 30}, {'a': 20}]
b = a[:]
c = a.sort(key=lambda x: x.get('a'))
print(f'{a} != {c} - "sort" changes the list in place and returns nothing')

c = sorted(b, key=lambda x: x.get('a'))
print(f'{c} != {b} - "sorted" does not change the original list')

[{'a': 10}, {'a': 20}, {'a': 30}, {'a': 40}] != None - "sort" changes the list in place and returns nothing
[{'a': 10}, {'a': 20}, {'a': 30}, {'a': 40}] != [{'a': 10}, {'a': 40}, {'a': 30}, {'a': 20}] - "sorted" does not change the original list


In [38]:
for x in [1, '2', True, (1, 2, 3), {}, list(), set([]), (1, 'a', {'a': 1})]:
    try:
        x.__hash__()
        is_hashable = True
    except Exception as e:
        is_hashable = False
    
    print(f'{type(x)}: {"" if is_hashable else "not "}hashable')

<class 'int'>: hashable
<class 'str'>: hashable
<class 'bool'>: hashable
<class 'tuple'>: hashable
<class 'dict'>: not hashable
<class 'list'>: not hashable
<class 'set'>: not hashable
<class 'tuple'>: not hashable


In [39]:
from timeit import timeit

l = list(range(1_000_000))
d = dict.fromkeys(l)
s = set(l)

def iter_list():
    for i in l:
        pass

def iter_dict():
    for i in d:
        pass

def iter_set():
    for i in s:
        pass

print(timeit(iter_list, number=100))
print(timeit(iter_dict, number=100))
print(timeit(iter_set, number=100))

1.8341103494167328
2.312933199107647
2.0736200399696827


In [1]:
def double(a):
    a += a
    return a

i = 1
s = '1'
m = [1]
print(double(i), i)
print(double(s), s)
print(double(m), m)

# instead of passing by reference and by value in Python variable names are assigned to a value object and passed by assignment
# as a result what is important is the mutability of the value
# if the value is immutable, when the variable is changed a new value object is created and assigned to the old name
# the old value remains immutable and exist as long as there are other names assigned to it, otherwise it gets deleted

2 1
11 1
[1, 1] [1, 1]


In [5]:
def bad_case(s, a=[1,2,3,4]):
    if s:
        a[0] += 1
        return a
    else:
        return a

print(bad_case(True))
print(f'{bad_case(False)} -- default value is affected')

def good_case(s, a=None):
    if a is None:
        a = [1,2,3,4]
    if s:
        a[0] += 1
        return a
    else:
        return a

print(good_case(True))
print(f'{good_case(False)} -- default value is not affected')

[2, 2, 3, 4]
[2, 2, 3, 4] -- default value is affected
[2, 2, 3, 4]
[1, 2, 3, 4] -- default value is not affected


In [41]:
a = [1]
b = a * 2 # here as the values of 'a' being immutable got copied twice and stored into a new variable
b[0] = 2
print(a, b) # therefore the values of 'a' are not affected

[1] [2, 1]


In [42]:
a = [1] * 2
b = [a] * 2 # here the values of 'a' are mutable, and instead of being copied they are stored by reference
b[0][0] = 3
print(a, b) # as a result the values of 'a' are affected

[3, 1] [[3, 1], [3, 1]]


In [19]:
class T():
    def __getattr__(self, x):
        print(f'"{x}" is not found in T')

    def __getattribute__(self, x):
        print(f'"{x}" is requested from T')
        return object.__getattribute__(self, x)

    def __init__(self):
        self.foo = 'bar'

t = T()
t.foo
t.bar

"foo" is requested from T
"bar" is requested from T
"bar" is not found in T


In [22]:
e = 42
try:
    1/0
except Exception as e:
    pass

print(e)

NameError: name 'e' is not defined

In [25]:
a = list('abcdefg')
a[2] = 'x'
"".join(a)

'abxdefg'

In [27]:
import profile, timeit
print(dir(profile))
print(dir(timeit))

['Profile', '_Utils', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'main', 'marshal', 'run', 'runctx', 'sys', 'time']
['Timer', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', '_globals', 'default_number', 'default_repeat', 'default_timer', 'dummy_src_name', 'gc', 'itertools', 'main', 'reindent', 'repeat', 'sys', 'template', 'time', 'timeit']


In [32]:
class O(): pass
class X(O): pass
class Y(O): pass
class A(X, Y): pass
class B(Y, X): pass

try:
    class C(A, B): pass
except TypeError as e:
    print(e, '\n');

# See C3 algorithm for details
#   If C is a class inheriting classes C1, C2, ..., CN, its' linearization is
#   L[C] = C + merge(L[C1], L[C2], ..., L[CN], C1, C2, ..., CN)
#   Merge in C3 works as follows. Take the head of the first item of items ot be merged,
#     if the head is not in the tail (i.e. may be in the head) of any other list to be merged,
#     then add to the lenearization of C, and remove it everywhere in the rest of the lists.
#   Otherwise, if the head in another list's tail, then we skip it and take the next element in the merge,
#     until all elements are added to the linearization or there's no good head left in the merge,
#     and in the latter case the linearization is impossible.

print(B.__mro__)
b = B()
print(type(b).__mro__)

Cannot create a consistent method resolution
order (MRO) for bases X, Y 

(<class '__main__.B'>, <class '__main__.Y'>, <class '__main__.X'>, <class '__main__.O'>, <class 'object'>)
(<class '__main__.B'>, <class '__main__.Y'>, <class '__main__.X'>, <class '__main__.O'>, <class 'object'>)


In [48]:
# Identity test
# 'x is y' is the same as 'id(x) == id(y)', where id is basically a memory address

a = 1000
b = a
c = 500
d = 'Test'
e = 'T'
f = None
g = True
h = []
i = {}

print(f is None)
print(g is True)
print(a is a)
print(a is b)
print(d is d)

print(a is c + 500)
print(d is e + 'est')
print(h is [])
print(i is {})
print(f'But i == {"{}"} is {i == {}}')
print(object() == object())

print(id(i), id({}))

True
True
True
True
True
False
False
False
False
But i == {} is True in Python3
False
140302902603264 140302949766464


In [6]:
from collections import abc

class AMinimalIterable():
    # the __iter__ method returns an iterator over the iterable collection
    # here by returning 'self' we tell clients the class is also an iterator (though in fact it isn't)
    def __iter__(self):
        return self

print(isinstance(AMinimalIterable(), abc.Iterable))

class AnotherIterable():
    def __init__(self):
        self.count = 0

    # defining the __getitem__ method makes the class iterable,
    # however it is not recognized as an abc.Iterable subclass without defining the __iter__ method
    def __getitem__(self, index):
        count = self.count
        if count > 99:
            raise StopIteration
        self.count += 1
        return count

print(isinstance(AnotherIterable(), abc.Iterable))

i = AnotherIterable()
for j in i:
    print(j, end=", ")

print('\n')

class AnIterableIterator():
    count = 0
    # by declaring the __iter__ method here, we make this class iterable
    # but because the method returns 'self', it should also be an iterator, and declare the __next__ method
    def __iter__(self):
        return self

    # the __next__ method makes the class an Iterator
    # the __iter__ method therefore returns 'self'
    def __next__(self):
        count = self.count
        if count > 99:
            raise StopIteration
        self.count += 1
        return count

i = AnIterableIterator()
for j in i:
    print(j, end=", ")

print('\n')

k = AnIterableIterator()
while True:
    try:
        print(next(k), end=", ")
    except StopIteration as e:
        break

print('\n')

from random import randint

def roll_a_dice():
    return randint(1, 6)

# another way to create an iterator is by using iter() function
# the second argument of which is a limiter
unless_6 = iter(roll_a_dice, 6)

for side in unless_6:
    print(side, end=", ")

True
False
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 

0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 

0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53,

In [22]:
class FibonacciGenerator:
    def __init__(self):
        self.prev = 0
        self.curr = 1

    def __next__(self):
        res = self.prev
        self.prev, self.curr = self.curr, self.prev + self.curr
        return res

    def __iter__(self):
        return self

for i in FibonacciGenerator():
    print(i, end=', ')
    if i > 100:
        break

print('\n')

def fibonacci():
    prev, curr = 0, 1
    while True:
        yield prev
        prev, curr = curr, prev + curr

for j in fibonacci():
    print(j, end=', ')
    if j > 100:
        break

print('\n')

def my_range(start, end, step):
    x = start
    while x < end:
        yield x
        x += step

print(list(my_range(0, 4, 0.5)), '\n')

r = (i for i in range(0, 8, 1)) # this is also a generator

while True:
    try:
        print(next(r), end=', ')
    except StopIteration:
        print('StopIteration\n')
        break

r2 = [i for i in range(0, 8, 1)] # but this is not!

while True:
    try:
        print(next(r2), end=', ')
    except TypeError as e:
        print(e, '\n')
        break

def chain_iterators(*iterators):
    for iterator in iterators:
        for i in iterator:
            yield i

print(list(chain_iterators([1, 2, 3], {'A', 'B', 'C'}, '...')), '\n')

def chain_from_iterators(*iterators):
    for i in iterators:
        yield from i

print(list(chain_from_iterators([1, 2, 3], {'a', 'b', 'c'}, '!!!')))

0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 

0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 

[0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5] 

0, 1, 2, 3, 4, 5, 6, 7, StopIteration

'list' object is not an iterator 

[1, 2, 3, 'B', 'C', 'A', '.', '.', '.'] 

[1, 2, 3, 'a', 'c', 'b', '!', '!', '!']


In [None]:
# context manager

class TestContext:
    def __init__ (self, foo):
        self.foo = foo

    def __enter__(self):
        print('enter')
        return self.foo

    def __exit__(self, type, value, traceback):
        print('done', type, value, traceback)

with TestContext(42) as foo:
    print(foo)
    # raise Exception('Test')

from asyncio import sleep

class TestAContext:
    def __init__ (self, foo):
        self.foo = foo

    async def __aenter__(self):
        print('enter')
        await sleep(1)
        return self.foo

    async def __aexit__(self, type, value, traceback):
        print('done', type, value, traceback)

async with TestAContext(42) as foo:
    print(foo)

In [14]:
from contextlib import contextmanager

@contextmanager
def test_context(foo):
    print('enter')
    try:
        yield foo
    except Exception as e:
        print(f'Exception: {e}')
    finally:
        print('exit')

with test_context(42) as foo:
    print(foo)
    raise Exception('Test')

from contextlib import asynccontextmanager
from asyncio import sleep

@asynccontextmanager
async def test_async_context(foo):
    print('enter')
    try:
        await sleep(1)
        yield foo
    except Exception as e:
        print(f'Exception: {e}')
    finally:
        print('exit')

async with test_async_context(42) as foo:
    print(foo)
    raise Exception('Test')

enter
42
Exception: Test
exit
enter
42
Exception: Test
exit


In [70]:
class Foo:
    foo = 0
    pass

class Bar(type(Foo())):
    foo = 42
    def __new__(self, *args):
        print('first __new__')
        return super().__new__(Bar)

    def __init__(self, meow=42):
        print('then __init__')
        self.foo = meow

bar = Bar(84)
print(bar.foo)

# Difference between Python3 and Python2 classes
print(type(bar), type(4), '-- classes are now also types and vice versa')
print(type(bar).__mro__, '-- all classes are now subclasses of object() by default')

first __new__
then __init__
84
<class '__main__.Bar'> <class 'int'> -- classes are now also types and vice versa
(<class '__main__.Bar'>, <class '__main__.Foo'>, <class 'object'>) -- all classes are now subclasses of object() by default


In [83]:
try:
    try:
        raise Exception('Foo #1')
    except Exception as e:
        print(e)
        raise
except Exception as e:
    print(f'The same exception "{e}" was raised twice')

try:
    try:
        raise Exception('Foo #2')
    except Exception as e:
        print(e)
        raise Exception('Foo #3') from e
except Exception as e:
    print(f'A new exception is created from the original one, here is how it looks')
    print(e, e.__cause__, e.__context__,
          '"During handling of the above exception, another exception occurred: ...", ',
          '"The above exception was the direct cause of the following exception: ..."')

try:
    try:
        raise Exception('Foo #4')
    except Exception as e:
        print(e)
        raise Exception('Foo #5') from None
except Exception as e:
    print(f'Now a new exception replaces the old one, here is how it looks')
    print(e, e.__cause__, e.__context__)

try:
    try:
        pass
    except Exception as e:
        print('we do not want to get the "oops" exception here')
    else:
        raise Exception('oops')
except Exception as e:
    print('here is where we want to and get it:', e)

Foo #1
The same exception "Foo #1" was raised twice
Foo #2
A new exception is created from the original one, here is how it looks
Foo #3 Foo #2 Foo #2 "During handling of the above exception, another exception occurred: ...",  "The above exception was the direct cause of the following exception: ..."
Foo #4
Now a new exception replaces the old one, here is how it looks
Foo #5 None Foo #4
here is where we want to and get it: oops


In [88]:
from functools import wraps

def increment(f):
    @wraps(f)
    def incremented(x):
        return f(x) + 1
    return incremented

@increment
def add_one(x):
    return x + 1

print(add_one(1), add_one(100))

def increment_x(x):
    def increment(f):
        @wraps(f)
        def incremented(y):
            return f(y) + x
        return incremented
    return increment

@increment_x(10)
def add_one_again(x):
    return x + 1

print(add_one_again(1), add_one_again(1000))

3 102
12 1011


In [90]:
class Foo:
    pass

Bar = type('MyClass', (Foo, ), {'foo': 42, 'bar': lambda self: self.foo + self.foo})

bar = Bar()

print(bar, type(bar).__mro__)
print(bar.foo, bar.bar())

<__main__.MyClass object at 0x7f9ad0d34640> (<class '__main__.MyClass'>, <class '__main__.Foo'>, <class 'object'>)
42 84


In [112]:
# metaclasses

class MetaClass(type):
    # the arguments (after self, which is the currently modified class) are the same as those that we pass to type()
    def __new__(self, name, parents, props):
        modified_props = {}
        for prop in props:
            new_name = prop if prop.startswith('__') else prop.upper()
            modified_props[new_name] = props[prop]

        return super(MetaClass, self).__new__(self, name, parents, modified_props)

class Foo(metaclass=MetaClass):
    # __metaclass__ = MetaClass # this is Python2 syntax, see Python3 syntax a line above

    foo = 24

    def bar(self):
        return self.FOO + self.FOO

foo = Foo()

print(f'foo: {getattr(foo, "foo", None)}')
print(f'bar: {getattr(foo, "bar", None)}')
print(f'FOO: {getattr(foo, "FOO", None)}')
print(f'BAR: {getattr(foo, "BAR", None)()}')

foo: None
bar: None
FOO: 24
BAR: 48


In [14]:
class Singleton(type):
    _instances = {}
    def __call__(self, *args, **kwargs):
        if self not in self._instances:
            self._instances[self] = super(Singleton, self).__call__(*args, **kwargs)
        return self._instances[self]

class Database(metaclass=Singleton):
    prop = 42

d1 = Database()
print(d1.prop)
d1.prop = 43
d2 = Database()
print(d2.prop)

42
43


In [4]:
# coroutine - a computer program component that allows execution to be stopped and resumed

def coroutine_1():
    count = 0
    while True:
        yield count
        count += 1

iterator = coroutine_1()

for i in range(10):
    print(next(iterator), end=', ')

print('\n')

from asyncio import sleep, get_event_loop, run_coroutine_threadsafe

async def coroutine_2():
    for i in range(10):
        await sleep(i * 0.05)
        print(i, end=', ')

loop = get_event_loop()

print(loop) # in JupyterLabs already has a running event loop

await coroutine_2() # in jupyterlabs we are already in a running event loop

# or

run_coroutine_threadsafe(coroutine_2(), loop)

# or

loop.create_task(coroutine_2())

# https://stackoverflow.com/a/51342468/4228476

0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 

<_UnixSelectorEventLoop running=True closed=False debug=False>
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 

<Task pending name='Task-9' coro=<coroutine_2() running at /tmp/ipykernel_1347723/784762034.py:18>>

0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 

In [14]:
from functools import cache, cached_property, lru_cache
from random import randint

@cache
def factorial(x):
    return x * factorial(x - 1) if x > 1 else 1

print(factorial(10))

# @cache is the same as @lru_cache(maxsize=None)
# basically @cache is memoize
# @cache is threadsafe, can be used in multiple threads

class Test:
    def __init__(self, x):
        self.x = x

    @cached_property
    def random(self):
        return self.x + randint(0, 100)

t = Test(42)
print(t.random)
print(t.random)
print(t.random)

del t.random # clears the cache
print(t.random)
print(t.random)
print(t.random)

t.random = 291 # unlike @property allows set
print(t.random)
print(t.random)
print(t.random)

def plural(x, single, plural=None):
    if plural is None:
        plural = f'{single}s'
    return single if x == 1 else plural

@lru_cache(maxsize=10, typed=False) # irregradles of arg types
def hugs_history(date, unit='hug'):
    x = randint(0, 100)
    return f'{x} {plural(x, unit)} happened on {date}'

print(hugs_history('2023-12-12'))
print(hugs_history('2023-12-11'))
print(hugs_history('2023-12-10'))
print(hugs_history('2023-12-12'))
print(hugs_history('2023-12-12'))
print(hugs_history('2023-12-11'))
print(hugs_history('2023-12-11'))

hugs_history.cache_clear()

print(hugs_history('2023-12-12'))

# @lru_cache is also applicable to class methods
class StoryTeller:
    @lru_cache(maxsize=20)
    def hugs_per_date(self, date, unit='Hug'):
        x = randint(0, 100)
        return f'{x} {plural(x, unit)} was accepted on {date}'

s = StoryTeller()
print(s.hugs_per_date('2023-12-12'))
print(s.hugs_per_date('2023-12-11'))
print(s.hugs_per_date('2023-12-10'))
print(s.hugs_per_date('2023-12-12'))
print(s.hugs_per_date('2023-12-12'))
print(s.hugs_per_date('2023-12-11'))
print(s.hugs_per_date('2023-12-11'))

s.hugs_per_date.cache_clear()

print(s.hugs_per_date('2023-12-12'))

# @lru_cache is threadsafe, can be used in multiple threads

3628800
69
69
69
118
118
118
291
291
291
89 hugs happened on 2023-12-12
85 hugs happened on 2023-12-11
95 hugs happened on 2023-12-10
89 hugs happened on 2023-12-12
89 hugs happened on 2023-12-12
85 hugs happened on 2023-12-11
85 hugs happened on 2023-12-11
69 hugs happened on 2023-12-12
1 Hug was accepted on 2023-12-12
88 Hugs was accepted on 2023-12-11
79 Hugs was accepted on 2023-12-10
1 Hug was accepted on 2023-12-12
1 Hug was accepted on 2023-12-12
88 Hugs was accepted on 2023-12-11
88 Hugs was accepted on 2023-12-11
2 Hugs was accepted on 2023-12-12


In [10]:
def test_typings(x: int) -> int:
    if type(x) is not int:
        return 0
    return x + 42

def test_typings2(x: int) -> int:
    return 'foo'

print(test_typings(42))
print(test_typings('42'))
print(test_typings2(42))

# no runtime errors without 3rd party tools

# type Vector = list[float] # new in Python 3.12
Vector = list[float]

# also:

from typing import NewType, TypeAlias

Vector2: TypeAlias = list[float]
Vector3 = NewType('Vector3', list[float])

def v_add(v1: Vector, v2: Vector) -> Vector:
    return [a + b for a, b in zip(v1, v2)]

print(v_add([1,2,3], [4,5,6]))
print(Vector, Vector2, Vector3)

84
0
foo
[5, 7, 9]
list[float] list[float] __main__.Vector3


In [18]:
from collections.abc import Callable, Awaitable
from asyncio import sleep

foo: Callable[[int], str] = lambda x: str(x)

print(foo(42))

bar: Callable[[int, Exception | None], None] = lambda x, e=None: f'My bad: {e}' if e is not None else f'Correct: {x}'

print(bar(42))
print(bar(42, Exception('dammit!')))

async def woof(wait: int=0):
    await sleep(wait)
    return 'woof!'

wooffer: Callable[[int], Awaitable[str]] = woof

print(await wooffer(1))

# also

does_not_care: Callable[..., str] = lambda a, b, c: f'{a}+{b}+{c}'

print(does_not_care('a','b','c'))

42
Correct: 42
My bad: dammit!
woof!
a+b+c


In [22]:
# generic

from collections.abc import Iterable
from typing import TypeVar, Generic

# new in Python 3.12
# def get_first[T](coll: Iterable[T]) -> T:
#     return coll[0]

T = TypeVar('T')
def get_first(coll: Iterable[T]) -> T:
    return coll[0]

print(get_first([1,2,3]))
print(get_first(['a','b','c']))

# class types

class Foo:
    pass

def foo(coll: list[Foo]) -> Foo:
    return coll[0]

print(foo([Foo(), Foo()]))

# generic classes

class Bar(Generic[T]):
    def __init__(self, value: T):
        self.foo = value

b1 = Bar(2)
b2 = Bar('b')

print(b1.foo, b2.foo)

1
a
<__main__.Foo object at 0x7fa08c781cf0>
2 b


In [29]:
from typing import Any, AnyStr, NoReturn #, reveal_type, Never, LiteralString, Self # in Python3.12

print(Any, AnyStr, NoReturn)

# class Foo:
#     def what(self) -> Self:
#         return self

# f = Foo()
# print(reveal_type(f.what))

typing.Any ~AnyStr typing.NoReturn


In [19]:
# descriptors

class ListSize:
    # 'self' is the descriptor itself
    # 'obj' is the object owning the described attribute
    # 'objtype' is the object class
    def __get__(self, obj, objtype=None):
        return len(obj.list)

class CustomList:
    len = ListSize()
    def __init__(self, origin):
        self.list = origin

foo = [1, 2, 3, 4, 5, 6, 7, 8]

my_list = CustomList(foo)
print(my_list.len)

foo.pop()

print(my_list.len)

class AgeLogging:
    def __get__(self, obj, objtype=None):
        age = obj._age
        print(f'Accessing the age property set to {age}')
        return age
    def __set__(self, obj, value):
        print(f'Updating age with the new value {value}')
        obj._age = value

class Person:
    age = AgeLogging()
    def __init__(self, name, age):
        self.name = name
        self.age = age
    def birthday(self):
        self.age += 1

p = Person('Booba', 20)
print(p.age)

p.birthday()
print(p.age)

print(dir(p))
print(p.__dict__)

8
7
Updating age with the new value 20
Accessing the age property set to 20
20
Accessing the age property set to 20
Updating age with the new value 21
Accessing the age property set to 21
21
['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_age', 'age', 'birthday', 'name']
{'name': 'Booba', '_age': 21}


In [2]:
from abc import ABC, abstractmethod

class Validator(ABC):
    def __set_name__(self, owner, name):
        self.private_name = '_' + name

    def __get__(self, obj, objtype=None):
        return getattr(obj, self.private_name)

    def __set__(self, obj, value):
        self.validate(value)
        setattr(obj, self.private_name, value)

    @abstractmethod
    def validate(self, value):
        pass

class OneOf(Validator):
    def __init__(self, *options):
        self.options = options
    def validate(self, value):
        if value not in self.options:
            opts = [f'"{o}"' for o in self.options]
            raise Exception(f'Value "{value}" is not allowed, please provide one of {", ".join(opts)}')

class Pet:
    kind = OneOf('cat', 'dog', 'duck', 'cow', 'mouse')
    def __init__(self, kind):
        self.kind = kind

try:
    p = Pet('worm')
except Exception as e:
    print(f'Error: {e}')

p = Pet('cat')
# print(dir(p), p.__dict__, vars(p))

Error: Value "worm" is not allowed, please provide one of "cat", "dog", "duck", "cow", "mouse"


In [None]:
# property

class C:
    def getx(self): return self.__x
    def setx(self, value): self.__x = value
    def delx(self): del self.__x
    x = property(getx, setx, delx, "I'm the 'x' property.")

    @property
    def y(self):
        '''This documents the y'''
        return self.__y
    @y.setter
    def y(self, value):
        self.__y = value
    @y.deleter
    def y(self):
        del self.__y

In [3]:
# __slots__

class A:
    pass

a = A()
a.foo = 1
a.bar = 2
print(a.foo, a.bar)

class B:
    __slots__ = ['foo']

b = B()
b.foo = 1
try:
    b.bar = 2
except AttributeError as e:
    print(f'Error: {e}')

class C(B):
    __slots__ = ('bar', ) # 'foo' is already inherited from B

c = C()
c.foo = 2
c.bar = 3

try:
    c.qux = 2
except AttributeError as e:
    print(f'Error: {e}')

class BaseA:
    __slots__ = ('foo',)

class BaseB:
    __slots__ = ('bar',)

try:
    class ChildC(BaseA, BaseB):
        __slots__ = ()
except TypeError as e:
    print(f'Failed to inherit __slots__ from multiple base classes: {e}')

# workaround:

class AbstractD:
    __slots__ = ()

class BaseD(AbstractD):
    __slots__ = ('d',)

class AbstractE:
    __slots__ = ()

class BaseE(AbstractE):
    __slots__ = ('e',)

# !!! Warning !!!
# ------->>>>>>>>>>> Base classes still cannot be used here! <<<<<<<<<<<<<-------
# The solution is to move all their functionality into Abstract classes,
# And inherit from the Abstract ones, manually rebuilding the desired __slots__ field in the Child class
# Whereas the Base classes simply add the required __slots__ on top of the Abstract functionality
class ChildF(AbstractD, AbstractE): # Mind the inheritance from Abstract classes instead of the Base ones!!!
    __slots__ = ('d', 'e', 'f',) # All the __slots__ need to be added manually here!!!

f = ChildF()
f.d = 1
f.e = 2
f.f = 3

print(f.d, f.e, f.f)
print(dir(f))

1 2
Error: 'B' object has no attribute 'bar'
Error: 'C' object has no attribute 'qux'
Failed to inherit __slots__ from multiple base classes: multiple bases have instance lay-out conflict
1 2 3
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__slots__', '__str__', '__subclasshook__', 'd', 'e', 'f']


In [15]:
a = list(range(10))
print('original length:', len(a))
for i in a:
    if i == 3:
        a.remove(i)
        print('removing:', i, 'the length is now:', len(a))
        # del a[3] # has the same effect
    print(i)

somelist = list(range(10))
for x in somelist:
    somelist.remove(x)

print(somelist)

somelist2 = list(range(10))
for x in somelist2[:]:
    somelist2.remove(x)

print(somelist2)

10
0
1
2
removing: 3 the length is now: 9
3
5
6
7
8
9
[1, 3, 5, 7, 9]
[]


In [31]:
# For I/O intensive code the threading module can be used

from time import sleep
from random import randint
from threading import Thread

def delayed_print(delay, i):
    sleep(delay)
    print(f'Delay {delay}, index: {i}')

tasks = [Thread(target=delayed_print, args=(randint(0, 5), i)) for i in range(10)]

for task in tasks:
    task.start()


# run_coroutine_threadsafe

# for taks in tasks:
#     task.join() # wait until the thread terminates

Delay 0, index: 4
Delay 0, index: 6
Delay 1, index: 9
Delay 2, index: 0
Delay 2, index: 1
Delay 3, index: 5
Delay 4, index: 2
Delay 5, index: 3
Delay 5, index: 8
Delay 5, index: 7


In [14]:
# Python 3.2+

from concurrent.futures import ThreadPoolExecutor
from time import sleep
from random import randint

def task(delay, index):
    sleep(delay)
    print(f'Task {index} prints after {delay}s delay')

with ThreadPoolExecutor() as executor:
    running_tasks = [executor.submit(task, randint(1, 10) / 10, i) for i in range(10)]
    for t in running_tasks:
        t.result()

Task 1 prints after 0.3s delay
Task 4 prints after 0.3s delay
Task 6 prints after 0.3s delay
Task 3 prints after 0.3s delay
Task 7 prints after 0.4s delay
Task 8 prints after 0.3s delayTask 9 prints after 0.3s delay

Task 5 prints after 0.7s delay
Task 0 prints after 0.9s delay
Task 2 prints after 0.9s delay


In [11]:
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from random import randint

def task(delay, index):
    sleep(delay)
    print(f'Task {index} prints after {delay}s delay')

indices = range(10)
delays = [randint(1, 10) / 10 for i in indices]

with ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(task, delays, indices, timeout=5)

Task 3 prints after 0.4s delay
Task 2 prints after 0.7s delay
Task 1 prints after 0.8s delay
Task 4 prints after 0.8s delay
Task 5 prints after 0.5s delay
Task 0 prints after 1.0s delay
Task 9 prints after 0.1s delay
Task 7 prints after 0.6s delay
Task 8 prints after 0.6s delay
Task 6 prints after 1.0s delay


In [4]:
# synchronization primitives:
# - lock

from threading import Thread, Lock
from time import sleep
from random import randint

class Queue:
    def __init__(self):
        self.queue = []
        self.lock = Lock()

    def push(self, what, delay=0):
        with self.lock: # similar to self.lock.aquire()
            sleep(delay)
            self.queue.append(what)
        # outside of the above 'with' block, similar to self.lock.release()

    def print(self):
        print(', '.join(self.queue))

queue = Queue()
chars = list('abcdefghijk')

tasks = [Thread(target=queue.push, args=(char, randint(0, 1))) for char in chars]

for task in tasks: task.start()

for task in tasks: task.join()

queue.print() # technically the order at which each next thread aquires the lock is not guaranteed

a, b, c, d, e, f, g, h, i, j, k


In [7]:
# synchronization primitives
# - semaphore

from threading import Thread, Semaphore
from time import sleep
from random import randrange

MAX_CONCURRENT_DOWNLOADS = 3
semaphore = Semaphore(MAX_CONCURRENT_DOWNLOADS)

def download(url):
    with semaphore:
        print(f'Start download {url}')
        sleep(0.2)
        print(f'Finished download {url}')

urls = [f'https://website-number-{i}.com' for i in range(1, 11)]
tasks = [Thread(target=download, args=(url,)) for url in urls]

for task in tasks:
    task.start()

Start download https://website-number-1.com
Start download https://website-number-2.com
Start download https://website-number-3.com
Finished download https://website-number-1.com
Start download https://website-number-4.com
Finished download https://website-number-2.com
Start download https://website-number-5.com
Finished download https://website-number-3.com
Start download https://website-number-6.com
Finished download https://website-number-4.comFinished download https://website-number-6.com
Finished download https://website-number-5.com

Start download https://website-number-9.com
Start download https://website-number-8.com
Start download https://website-number-7.com
Finished download https://website-number-9.com
Finished download https://website-number-8.com
Start download https://website-number-10.com
Finished download https://website-number-7.com
Finished download https://website-number-10.com


In [8]:
# synchronization primitives
# - event

from threading import Event, Thread
from time import sleep
from random import randint

def task(event):
    print('Background thread is started and waiting for event')
    event.wait()
    print('Background thread received the event and is resumed')

event = Event()
worker = Thread(target=task, args=(event,))
worker.start()

print('Main thread is blocked')
sleep(randint(1, 10) / 10)
print('Main thread is resumed and sending the event')

if not event.is_set():
    event.set()
else:
    event.clear()
    event.set()

worker.join()
print('Done')

Background thread is started and waiting for event
Main thread is blocked
Main thread is resumed and sending the event
Background thread received the event and is resumed
Done


In [6]:
# synchronization primitives
# - barrier

from threading import Thread, Barrier, current_thread, BrokenBarrierError
from time import sleep
from random import randrange

NUM_WAITS = 4

def func(barrier):
    name = current_thread().name
    print(f'{name} is enqueued for execution after {NUM_WAITS - barrier.n_waiting}')
    try:
        barrier.wait()
        print(f'{name} is executed')
    except BrokenBarrierError as e:
        print(f'{name} is a deadlock, exiting by timeout')


barrier = Barrier(NUM_WAITS, timeout=1)

tasks = [Thread(target=func, args=(barrier,), name=f'Task # {i}') for i in range(6)]

for task in tasks:
    sleep(int(randrange(0, 1) * 10) / 10)
    task.start()

Task # 0 is enqueued for execution after 4
Task # 1 is enqueued for execution after 3
Task # 2 is enqueued for execution after 2
Task # 3 is enqueued for execution after 1
Task # 3 is executed
Task # 1 is executed
Task # 2 is executed
Task # 4 is enqueued for execution after 4
Task # 5 is enqueued for execution after 4
Task # 0 is executed
Task # 4 is a deadlock, exiting by timeout
Task # 5 is a deadlock, exiting by timeout


In [4]:
# synchronization primitives
# - condition
# https://superfastpython.com/thread-condition/

from threading import Thread, Condition
from time import sleep
from random import randint

# from thread
def task_1(condition):
    sleep(randint(1, 10) / 10)
    with condition:
        condition.notify()

cond_1 = Condition()
worker_1 = Thread(target=task_1, args=(cond_1,))
worker_1.start()

print('Waiting for condition')
with cond_1:
    cond_1.wait(timeout=2)
print('Done')

# to thread
def task_2(i, condition):
    print(f'Thread #{i} is waiting for condition')
    condition.acquire()
    condition.wait(timeout=3)
    condition.release()
    print(f'Thread #{i} done')

cond_2 = Condition()
tasks = [Thread(target=task_2, args=(i, cond_2)) for i in range(5)]
for task in tasks:
    task.start()

print(f'{len(tasks)} threads are waiting for condition notification')
sleep(randint(1, 10) / 10)
with cond_2:
    cond_2.notify_all()

print('Done')

# wait for a specific condition
dice = 0
def task_3(i, condition):
    global dice
    sleep(randint(1, 15) / 10)
    d = randint(1, 6)
    with condition:
        print(f'Thread #{i} dice value {d}')
        dice = d
        condition.notify()

cond_3 = Condition()
tasks = [Thread(target=task_3, args=(i, cond_3)) for i in range(6)]
for t in tasks:
    t.start()

MAX_ALIVE = 2
print(f'Waiting for the dice value 6 or the amount of alive threads < {MAX_ALIVE}')
with cond_3:
    cond_3.wait_for(lambda: ((dice == 6) or (len([1 for t in tasks if t.is_alive()]) < MAX_ALIVE)), timeout=3)

print(f'Done: dice {dice}, alive threads: {len([1 for t in tasks if t.is_alive()])}')

Waiting for condition
Done
Thread #0 is waiting for condition
Thread #1 is waiting for condition
Thread #2 is waiting for condition
Thread #3 is waiting for condition
Thread #4 is waiting for condition
5 threads are waiting for condition notification
DoneThread #0 done
Thread #1 done
Thread #3 done
Thread #4 done
Thread #2 done

Waiting for the dice value 6 or the amount of alive threads < 2
Thread #5 dice value 6
Done: dice 6, alive threads: 5
Thread #2 dice value 5
Thread #4 dice value 3
Thread #3 dice value 1
Thread #0 dice value 6
Thread #1 dice value 2


In [10]:
# synchronization primitives
# - timer

from threading import Timer
from time import sleep
from random import randint

def task(message):
    print(f'Background thread says: "{message}"')

print('Creating and starting a timer')
timer = Timer(1, task, args=('Hello world!',))
timer.start()

sleep(randint(0, 9) / 10)
timer.cancel()
print('The timer has been canceled, creating and starting a new one')

timer = Timer(1, task, args=('Hello world!!!',))
timer.start()
timer.join()

print('Done')

Creating and starting a timer
The timer has been canceled, creating and starting a new one
Background thread says: "Hello world!!!"
Done


In [13]:
# a precursor of coroutines
import asyncio

# define a custom coroutine in Python 3.4
@asyncio.coroutine
def custom_coro():
    # suspend and execute another coroutine
    yield from asyncio.sleep(1)

print(next(custom_coro()))

<Future pending>


  def custom_coro():


In [6]:
# async generator
from asyncio import sleep
from random import randrange

async def async_iterator():
    for i in range(10):
        await sleep(int(randrange(0, 2) * 10) / 10)
        yield i

async for i in async_iterator():
    print(f'received {i}')

received 0
received 1
received 2
received 3
received 4
received 5
received 6
received 7
received 8
received 9


In [17]:
# multiprocessing
from multiprocessing import Process
from time import sleep
from random import randint

def task(sleeptime):
    print('Child process is starting')
    sleep(sleeptime)
    print(f'Child process is terminating after {sleeptime} seconds')

proc = Process(target=task, name='A Subprocess', args=(randint(1, 2),))
proc.start()

print(f'Successfully started a subprocess {proc.pid}')
proc.join()

print(f'Child process is terminated with code {proc.exitcode}, parent process is done')

Child process is starting
Successfully started a subprocess 2633726
Child process is terminating after 1 seconds
Child process is terminated with code 0, parent process is done


In [15]:
from multiprocessing import Process, Value
from time import sleep
from random import randint

class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        self.data = Value('i', 0) # 'i' is for integer
    def run(self):
        print('Child process is starting')
        sleeptime = randint(1, 2)
        sleep(sleeptime)
        self.data.value = 42
        print(f'Child process is terminating after {sleeptime} seconds')


proc = MyProcess()
proc.start()

print(f'Successfully started a subprocess {proc.pid}')
proc.join()

print(f'Child process is terminated, parent process is done, the shared value is {proc.data.value}')

Child process is starting
Successfully started a subprocess 2633630
Child process is terminating after 1 seconds
Child process is terminated, parent process is done, the shared value is 42


In [18]:
import multiprocessing
print(f'Available process starting methods: {multiprocessing.get_all_start_methods()}')

Available process starting methods: ['fork', 'spawn', 'forkserver']


In [19]:
import multiprocessing
print(f'CPU count is: {multiprocessing.cpu_count()}')

CPU count is: 4


In [24]:
from multiprocessing import Process, Pipe
from time import sleep

def task(pipe):
    print('Subprocess is started')
    data = pipe.recv()
    print(f'Subprocess received data {data}')
    print('Subprocess is done')

pipe_end1, pipe_end2 = Pipe()
proc = Process(target=task, args=(pipe_end1,))
proc.start()

print(f'Parnet successfully stated a subprocess {proc.pid}')
sleep(1)

print('Parent is about to send a message to a child process through a pipe')
pipe_end2.send(42)

proc.join()
print(f'Parent is notified of the child process is terminated with code {proc.exitcode}, and is done')

Subprocess is started
Parnet successfully stated a subprocess 2635201
Subprocess received data 42
Subprocess is done
Parent is about to send a message to a child process through a pipe
Parent is notified of the child process is terminated with code 0, and is done


In [4]:
import sys

x = 4200
print(sys.getrefcount(x))

y = x
print(sys.getrefcount(x))

del y
print(sys.getrefcount(x))

3
4
3


In [7]:
foo = []
print(sys.getrefcount(foo))

foo.append(foo)
print(sys.getrefcount(foo))

del foo

2
3


In [13]:
import gc
gc.get_threshold()
# gc.get_objects(generation=2)

(700, 10, 10)