In [4]:
from sqlalchemy import create_engine
import pandas as pd
import numpy as np
import ast


# Cấu hình kết nối (bạn thay thông tin cho đúng)
db_user = 'postgres'
db_password = 'changethis'
db_host = 'localhost'
db_port = '5432'
db_name = 'app'

# Tạo connection
engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')


In [12]:

df = pd.read_csv('./movies_metadata.csv', low_memory=False)
df = df.drop([19730, 29503, 35587])

columns_keep = [
    'id', 'title', 'original_title', 'belongs_to_collection', 'original_language', 'release_date', 'status',
    'overview', 'tagline', 'adult', 'popularity', 'homepage', 'poster_path',
    'runtime', 'budget', 'revenue', 'vote_average', 'vote_count'
]
df = df[columns_keep]

df = df.replace({np.nan: None})

df = df.drop_duplicates(subset=['id'], keep='first')

df['release_date'] = pd.to_datetime(df['release_date'], errors='coerce').dt.date
df['popularity'] = pd.to_numeric(df['popularity'], errors='coerce')
df['runtime'] = pd.to_numeric(df['runtime'], errors='coerce').fillna(0).astype('Int64')
df['budget'] = pd.to_numeric(df['budget'], errors='coerce').fillna(0).astype('Int64')
df['revenue'] = pd.to_numeric(df['revenue'], errors='coerce').fillna(0).astype('Int64')
df['vote_average'] = pd.to_numeric(df['vote_average'], errors='coerce')
df['vote_count'] = pd.to_numeric(df['vote_count'], errors='coerce').fillna(0).astype('Int64')
df['belongs_to_collection'] = df['belongs_to_collection'].fillna('').apply(
    lambda x: ast.literal_eval(x) if isinstance(x, str) and x != '' else np.nan
).apply(
    lambda x: x['name'] if isinstance(x, dict) and 'name' in x else ''
)


# 6. Insert vào database
with engine.begin() as connection:
    df.to_sql('stg_movie_metadata', con=connection, if_exists='append', index=False, method='multi')

print("✅ Insert thành công!")


✅ Insert thành công!


In [23]:

md = pd.read_csv('./movies_metadata.csv', low_memory=False)
md = md.drop([19730, 29503, 35587])

genres = md[['id', 'genres']].copy()

genres['genres'] = genres['genres'].apply(ast.literal_eval)

genres.rename(columns={'id': 'movie_id'}, inplace=True)

rows = []
for idx, row in genres.iterrows():
    movie_id = row['movie_id']
    for genre in row['genres']:
        rows.append({'movie_id': movie_id, 'genre': genre['name']})

genres = pd.DataFrame(rows)
genres["key_id"] = genres.index + 1

with engine.begin() as connection:
    genres[["key_id", "movie_id", "genre"]].to_sql('stg_genre', con=connection, if_exists='append', index=False, method='multi')

print("✅ Insert bảng movie_genres thành công!")

✅ Insert bảng movie_genres thành công!


In [32]:
md = pd.read_csv('./movies_metadata.csv', low_memory=False)
md = md.drop([19730, 29503, 35587])
md['id'] = md['id'].astype('int')
keywords = pd.read_csv('./keywords.csv')
keywords['id'] = keywords['id'].astype('int')
md = md.merge(keywords, on='id')
md['keywords'] = md['keywords'].apply(ast.literal_eval)
md['keywords'] = md['keywords'].apply(lambda x: [i['name'] for i in x] if isinstance(x, list) else [])

rows = []
for idx, row in md.iterrows():
    movie_id = row['id']
    for keyword in row['keywords']:
        rows.append({'movie_id': movie_id, 'keyword': keyword})

keywords = pd.DataFrame(rows)
keywords["key_id"] = keywords.index + 1

with engine.begin() as connection:
    keywords[["key_id", "movie_id", "keyword"]].to_sql('stg_keyword', con=connection, if_exists='append', index=False, method='multi')

print("✅ Insert bảng stg_keyword thành công!")

✅ Insert bảng movie_genres thành công!


In [8]:
md = pd.read_csv('./movies_metadata.csv', low_memory=False)
md = md.drop([19730, 29503, 35587])

credits = pd.read_csv('./credits.csv')
md['id'] = md['id'].astype('int')
credits['id'] = credits['id'].astype('int')
md = md.merge(credits, on='id')

def get_director(x):
    for i in x:
        if i['job'] == 'Director':
            return i['name']
    return np.nan

md['cast'] = md['cast'].apply(ast.literal_eval)
md['cast'] = md['cast'].apply(lambda x: [i['name'] for i in x] if isinstance(x, list) else [])
md['cast'] = md['cast'].apply(lambda x: x[:3] if len(x) >=3 else x)
md['crew'] = md['crew'].apply(ast.literal_eval)
md['director'] = md['crew'].apply(get_director).fillna('')

md[["id", 'cast', 'crew', 'director']].head()
md['movie_id'] = md['id']

rows = []
for idx, row in md.iterrows():
    movie_id = row['id']
    for cast in row['cast']:
        rows.append({'movie_id': movie_id, 'name': cast, 'role': 'cast'})
    rows.append({'movie_id': movie_id, 'name': row['director'], 'role': 'director'})


produce = pd.DataFrame(rows)
produce["key_id"] = produce.index + 1

with engine.begin() as connection:
    produce[["key_id", "movie_id", "name", "role"]].to_sql('stg_cast', con=connection, if_exists='append', index=False, method='multi')

print("✅ Insert bảng stg_cast thành công!")

✅ Insert bảng stg_cast thành công!


In [2]:
rating = pd.read_csv('./ratings.csv')
rating.rename(columns={"movieId": "movie_id", "userId": "user_id"}, inplace=True)
rating['timestamp'] = pd.to_datetime(rating['timestamp'], unit='s')
rating["key_id"] = rating.index + 1
rating.head()

Unnamed: 0,user_id,movie_id,rating,timestamp,key_id
0,1,110,1.0,2015-03-09 22:52:09,1
1,1,147,4.5,2015-03-09 23:07:15,2
2,1,858,5.0,2015-03-09 22:52:03,3
3,1,1221,5.0,2015-03-09 22:52:26,4
4,1,1246,5.0,2015-03-09 22:52:36,5


In [6]:
links = pd.read_csv('./links.csv')
links.rename(columns={"movieId": "movie_id", "imdbId": "imdb_id", "tmdbId": "tmdb_id"}, inplace=True)

with engine.begin() as connection:
    links.to_sql('stg_links', con=connection, if_exists='append', index=False, method='multi')

print("✅ Insert stg_links thành công!")


✅ Insert stg_links thành công!
