In [2]:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Date, Float, DECIMAL, ForeignKey
from sqlalchemy.ext.declarative import declarative_base

# Kết nối đến CSDL Data Warehouse
engine = create_engine("mysql+pymysql://nhanadmin:nhandeptrai191@localhost:3306/datawarehouse")
Base = declarative_base()

# ==============================================
# 1. Định nghĩa các bảng DIMENSION
# ==============================================

class DimTime(Base):
    """Bảng dimension thời gian"""
    __tablename__ = 'dim_time'

    MaThoiGian = Column(Integer, primary_key=True)
    Ngay = Column(Date, nullable=False)
    Thang = Column(Integer)
    Quy = Column(Integer)
    Nam = Column(Integer)
    Tuan = Column(Integer)

class DimStore(Base):
    """Bảng dimension cửa hàng"""
    __tablename__ = 'dim_store'
    MaCuaHang = Column(Integer, primary_key=True)
    SoDienThoai = Column(String(15))
    MaThanhPho = Column(Integer, ForeignKey('dim_geo.MaThanhPho'))

class DimItem(Base):
    """Bảng dimension sản phẩm"""
    __tablename__ = 'dim_item'

    MaMH = Column(Integer, primary_key=True)
    MoTa = Column(String(100))
    KichCo = Column(String(20))
    TrongLuong = Column(DECIMAL(10,2))
    Gia = Column(DECIMAL(15,2))
class DimCustomer(Base):
    """Bảng dimension khách hàng"""
    __tablename__ = 'dim_customer'

    MaKH = Column(Integer, primary_key=True)
    TenKH = Column(String(100))
    LoaiKH = Column(Integer) 
    # DiaChi = Column(String(255))

class DimGeo(Base):
    """Bảng dimension địa lý"""
    __tablename__ = 'dim_geo'

    MaThanhPho = Column(Integer, primary_key=True)
    DiaChiVP = Column(String(255))
    TenThanhPho = Column(String(100))
    Bang = Column(String(50))
    # QuocGia = Column(String(50))

# ==============================================
# 2. Định nghĩa các bảng FACT
# ==============================================

class FactSales(Base):
    """Bảng fact doanh số"""
    __tablename__ = 'fact_sales'

    id = Column(Integer, primary_key=True, autoincrement=True)
    MaThoiGian = Column(Integer, ForeignKey('dim_time.MaThoiGian'))
    MaMH = Column(Integer, ForeignKey('dim_item.MaMH'))
    MaThanhPho = Column(Integer, ForeignKey('dim_geo.MaThanhPho'))
    MaKH = Column(Integer, ForeignKey('dim_customer.MaKH'))
    SoLuongBan = Column(Integer)
    DoanhThu = Column(DECIMAL(15,2))

class FactInventory(Base):
    """Bảng fact tồn kho"""
    __tablename__ = 'fact_inventory'

    id = Column(Integer, primary_key=True, autoincrement=True)
    MaThoiGian = Column(Integer, ForeignKey('dim_time.MaThoiGian'))
    MaCuaHang = Column(Integer, ForeignKey('dim_store.MaCuaHang'))
    MaMH = Column(Integer, ForeignKey('dim_item.MaMH'))
    SoLuongTonKho = Column(Integer)
    ThoiGianNhapHang = Column(Date)

# ==============================================
# 3. Tạo tất cả bảng trong CSDL
# ==============================================

def create_tables():
    try:
        # Xóa toàn bộ bảng cũ (nếu muốn)
        Base.metadata.drop_all(engine)

        # Tạo mới tất cả bảng
        Base.metadata.create_all(engine)
        print("✅ Đã tạo thành công các bảng trong Data Warehouse!")
    except Exception as e:
        print(f"❌ Lỗi khi tạo bảng: {e}")

if __name__ == "__main__":
    create_tables()

  Base = declarative_base()


✅ Đã tạo thành công các bảng trong Data Warehouse!


In [3]:
from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from integration_db import Customer, TouristCustomer, PostalCustomer, RepresentativeOffice, Store, Item, Order, StoredItem

# Kết nối đến cả hai database
source_engine = create_engine('mysql+pymysql://nhanadmin:nhandeptrai191@localhost:3306/intergrated_db')
warehouse_engine = create_engine('mysql+pymysql://nhanadmin:nhandeptrai191@localhost:3306/datawarehouse')


SourceSession = sessionmaker(bind=source_engine)
WarehouseSession = sessionmaker(bind=warehouse_engine)

source_session = SourceSession()
warehouse_session = WarehouseSession()

def extract_transform_load():
    try:
        # 1. Đổ dữ liệu vào DimTime từ các bảng có thông tin ngày tháng
        print("Đang xử lý DimTime...")
        # Lấy tất cả các ngày duy nhất từ các bảng nguồn
        dates = set()
        
        # Lấy từ bảng Customer
        customer_dates = source_session.query(Customer.NgayDatHangDauTien).distinct().all()
        dates.update([date[0] for date in customer_dates if date[0] is not None])
        
        # Lấy từ bảng TouristCustomer
        tourist_dates = source_session.query(TouristCustomer.ThoiDiemDuLich).distinct().all()
        dates.update([date[0] for date in tourist_dates if date[0] is not None])
        
        # Lấy từ bảng PostalCustomer
        postal_dates = source_session.query(PostalCustomer.ThoiDiemDangKy).distinct().all()
        dates.update([date[0] for date in postal_dates if date[0] is not None])
        
        # Lấy từ bảng RepresentativeOffice
        office_dates = source_session.query(RepresentativeOffice.ThoiDiemThanhLap).distinct().all()
        dates.update([date[0] for date in office_dates if date[0] is not None])
        
        # Lấy từ bảng Store
        store_dates = source_session.query(Store.ThoiDiemThanhLap).distinct().all()
        dates.update([date[0] for date in store_dates if date[0] is not None])
        
        # Lấy từ bảng Item
        item_dates = source_session.query(Item.ThoiDiemSanXuat).distinct().all()
        dates.update([date[0] for date in item_dates if date[0] is not None])
        
        # Lấy từ bảng Order
        order_dates = source_session.query(Order.NgayDatHang).distinct().all()
        dates.update([date[0] for date in order_dates if date[0] is not None])
        
        # Lấy từ bảng StoredItem
        stored_item_dates = source_session.query(StoredItem.ThoiGianNhapHang).distinct().all()
        dates.update([date[0] for date in stored_item_dates if date[0] is not None])
        
        # Chuyển đổi sang DimTime
        for date in dates:
            if date:
                month = date.month
                quarter = (month - 1) // 3 + 1
                year = date.year
                week = date.isocalendar()[1]
                
                existing = warehouse_session.query(DimTime).filter_by(Ngay=date).first()
                if not existing:
                    dim_time = DimTime(
                        Ngay=date,
                        Thang=month,
                        Quy=quarter,
                        Nam=year,
                        Tuan=week
                    )
                    warehouse_session.add(dim_time)
        
        warehouse_session.commit()
        
        # 2. Đổ dữ liệu vào DimGeo
        print("Đang xử lý DimGeo...")
        offices = source_session.query(RepresentativeOffice).all()
        for office in offices:
            existing = warehouse_session.query(DimGeo).filter_by(MaThanhPho=office.MaThanhPho).first()
            if not existing:
                dim_geo = DimGeo(
                    MaThanhPho=office.MaThanhPho,
                    DiaChiVP=office.DiaChiVP,
                    TenThanhPho=office.TenThanhPho,
                    Bang=office.Bang,
                    # QuocGia="USA"  # Giả định tất cả đều ở Mỹ
                )
                warehouse_session.add(dim_geo)
        
        warehouse_session.commit()
        
        # 3. Đổ dữ liệu vào DimStore
        print("Đang xử lý DimStore...")
        stores = source_session.query(Store).all()
        for store in stores:
            existing = warehouse_session.query(DimStore).filter_by(MaCuaHang=store.MaCuaHang).first()
            if not existing:
                dim_store = DimStore(
                    MaCuaHang=store.MaCuaHang,
                    SoDienThoai=store.SoDienThoai,  # Giả định tên cửa hàng
                    MaThanhPho=store.MaThanhPho
                )
                warehouse_session.add(dim_store)
        
        warehouse_session.commit()
        
        # 4. Đổ dữ liệu vào DimItem
        print("Đang xử lý DimItem...")
        items = source_session.query(Item).all()
        for item in items:
            existing = warehouse_session.query(DimItem).filter_by(MaMH=item.MaMH).first()
            if not existing:
                dim_item = DimItem(
                    MaMH=item.MaMH,
                    MoTa=item.MoTa,  # Giả định mô tả là tên mặt hàng
 # Cần thêm logic phân loại
                    KichCo=item.KichCo,
                    TrongLuong=item.TrongLuong,
                    Gia=item.Gia
                )
                warehouse_session.add(dim_item)
        
        warehouse_session.commit()
        
        # 5. Đổ dữ liệu vào DimCustomer
        print("Đang xử lý DimCustomer...")
        customers = source_session.query(Customer).all()
        for customer in customers:
            existing = warehouse_session.query(DimCustomer).filter_by(MaKH=customer.MaKH).first()
            if not existing:
                # Xác định loại khách hàng và địa chỉ
                customer_type = 0  # Mặc định
                
                if customer.tourist_info and customer.postal_info:
                    customer_type = 3  # Khách hàng không xác định rõ loại

                elif customer.tourist_info:
                    customer_type = 1

                elif customer.postal_info:
                    customer_type = 2  # Khách bưu điện

                
                dim_customer = DimCustomer(
                    MaKH=customer.MaKH,
                    TenKH=customer.TenKH,
                    LoaiKH=customer_type,
                )
                warehouse_session.add(dim_customer)
        
        warehouse_session.commit()
        
        # 6. Đổ dữ liệu vào FactSales
        print("Đang xử lý FactSales...")
        # Lấy thông tin thời gian từ DimTime để mapping
        time_mapping = {dt.Ngay: dt.MaThoiGian for dt in warehouse_session.query(DimTime).all()}
        
        orders = source_session.query(Order).all()
        for order in orders:
            order_date_id = time_mapping.get(order.NgayDatHang)
            if not order_date_id:
                continue
                
            for ordered_item in order.ordered_items:
                fact_sale = FactSales(
                    MaThoiGian=order_date_id,
                    MaMH=ordered_item.MaMH,
                    MaKH=order.MaKH, 
                    MaThanhPho=order.customer.MaThanhPho,  # Giả định địa chỉ là mã thành phố
                    SoLuongBan=ordered_item.SoLuongDat,
                    DoanhThu=ordered_item.GiaDat * ordered_item.SoLuongDat,
                )
                warehouse_session.add(fact_sale)
        
        warehouse_session.commit()
        
        # 7. Đổ dữ liệu vào FactInventory
        print("Đang xử lý FactInventory...")
        stored_items = source_session.query(StoredItem).all()
        for stored_item in stored_items:
            entry_date_id = time_mapping.get(stored_item.ThoiGianNhapHang)
            if not entry_date_id:
                continue
                
            fact_inventory = FactInventory(
                MaThoiGian=entry_date_id,
                MaCuaHang=stored_item.MaCuaHang,
                MaMH=stored_item.MaMH,
                SoLuongTonKho=stored_item.SoLuongTrongKho,
                # ThoiGianNhapHang=stored_item.ThoiGianNhapHang
            )
            warehouse_session.add(fact_inventory)
        
        warehouse_session.commit()
        
        print("ETL hoàn thành thành công!")
        
    except Exception as e:
        warehouse_session.rollback()
        print(f"Lỗi trong quá trình ETL: {str(e)}")
    finally:
        source_session.close()
        warehouse_session.close()

if __name__ == "__main__":
    extract_transform_load()

Đang xử lý DimTime...
Đang xử lý DimGeo...
Đang xử lý DimStore...
Đang xử lý DimItem...
Đang xử lý DimCustomer...
Đang xử lý FactSales...
Đang xử lý FactInventory...
ETL hoàn thành thành công!


In [4]:
import itertools

# Các cấp phân cấp của từng chiều

# Time hierarchy
time_levels = [
    [],
    ["t.Nam"],
    ["t.Nam", "t.Thang"],
    ["t.Nam", "t.Thang", "t.Quy"]
]

# Customer hierarchy
customer_levels = [
    [],
    ["c.LoaiKH"]
]

# Item hierarchy
item_levels = [
    [],
    ["i.MaMH"],
    ["i.MaMH", "i.KichCo"],
    ["i.MaMH", "i.KichCo", "i.TrongLuong"]
]

geo_levels = [
    [],
    ["g.Bang"],
    ["g.Bang", "g.MaThanhPho"]
]
# Danh sách tất cả các cột có thể có trong SELECT
all_columns = [
    ("t.Nam", "Year"),
    ("t.Thang", "Month"),
    ("t.Quy", "Quarter"),
    ("c.LoaiKH", "CustomerType"),
    ("i.MaMH", "ProductCode"),
    ("i.KichCo", "Size"),
    ("i.TrongLuong", "Weight"),
    ("g.Bang", "State"),
    ("g.MaThanhPho", "City"),

]

# FROM và JOIN cố định
from_clause = """
FROM fact_sales f
JOIN dim_time t ON f.MaThoiGian = t.MaThoiGian
JOIN dim_item i ON f.MaMH = i.MaMH
JOIN dim_customer c ON f.MaKH = c.MaKH
join dim_geo g on f.MaThanhPho = g.MaThanhPho
"""

# Phần SUM
select_sum = """
    SUM(f.SoLuongBan) AS TotalQuantity,
    SUM(f.DoanhThu) AS TotalRevenue
"""

# Sinh tổ hợp từ 3 cấp
queries = []

for t_idx, time in enumerate(time_levels):
    for c_idx, customer in enumerate(customer_levels):
        for i_idx, item in enumerate(item_levels):
            for g_idx, geo in enumerate(geo_levels):
                group_by_cols = time + customer + item + geo

                # Tạo phần SELECT
                select_parts = []
                for col, alias in all_columns:
                    if col in group_by_cols:
                        select_parts.append(col)
                    # else:
                    #     select_parts.append(f"NULL AS {alias}")

                select_clause = "SELECT \n    " + ",\n    ".join(select_parts) + ",\n    " + select_sum
                group_by_clause = ""
                if group_by_cols:
                    group_by_clause = "GROUP BY " + ", ".join(group_by_cols)

                full_query = f"{select_clause}\n{from_clause}\n{group_by_clause}".strip()
                if t_idx+c_idx+i_idx+g_idx > 0:
                    with open(f"sql_query/time{t_idx}-{c_idx}-{i_idx}-{g_idx}.sql", "w", encoding="utf-8") as f:
                        f.write(full_query)



                    queries.append(full_query)

# Ghép lại bằng UNION ALL
# final_sql = "\n\nUNION ALL\n\n".join(queries)

# # Ghi ra file
# with open("sql_query/group_by_hierarchy_cube.sql", "w", encoding="utf-8") as f:
#     f.write(final_sql)

print(f"✅ Đã sinh {len(queries)} tổ hợp GROUP BY phân cấp và lưu vào group_by_hierarchy_cube.sql")


✅ Đã sinh 95 tổ hợp GROUP BY phân cấp và lưu vào group_by_hierarchy_cube.sql


In [1]:

import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Date, Float, DECIMAL, ForeignKey
from sqlalchemy.ext.declarative import declarative_base

# Kết nối đến CSDL Data Warehouse
engine = create_engine("mysql+pymysql://nhanadmin:nhandeptrai191@localhost:3306/datawarehouse")
Base = declarative_base()

query = """
SELECT
    t.Nam,
    t.Quy,
    i.MaMH,
    c.MaKH,
    f.SoLuongBan,
    f.DoanhThu
FROM fact_sales f
JOIN dim_time t ON f.MaThoiGian = t.MaThoiGian
JOIN dim_item i ON f.MaMH = i.MaMH
JOIN dim_customer c ON f.MaKH = c.MaKH
"""

df = pd.read_sql(query, engine)

df.head()

  Base = declarative_base()


Unnamed: 0,Nam,Quy,MaMH,MaKH,SoLuongBan,DoanhThu
0,2022,3,14235,5980,2,8254.14
1,2022,3,19855,5980,10,46180.6
2,2022,3,39757,5980,3,8827.92
3,2022,3,77784,5980,6,9023.64
4,2022,3,104522,5980,3,14601.09


In [None]:
! pip install pyspark

In [None]:
from pyspark.sql import SparkSession

# Tạo SparkSession
spark = SparkSession.builder \
    .appName("MySQL Integration") \
    .config("spark.jars", "C:/Users/Admin/Desktop/BTL_LTM-client2/mysql-connector-java-8.0.13.jar") \
    .getOrCreate()

# Thông tin kết nối
url = "jdbc:mysql://localhost:3306/datawarehouse"
properties = {
    "user": "nhanadmin",
    "password": "nhandeptrai191",
    "driver": "com.mysql.cj.jdbc.Driver"
}
