In [6]:
import pandas as pd

# Đọc dữ liệu từ file Parquet
tps_df = pd.read_parquet("../data/tps_df_cleaned.parquet")
track_credit_df = pd.read_parquet("../data/track_credit_cleaned.parquet")

In [4]:
from sqlalchemy import Column, Integer, String, Float, DateTime, create_engine, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError

# Thông tin kết nối MySQL
user = "root"
password = "khoa_501"
host = "localhost"
database = "diijam_parquet"

# Tạo engine
engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@{host}/{database}")

Base = declarative_base()

# Bảng track_credit
class TrackCredit(Base):
    __tablename__ = 'track_credit'

    id = Column(Integer, primary_key=True, autoincrement=True)
    track_id = Column(Integer, nullable=False, index=True)
    artist_id = Column(Integer, nullable=False)
    role_id = Column(Integer, nullable=False)
    created_at = Column(DateTime, nullable=False)
    updated_at = Column(DateTime, nullable=False)
    position = Column(Float, nullable=False)

    # Quan hệ với bảng TPS
    tps = relationship("TPS", back_populates="track_credit")

# Bảng tps
class TPS(Base):
    __tablename__ = 'tps'

    track_id = Column(Integer, ForeignKey('track_credit.track_id'), primary_key=True, nullable=False)
    user_id = Column(Integer, nullable=False)
    date = Column(DateTime, nullable=False)
    listen_count = Column(Integer, nullable=False)
    created_at = Column(DateTime, nullable=False)
    updated_at = Column(DateTime, nullable=False)

    # Quan hệ ngược lại với bảng TrackCredit
    track_credit = relationship("TrackCredit", back_populates="tps")

# Tạo tất cả bảng dựa trên định nghĩa ORM
Base.metadata.create_all(engine)

In [None]:
# Tạo engine
engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@{host}/{database}")

try:
    # Lưu dữ liệu vào MySQL
    track_credit_df.to_sql(name="track_credit", con=engine, if_exists="replace", index=False)
    print(f"Dữ liệu đã được lưu vào MySQL.")
except SQLAlchemyError as e:
    print(f"Lỗi khi ghi dữ liệu vào MySQL: {e}")
    engine.rollback()  # Reset trạng thái kết nối
    engine.dispose()  # Đóng kết nối để tránh lỗi tiếp diễn
finally:
    engine.dispose()  # Giải phóng tài nguyên kết nối

In [7]:
# Tạo engine
engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@{host}/{database}")
try:
    # Chuyển object thành datetime
    track_credit_df["created_at"] = pd.to_datetime(track_credit_df["created_at"], errors="coerce") 
    track_credit_df["updated_at"] = pd.to_datetime(track_credit_df["updated_at"], errors="coerce") 
    tps_df["date"] = pd.to_datetime(tps_df["date"], errors="coerce") 
    tps_df["created_at"] = pd.to_datetime(tps_df["created_at"], errors="coerce") 
    tps_df["updated_at"] = pd.to_datetime(tps_df["updated_at"], errors="coerce") 
    
    # Lưu dữ liệu vào MySQL
    track_credit_df.to_sql(name="track_credit", con=engine, if_exists="replace", index=False)
    # Ghi dữ liệu vào MySQL theo từng batch nhỏ
    tps_df.to_sql(
        name="tps",
        con=engine,
        if_exists="replace",
        index=False,
        chunksize=10000  # Ghi từng batch 10.000 dòng
    )
    print("Dữ liệu đã được lưu vào MySQL.")
except SQLAlchemyError as e:
    print(f"Lỗi khi ghi dữ liệu vào MySQL: {e}")
    engine.rollback()  # Reset trạng thái kết nối
    engine.dispose()  # Đóng kết nối để tránh lỗi tiếp diễn
finally:
    engine.dispose()

  track_credit_df["created_at"] = pd.to_datetime(track_credit_df["created_at"], errors="coerce")


Dữ liệu đã được lưu vào MySQL.


In [8]:
# Tạo engine
engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@{host}/{database}")

# Tạo session để làm việc
Session = sessionmaker(bind=engine)
session = Session()

# JOIN hai bảng track_credit và tps theo track_id 
# Chọn các cột track_id, artist_id từ bảng track_credit 
# và các cột user_id, listen_count từ bảng tps
# limit 100 kết quả
join_query = session.query(TrackCredit.track_id, TrackCredit.artist_id, TPS.user_id, TPS.listen_count).join(TPS, TrackCredit.track_id == TPS.track_id).limit(100)

for row in join_query:
    print(f"Track ID: {row.track_id}, Artist ID: {row.artist_id}, User ID: {row.user_id}, Listen Count: {row.listen_count}")

# Đóng session
session.close()

Track ID: 4, Artist ID: 2, User ID: 3, Listen Count: 1
Track ID: 4, Artist ID: 1, User ID: 3, Listen Count: 1
Track ID: 4, Artist ID: 2, User ID: 3, Listen Count: 3
Track ID: 4, Artist ID: 1, User ID: 3, Listen Count: 3
Track ID: 4, Artist ID: 2, User ID: 3, Listen Count: 8
Track ID: 4, Artist ID: 1, User ID: 3, Listen Count: 8
Track ID: 4, Artist ID: 2, User ID: 3, Listen Count: 3
Track ID: 4, Artist ID: 1, User ID: 3, Listen Count: 3
Track ID: 4, Artist ID: 2, User ID: 3, Listen Count: 1
Track ID: 4, Artist ID: 1, User ID: 3, Listen Count: 1
Track ID: 4, Artist ID: 2, User ID: 4, Listen Count: 1
Track ID: 4, Artist ID: 1, User ID: 4, Listen Count: 1
Track ID: 4, Artist ID: 2, User ID: 4, Listen Count: 2
Track ID: 4, Artist ID: 1, User ID: 4, Listen Count: 2
Track ID: 4, Artist ID: 2, User ID: 9, Listen Count: 6
Track ID: 4, Artist ID: 1, User ID: 9, Listen Count: 6
Track ID: 4, Artist ID: 2, User ID: 14, Listen Count: 1
Track ID: 4, Artist ID: 1, User ID: 14, Listen Count: 1
Track ID

In [27]:
new_query = session.query(TrackCredit.track_id, TrackCredit.artist_id, TPS.user_id, TPS.listen_count).join(TPS, TrackCredit.track_id == TPS.track_id)
query_df = pd.DataFrame(new_query)
query_df

Unnamed: 0,track_id,artist_id,user_id,listen_count
0,4,2,3,1
1,4,1,3,1
2,4,2,3,3
3,4,1,3,3
4,4,2,3,8
...,...,...,...,...
23471952,22567,3049,1979187,1
23471953,22567,3049,1984523,1
23471954,22567,3049,1989177,1
23471955,22567,3049,1989783,1


In [31]:
merged_df = tps_df.merge(track_credit_df, on="track_id", how="inner")
merged_df

Unnamed: 0,track_id,user_id,date,listen_count,created_at_x,updated_at_x,id,artist_id,role_id,created_at_y,updated_at_y,position
0,4,3,22/05/2020,1,23/05/2020 02:40:42,27/05/2020 01:53:43,455,1,2,25/05/2020 15:34:58,04/09/2020 01:25:13,0.0
1,4,3,22/05/2020,1,23/05/2020 02:40:42,27/05/2020 01:53:43,456,2,3,25/05/2020 15:34:58,04/09/2020 01:25:13,1.0
2,4,3,12/07/2021,3,02/09/2021 03:10:57,02/09/2021 03:10:57,455,1,2,25/05/2020 15:34:58,04/09/2020 01:25:13,0.0
3,4,3,12/07/2021,3,02/09/2021 03:10:57,02/09/2021 03:10:57,456,2,3,25/05/2020 15:34:58,04/09/2020 01:25:13,1.0
4,4,3,06/08/2021,8,03/09/2021 01:58:14,03/09/2021 01:58:14,455,1,2,25/05/2020 15:34:58,04/09/2020 01:25:13,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...
23471952,22567,1979187,13/11/2024,1,14/11/2024 07:16:21,14/11/2024 07:16:21,63404,3049,2,12/11/2024 14:28:02,12/11/2024 14:28:02,0.0
23471953,22567,1984523,13/11/2024,1,14/11/2024 07:16:21,14/11/2024 07:16:21,63404,3049,2,12/11/2024 14:28:02,12/11/2024 14:28:02,0.0
23471954,22567,1989177,13/11/2024,1,14/11/2024 07:16:21,14/11/2024 07:16:21,63404,3049,2,12/11/2024 14:28:02,12/11/2024 14:28:02,0.0
23471955,22567,1989783,13/11/2024,1,14/11/2024 07:16:21,14/11/2024 07:16:21,63404,3049,2,12/11/2024 14:28:02,12/11/2024 14:28:02,0.0
