In [15]:
import pandas as pd
import nest_asyncio
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import select, text
from sqlalchemy.orm import sessionmaker

# Применение nest_asyncio для работы с уже запущенным event loop
nest_asyncio.apply()

DATABASE_URL = "postgresql+asyncpg://amogus:18102005@localhost:5432/postgres"

# Создам асинхронный движок
engine = create_async_engine(DATABASE_URL, echo=True)

# Сессия
async_session = sessionmaker(
    bind=engine,
    class_=AsyncSession,
    expire_on_commit=False
)


async def load_data(query):
    """Асинхронная функция для выполнения SQL-запроса и загрузки данных в DataFrame"""
    async with async_session() as session:
        async with session.begin():
            result = await session.execute(query)
            # Конвертируем результат в DataFrame
            df = pd.DataFrame(result.fetchall(), columns=result.keys())
            return df


# Главная функция для загрузки данных из таблиц
async def main():
    query = select("*").select_from(text('users'))

    # Загрузим данных, пока только users для теста
    users_df = await load_data(query)
    print(users_df.head())


# Запуск
await main()

2024-11-13 20:54:12,084 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2024-11-13 20:54:12,086 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-11-13 20:54:12,090 INFO sqlalchemy.engine.Engine select current_schema()
2024-11-13 20:54:12,091 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-11-13 20:54:12,094 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2024-11-13 20:54:12,096 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-11-13 20:54:12,099 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-11-13 20:54:12,101 INFO sqlalchemy.engine.Engine SELECT * 
FROM users
2024-11-13 20:54:12,102 INFO sqlalchemy.engine.Engine [generated in 0.00119s] ()
2024-11-13 20:54:12,164 INFO sqlalchemy.engine.Engine COMMIT
      id      last_name  user_role_id gender  coins  city_id  \
0  10001        Markova             5   None    880    784.0   
1  10002      Коваленко             5   None    160    508.0   
2  10003  Драгомирецкий             5   None    200      NaN   
3  10004

In [3]:
import pandas as pd
import nest_asyncio
import asyncio
import logging
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import select, text
from sqlalchemy.orm import sessionmaker
from datetime import datetime, timedelta

# Отключение уведомлений
logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING)

# Применение nest_asyncio для работы в юпитере
nest_asyncio.apply()

DATABASE_URL = "postgresql+asyncpg://amogus:18102005@localhost:5432/postgres"

# Асинхронный движок и сессия
engine = create_async_engine(DATABASE_URL, echo=False)
async_session = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)

# Задание параметров даты старта
start_date = datetime.strptime('2024-11-13', '%Y-%m-%d').date()

async def load_data(query):
    async with async_session() as session:
        async with session.begin():
            result = await session.execute(query)
            df = pd.DataFrame(result.fetchall(), columns=result.keys())
            return df

async def preprocess_data(df, table_name):
    print(f"\nПредобработка таблицы {table_name}")
    
    # Проверка на дубликаты
    duplicates_count = df.duplicated().sum()
    print(f"Количество дубликатов: {duplicates_count}")
    if duplicates_count > 0:
        df = df.drop_duplicates()

    # Проверка на пропуски
    missing_values = df.isnull().sum()
    print(f"Пропуски:\n{missing_values[missing_values > 0]}")

    # Преобразование типов данных при необходимости
    if 'created_at' in df.columns:
        df['created_at'] = pd.to_datetime(df['created_at'], errors='coerce')
    if 'updated_at' in df.columns:
        df['updated_at'] = pd.to_datetime(df['updated_at'], errors='coerce')

    # Обработка аномалий
    for column in df.select_dtypes(include=['float64', 'int64']).columns:
        # Аномалия: отрицательные значения зануляем
        df[column] = df[column].apply(lambda x: x if x >= 0 else 0)
        # Диапазон значений
        print(f"{column}: min = {df[column].min()}, max = {df[column].max()}")

    return df

def add_wave_column(df, start_date):
    if 'created_at' in df.columns:
        def calculate_wave(created_at, start_date):
            if pd.isnull(created_at):
                return None
            # Приведение created_at к дате
            created_at_date = created_at.date()
            days_difference = (created_at_date - start_date).days

            if days_difference <= 0:
                return 0
            elif days_difference <= 7:
                return 1
            elif days_difference <= 14:
                return 2
            elif days_difference <= 21:
                return 3
            elif days_difference <= 28:
                return 4
            else:
                return 5

        df['wave'] = df['created_at'].apply(lambda x: calculate_wave(x, start_date))
        print(df[['created_at', 'wave']].head())

    return df

async def main():
    table_queries = ['users', 'user_roles', 'subjects', 'lessons', 
                     'homework_lessons', 'homework_done', 'homework', 
                     'courses', 'course_types', 'cities']
    
    for table in table_queries:
        query = select("*").select_from(text(table))
        df = await load_data(query)
        
        # Предобработка данных
        df = await preprocess_data(df, table)
        
        # Волны
        if table == 'users':
            df = add_wave_column(df, start_date)

# Запуск
await main()



Предобработка таблицы users
Количество дубликатов: 0
Пропуски:
gender     4906
city_id     972
dtype: int64
id: min = 10001, max = 33999
user_role_id: min = 5, max = 12
coins: min = 0, max = 24930
city_id: min = 0.0, max = 1225.0
           created_at  wave
0 2024-07-25 17:51:42     0
1 2024-07-25 17:55:38     0
2 2024-07-25 18:30:01     0
3 2024-07-25 18:31:31     0
4 2024-07-25 18:41:44     0

Предобработка таблицы user_roles
Количество дубликатов: 0
Пропуски:
Series([], dtype: int64)
id: min = 1, max = 13

Предобработка таблицы subjects
Количество дубликатов: 0
Пропуски:
Series([], dtype: int64)
id: min = 1, max = 15

Предобработка таблицы lessons
Количество дубликатов: 0
Пропуски:
Series([], dtype: int64)
id: min = 6, max = 1274
course_id: min = 16, max = 104

Предобработка таблицы homework_lessons
Количество дубликатов: 0
Пропуски:
Series([], dtype: int64)
id: min = 5, max = 1012
homework_id: min = 40, max = 843
lesson_id: min = 6, max = 1100

Предобработка таблицы homework_done
