In [None]:
!pip install pandas pyarrow matplotlib seaborn kaggle



In [None]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# Load data
movies_df = pd.read_csv('/content/movies_metadata.csv', low_memory=False)
ratings_df = pd.read_csv('/content/ratings_small.csv')

# Create 'tables' (DataFrames) for our data warehouse
def create_movie_dimension(df):
    movie_dim = df[['id', 'title', 'release_date', 'budget', 'revenue', 'runtime']]
    movie_dim['release_date'] = pd.to_datetime(movie_dim['release_date'], errors='coerce')
    return movie_dim

def create_genre_dimension(df):
    genres = df['genres'].apply(eval).explode()
    return pd.json_normalize(genres)[['id', 'name']]

def create_fact_table(movies_df, ratings_df):
    movies_df['id'] = pd.to_numeric(movies_df['id'], errors='coerce')
    return pd.merge(ratings_df, movies_df[['id', 'title']], left_on='movieId', right_on='id', how='inner')

movie_dim = create_movie_dimension(movies_df)
genre_dim = create_genre_dimension(movies_df)
fact_table = create_fact_table(movies_df, ratings_df)

print("Data Warehouse tables created.")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  movie_dim['release_date'] = pd.to_datetime(movie_dim['release_date'], errors='coerce')


Data Warehouse tables created.


In [None]:
import datetime

def extract():
    movies_df = pd.read_csv('/content/movies_metadata.csv', low_memory=False)
    ratings_df = pd.read_csv('/content/ratings_small.csv')
    return movies_df, ratings_df

def transform(movies_df, ratings_df):
    movie_dim = create_movie_dimension(movies_df)
    genre_dim = create_genre_dimension(movies_df)
    fact_table = create_fact_table(movies_df, ratings_df)
    return movie_dim, genre_dim, fact_table

def load(movie_dim, genre_dim, fact_table):
    movie_dim.to_parquet('movie_dimension.parquet')
    genre_dim.to_parquet('genre_dimension.parquet')
    fact_table.to_parquet('ratings_fact.parquet')

def etl_pipeline():
    print("Extracting data...")
    movies_df, ratings_df = extract()

    print("Transforming data...")
    movie_dim, genre_dim, fact_table = transform(movies_df, ratings_df)

    print("Loading data...")
    load(movie_dim, genre_dim, fact_table)

    print("ETL pipeline completed.")

# Run the ETL pipeline
etl_pipeline()

Extracting data...
Transforming data...


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  movie_dim['release_date'] = pd.to_datetime(movie_dim['release_date'], errors='coerce')


Loading data...
ETL pipeline completed.


In [None]:
def partition_by_year(df, date_column):
    df['year'] = df[date_column].dt.year
    for year, group in df.dropna(subset=['year']).groupby('year'):
        pq.write_table(pa.Table.from_pandas(group), f'movies_{int(year)}.parquet')

def create_index(df, column):
    return df.set_index(column).sort_index()

# Usage
partition_by_year(movie_dim, 'release_date')
movie_dim_indexed = create_index(movie_dim, 'id')

print("Optimization complete.")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['year'] = df[date_column].dt.year


Optimization complete.


In [None]:
import hashlib

def data_lineage(df, source):
    df['DataSource'] = source
    df['LoadTimestamp'] = datetime.datetime.now()
    return df

def audit_log(operation, user):
    with open('audit_log.txt', 'a') as f:
        f.write(f"{datetime.datetime.now()} - {operation} performed by {user}\n")

def access_control(user, allowed_users):
    if user in allowed_users:
        return True
    else:
        raise PermissionError("User not authorized")

# Usage
movie_dim = data_lineage(movie_dim, "/content/movies_metadata.csv")
audit_log("ETL Process", "DataEngineer1")
try:
    if access_control("DataEngineer1", ["DataEngineer1", "DataAnalyst1"]):
        print("Access granted")
except PermissionError as e:
    print(e)

Access granted


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['DataSource'] = source
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['LoadTimestamp'] = datetime.datetime.now()


In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'movie_etl',
    default_args=default_args,
    description='ETL process for movie data warehouse',
    schedule_interval=timedelta(days=1),
)

extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform,
    dag=dag,
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load,
    dag=dag,
)

extract_task >> transform_task >> load_task


ModuleNotFoundError: No module named 'airflow'