-
Notifications
You must be signed in to change notification settings - Fork 12.5k
Open
Description
Улучшенная версия ii GRUT с расширенными возможностями
Ниже — доработанный вариант утилиты с новыми функциями и улучшенной архитектурой.
Обновлённый исходный код (Python 3.10+)
#!/usr/bin/env python3
"""
ii GRUT — расширенная утилита для анализа табличных данных.
Основные функции:
- загрузка/сохранение CSV, JSON, Excel;
- расширенная статистика (включая min/max/sum);
- фильтрация с операторами сравнения;
- группировка и агрегация;
- логирование операций.
"""
import argparse
import csv
import json
import sys
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional, Union
from statistics import mean, median, stdev
import pandas as pd
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(levelname)s | %(message)s',
handlers=[
logging.FileHandler('ii_grut.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class GRUTProcessor:
"""Основной класс для обработки данных с поддержкой разных форматов."""
def __init__(self):
self.data: pd.DataFrame = pd.DataFrame()
self.source_format: str = ""
def load_data(self, filepath: str) -> bool:
"""Загрузить данные из файла (CSV, JSON, Excel)."""
path = Path(filepath)
try:
if path.suffix.lower() == '.csv':
self.data = pd.read_csv(path, encoding='utf-8')
self.source_format = 'csv'
elif path.suffix.lower() in ['.json', '.jsonl']:
self.data = pd.read_json(path)
self.source_format = 'json'
elif path.suffix.lower() in ['.xls', '.xlsx', '.xlsm']:
self.data = pd.read_excel(path)
self.source_format = 'excel'
else:
logger.error(f"Неподдерживаемый формат файла: {path.suffix}")
return False
logger.info(f"Загружено {len(self.data)} строк из {filepath}")
return True
except Exception as e:
logger.error(f"Ошибка загрузки файла {filepath}: {e}")
return False
def describe_extended(self) -> Dict[str, Dict[str, float]]:
"""Расширенная статистика для числовых колонок."""
stats = {}
for col in self.data.columns:
if pd.api.types.is_numeric_dtype(self.data[col]):
numeric_data = self.data[col].dropna()
if len(numeric_data) > 0:
stats[col] = {
'count': len(numeric_data),
'mean': mean(numeric_data),
'median': median(numeric_data),
'stdev': stdev(numeric_data) if len(numeric_data) > 1 else 0.0,
'min': min(numeric_data),
'max': max(numeric_data),
'sum': sum(numeric_data)
}
return stats
def filter_data(self, column: str, operator: str, value: str) -> pd.DataFrame:
"""Отфильтровать данные с разными операторами."""
try:
val = float(value) if value.replace('.', '').isdigit() else value
if operator == '==':
return self.data[self.data[column] == val]
elif operator == '>':
return self.data[self.data[column] > val]
elif operator == '<':
return self.data[self.data[column] < val]
elif operator == '>=':
return self.data[self.data[column] >= val]
elif operator == '<=':
return self.data[self.data[column] <= val]
elif operator == '!=':
return self.data[self.data[column] != val]
else:
logger.warning(f"Неизвестный оператор: {operator}")
return self.data
except KeyError:
logger.error(f"Колонка {column} не найдена")
return self.data
except Exception as e:
logger.error(f"Ошибка фильтрации: {e}")
return self.data
def group_and_aggregate(self, group_col: str, agg_col: str,
agg_func: str) -> pd.DataFrame:
"""Группировка и агрегация данных."""
try:
agg_map = {'sum': 'sum', 'mean': 'mean', 'count': 'count',
'min': 'min', 'max': 'max'}
if agg_func not in agg_map:
logger.error(f"Функция агрегации не поддерживается: {agg_func}")
return pd.DataFrame()
result = self.data.groupby(group_col)[agg_col].agg(agg_map[agg_func]).reset_index()
logger.info(f"Сгруппировано по {group_col}, агрегация {agg_func} для {agg_col}")
return result
except Exception as e:
logger.error(f"Ошибка группировки: {e}")
return pd.DataFrame()
def save_data(self, filepath: str, data: Optional[pd.DataFrame] = None):
"""Сохранить данные в файл."""
df = data if data is not None else self.data
path = Path(filepath)
try:
if path.suffix.lower() == '.csv':
df.to_csv(path, index=False, encoding='utf-8')
elif path.suffix.lower() == '.json':
df.to_json(path, orient='records', force_ascii=False)
elif path.suffix.lower() in ['.xls', '.xlsx']:
df.to_excel(path, index=False)
else:
logger.error(f"Неподдерживаемый формат для сохранения: {path.suffix}")
return
logger.info(f"Данные сохранены в {filepath}")
except Exception as e:
logger.error(f"Ошибка сохранения файла {filepath}: {e}")
def main():
"""Точка входа в приложение."""
parser = argparse.ArgumentParser(description="ii GRUT — расширенный анализ данных")
parser.add_argument("input", help="Путь к входному файлу (CSV/JSON/Excel)")
parser.add_argument("-o", "--output", help="Путь к выходному файлу")
# Фильтрация
parser.add_argument("--filter", nargs=3, metavar=('COLUMN', 'OPERATOR', 'VALUE'),
help="Фильтрация: колонка оператор значение (например, Возраст > 18)")
# Группировка
parser.add_argument("--groupby", nargs=2, metavar=('GROUP_COL', 'AGG_COL'),
help="Группировка: колонка_группы колонка_агрегации")
parser.add_argument("--agg", choices=['sum', 'mean', 'count', 'min', 'max'],
help="Функция агрегации")
# Статистика
parser.add_argument("--stats", action="store_true", help="Вывести расширенную статистику")
parser.add_argument("--basic-stats", action="store_true",
help="Только базовая статистика (mean/median/stdev)")
args = parser.parse_args()
processor = GRUTProcessor()
# Загрузка данных
if not processor.load_data(args.input):
sys.exit(1)
# Обработка фильтрации
filtered_data = processor.data
if args.filter:
col, op, val = args.filter
filtered_data = processor.filter_data(col, op, val)
logger.info(f"Отфильтровано: {len(filtered_data)} строк")
# Группировка и агрегация
result_data = filtered_data
if args.groupby and args.agg:
group_col, agg_col = args.groupby
result_data = processor.group_and_aggregate(group_col, agg_col, args.agg)
# Вывод статистики
if args.Metadata
Metadata
Assignees
Labels
No labels