In [None]:
import sys
print(sys.version)

In [None]:
# посмотрим информацию об операционной системе, на которой мы работаем.
import platform
platform.uname()

## Работа с файлами

In [None]:
f = open("/tmp/example.txt", "w")
f.write("Технопарк\n")
f.close()

In [None]:
# открываем дескриптор файла для чтения
f = open("/tmp/example.txt", "r")

# читаем содержимое полностью.
data = f.read()

# обязательно закрываем!
f.close()

print(data)

In [None]:
# используя context-manager
with open("/tmp/example.txt", "a") as f:
    f.write("МГТУ\n")

In [None]:
with open("/tmp/example.txt", "r") as f:
    print(f.readlines())

In [None]:
# читаем файл по строке, не загружая его полность в память
with open("/tmp/example.txt", "r") as f:
    for line in f:
        print(repr(line))

In [None]:
import hashlib

def hash_file(filename):
    h = hashlib.sha1()

    # открываем файл в бинарном виде.
    with open(filename,'rb') as file:
        chunk = 0
        while chunk != b'':
            # читаем кусочками по 1024 байта
            chunk = file.read(1024)
            h.update(chunk)

    # hex-представление полученной суммы.
    return h.hexdigest()

print(hash_file("/tmp/example.txt"))
print(hash_file("/tmp/example.txt"))

with open("/tmp/example.txt", "a") as f:
    f.write("1")

print(hash_file("/tmp/example.txt"))

## Работа с директориями

In [None]:
import os

os.mkdir("/tmp/park-python")

In [None]:
try:
    os.rmdir("/tmp/park-python")
except IOError as err:
    print(err)

In [None]:
path = "/tmp/park-python/lectures/04"
if not os.path.exists(path):
    os.makedirs(path)

In [None]:
import pprint
pprint.pprint(list(os.walk(os.curdir)))

In [None]:
os.rmdir("/tmp/park-python")

In [None]:
import shutil
shutil.rmtree("/tmp/park-python")

## stdin, stdout, stderr

In [None]:
import sys
print(sys.stdin)
print(sys.stdout)
print(sys.stderr)

In [None]:
print(sys.stdin.fileno())

In [None]:
print(sys.stdout.fileno())

Так как дескрипторы stdout и stderr переопределены в Jupyter notebook. Давайте посмотрим куда они ведут:

In [None]:
sys.stdout.write("where am I")

А ведут они как раз в этот ноутбук:)

# Отладка

In [None]:
def find_sum(a, b):
    import pdb
    pdb.set_trace()    
    return a + b

find_sum(1, 4)

Гораздо удобнее использовать `ipdb`, иногда достаточно просто ф-ии `print()`

Полезные библиотеки для отладки (debugging) - https://github.com/vinta/awesome-python#debugging-tools

# Тестирование

**Зачем?**

In [None]:
def get_max_length_word(sentence):
    longest_word = None
    words = sentence.split()
    for word in words:
        if not longest_word or len(word) > len(longest_word):
            longest_word = word
    return longest_word

Что может пойти не так? Да все что угодно:

* SyntaxError
* Ошибка в логике
* Обратная несовместимость новой версии используемой библиотеки
* ...

Через полгода после запуска приложения без тестов изменения в код большого приложения вносить очень страшно!

Некоторые даже используют TDD - Test-Driven Development.

## unittest

In [None]:
import unittest

class LongestWordTestCase(unittest.TestCase):
    
    def test_sentences(self):
        sentences = [
            ["Beautiful is better than ugly.", "Beautiful"],
            ["Complex is better than complicated.", "complicated"]
        ]
        for sentence, correct_word in sentences:
            self.assertEqual(get_max_length_word(sentence), correct_word)

# Обычно в реальных проектах использует механизм автоматического нахождения тестов (discover).
suite = unittest.defaultTestLoader.loadTestsFromTestCase(LongestWordTestCase)
unittest.TextTestRunner().run(suite)

<table border="1" class="docutils" align="left">
<colgroup>
<col width="48%">
<col width="34%">
<col width="18%">
</colgroup>
<thead valign="bottom">
<tr class="row-odd"><th class="head">Method</th>
<th class="head">Checks that</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even"><td>assertEqual(a, b)</td>
<td><code class="docutils literal"><span class="pre">a</span> <span class="pre">==</span> <span class="pre">b</span></code></td>
</tr>
<tr class="row-odd"><td>assertNotEqual(a, b)</td>
<td><code class="docutils literal"><span class="pre">a</span> <span class="pre">!=</span> <span class="pre">b</span></code></td>
</tr>
<tr class="row-even"><td>assertTrue(x)</td>
<td><code class="docutils literal"><span class="pre">bool(x)</span> <span class="pre">is</span> <span class="pre">True</span></code></td>
</tr>
<tr class="row-odd"><td>assertFalse(x)</td>
<td><code class="docutils literal"><span class="pre">bool(x)</span> <span class="pre">is</span> <span class="pre">False</span></code></td>
</tr>
<tr class="row-even"><td>assertIsNone(x)</td>
<td><code class="docutils literal"><span class="pre">x</span> <span class="pre">is</span> <span class="pre">None</span></code></td>
</tr>
<tr class="row-odd"><td>assertIsNotNone(x)</td>
<td><code class="docutils literal"><span class="pre">x</span> <span class="pre">is</span> <span class="pre">not</span> <span class="pre">None</span></code></td>
</tr>
<tr class="row-even"><td>assertIsInstance(a, b)</td>
<td><code class="docutils literal"><span class="pre">isinstance(a,</span> <span class="pre">b)</span></code></td>
</tr>
<tr class="row-even"><td colspan="2">И другие... https://docs.python.org/3.5/library/unittest.html</td>
</tr>
</tbody>
</table>

Протестируем class. Все аналогично:

In [None]:
class BoomException(Exception):
    pass


class Item:

    def __init__(self, name, reacts_with=None):
        self.name = name
        self.reacts_with = reacts_with or []

    def __repr__(self):
        return self.name


class Alchemy:

    def __init__(self):
        self.items = []

    def add(self, item):
        for existing_item in self.items:
            if item.name not in existing_item.reacts_with:
                continue
            self.items = []
            raise BoomException("{0} + {1}".format(existing_item.name, item.name))
        self.items.append(item)

    def remove(self, item):
        self.items.remove(item)


# 2Na + 2H2O = 2NaOH + H2 (Не повторять дома!!! Пишут, что щелочь чрезвычайно опасна!)
alchemy = Alchemy()
alchemy.add(Item("Ca", reacts_with=[]))
alchemy.add(Item("H2O", reacts_with=["Na"]))

try:
    alchemy.add(Item("Na", reacts_with=["H2O"]))
except BoomException:
    print("We are alive! But all items lost!")

In [None]:
import unittest

class AlchemyTest(unittest.TestCase):

    def setUp(self):
        self.alchemy = Alchemy()
    
    def test_add(self):
        self.alchemy.add(Item("C"))
        self.alchemy.add(Item("F"))
        self.assertEqual(len(self.alchemy.items), 2)

    def test_remove(self):
        item = Item("C")
        self.alchemy.add(item)
        self.assertEqual(len(self.alchemy.items), 1)
        self.alchemy.remove(item)
        self.assertEqual(len(self.alchemy.items), 0)

    def test_boom(self):
        item1 = Item("Na", reacts_with=["H2O"])
        item2 = Item("H2O", reacts_with=["Na"])
        self.alchemy.add(item1)
        self.assertRaises(BoomException, self.alchemy.add, item2)
        self.assertEqual(len(self.alchemy.items), 0)


# Обычно в реальных проектах использует механизм автоматического нахождения тестов (discover).
suite = unittest.defaultTestLoader.loadTestsFromTestCase(AlchemyTest)
unittest.TextTestRunner().run(suite)

## unittest.mock

Как тестировать код, выполняющий внешние вызовы: чтение файла, запрос содержимого URL?

In [None]:
import requests

def get_location_city(ip):
    data = requests.get("https://freegeoip.net/json/{ip}".format(ip=ip)).json()
    return data["city"]

def get_ip():
    data = requests.get("https://httpbin.org/ip").json()
    return data["origin"]

get_location_city(get_ip())

Для начала посмотрим что такое monkey patching.

In [None]:
import math

def fake_sqrt(num):
    return 42

original_sqrt = math.sqrt
math.sqrt = fake_sqrt

# вызываем ф-ю, которую мы запатчили.
print(math.sqrt(16))

math.sqrt = original_sqrt

In [None]:
math.sqrt(16)

In [None]:
import unittest
from unittest.mock import patch, Mock


class FakeIPResponse:

    def json(self):
        return {"origin": "127.0.0.1"}


class LongestWordTestCase(unittest.TestCase):

    @patch('requests.get', Mock(return_value=FakeIPResponse()))
    def test_get_ip(self):
        self.assertEqual(get_ip(), "127.0.0.1")

suite = unittest.defaultTestLoader.loadTestsFromTestCase(LongestWordTestCase)
unittest.TextTestRunner().run(suite)


Библиотека `coverage` позволяет оценить степень покрытия кода тестами.

Помимо unit-тестирования существует масса других типов тестов:
* Интеграционные
* Функциональные
* Тестирование производительности (бенчмарки)
* ...

Полезные библиотеки для тестирования - https://github.com/vinta/awesome-python#testing

# Многопоточность

Что такое процесс?

UNIX является многозадачной операционной системой. Это означает, что одновременно может быть запущена более чем одна программа. Каждая программа, работающая в некоторый момент времени, называется процессом.

http://www.kharchuk.ru/%D0%A1%D1%82%D0%B0%D1%82%D1%8C%D0%B8/15-unix-foundations/80-unix-processes

In [None]:
STEPS = 10000000

# Простая программа, складывающая числа.
def worker(steps):
    count = 0
    for i in range(steps):
        count += 1
    return count

print(worker(STEPS))

%timeit -n1 worker(STEPS)

print("Напомните преподавателю показать top")

<div style="float:left;margin:0 10px 10px 0" markdown="1">![title](img/g3.png)</div>

Что такое поток?

Многопоточность является естественным продолжением многозадачности. Каждый из процессов может выполнятся в несколько потоков. Программа выше исполнялась в одном процессе в главном потоке.

<div style="float:left;margin:0 10px 10px 0" markdown="1">![title](img/threads.jpg)</div>

http://www.cs.miami.edu/home/visser/Courses/CSC322-09S/Content/UNIXProgramming/UNIXThreads.shtml

Логичный шаг предположить, что 2 потока выполнят программу выше быстрее. Проверим?

In [None]:
import threading
import queue

result_queue = queue.Queue()

STEPS = 10000000
NUM_THREADS = 2

def worker(steps):
    count = 0
    for i in range(steps):
        count += 1
    result_queue.put(count)


def get_count_threaded():    
    count = 0
    threads = []

    for i in range(NUM_THREADS):
        t = threading.Thread(target=worker, args=(STEPS//NUM_THREADS,))
        threads.append(t)
        t.start()

    for i in range(NUM_THREADS):
        count += result_queue.get()

    return count

print(get_count_threaded())
%timeit -n1 get_count_threaded()

<div style="float:left;margin:0 10px 10px 0" markdown="1">![title](img/g4.png)</div>

GIL

https://jeffknupp.com/blog/2012/03/31/pythons-hardest-problem/

Ок. Неужели выхода нет? Есть - multiprocessing

## Мультипроцессинг

In [None]:
import multiprocessing

NUM_PROCESSES = 2
STEPS = 10000000

result_queue = multiprocessing.Queue()

def worker(steps):
    count = 0
    for i in range(steps):
        count += 1

    result_queue.put(count)


def get_count_in_processes():    
    count = 0
    processes = []
    for i in range(NUM_PROCESSES):
        p = multiprocessing.Process(target=worker, args=(STEPS//NUM_PROCESSES,))
        processes.append(t)
        p.start()

    for i in range(NUM_THREADS):
        count += result_queue.get()

    return count

print(get_count_in_processes())
%timeit -n1 get_count_in_processes()

**Зачем тогда нужны потоки?**

Все потому что не все задачи CPU-bound. Есть IO-bound задачи, которые прекрасно параллелятся на несколько CPU. Кто приведет пример?

**TODO: примеры**

Чтобы не быть голословными, поднимем HTTP-сервер на порту 8000. По адресу http://localhost:8000 будет отдаваться небольшой кусочек текста. Наша задача - скачивать контент по этому адресу.

In [None]:
import requests

STEPS = 100

def download():
    requests.get("http://127.0.0.1:8000").text

# Простая программа, загружающая контент URL-странички. Типичная IO-bound задача.
def worker(steps):
    for i in range(steps):
        download()

%timeit -n1 worker(STEPS)

<div style="float:left;margin:0 10px 10px 0" markdown="1">![title](img/g1.png)</div>

In [None]:
import threading

NUM_THREADS = 64

def worker(steps):
    count = 0
    for i in range(steps):
        download()

def run_worker_threaded():    
    threads = []

    for i in range(NUM_THREADS):
        t = threading.Thread(target=worker, args=(STEPS//NUM_THREADS,))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

%timeit -n1 run_worker_threaded()

<div style="float:left;margin:0 10px 10px 0" markdown="1">![title](img/g2.png)</div>

Ради интереса попробуем мультипроцессинг для этой задачи:

In [None]:
import multiprocessing

NUM_PROCESSES = 16

def worker(steps):
    count = 0
    for i in range(steps):
        download()

def run_worker_in_processes():    
    processes = []

    for i in range(NUM_PROCESSES):
        p = multiprocessing.Process(target=worker, args=(STEPS//NUM_PROCESSES,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

%timeit -n1 run_worker_in_processes()

Как мы видим - треды позволили получить лучший результат (Macbook Air 2011 - 64 треда).

Чтобы упростить работу с тредами в Python есть модуль concurrent.futures: 
он предоставляет доступ к 2-м высокоуровневым объектам: ThreadPoolExecutor и ProcessPoolExecutor

In [None]:
import concurrent.futures
import requests

STEPS = 100

def download():
    return requests.get("http://127.0.0.1:8000").text

def run_in_executor():
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=64)

    future_to_url = {executor.submit(download): i for i in range(STEPS)}
    for future in concurrent.futures.as_completed(future_to_url):
        i = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%d generated an exception: %s' % (i, exc))
        else:
            print('%d page is %d bytes' % (i, len(data)))

    executor.shutdown()

run_in_executor()
#%timeit -n1 run_in_executor()

Аналогично можно использовать ProcessPoolExecutor, чтобы вынести работу в пул процессов. 

## Сложность многопоточных приложений

In [None]:
counter = 0

def worker(num):
    global counter
    for i in range(num):
        counter += 1

worker(1000000)

print(counter)

In [None]:
import threading

counter = 0

def worker(num):
    global counter
    for i in range(num):
        counter += 1

threads = []
for i in range(10):
    t = threading.Thread(target=worker, args=(100000,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(counter)

In [None]:
import threading

counter = 0
lock = threading.Lock()

def worker(num):
    global counter
    for i in range(num):
        lock.acquire()
        counter += 1
        lock.release()

threads = []
for i in range(10):
    t = threading.Thread(target=worker, args=(100000,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(counter)

In [None]:
# deadlock example
import threading

counter = 0
lock = threading.Lock()

def print_counter():
    lock.acquire()
    print(counter)
    lock.release()

def worker():
    global counter
    lock.acquire()
    print_counter()
    counter += 1
    lock.release()
    
worker()

**Вернемся к примеру с загрузкой URL: можем ли мы сделать еще лучше?**

* загружать странички еще быстрее
* потреблять меньше памяти, не расходуя ее на создание потоков
* не задумываться о синхронизации потоков

# Асинхронное программирование

Мотивация - IO-операции очень медленные, нужно заставить программу выполнять полезную работу во время ожидания ввода-вывода.

Сравнение Latency некоторых операций (https://gist.github.com/jboner/2841832)
<table align="left">
    <tr>
        <td>L1 CPU cache reference</td>
        <td>0.5 ns</td>
        <td></td>
    </tr>
    <tr>
        <td>Main memory reference</td>
        <td>100 ns</td>
        <td>200x L1 cache</td>
    </tr>
    <tr>
        <td>Read 1 MB sequentially from memory</td>
        <td>250,000 ns (250 us)</td>
        <td></td>
    </tr>
    <tr>
        <td>Round trip within same datacenter</td>
        <td>500,000 ns (500 us)</td>
        <td></td>
    </tr>
    <tr>
        <td>Read 1 MB sequentially from SSD*</td>
        <td>1,000,000 ns (1,000 us, 1ms)</td>
        <td></td>
    </tr>
    <tr>
        <td>Read 1 MB sequentially from disk</td>
        <td>20,000,000 ns (20,000 us, 20 ms)</td>
        <td>80x memory, 20X SSD</td>
    </tr>
    <tr>
        <td>Send packet CA->Netherlands->CA</td>
        <td>150,000,000 ns (150,000 us, 150 ms)</td>
        <td></td>
    </tr>    
</table>

Вернемся к упрощенному варианту нашей программы, загружавшей URL в одном потоке синхронно.

In [None]:
import time

def request(i):
    print("Sending request %d" % i)
    time.sleep(1)
    print("Got response from request %d" % i)
    print()

for i in range(5):
    request(i)

На запрос тратится 1 секунда, и мы ждем 5 секунд на 5 запросов - а ведь могли бы отправить их друг за другом и через секунду получить результаты для всех и обработать.

Подход с callback-ами:

In [None]:
import time

def request(i):
    print("Sending request %d" % i)

    def on_data(data):
        print("Got response from request %d" % i)

    return on_data

callbacks = []
        
for i in range(5):
    cb = request(i)
    callbacks.append(cb)

time.sleep(1)

for cb in callbacks:
    cb("data")

## Генераторы

Это функция, которая генерирует последовательность значений используя ключевое слово **yield**

Самый простой пример:

In [None]:
def simple_gen():
    yield 1
    yield 2

gen = simple_gen()
print(next(gen))
print(next(gen))
print(next(gen))

In [None]:
gen = simple_gen()
for i in gen:
    print(i)

Первый плюс: **получить значения, не загружая все элементы в память**. Яркий пример - range.

Чуть посложнее (с состоянием):

In [None]:
def fib():
    a, b = 0, 1
    while True:
        yield a
        a, b = b, a + b

gen = fib()
for i in range(6):
    print(next(gen))

Второй плюс: **получить значения сразу после того как они были вычислены**

Корутины на основе генераторов:

In [None]:
def coro():
    next_value = yield "Hello"
    yield next_value

c = coro()
print(next(c))
print(c.send("World"))

Можно работать с бесконечным потоком данных. Можно обмениваться результатами между отдельными генераторами по мере готовности - то есть иметь дело с несколькими параллельными задачами. При этом не обязательно эти задачи зависят друг от друга.

Для более глубокого понимания и изучения других особенностей - http://www.dabeaz.com/finalgenerator/

In [None]:
import time

def request(i):
    print("Sending request %d" % i)
    data = yield
    print("Got response from request %d" % i)

generators = []

for i in range(5):
    gen = request(i)
    generators.append(gen)
    next(gen)

time.sleep(1)

for gen in generators:
    try:
        gen.send("data")
    except StopIteration:
        pass


**В контексте лекции важно понять, что выполнение функции-генератора в Python можно приостановить, дождаться нужных данных, а затем продолжить выполнение с места прерывания. При этом сохраняется локальный контекст выполнения и пока мы ждем данных интерпретатор может заниматься другой полезной работой**

## Asyncio

Асинхронное программирование с использованием библиотеки asyncio строится вокруг понятия Event Loop - "цикл событий". Event loop является основным координатором в асинхронных программах на Python. Он отвечает за:

* шедулинг корутин и коллбеков
* регистрацию отложенных вызовов
* регистрацию таймеров

Это позволяет писать программы так, что в момент блокирующих IO операций контекст выполнения будет переключаться на другие задачи, ждущие выполнения.

In [None]:
import asyncio

loop = asyncio.get_event_loop()
loop.run_forever()

In [None]:
import asyncio

def cb():
    print("callback called")
    loop.stop()

loop = asyncio.get_event_loop()
loop.call_later(delay=3, callback=cb)
print("start event loop")
loop.run_forever()

В Python 3.4 вызов результата корутины выполнялся с помощью конструкции **yield from**, а функции-корутины помечались декоратором **@asyncio.coroutine**

In [12]:
import asyncio

@asyncio.coroutine
def return_after_delay():
    yield from asyncio.sleep(3)
    print("return called")

loop = asyncio.get_event_loop()
print("start event loop")
loop.run_until_complete(return_after_delay())

start event loop
return called


В версии 3.5 появились специальные ключевые слова, позволяющие программировать в асинхронном стиле: **async** и **await**

**async** - ключевое слово, позволяющее обозначить функцию как асинхронную (корутина, coroutine). Такая функция может прервать свое выполнение в определенной точке (на блокирующей операции), а затем, дождавшись результата этой операции, продолжить свое выполнение.

**await** позволяет запустить такую функцию и дождаться результата.

In [14]:
import asyncio

async def return_after_delay():
    await asyncio.sleep(3)
    print("return called")

loop = asyncio.get_event_loop()
print("start event loop")
loop.run_until_complete(return_after_delay())

start event loop
return called


**Чтобы программа стала работать асинхронно нужно использовать примитивы, которые есть в библиотеке asyncio:**

In [None]:
import asyncio

async def get_data():
    await asyncio.sleep(1)

async def request(i):
    print("Sending request %d" % i)
    data = await get_data()
    print("Got response from request %d" % i)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*[request(i) for i in range(5)]))

Исключения при работе с корутинами работают точно так же как и в синхронном коде:

In [15]:
import asyncio

async def get_data():
    await asyncio.sleep(1)
    raise ValueError

async def request(i):
    print("Sending request %d" % i)
    try:
        data = await get_data()
    except ValueError:
        print("Error in request %d" % i)
    else:
        print("Got response from request %d" % i)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*[request(i) for i in range(5)]))

Sending request 3
Sending request 2
Sending request 0
Sending request 4
Sending request 1
Error in request 3
Error in request 2
Error in request 0
Error in request 4
Error in request 1


[None, None, None, None, None]

Примеры других реализаций Event Loop'ов:

* Tornado IOLoop (https://github.com/tornadoweb/tornado)
* Twisted reactor (https://twistedmatrix.com/trac/)
* pyuv (https://github.com/saghul/pyuv)
* PyGame (http://pygame.org/hifi.html)
* ...

Мы еще вернемся к asyncio в лекции про интернет и клиент-серверные приложения. В том числе подробно посмотрим на сетевые операции - неблокирующее чтение и запись в сокеты.

Список библиотек, написанных поверх asyncio - https://github.com/python/asyncio/wiki/ThirdParty

А пока **вернемся к нашему примеру** и перепишем его, используя асинхронный подход и asyncio в частности.

In [22]:
import aiohttp
import asyncio

STEPS = 100

async def download():
    async with aiohttp.get("http://127.0.0.1:8000", loop=loop) as response:
        return await response.text()

async def worker(steps, loop):
    await asyncio.gather(*[download() for x in range(steps)])

loop = asyncio.get_event_loop()
%timeit -n1 loop.run_until_complete(worker(STEPS, loop))

1 loop, best of 3: 254 ms per loop


![title](img/g5.png)

# Завершающий пример (asyncio + multiprocessing)

In [None]:
# asyncio + multiprocessing
import aiohttp
import asyncio
import multiprocessing

NUM_PROCESSES = 2

STEPS = 100

async def download(loop):
    async with aiohttp.get("http://127.0.0.1:8000", loop=loop) as response:
        return await response.text()

async def worker(steps, loop):
    await asyncio.gather(*[download(loop) for x in range(steps)], loop=loop)

def run(steps):
    loop = asyncio.new_event_loop()
    loop.run_until_complete(worker(steps, loop))

def run_in_processes():    
    processes = []

    for i in range(NUM_PROCESSES):
        p = multiprocessing.Process(target=run, args=(STEPS//NUM_PROCESSES,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

%timeit -n1 run_in_processes()

# Subprocess. Если останется время...

In [14]:
import subprocess
import os

result = subprocess.run(["ls", "-l", os.getcwd()], stdout=subprocess.PIPE)
print(result.stdout)

b'total 20104\n-rw-r--r--  1 fz  staff        0 Sep 24 12:53 README.md\n-rw-r--r--  1 fz  staff      602 Oct 16 23:31 debugging.py\ndrwxr-xr-x  8 fz  staff      272 Oct 16 10:46 img\n-rw-r--r--  1 fz  staff    51100 Oct 17 09:50 notebook.ipynb\n-rwxr-xr-x  1 fz  staff  9243740 Oct 15 15:00 server\n-rw-r--r--  1 fz  staff     1161 Oct 15 15:00 server.go\n-rw-------@ 1 fz  staff   983707 Oct 16 21:08 slides.pdf\n'


In [15]:
# используя shell
result = subprocess.run(
    "ls -l " + os.getcwd() + "|grep debug",
    stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
print(result.stdout)

b'-rw-r--r--  1 fz  staff      602 Oct 16 23:31 debugging.py\n'
