Khởi tạo thư viện

In [13]:
import pandas as pd
import pyodbc
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta

import sys
sys.path.append(r"G:/My Drive/HỌC TẬP DUE/NNT_năm 4 kì 1/DW & DM/config")
import mdl_db_connection
import mdl_db_query

## Khởi tạo một số thông tin chung

In [14]:
conn_str, engine = mdl_db_connection.connect_db()
# Định nghĩa các tên bảng và cột
table_source = 'ETL_SaleDataset'
fact_table = "fact_table"
table_time = "Dim_Time"
table_product = "Dim_Product"
table_region = "Dim_Region"
table_customer = "Dim_Customer"


### Các câu lệnh truy vấn

#### product

In [15]:
# tạo bảng product
query_create_table_product = f"""
    USE {mdl_db_connection.database}
    DROP TABLE IF EXISTS {table_product};
    CREATE TABLE {table_product} (
        ProductID nvarchar(255),
        Product nvarchar(255),
        Description nvarchar(255),
        Supplier nvarchar(255),
        [Suggested Price] float,
        Price float,
        primary key (ProductID)
    );
"""

#insert dữ lijeuej vào bảng dim_product
query_insert_into_table_product = f"""
    INSERT INTO {table_product} (ProductID, Product, Description, Supplier, [Suggested Price], Price)
    SELECT DISTINCT
        ProductID,
        Product,
        Description,
        Supplier,
        [Suggested Price],
        Price
    FROM {table_source};
"""

#### customer

In [16]:
# tạo bảng product
query_create_table_customer = f"""
    DROP TABLE IF EXISTS {table_customer};
    CREATE TABLE {table_customer} (
        [CustomerId] int,
        [Fname] NVARCHAR(255),
        [Lname] NVARCHAR(255),
        [Balance] MONEY,
        [Address] NVARCHAR(255),
        [E-mail] NVARCHAR(255),
        [City] NVARCHAR(255),
        [Zip] NVARCHAR(255)
        primary key (CustomerId)
    );
"""

#insert dữ lijeuej vào bảng dim_product
query_insert_into_table_customer = f"""
    INSERT INTO {table_customer} (CustomerId, Fname, Lname, Balance, Address, [E-mail], City, Zip)
    SELECT DISTINCT
        CustomerId,
        Fname,
        Lname,
        Balance,
        Address,
        [E-mail],
        City,
        Zip
    FROM {table_source}
    WHERE CustomerId IS NOT NULL;
"""

#### region

In [17]:
#tạo bảng region
query_create_table_region = f"""
    DROP TABLE IF EXISTS {table_region};
    CREATE TABLE {table_region} (
        [RegionID] INT IDENTITY(1,1),
        [Region] NVARCHAR(255),
        primary key (RegionID)
    );
"""

query_insert_into_table_region = f"""
    insert into {table_region} (Region)
    select distinct
        Region
    FROM {table_source}
    where Region is not null
"""


#### fact table

In [18]:
query_create_fact_table = f"""
  DROP TABLE IF EXISTS {fact_table};
  CREATE TABLE {fact_table} (
    [RowID] INT IDENTITY(1,1) PRIMARY KEY,
    [OrderID] NVARCHAR(255),
    [Units] DOUBLE PRECISION,
    [Sales] MONEY,
    [COGS] MONEY,
    [Profit] MONEY,

    [CustomerID] INT,
    [ProductID] NVARCHAR(255),
    
    [RegionID] INT,
    [Region] varchar(255),
    
    [DateID] INT,
    [Date] DATETIME,
  );
"""

query_insert_into_fact_table = f"""
  INSERT INTO {fact_table} (
    [OrderID],
    [Units],
    [Sales],
    [COGS],
    [Profit],

    [CustomerID],
    [ProductID],

    [Region],

    [Date]
)
SELECT distinct
    [OrderID],
    [Units],
    [Sales],
    [COGS],
    [Profit],

    [CustomerID],
    [ProductID],

    [Region],

    [Date]
FROM {table_source}
WHERE [OrderID] IS NOT NULL
"""

lookup_value_region = f"""
    UPDATE {fact_table}
    SET {fact_table}.RegionID = {table_region}.RegionID
    FROM {fact_table}
    inner JOIN {table_region} ON {fact_table}.[Region] = {table_region}.[Region]
    ALTER TABLE [fact_table]
    DROP COLUMN [Region];
"""


lookup_value_date = f"""
    UPDATE {fact_table}
    SET {fact_table}.DateID = {table_time}.[Date_ID]
    FROM {fact_table}
    inner JOIN {table_time} ON {fact_table}.[Date] = {table_time}.[Datetime]

    ALTER TABLE [fact_table]
    DROP COLUMN [Date]

"""

#### time

In [19]:

query_create_table_time = f"""
    DROP TABLE IF EXISTS {table_time};
    CREATE TABLE {table_time} (
        Date_ID INT IDENTITY(1,1) PRIMARY KEY,
        [Datetime] DATETIME,
        [Date] DATE,
        [Year] INT,
        [Quarter] INT,
        [Month] INT,
        [Day] INT,
        [Weekday] INT
    );
"""

# insert dữ liệu vào bảng dim_time

query_insert_into_table_time = f"""
    INSERT INTO {table_time} ([Datetime], [Date], [Year], [Quarter], [Month], [Day], [Weekday])
    SELECT DISTINCT
        [Date] AS [Datetime],
        CONVERT(DATE, [Date], 105) AS [Date],
        YEAR([Date]) AS [Year],
        DATEPART(QUARTER, [Date]) AS [Quarter],
        MONTH([Date]) AS [Month],
        DAY([Date]) AS [Day],
        DATEPART(WEEKDAY, [Date]) AS [Weekday]
    FROM 
        {table_source}
    WHERE [Date] IS NOT NULL;

"""




#### schema query

In [20]:
# Câu truy vấn để xóa ràng buộc khóa ngoại nếu tồn tại
drop_fk_constraints_query = f"""
IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS
           WHERE CONSTRAINT_TYPE = 'FOREIGN KEY'
           AND TABLE_NAME = '{fact_table}')
BEGIN
    ALTER TABLE {fact_table}
    DROP CONSTRAINT FK_fact_date;
    
    ALTER TABLE {fact_table}
    DROP CONSTRAINT FK_fact_product;
    
    ALTER TABLE {fact_table}
    DROP CONSTRAINT FK_fact_region;
    
    ALTER TABLE {fact_table}
    DROP CONSTRAINT FK_fact_customer;
END
"""



# Câu truy vấn để thêm ràng buộc khóa ngoại mới
add_fk_constraints_query = f"""
ALTER TABLE {fact_table}
ADD CONSTRAINT FK_fact_date
FOREIGN KEY(DateID) REFERENCES {table_time}(Date_ID);

ALTER TABLE {fact_table}
ADD CONSTRAINT FK_fact_product
FOREIGN KEY(ProductID) REFERENCES {table_product}(ProductID);

ALTER TABLE {fact_table}
ADD CONSTRAINT FK_fact_region
FOREIGN KEY(RegionID) REFERENCES {table_region}(RegionID);

ALTER TABLE {fact_table}
ADD CONSTRAINT FK_fact_customer
FOREIGN KEY(CustomerID) REFERENCES {table_customer}(CustomerId);
"""



### PROCESS DATA

#### drop fk nếu tồn tại

In [21]:
@task(
        name="drop fk if exist"
      , description = "Phải xóa các fk thì mới tạo bảng mới/insert được (cho trường hợp chạy lại code)"
      , task_run_name = "drop fk if exist"
      , cache_key_fn = task_input_hash
      , cache_expiration=timedelta(days=1)
      )
def drop_fk_constraints():
    logger = get_run_logger()
    logger.info("Xóa các fk nếu nó đã tồn tại!")
    # Kết nối đến cơ sở dữ liệu
    conx = pyodbc.connect(conn_str)
    # Thực hiện truy vấn và tạo bảng
    cursor = conx.cursor()
    cursor.execute(drop_fk_constraints_query)
    # Xác nhận thay đổi
    conx.commit()
    # Đóng kết nối pyodbc
    conx.close()


 `@task(name='my_unique_name', ...)`


#### create dim table

In [22]:
@task(
        name="create dim table, fact table"
      , description = "Tạo các bảng dim table từ 1 table gốc trong db (week 2)"
      , task_run_name = "tạo tất cả các bảng dim, fact table"
      # , cache_key_fn = task_input_hash
      # , cache_expiration=timedelta(days=1)
      )
def create_dim_table():
    logger = get_run_logger()

    logger.info("Tạo bảng dim prodcut")
    # tạo bảng dim product
    # execute_query(query_create_table_product,table_product)
    mdl_db_query.create_tbl_query(query_create_table_product,table_product)
    # insert data vào bảng
    mdl_db_query.create_tbl_query(query_insert_into_table_product,table_product)


    logger.info("Tạo bảng dim region")
    #tạo bảng  dim region
    mdl_db_query.create_tbl_query(query_create_table_region, table_region)
    #insert data từ bảng gốc vào
    mdl_db_query.create_tbl_query(query_insert_into_table_region, table_region)
    
    
    
    logger.info("Tạo bảng dim time")

    # tạo bảng dim time
    mdl_db_query.create_tbl_query(query_create_table_time,table_time)
    # insert data vào bảng
    mdl_db_query.create_tbl_query(query_insert_into_table_time,table_time)



    logger.info("Tạo bảng dim customer")
    # tạo bảng dim customer
    mdl_db_query.create_tbl_query(query_create_table_customer,table_customer)
    # insert data vào bảng
    mdl_db_query.create_tbl_query(query_insert_into_table_customer,table_customer)

    logger.info("Tạo bảng fact_table")
    # tạo bảng dim product
    mdl_db_query.create_tbl_query(query_create_fact_table,fact_table)
    logger.info("Insert data vào bảng fact_table")
    # insert data vào bảng
    mdl_db_query.create_tbl_query(query_insert_into_fact_table,fact_table)

    #tạo 2 fk cho fact_table
    logger.info("tạo khóa ngoại REGION cho fact_table")
    mdl_db_query.create_tbl_query(lookup_value_region, fact_table)

    logger.info("tạo khóa ngoại DATE cho fact_table")
    mdl_db_query.create_tbl_query(lookup_value_date, fact_table) 



 `@task(name='my_unique_name', ...)`


#### data modeling

In [23]:
@task(
        name = "data modeling"
      , description = "Nối các bảng lại với nhau (tạo schema trong db)"
      , task_run_name = "create schema"
      # , cache_key_fn = task_input_hash
      # , cache_expiration=timedelta(days=1)
        )
def data_modeling():
    
    logger = get_run_logger()

    logger.info("TẠO FK - SCHEMA")
    # Kết nối đến cơ sở dữ liệu
    conx = pyodbc.connect(conn_str)

    # Thực hiện truy vấn và tạo bảng
    cursor = conx.cursor()

    cursor.execute(drop_fk_constraints_query)
    cursor.execute(add_fk_constraints_query)

    # Xác nhận thay đổi
    conx.commit()
    # Đóng kết nối pyodbc
    conx.close()
    
    logger.info("Hiển thị mối quan hệ của các bảng sau khi tạo FK")

    mdl_db_query.get_foreign_key_relationships()



 `@task(name='my_unique_name', ...)`


### BUILD FLOW

In [24]:
@flow(
        name = "FLOW ETL TO SCHEMA - WEEK 4 - USING PREFECT PYTHON (DATA WEEK 2)"
        , flow_run_name = "ETL to schema week 4"
        , log_prints = True
        , description= "buld flow cho các bước bên trên: tạo dim, fact, moldeling..."
)
def build_flow():
    drop_fk_constraints()
    create_dim_table()
    data_modeling()
build_flow()


 `@flow(name='my_unique_name', ...)`


[Completed(message=None, type=COMPLETED, result=LiteralResult(type='literal', artifact_type='result', artifact_description='Result with value `None` persisted to Prefect.', value=None)),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]