In [None]:
# Import các thư viện cần thiết
import pandas as pd
import numpy as np
import json
import ast
from sklearn.feature_extraction.text import TfidfVectorizer
import warnings
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Dict
import logging

# Cấu hình logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

warnings.filterwarnings('ignore')
plt.style.use('seaborn-v0_8')


In [None]:
def validate_json_data(df: pd.DataFrame, json_columns: List[str]) -> Dict:
    """
    Validate tính toàn vẹn của dữ liệu JSON và trả về báo cáo chi tiết
    
    Args:
        df: DataFrame cần validate
        json_columns: List các cột chứa dữ liệu JSON
        
    Returns:
        Dict chứa thống kê validation cho mỗi cột
    """
    validation_report = {}
    logging.info("=== JSON DATA VALIDATION ===")
    
    for col in json_columns:
        if col in df.columns:
            stats = {
                'valid_json': 0,
                'invalid_json': 0,
                'empty_json': 0,
                'total_rows': len(df)
            }
            
            for idx, value in df[col].items():
                if pd.isna(value) or value == '[]' or value == '':
                    stats['empty_json'] += 1
                else:
                    try:
                        parsed = ast.literal_eval(value)
                        if isinstance(parsed, list):
                            stats['valid_json'] += 1
                        else:
                            stats['invalid_json'] += 1
                    except (ValueError, SyntaxError):
                        stats['invalid_json'] += 1
            
            stats['validity_rate'] = (stats['valid_json'] / len(df) * 100)
            validation_report[col] = stats
            
            logging.info(f"\n{col}:")
            logging.info(f"  Valid JSON: {stats['valid_json']}")
            logging.info(f"  Invalid JSON: {stats['invalid_json']}")
            logging.info(f"  Empty JSON: {stats['empty_json']}")
            logging.info(f"  Validity rate: {stats['validity_rate']:.2f}%")
    
    return validation_report


In [None]:
# Load và xử lý dữ liệu ban đầu
credits = pd.read_csv('../datasets/credits.csv')
meta = pd.read_csv('../datasets/movie_metadata.csv')

logging.info("=== DATASET INFORMATION ===")
logging.info(f"Credits shape: {credits.shape}")
logging.info(f"Metadata shape: {meta.shape}")

# Validate JSON columns trong credits dataset
json_columns = ['cast', 'crew', 'genres']
validation_results = validate_json_data(credits, json_columns)

# Xử lý ngày tháng và lọc dữ liệu 2017
meta['release_date'] = pd.to_datetime(meta['release_date'], errors='coerce')
meta['year'] = meta['release_date'].dt.year

# Hiển thị phân bố theo năm
logging.info("\n=== YEAR DISTRIBUTION ===")
year_dist = meta['year'].value_counts().sort_index()
display(year_dist)

# Lọc phim 2017
new_meta = meta.loc[meta.year == 2017, ['genres', 'id', 'title', 'year']]
new_meta['id'] = new_meta['id'].astype(int)

# Merge dữ liệu
data = pd.merge(new_meta, credits, on='id')
pd.set_option('display.max_colwidth', 75)

logging.info("\n=== MERGED DATA INFORMATION ===")
logging.info(f"Shape after merge: {data.shape}")
display(data.head())


In [None]:
def enhanced_data_quality_report(df: pd.DataFrame, dataset_name: str = "Dataset") -> Dict:
    """
    Tạo báo cáo chất lượng dữ liệu chi tiết với các chỉ số bổ sung
    
    Args:
        df: DataFrame cần phân tích
        dataset_name: Tên của dataset để hiển thị trong báo cáo
    
    Returns:
        Dict chứa các chỉ số chất lượng dữ liệu
    """
    quality_metrics = {}
    
    logging.info(f"\n=== {dataset_name.upper()} QUALITY REPORT ===")
    logging.info(f"Shape: {df.shape}")
    
    # 1. Missing values analysis
    missing_data = pd.DataFrame({
        'Column': df.columns,
        'Missing_Count': df.isnull().sum(),
        'Missing_Percentage': (df.isnull().sum() / len(df)) * 100,
        'Data_Type': df.dtypes
    })
    missing_data = missing_data.sort_values('Missing_Percentage', ascending=False)
    quality_metrics['missing_data'] = missing_data
    
    # 2. Duplicate analysis
    duplicates = df.duplicated().sum()
    duplicate_rate = (duplicates/len(df)*100)
    quality_metrics['duplicates'] = {
        'count': duplicates,
        'percentage': duplicate_rate
    }
    
    # 3. Data type distribution
    dtype_dist = df.dtypes.value_counts()
    quality_metrics['dtype_distribution'] = dtype_dist
    
    # 4. Unique values analysis
    unique_counts = {}
    for col in df.columns:
        if df[col].dtype in ['object', 'string', 'category']:
            unique_counts[col] = df[col].nunique()
    quality_metrics['unique_values'] = unique_counts
    
    # Display results
    logging.info("\n=== MISSING VALUES ANALYSIS ===")
    display(missing_data[missing_data['Missing_Count'] > 0])
    
    logging.info(f"\n=== DUPLICATE ANALYSIS ===")
    logging.info(f"Total duplicates: {duplicates} ({duplicate_rate:.2f}%)")
    
    logging.info("\n=== DATA TYPE DISTRIBUTION ===")
    display(dtype_dist)
    
    logging.info("\n=== UNIQUE VALUES IN CATEGORICAL COLUMNS ===")
    for col, count in unique_counts.items():
        logging.info(f"{col}: {count} unique values")
    
    return quality_metrics

# Chạy quality report cho cả 3 dataset
data_quality = enhanced_data_quality_report(data, "Movies 2017")
credits_quality = enhanced_data_quality_report(credits, "Credits")
meta_quality = enhanced_data_quality_report(meta, "Metadata")


In [None]:
def safe_json_parse(json_str: str, default_value: Dict = None) -> Dict:
    """
    Parse JSON string an toàn với logging và validation
    
    Args:
        json_str: Chuỗi JSON cần parse
        default_value: Giá trị mặc định nếu parse thất bại (default: {})
    
    Returns:
        Dict đã parse hoặc default value
    """
    if default_value is None:
        default_value = {}
        
    try:
        if pd.isna(json_str):
            return default_value
        parsed_data = ast.literal_eval(json_str)
        if not isinstance(parsed_data, list):
            logging.warning(f"JSON data không phải là list: {json_str[:100]}...")
            return default_value
        return parsed_data
    except Exception as e:
        logging.warning(f"Lỗi khi parse JSON: {str(e)[:100]}...")
        return default_value

def process_json_columns(df: pd.DataFrame, columns: List[str]) -> pd.DataFrame:
    """
    Xử lý nhiều cột JSON với validation và logging
    
    Args:
        df: DataFrame cần xử lý
        columns: List các cột JSON cần xử lý
    
    Returns:
        DataFrame đã xử lý
    """
    df = df.copy()
    for col in columns:
        if col in df.columns:
            logging.info(f"Processing {col} column...")
            df[col] = df[col].apply(safe_json_parse)
            
            # Validate kết quả
            valid_rows = df[col].apply(lambda x: isinstance(x, list)).sum()
            total_rows = len(df)
            success_rate = (valid_rows / total_rows) * 100
            
            logging.info(f"{col} processing results:")
            logging.info(f"- Valid rows: {valid_rows}/{total_rows}")
            logging.info(f"- Success rate: {success_rate:.2f}%")
            
            # Sample data
            if len(df) > 0:
                logging.info(f"\nSample {col} data:")
                logging.info(df[col].iloc[0])
    
    return df

# Xử lý JSON columns cho cả hai dataset
json_columns = ['cast', 'crew', 'genres']
data = process_json_columns(data, json_columns)
credits = process_json_columns(credits, json_columns)


In [None]:
# Xử lý genres
def make_genresList(x):
    gen = []
    st = " "
    for i in x:
        if i.get('name') == 'Science Fiction':
            scifi = 'Sci-Fi'
            gen.append(scifi)
        else:
            gen.append(i.get('name'))
    if gen == []:
        return np.NaN
    else:
        return (st.join(gen))

# Xử lý actors
def get_actor1(x):
    casts = []
    for i in x:
        casts.append(i.get('name'))
    if casts == []:
        return np.NaN
    else:
        return (casts[0])

def get_actor2(x):
    casts = []
    for i in x:
        casts.append(i.get('name'))
    if casts == [] or len(casts)<=1:
        return np.NaN
    else:
        return (casts[1])

def get_actor3(x):
    casts = []
    for i in x:
        casts.append(i.get('name'))
    if casts == [] or len(casts)<=2:
        return np.NaN
    else:
        return (casts[2])

# Xử lý directors
def get_directors(x):
    dt = []
    st = " "
    for i in x:
        if i.get('job') == 'Director':
            dt.append(i.get('name'))
    if dt == []:
        return np.NaN
    else:
        return (st.join(dt))

# Áp dụng các hàm xử lý
data['genres_list'] = data['genres'].map(lambda x: make_genresList(x))
data['actor_1_name'] = data['cast'].map(lambda x: get_actor1(x))
data['actor_2_name'] = data['cast'].map(lambda x: get_actor2(x))
data['actor_3_name'] = data['cast'].map(lambda x: get_actor3(x))
data['director_name'] = data['crew'].map(lambda x: get_directors(x))

# Tạo DataFrame movie với các cột cần thiết
movie = data.loc[:, ['director_name', 'actor_1_name', 'actor_2_name', 'actor_3_name', 'genres_list', 'title']]

# Kiểm tra missing values
logging.info("\n=== MISSING VALUES CHECK ===")
display(movie.isna().sum())

# Xóa các dòng có missing values
movie = movie.dropna(how='any')
logging.info("\n=== AFTER DROPPING NA ===")
display(movie.isna().sum())

# Đổi tên cột
movie = movie.rename(columns={'genres_list': 'genres', 'title': 'movie_title'})
movie['movie_title'] = movie['movie_title'].str.lower()

# Tạo cột combined features
movie['comb'] = movie['actor_1_name'] + ' ' + movie['actor_2_name'] + ' ' + movie['actor_3_name'] + ' ' + movie['director_name'] + ' ' + movie['genres']

logging.info("\n=== FINAL PROCESSED DATA ===")
display(movie.head())


In [None]:
logging.info("\n=== TF-IDF PROCESSING ===")
tfidf = TfidfVectorizer(stop_words='english')
genres_tfidf = tfidf.fit_transform(movie['genres'])  
genres_df = pd.DataFrame(genres_tfidf.toarray(), columns=tfidf.get_feature_names_out())
movie = pd.concat([movie, genres_df], axis=1)  

print("Sample TF-IDF features:")
display(genres_df.head())

In [None]:
# Load dữ liệu cũ để kết hợp
try:
    old = pd.read_csv('../datasets/data.csv')
    logging.info("\n=== LOADING OLD DATA ===")
    logging.info(f"Old data shape: {old.shape}")
    
    # Tạo cột combined features cho dữ liệu cũ
    old['comb'] = old['actor_1_name'] + ' ' + old['actor_2_name'] + ' ' + old['actor_3_name'] + ' ' + old['director_name'] + ' ' + old['genres']
    
    # Kết hợp dữ liệu cũ và mới
    final_data = pd.concat([old, movie], ignore_index=True)
    logging.info(f"\nCombined data shape: {final_data.shape}")

    final_data = final_data.drop_duplicates(subset="movie_title", keep='last')
    logging.info(f"After removing duplicates: {final_data.shape}")
    
    # Xuất ra file
    output_file = '../datasets/new_data.csv'
    final_data.to_csv(output_file, index=False)
    logging.info(f"\nĐã xuất dữ liệu kết hợp ra file: {output_file}")
    
except FileNotFoundError:
    logging.warning("\nKhông tìm thấy file data.csv cũ, chỉ xuất dữ liệu mới")

    processed_movie_data = movie.drop_duplicates(subset="movie_title", keep='last')
    logging.info(f"After removing duplicates: {processed_movie_data.shape}")
    
    output_file = '../datasets/new_data.csv'
    processed_movie_data.to_csv(output_file, index=False) 


In [None]:
## 6. Export Processed Data

# Lọc và chuẩn bị dữ liệu cuối cùng
final_columns = [
    'movie_title', 'director_name', 
    'actor_1_name', 'actor_2_name', 'actor_3_name',
    'genres'
] + list(genres_df.columns)  # Thêm các cột TF-IDF

# Tạo DataFrame cuối cùng với các cột đã chọn
final_data = data[final_columns].copy()

# Kiểm tra dữ liệu trước khi xuất
logging.info("\n=== FINAL DATA QUALITY CHECK ===")
logging.info(f"Shape: {final_data.shape}")
logging.info(f"Memory usage: {final_data.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
logging.info("\nSample of final data:")
display(final_data.head())

# Xuất ra file CSV
output_file = '../datasets/movies_2017_processed.csv'
final_data.to_csv(output_file, index=False)
logging.info(f"\nĐã xuất dữ liệu đã xử lý ra file: {output_file}")
