# Простая асинхронная функция, которая отображает текущее время с интервалом 1 секунда

In [15]:
import asyncio
import datetime
from IPython.display import clear_output

async def display_time():
    try:
        while True:
            current_time = datetime.datetime.now().strftime("%H:%M:%S")
            clear_output(wait=True)
            print(f"Текущее время: {current_time}")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("\nЗавершение работы часов.")

async def run_clock():
    task = asyncio.create_task(display_time())
    try:
        await asyncio.sleep(10)  # Запускаем часы на 10 секунд (не бесконечно, тк Jupyter Notebook, если убрать try-except, то будет бесконечно)
    finally:
        task.cancel()
        await task

# Для запуска в Jcupyter Notebook
await run_clock()


Завершение работы часов.


# Улучшенная предыдущая программа

In [10]:
import asyncio
import datetime
from termcolor import colored
from pynput import keyboard
import sys

# Флаг для управления выполнением программы
running = True

def on_press(key):
    global running
    if key == keyboard.Key.esc:
        running = False
        return False  # Остановить прослушивание

# Запуск прослушивателя клавиатуры в отдельном потоке
listener = keyboard.Listener(on_press=on_press)
listener.start()

async def display_time():
    try:
        while running:
            now = datetime.datetime.now()
            date_str = colored(now.strftime("%Y-%m-%d"), "cyan")
            time_str = colored(now.strftime("%H:%M:%S"), "yellow")
            sys.stdout.write(f"\r{date_str} {time_str}")
            sys.stdout.flush()
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("\nЗавершение работы часов.")
    finally:
        print("\nПрограмма завершена.")

async def main():
    await display_time()

# Для запуска в Jupyter Notebook
await main()

[36m2024-10-27[0m [33m13:18:45[0m
Завершение работы часов.

Программа завершена.


# Программа, которая использует asyncio.gather для выполнения двух асинхронных задач параллельно и затем обрабатывает результаты в отдельной функции в цикле.

In [16]:
import asyncio
import random
import string

async def generate_random_numbers(count):
    """Асинхронная функция для генерации случайных чисел"""
    result = []
    for _ in range(count):
        await asyncio.sleep(0.1)  # Имитация асинхронной операции
        result.append(random.randint(1, 100))
    return result

async def generate_random_strings(count):
    """Асинхронная функция для генерации случайных строк"""
    result = []
    for _ in range(count):
        await asyncio.sleep(0.15)  # Имитация асинхронной операции
        random_string = ''.join(random.choices(string.ascii_lowercase, k=5))
        result.append(random_string)
    return result

def process_results(numbers, strings):
    """Функция для обработки результатов"""
    print("Обработка результатов:")
    for num, s in zip(numbers, strings):
        print(f"Число: {num}, Строка: {s}")

async def main():
    # Запускаем обе задачи параллельно
    numbers_task = asyncio.create_task(generate_random_numbers(5))
    strings_task = asyncio.create_task(generate_random_strings(5))
    
    # Ожидаем завершения обеих задач
    numbers, strings = await asyncio.gather(numbers_task, strings_task)
    
    # Обрабатываем результаты
    process_results(numbers, strings)

# Запускаем асинхронную программу
await main()

Обработка результатов:
Число: 23, Строка: lshke
Число: 45, Строка: wddpv
Число: 46, Строка: ypokt
Число: 87, Строка: vrqeb
Число: 86, Строка: jtxwx


# Улучшение предыдущем программы

In [20]:
import asyncio
import aiohttp
import asyncpg
import json
from datetime import datetime

WEB_SERVER_URL = "https://rnacentral.org/api/v1/rna/"
DB_CONNECTION_STRING = "postgres://reader:NWDMCE5xdipIjRrp@hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs"

class CustomJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        elif isinstance(obj, asyncpg.Record):
            return dict(obj)
        return super().default(obj)

async def fetch_web_data(session, rna_id):
    """Асинхронная функция для получения данных с веб-сервера"""
    url = f"{WEB_SERVER_URL}{rna_id}"
    async with session.get(url) as response:
        return await response.json()

async def fetch_db_data(connection, rna_id):
    """Асинхронная функция для получения данных из базы данных"""
    query = "SELECT * FROM rna WHERE upi = $1 LIMIT 1"
    return await connection.fetchrow(query, rna_id)

def process_results(web_data, db_data):
    """Функция для обработки и вывода результатов"""
    print("Данные с веб-сервера:")
    print(json.dumps(web_data, indent=2, cls=CustomJSONEncoder))
    
    print("\nДанные из базы данных:")
    if db_data:
        print(json.dumps(db_data, indent=2, cls=CustomJSONEncoder))
    else:
        print("Данные не найдены")

async def main():
    rna_id = "URS000075D2D3"  # Пример ID РНК
    
    async with aiohttp.ClientSession() as session, \
               asyncpg.create_pool(DB_CONNECTION_STRING) as pool:
        async with pool.acquire() as connection:
            # Запускаем обе задачи параллельно
            web_task = asyncio.create_task(fetch_web_data(session, rna_id))
            db_task = asyncio.create_task(fetch_db_data(connection, rna_id))
            
            # Ожидаем завершения обеих задач
            web_data, db_data = await asyncio.gather(web_task, db_task)
    
    # Обрабатываем результаты
    process_results(web_data, db_data)

# Запускаем асинхронную программу
await main()

Данные с веб-сервера:
{
  "url": "http://rnacentral.org/api/v1/rna/URS000075D2D3",
  "rnacentral_id": "URS000075D2D3",
  "md5": "1687c89295e9b660a8b012b1342dcd3b",
  "sequence": "AGGCGAGGAGAACAGCAGCU",
  "length": 20,
  "xrefs": "http://rnacentral.org/api/v1/rna/URS000075D2D3/xrefs",
  "publications": "http://rnacentral.org/api/v1/rna/URS000075D2D3/publications",
  "is_active": true,
  "description": "miRNA from 1 species",
  "rna_type": "miRNA",
  "count_distinct_organisms": 1,
  "distinct_databases": [
    "miRBase",
    "RefSeq"
  ]
}

Данные из базы данных:
{
  "id": 7721683,
  "upi": "URS000075D2D3",
  "timestamp": "2014-07-25T20:48:57",
  "userstamp": "RNACEN",
  "crc64": "3669D54DDB04B68C",
  "len": 20,
  "seq_short": "AGGCGAGGAGAACAGCAGCT",
  "seq_long": null,
  "md5": "1687c89295e9b660a8b012b1342dcd3b"
}


# Асинхронный веб скрепйпер


In [21]:
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time

class AsyncWebScraper:
    def __init__(self, urls):
        self.urls = urls
        self.results = []

    async def fetch_url(self, session, url):
        try:
            async with session.get(url) as response:
                if response.status == 200:
                    html = await response.text()
                    return url, html
                else:
                    return url, f"Error: HTTP {response.status}"
        except Exception as e:
            return url, f"Error: {str(e)}"

    def parse_html(self, html):
        soup = BeautifulSoup(html, 'html.parser')
        title = soup.title.string if soup.title else "No title found"
        return title

    async def scrape_url(self, session, url):
        url, html = await self.fetch_url(session, url)
        if not html.startswith("Error"):
            title = self.parse_html(html)
            return url, title
        else:
            return url, html

    async def scrape_all(self):
        async with aiohttp.ClientSession() as session:
            tasks = [self.scrape_url(session, url) for url in self.urls]
            self.results = await asyncio.gather(*tasks)

    def print_results(self):
        for url, title in self.results:
            print(f"URL: {url}")
            print(f"Title: {title}")
            print("-" * 50)

async def main():
    urls = [
        "https://www.youtube.com",
        "https://www.herzen.spb.ru/",
        "https://moodle.herzen.spb.ru",
    ]

    start_time = time.time()
    
    scraper = AsyncWebScraper(urls)
    await scraper.scrape_all()
    scraper.print_results()
    
    end_time = time.time()
    print(f"Total time taken: {end_time - start_time:.2f} seconds")

# Запускаем асинхронную программу
await main()

URL: https://www.youtube.com
Title: YouTube
--------------------------------------------------
URL: https://www.herzen.spb.ru/
Title: Российский государственный педагогический университет им. А. И. Герцена
--------------------------------------------------
URL: https://moodle.herzen.spb.ru
Title: Центр дистанционной поддержки обучения РГПУ им. А. И. Герцена
--------------------------------------------------
Total time taken: 0.74 seconds


# Улучшенный предыдущий код

In [24]:
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time

class AsyncWebScraper:
    def __init__(self, urls):
        self.urls = urls
        self.results = []
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()

    async def fetch_url(self, url):
        try:
            async with self.session.get(url) as response:
                if response.status == 200:
                    html = await response.text()
                    return url, html
                else:
                    return url, f"Error: HTTP {response.status}"
        except Exception as e:
            return url, f"Error: {str(e)}"

    def parse_html(self, html):
        soup = BeautifulSoup(html, 'html.parser')
        title = soup.title.string if soup.title else "No title found"
        return title

    async def scrape_url(self, url):
        url, html = await self.fetch_url(url)
        if not html.startswith("Error"):
            title = self.parse_html(html)
            return url, title
        else:
            return url, html

    async def scrape_all(self):
        tasks = [self.scrape_url(url) for url in self.urls]
        self.results = await asyncio.gather(*tasks)

    def print_results(self):
        for url, title in self.results:
            print(f"URL: {url}")
            print(f"Title: {title}")
            print("-" * 50)

async def main():
    urls = [
        "https://www.youtube.com",
        "https://www.herzen.spb.ru/",
        "https://moodle.herzen.spb.ru",
    ]

    start_time = time.time()
    
    async with AsyncWebScraper(urls) as scraper:
        await scraper.scrape_all()
        scraper.print_results()
    
    end_time = time.time()
    print(f"Total time taken: {end_time - start_time:.2f} seconds")


await main()

URL: https://www.youtube.com
Title: YouTube
--------------------------------------------------
URL: https://www.herzen.spb.ru/
Title: Российский государственный педагогический университет им. А. И. Герцена
--------------------------------------------------
URL: https://moodle.herzen.spb.ru
Title: Центр дистанционной поддержки обучения РГПУ им. А. И. Герцена
--------------------------------------------------
Total time taken: 0.75 seconds


![image.png](attachment:image.png)