<a href="https://colab.research.google.com/github/julmiha25-sys/Python/blob/main/4%D0%9A%D0%B5%D0%B9%D1%811_ETL_csv_%D1%84%D0%B0%D0%B9%D0%BB_%D0%BF%D1%80%D0%BE%D0%B4%D0%B0%D0%B6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [85]:
import os,csv, datetime
class Extraction:
  @staticmethod
  def from_csv(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
      reader=csv.reader(f, delimiter=',')
      next(reader) # Заголовок
      rows=[row for row in reader] # Список строк
    if not rows:
        print("Файл пуст")
        return []
    sales_objects=[] # Создание списка объектов из строк
    for row in rows:
      if len(row)<4:
        print(f"Пропущена строка с недостаточными данными: {row}")
        continue
      try:
        sale_obj=Sale(id=row[0],date=datetime.datetime.strptime(row[1],'%Y-%m-%d').date(),amount=float(row[2]), product=row[3]) # Создание объекта Sale из строки
        sales_objects.append(sale_obj) # Добавление данных об объектах в список объектов
      except (ValueError, IndexError) as e:
        print(f"Ошибка в строке {row}: {e}")
        continue  # Пропускаем проблемные строки
    return sales_objects
class Sale:
  def __init__(self,id,date,amount,product):
    self.__id=id
    if isinstance(date, str):
      self.__date=datetime.datetime.strptime(date, '%Y-%m-%d').date()
    elif isinstance(date, datetime.datetime):
      self.__date=date.date()  # если пришел datetime
    elif isinstance(date, datetime.date):
      self.__date=date  # если уже date
    else:
      self.__date=date
    self.__amount=amount
    self.__product=product
  def get_id(self): # Создание геттера - получает значение атрибута
    return self.__id
  def set_id(self, value):  # Создание сеттера - устанавливает значение атрибута
    self.__id=value
  def get_date(self):
    return self.__date.strftime('%Y-%m-%d') # Преобразование строки в дату
  def set_date(self, value):
    self.__date=value
  def get_amount(self):
    return self.__amount
  def set_amount(self, value):
    self.__amount=value
  def get_product(self):
    return self.__product
  def set_product(self, value):
    self.__product=value
  def __str__(self): # Преобразование объекта в строку
    date_str=self.__date.strftime('%Y-%m-%d')
    return f"Sale(id={self.__id}, date='{date_str}', amount={self.__amount}, product='{self.__product}')"
  def __repr__(self):  # Возвращение строкового представления объекта
    return self.__str__()
class Transformation:
  @staticmethod
  def filter_by_date(sales_list, start_date=None, end_date=None):
    if isinstance(start_date, str):
      start_date=datetime.datetime.strptime(start_date, '%Y-%m-%d')
    if isinstance(end_date, str):
      end_date=datetime.datetime.strptime(end_date, '%Y-%m-%d')
    filtered_sales = []
    for sale in sales_list:
      date_ok=True
      sale_date_str=sale.get_date()  # строка '2023-01-15'
      sale_date=datetime.datetime.strptime(sale_date_str, '%Y-%m-%d')
      if start_date is not None and sale_date < start_date:
        date_ok=False
      if end_date is not None and sale_date > end_date:
        date_ok=False
      if date_ok:
        filtered_sales.append(sale)
    return filtered_sales
  @staticmethod
  def filter_by_amount(sales_list, min_amount=None,max_amount=None):
    filtered_sales=[] # Список продаж с отфильтрованными данными
    for sale in sales_list:
      amount_ok=True
      amount=sale.get_amount()
      if min_amount is not None and amount < min_amount:
        amount_ok = False
      if max_amount is not None and amount > max_amount:
        amount_ok = False
      if amount_ok:
        filtered_sales.append(sale)
    return filtered_sales
class Loading:
  def to_csv(sales_data, file_path):
    with open(file_path, 'w', encoding='utf-8', newline='') as f:
      writer=csv.writer(f)
      writer.writerow(['id', 'date', 'amount', 'product']) # Заголовок
      for sale in sales_data: # Запись строк с данными
        writer.writerow([sale.get_id(),sale.get_date(),sale.get_amount(),sale.get_product()])
class Analysis:
  @staticmethod
  def calculate_total_sales(sales_list):
    total_sales=0
    for sale in sales_list:
      amount=sale.get_amount()
      total_sales+=amount  # Сумма всех покупок
    return round(total_sales,2)
  @staticmethod
  def calculate_average_sales(sales_list):
    return round((total_sales/len(sales_list)),2)  # Средняя сумма покупок


In [103]:
sales_data = Extraction.from_csv('sales_data.csv')
print(sales_data[:5])
filtered_sales=Transformation.filter_by_date(sales_data, '2022-01-01','2022-12-01')
print(filtered_sales[:5])
filtered_sales2 = Transformation.filter_by_amount(filtered_sales1, 100, 1000)
print(filtered_sales[:5])
res=Analysis.calculate_total_sales(sales_data)
print(res)
res2=Analysis.calculate_average_sales(sales_data)
print(res2)
Loading.to_csv(filtered_sales, 'filtered_sales.csv')

[Sale(id=1, date='2023-04-25', amount=35.21, product='Product 6'), Sale(id=2, date='2022-08-31', amount=359.19, product='Product 9'), Sale(id=3, date='2023-01-22', amount=117.53, product='Product 5'), Sale(id=4, date='2022-12-15', amount=366.68, product='Product 4'), Sale(id=5, date='2023-03-06', amount=628.65, product='Product 2')]
[Sale(id=2, date='2022-08-31', amount=359.19, product='Product 9'), Sale(id=6, date='2022-08-04', amount=843.69, product='Product 3'), Sale(id=7, date='2022-10-13', amount=190.38, product='Product 7'), Sale(id=13, date='2022-09-02', amount=590.5, product='Product 3'), Sale(id=14, date='2022-07-12', amount=51.17, product='Product 1')]
[Sale(id=2, date='2022-08-31', amount=359.19, product='Product 9'), Sale(id=6, date='2022-08-04', amount=843.69, product='Product 3'), Sale(id=7, date='2022-10-13', amount=190.38, product='Product 7'), Sale(id=13, date='2022-09-02', amount=590.5, product='Product 3'), Sale(id=14, date='2022-07-12', amount=51.17, product='Produc