### Домашнее задание 1. Создание и нормализация базы данных

#### 1. Данные по клиентам и транзакциям

Посмотрим что из себя представляют данные из csv таблиц.

In [1]:
import pandas as pd

customer_pd = pd.read_csv("raw_data/customer.csv")
transaction_pd = pd.read_csv("raw_data/transaction.csv")

In [2]:
# Покупатели
customer_pd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4000 entries, 0 to 3999
Data columns (total 15 columns):
 #   Column                 Non-Null Count  Dtype 
---  ------                 --------------  ----- 
 0   customer_id            4000 non-null   int64 
 1   first_name             4000 non-null   object
 2   last_name              3875 non-null   object
 3   gender                 4000 non-null   object
 4   DOB                    3913 non-null   object
 5   job_title              3494 non-null   object
 6   job_industry_category  3344 non-null   object
 7   wealth_segment         4000 non-null   object
 8   deceased_indicator     4000 non-null   object
 9   owns_car               4000 non-null   object
 10  address                4000 non-null   object
 11  postcode               4000 non-null   int64 
 12  state                  4000 non-null   object
 13  country                4000 non-null   object
 14  property_valuation     4000 non-null   int64 
dtypes: int64(3), object(1

In [3]:
# Транзакции
transaction_pd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20000 entries, 0 to 19999
Data columns (total 12 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   transaction_id    20000 non-null  int64 
 1   product_id        20000 non-null  int64 
 2   customer_id       20000 non-null  int64 
 3   transaction_date  20000 non-null  object
 4   online_order      19640 non-null  object
 5   order_status      20000 non-null  object
 6   brand             19803 non-null  object
 7   product_line      19803 non-null  object
 8   product_class     19803 non-null  object
 9   product_size      19803 non-null  object
 10  list_price        20000 non-null  object
 11  standard_cost     19803 non-null  object
dtypes: int64(3), object(9)
memory usage: 1.8+ MB


Представленные данные можно интерпретировать как данные некоторого онлайн-сервиса, продающего товары.

Таблица `customer.csv` содержит персональные данные пользователей: 
- личные данные;
- информацию о профессии;
- информацию об адресе;
- и другие разные данные (наличие машины, оценка собственности и т.д.).

Таблица `transaction.csv` содержит информацию о заказах некоторых продуктов, по всей видимости велосипедов:
- какой продукт был куплен или доставлен;
- какому пользователю;
- информация о заказе;
- информацию о продукте (бренд, линейка, класс, размер, цена).

#### 2. Стуруктура БД

[Схема для базы данных](./assets/dbdiagram_io/db_schema_init.dbml) по исходным `.csv` таблицам:

<div style="text-align: center;">
    <img src="./assets/db_schema_init.svg" width="50%">
</div>


#### 3. Нормализация

##### 1-НФ

Таблица `customers` в колонке _address_ содержит сразу несколько типов данных: номер, название улицы и тип. Поэтому в качестве нормализации можно убрать _address_ и добавить его составляющие в новые колонки: _street\_number_, _street\_name_, _street\_type_.

Таблица `transactions` уже в 1-НФ, все поля атомарны, и скалярны.

##### 2-НФ

Для `customers` 2-НФ выполняется, т.к. таблица уже содержит _customer\_id_ primary key, который является уникальным и все остальные колонки от него зависят.

При `transactions` 2-НФ так же выполняется, она содрежит _transaction\_id_, который является ее primary key.

##### 3-НФ

Таблица `customers` содержит транзитивную зависимость, _postcode_ определяет в каком _state_ и _country_ находится покупатель, ведь это уникальный почтовый индекс. Поэтому их можно вынести в отдельную таблицу: `Table postcodes { potcode_id, state, country }`.

Также, заметим, что _job\_title_, _job\_industry\_category_ не содержат транзитивных зависимостей, и имеют достаточно общие данные, к тому же могут содержать пропуски.

В `transactions` так же есть транзитивная зависимость. Поле _product\_id_ однозначно определяет какой товар учавстовал в транзакции и информацию о нем: _brand_, _product\_line_, _product\_class_, _product\_size и цена. 

При этом, заметим, что есть транзакции, где _product\_id_ одинаковый, но информация о товарах разныя:

```csv
1136,66,1456,12/22/2017,False,Approved,Giant Bicycles,Road,low,small,"590,26","525,33"
1618,66,2871,5/29/2017,False,Approved,Solex,Standard,medium,medium,"1163,89","589,27"
```

Это может означать, что уникальный товар кодируется парой (_product\_id_, _brand_) и у каждого бренда, свои id продукта. Тогда данные о продукте стоит вынести в отдельную таблицу _products_, и сделать двойной primary key по паре (_product\_id_, _brand_).

#### Нормализированная схема

[Схема после нормализации](./assets/db_schema_norm.svg). 

Также дополнительно созда табличку _brands_, что бы ссылаться на нее в _products_ и _transactions_. Для перечислимых множеств завел отдельные _Enum_ типы.

<div style="text-align: center;">
    <img src="./assets/db_schema_norm.svg" width="50%">
</div>

#### 4. Создание таблиц

Для подключения к БД будем использовать _sqlalchemy_ библиотеку.

In [4]:
import pandas as pd
from sqlalchemy import create_engine, text, MetaData, inspect


engine = create_engine("postgresql+psycopg2://myuser:mypassword@localhost:5432/mydb")


def run_sql_script_from_file(file_path, params=None):
    with open(file_path) as file:
        sql_script = file.read()
    with engine.begin() as conn:
        conn.execute(text(sql_script), params)


def drop_all_tables():
    meta = MetaData()
    meta.reflect(bind=engine)
    meta.drop_all(bind=engine)

In [5]:
# Ячейка для сброса всех добавленных таблиц для экспериментов
drop_all_tables()

Создадим все таблицы по [нормализированной схеме бд](sql/create_db_schema_norm.sql).

In [6]:
run_sql_script_from_file("sql/create_db_schema_norm.sql")

print("Created tables:")
inspect(engine).get_table_names(schema="public")

Created tables:


['customers', 'transactions', 'products', 'postcodes']

Теперь выведем для каждой созданной таблицы информацию о ее полях: имя, тип, nullability и значение по умолчанию.

In [7]:
def get_public_table_df(name):
    inspector = inspect(engine)
    info = []
    for column in inspector.get_columns(name, schema="public"):
        info.append({
            "name": column["name"],
            "type": column["type"],
            "nullable": column["nullable"],
            "default": column["default"]
        })
    return pd.DataFrame(info)

In [8]:
get_public_table_df("customers")

Unnamed: 0,name,type,nullable,default
0,customer_id,INTEGER,False,
1,postcode_id,INTEGER,False,
2,first_name,VARCHAR,False,
3,last_name,VARCHAR,True,
4,gender,VARCHAR(7),False,
5,DOB,DATE,True,
6,job_title,VARCHAR,True,
7,job_industry_category,VARCHAR,True,
8,wealth_segment,VARCHAR,False,
9,deceased_indicator,BOOLEAN,False,


In [9]:
get_public_table_df("transactions")

Unnamed: 0,name,type,nullable,default
0,transaction_id,INTEGER,False,
1,product_id,INTEGER,True,
2,product_brand,VARCHAR,True,
3,customer_id,INTEGER,False,
4,transaction_date,DATE,False,
5,online_order,BOOLEAN,True,
6,order_status,VARCHAR,False,


In [10]:
get_public_table_df("postcodes")

Unnamed: 0,name,type,nullable,default
0,postcode_id,INTEGER,False,
1,state,VARCHAR,False,
2,country,VARCHAR,False,


In [11]:
get_public_table_df("products")

Unnamed: 0,name,type,nullable,default
0,product_id,INTEGER,False,
1,product_brand,VARCHAR,False,
2,product_line,VARCHAR,False,
3,product_class,VARCHAR,False,
4,product_size,VARCHAR,False,
5,list_price,"NUMERIC(15, 2)",True,
6,standard_cost,"NUMERIC(15, 2)",True,


Скриншот из GUI клиента Beekeeper studio, полностью не поместилось:

<div style="text-align: center;">
    <img src="./assets/screenshot_tables.png" width="25%">
</div>

#### 5. Загрузка данных в таблицы

Далее выполним приведение данных из таблицы к нужным нам типам и загрузим их в базу данных. Для проверки что данные действительно загрузились будем смотреть на количество данных в таблице использовать `table_records_size` функцию.

In [12]:
def table_records_size(table_name: str) -> int:
    with engine.connect() as conn:
        result = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
        count = result.scalar()
    return count

Загрузка данных для _customers_ и _postcodes_ таблиц.

sql команды с параметризированными insert командами:

- [insert_customer.sql](./sql/insert_customer.sql)
- [insert_postcode.sql](./sql/insert_postcode.sql)

In [13]:
import datetime


def process_customers(customers: pd.DataFrame):
    customers = customers.where(customers.notnull(), None)
    for record in customers.to_dict(orient="records"):
        postcode_entity = {
            "postcode_id": record["postcode"],
            "state": record["state"],
            "country": record["country"],
        }

        street_number, street_name, street_type = get_street_info_from(
            record["address"]
        )

        customer_entity = {
            "customer_id": record["customer_id"],
            "postcode_id": record["postcode"],
            "first_name": record["first_name"],
            "last_name": record["last_name"],
            "DOB": get_DOB_from(record["DOB"]),
            "gender": get_gender_from(record["gender"]),
            "job_title": record["job_title"],
            "job_industry_category": record["job_industry_category"],
            "wealth_segment": record["wealth_segment"],
            "deceased_indicator": get_deceased_indicator_from(
                record["deceased_indicator"]
            ),
            "owns_car": record["owns_car"],
            "street_number": street_number,
            "street_name": street_name,
            "street_type": street_type,
            "property_valuation": record["property_valuation"],
        }

        run_sql_script_from_file("./sql/insert_postcode.sql", params=postcode_entity)
        run_sql_script_from_file("./sql/insert_customer.sql", params=customer_entity)


def get_DOB_from(raw_DOB: str):
    if raw_DOB == None:
        return None
    return datetime.datetime.strptime(raw_DOB, "%Y-%m-%d").date()


def get_gender_from(raw_gender: str):
    if raw_gender.startswith("F"):
        return "Female"
    if raw_gender.startswith("M"):
        return "Male"
    if raw_gender == "N":
        return None
    if raw_gender == "U" or raw_gender == "Y":
        return "Unknown"
    else:
        raise Exception(f"Can't process raw_gender: {raw_gender}")


def get_deceased_indicator_from(raw_deceased_indicator: str):
    return raw_deceased_indicator != "N"


def get_owns_car_from(raw_owns_car: str):
    return raw_owns_car == "Yes"


def get_street_info_from(raw_address: str):
    parts = raw_address.split()
    return int(parts[0]), " ".join(parts[1:-1]), parts[-1]


process_customers(customer_pd)

print("Records size:")
customers_size = table_records_size("customers")
postcodes_size = table_records_size("postcodes")
print(f"customers_size: {customers_size}")
print(f"postcodes_size: {postcodes_size}")

Records size:
customers_size: 4000
postcodes_size: 874


Загрузка данных для _products_ и _transactions_ таблиц.

insert команды:

- [insert_product.sql](./sql/insert_product.sql)
- [insert_transaction.sql](./sql/insert_transaction.sql)

In [14]:
def process_transactions(transactions):
    transactions = transactions.where(transactions.notnull(), None)
    for record in transactions.to_dict(orient="records"):
        product_entity = {
            "product_id": record["product_id"],
            "product_brand": record["brand"],
            "product_line": record["product_line"],
            "product_class": record["product_class"],
            "product_size": record["product_size"],
            "list_price": get_price_from(record["list_price"]),
            "standard_cost": get_price_from(record["standard_cost"]),
        }

        transaction_entity = {
            "transaction_id": record["transaction_id"],
            "product_id": record["product_id"],
            "product_brand": record["brand"],
            "customer_id": record["customer_id"],
            "transaction_date": get_transaction_date_from(record["transaction_date"]),
            "online_order": record["online_order"],
            "order_status": record["order_status"],
        }

        if (
            product_entity["product_id"] is not None
            and product_entity["product_brand"] is not None
        ):
            run_sql_script_from_file("./sql/insert_product.sql", params=product_entity)

        customer_id = transaction_entity["customer_id"]
        if check_customer_existance(customer_id):
            run_sql_script_from_file(
                "./sql/insert_transaction.sql", 
                params=transaction_entity
            )
        else:
            print(f"No customer in DB with customer_id: {customer_id}, invalid transaction")


def get_transaction_date_from(raw_date: str):
    if raw_date == None:
        return None
    return datetime.datetime.strptime(raw_date, "%m/%d/%Y").date()


def get_price_from(raw_price: str):
    try:
        return float(raw_price.replace(",", "."))
    except:
        return None
    

def check_customer_existance(customer_id):
    with engine.connect() as conn:
        check_sql = text("SELECT 1 FROM customers WHERE customer_id = :customer_id")
        result = conn.execute(check_sql, {"customer_id": customer_id}).first()
        return result is not None


process_transactions(transaction_pd)

print("Records size:")
transactions_size = table_records_size("transactions")
products_size = table_records_size("products")
print(f"transactions_size: {transactions_size}")
print(f"products_size: {products_size}")

No customer in DB with customer_id: 5034, invalid transaction
No customer in DB with customer_id: 5034, invalid transaction
No customer in DB with customer_id: 5034, invalid transaction
Records size:
transactions_size: 19997
products_size: 168


Таким образом, в БД были залиты:
- 4000 - customer;
- 874 - postcode;
- 19997 - transaction;
- 168 - product.

Заметим, что в `transaction.csv` был обнаружен пользователь с `customer_id: 5034`, которого не было в `customer.csv`, поэтому мы не смогли обработать эту транзакцию. Скорее всего она некорректная.

##### Картинки из GUI

<div style="text-align: center;">
    <img src="./assets/screenshot_customers.png" width="45%">
    <img src="./assets/screenshot_postcodes.png" width="45%">
</div>

<div style="text-align: center;">
    <img src="./assets/screenshot_products.png" width="45%">
    <img src="./assets/screenshot_transactions.png" width="45%">
</div>

