# Dados externos IMDb

O objetivo desse notebook é servir com um ETL para os dados externos do [IMDb](https://developer.imdb.com/non-commercial-datasets/).

O output final será um `csv` que será salvo no S3 e posteriormente exportado para a máquina virtual que contém o restante da aplicação de forma local.

Seguiremos os sequintes passos:

1. Configurar os bucktes no S3 para salvar os dados de entrada, cache do Athena e dados de saída
2. Baixar os dados do IMDb e salvar na landing zonde do S3
3. Configurar o Glue para mapearmos os dados necessários
4. Breve visualização dos dados utilizando pyathena
5. Utilizar o pyathena para executar as queries necessárias e gerar o output final

Faremos cada uma das etapas apresentando um passo a passo necessário.

Quando for necessária execução externa ao notebook, indicaremos. No mais, utilizamos `boto3` para fazer tudo que for possível.

Ps.: Esse notebook foi testado e executado no [ml.t3.xlarge](https://docs.aws.amazon.com/sagemaker/latest/dg/notebooks-available-instance-types.html).

In [1]:
import boto3
import sagemaker
import time
import botocore
from botocore.exceptions import ClientError

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


## 1. Configurar os buckets no S3

Ps.: Utilizar sempre a região Norte de Virgínia e o IAM Role deve ser LabRole

1. Define um random number (vamos usar o account_id) para ser utilizado ao longo do notebook
2. Criamos um bucket no S3 chamado `alv-cache-athena-{random_number}`
3. Criamos um bucket no S3 chamado `alv-dl-{random_number}`
    * Cria subpasta `imdb/landing-zone`
    * Cria subpasta `imdb/brute-zone`
        * Cria as subpastas:
            * `name-basics`
            * `title-akas`
            * `title-basics`
            * `title-crew`
            * `title-episode`
            * `title-principals`
            * `title-ratings`
    * Cria subpasta `imdb/out-zone`

In [7]:
# Set default region
region = "us-east-1"
print(f"Current AWS Region: {region}")

# Get account Id to be used as random number
account_id = boto3.client('sts').get_caller_identity()['Account']
print(f"Current AWS Account ID: {account_id}")

# Define S3 client
s3_client = boto3.client('s3', region_name=region)

Current AWS Region: us-east-1
Current AWS Account ID: 667214058531


In [None]:
# Define bucket names and creathe them
def create_bucket(bucket_name: str, s3_client):
    """
    Creates an S3 bucket if it does not already exist.
    Handles region-specific creation.
    """
    try:
        # Get the region from the client
        region = s3_client.meta.region_name

        if region == 'us-east-1':
            # 'us-east-1' is the default and doesn't need a LocationConstraint
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            # All other regions require a LocationConstraint
            location_config = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name,
                                    CreateBucketConfiguration=location_config)

        print(f"✅ Successfully created bucket: {bucket_name}")

    except botocore.exceptions.ClientError as e:
        # Check for specific "already exists" errors
        error_code = e.response['Error']['Code']
        if error_code == 'BucketAlreadyOwnedByYou' or error_code == 'BucketAlreadyExists':
            print(f"⚠️ Bucket '{bucket_name}' already exists. Skipping creation.")
        else:
            # Raise other errors (e.g., Access Denied)
            print(f"❌ Error creating bucket: {e}")
            raise e

In [None]:
def create_folder(bucket_name: str, folder_name: str, s3_client):
    """
    Creates an S3 "folder" (a zero-byte object with a trailing slash)
    if it does not already exist.
    """
    # Ensure folder name has a trailing slash
    folder_key = folder_name.strip('/') + "/"

    try:
        # Check if the "folder" (object) already exists
        s3_client.head_object(Bucket=bucket_name, Key=folder_key)
        print(f"⚠️ Folder '{folder_name}' already exists in bucket '{bucket_name}'. Skipping creation.")

    except botocore.exceptions.ClientError as e:
        # If a client error is thrown, check if it was a 404 error.
        # If it was a 404 error, the "folder" does not exist.
        error_code = e.response['Error']['Code']
        if error_code == '404':
            try:
                # "Folder" not found, so create it by putting an empty object
                s3_client.put_object(Bucket=bucket_name, Key=folder_key, Body='')
                print(f"✅ Successfully created '{folder_name}' in bucket '{bucket_name}'")
            except Exception as create_e:
                print(f"❌ Error creating folder: {create_e}")
                raise create_e
        else:
            # Some other error (e.g., 403 Forbidden)
            print(f"❌ Error checking folder: {e}")
            raise e

In [None]:
bucket_name_cache_athena = f"alv-cache-athena-{account_id}"
bucket_name_dl = f"alv-dl-{account_id}"
folder_imdb = "imdb"
folder_landing_zone = "imdb/landing-zone"
folder_brute_zone = "imdb/brute-zone"
folders_tables = [
    "name-basics",
    "title-akas",
    "title-basics",
    "title-crew",
    "title-episode",
    "title-principals",
    "title-ratings",
]
folder_out_zone = "imdb/out-zone"

# Create buckets
create_bucket(bucket_name_cache_athena, s3_client)
create_bucket(bucket_name_dl, s3_client)

# Create folders
create_folder(bucket_name_dl, folder_imdb, s3_client)
create_folder(bucket_name_dl, folder_landing_zone, s3_client)
create_folder(bucket_name_dl, folder_brute_zone, s3_client)

for table in folders_tables:
    create_folder(bucket_name_dl, f"{folder_brute_zone}/{table}", s3_client)

create_folder(bucket_name_dl, folder_out_zone, s3_client)

## 2. Baixar os dados do IMDb

1. Fazer o download do dump do IMDb
2. Mover para a landing-zone no S3

In [9]:
imbd_zip_links = {
    "name-basics": "https://datasets.imdbws.com/name.basics.tsv.gz",
    "title-akas": "https://datasets.imdbws.com/title.akas.tsv.gz",
    "title-basics": "https://datasets.imdbws.com/title.basics.tsv.gz",
    "title-crew": "https://datasets.imdbws.com/title.crew.tsv.gz",
    "title-episode": "https://datasets.imdbws.com/title.episode.tsv.gz",
    "title-principals": "https://datasets.imdbws.com/title.principals.tsv.gz",
    "title-ratings": "https://datasets.imdbws.com/title.ratings.tsv.gz",
}

s3 = boto3.resource('s3')

for table, url in imbd_zip_links.items():
    zipped_file = f"/tmp/{table}.tsv.gz"
    unzipped_file = f"/tmp/{table}.tsv"
    print(f"Table: {table}")

    print("\tDownloading...")
    !curl {url} -o {zipped_file}

    print("\tUnzipping...")
    !gunzip -c {zipped_file} > {unzipped_file}

    print("\tMoving...")
    s3.meta.client.upload_file(unzipped_file, bucket_name_dl, f'{folder_landing_zone}/{table}.tsv')

Table: name-basics
	Downloading...
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  280M  100  280M    0     0   281M      0 --:--:-- --:--:-- --:--:--  281M
	Unzipping...
	Moving...
Table: title-akas
	Downloading...
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  446M  100  446M    0     0   256M      0  0:00:01  0:00:01 --:--:--  256M
	Unzipping...
	Moving...
Table: title-basics
	Downloading...
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  203M  100  203M    0     0   262M      0 --:--:-- --:--:-- --:--:--  262M
	Unzipping...
	Moving...
Table: title-crew
	Downloading...
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Cur

## 3. Configurar o Glue para mapearmos os dados necessários

1. Criar database no glue `imdb`
2. Criar as tabelas no glue utilizando o esquema correto e o formato avro:
    * `name-basics`
    * `title-akas`
    * `title-basics`
    * `title-crew`
    * `title-episode`
    * `title-principals`
    * `title-ratings`
3. Criar um ETL com:
    * Origem: arquivo da `landing-zone`
    * Destino: tabela do glue
    * Rodar job como: Update schema and add new partitions
4. Criar crawler para ler tabela da `brute-zone`

In [3]:
# Define schema
glue_schemas = {
  "title-akas": [
    {"Name": "titleId", "Type": "string"},
    {"Name": "ordering", "Type": "bigint"},
    {"Name": "title", "Type": "string"},
    {"Name": "region", "Type": "string"},
    {"Name": "language", "Type": "string"},
    {"Name": "types", "Type": "string"},
    {"Name": "attributes", "Type": "string"},
    {"Name": "isOriginalTitle", "Type": "bigint"}
  ],
  "title-basics": [
    {"Name": "tconst", "Type": "string"},
    {"Name": "titleType", "Type": "string"},
    {"Name": "primaryTitle", "Type": "string"},
    {"Name": "originalTitle", "Type": "string"},
    {"Name": "isAdult", "Type": "bigint"},
    {"Name": "startYear", "Type": "bigint"},
    {"Name": "endYear", "Type": "bigint"},
    {"Name": "runtimeMinutes", "Type": "bigint"},
    {"Name": "genres", "Type": "string"}
  ],
  "title-crew": [
    {"Name": "tconst", "Type": "string"},
    {"Name": "directors", "Type": "string"},
    {"Name": "writers", "Type": "string"}
  ],
  "title-episode": [
    {"Name": "tconst", "Type": "string"},
    {"Name": "parentTconst", "Type": "string"},
    {"Name": "seasonNumber", "Type": "bigint"},
    {"Name": "episodeNumber", "Type": "bigint"}
  ],
  "title-principals": [
    {"Name": "tconst", "Type": "string"},
    {"Name": "ordering", "Type": "bigint"},
    {"Name": "nconst", "Type": "string"},
    {"Name": "category", "Type": "string"},
    {"Name": "job", "Type": "string"},
    {"Name": "characters", "Type": "string"}
  ],
  "title-ratings": [
    {"Name": "tconst", "Type": "string"},
    {"Name": "averageRating", "Type": "double"},
    {"Name": "numVotes", "Type": "bigint"}
  ],
  "name-basics": [
    {"Name": "nconst", "Type": "string"},
    {"Name": "primaryName", "Type": "string"},
    {"Name": "birthYear", "Type": "bigint"},
    {"Name": "deathYear", "Type": "bigint"},
    {"Name": "primaryProfession", "Type": "string"},
    {"Name": "knownForTitles", "Type": "string"}
  ]
}

In [None]:
# Create database and tables
def create_glue_database(database_name: str, glue_client):
    try:
        glue_client.create_database(
            DatabaseInput={'Name': database_name}
        )
        print(f"✅ Database '{database_name}' created successfully.")
    except glue_client.exceptions.AlreadyExistsException:
        print(f"❌ Database '{database_name}' already exists.")
    except Exception as e:
        print(f"❌ Error creating database: {e}")

In [None]:
def create_glue_table(
    database_name: str,
    table_name: str,
    table_schema: list,
    glue_client
):
    table_input = {
        'Name': f'forma-vazia-{table_name}',
        'Description': 'A table created manually from Boto3 for CSV data.',
        'TableType': 'EXTERNAL_TABLE',
        'Parameters': {
            'classification': 'Avro',
            'skip.header.line.count': '1',
        },
        'StorageDescriptor': {
            'Columns': table_schema,
            'Location': f's3://{bucket_name_dl}/{folder_brute_zone}/{table_name}',
        },
    }
    try:
        glue_client.create_table(
            DatabaseName=database_name,
            TableInput=table_input
        )
        print(f"✅ Table '{table_name}' created successfully in database '{database_name}'.")
    except glue_client.exceptions.AlreadyExistsException:
        print(f"❌ Table '{table_name}' already exists.")
    except Exception as e:
        print(f"❌ Error creating table: {e}")

In [4]:
glue_database_name = "imdb"
glue_client = boto3.client('glue', region_name=region)

create_glue_database(glue_database_name, glue_client)

for table, schema in glue_schemas.items():
    create_glue_table(glue_database_name, table, schema, glue_client)

❌ Database 'imdb' already exists.
❌ Table 'title-akas' already exists.
❌ Table 'title-basics' already exists.
❌ Table 'title-crew' already exists.
❌ Table 'title-episode' already exists.
❌ Table 'title-principals' already exists.
❌ Table 'title-ratings' already exists.
❌ Table 'name-basics' already exists.


Ainda não conseguimos automatizar a parte de criação de ETL e de crawler, então precisa ser manual.

Tabelas:
* name-basics
* title-akas
* title-basic
* title-crew
* title-episode
* title-principals
* title-ratings

Siga o passo a passo para cada uma das tabelas:

1. Acesse o Glue
2. Vá em visual ETL e crie um novo com
    * Nome: `cria-forma-vazia-{table}`
    * IAM Role: LabRole
    * Origem: Amazon S3
        * S3 URL: `s3://alv-dl-{account_id}/imdb/landing-zone/{table}.tsv`
        * Data Format: CSV
        * Delimiter: Tab
    * Destino: AWS GLue Data Catalog
        * Database: imdb
        * Table: `forma-vazia-{table}`
        * Update schema and add new partitions
3. Execute o ETL
4. Vá em Crawler e crie um novo com:
    * Nome: `ler-{table}`
    * Source: `s3://alv-dl-{account_id}/imdb/brute-zone/{table}/`
    * IAM Role: LabRole
    * Database: imdb
5. Execute o Crawler

In [8]:
PYSPARK_ETL_SCRIPT = """
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# Obter argumentos passados para o job
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'S3_SOURCE_PATH',
    'TARGET_DATABASE',
    'TARGET_TABLE'
])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print(f"Iniciando job ETL para ler de: {args['S3_SOURCE_PATH']}")

# Origem: Ler o arquivo TSV (CSV com delimitador Tab) da landing-zone.
# Assumimos que o arquivo tem cabeçalho (withHeader=True).
dynamic_frame_source = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [args['S3_SOURCE_PATH']]},
    format="csv",
    format_options={"separator": "\\t", "withHeader": True}
)

print(f"Gravando no destino: {args['TARGET_DATABASE']}.{args['TARGET_TABLE']}")

# Destino: Gravar no Glue Data Catalog.
# A tabela de destino (forma-vazia-*) já foi definida com formato Avro.
# O GlueContext cuidará da conversão de CSV (DynamicFrame) para Avro.
# Isso implementa o "Update schema and add new partitions".
glueContext.write_dynamic_frame.from_catalog(
    frame=dynamic_frame_source,
    database=args['TARGET_DATABASE'],
    table_name=args['TARGET_TABLE']
)

job.commit()
print("Job ETL concluído com sucesso.")
"""

In [9]:
def create_glue_etl(
    glue_client,
    s3_client,
    table_name: str,
    iam_role: str,
    bucket_name_dl: str,
    folder_scripts: str
) -> str:
    """
    Automatiza a Etapa 2: Criação do Job ETL no Glue.

    Isso envolve:
    1. Fazer upload de um script PySpark gerado para o S3.
    2. Criar um Glue Job que aponta para esse script.
    """
    job_name = f"cria-forma-vazia-{table_name}"
    script_key = f"{folder_scripts.strip('/')}/{job_name}_script.py"
    script_s3_path = f"s3://{bucket_name_dl}/{script_key}"

    try:
        # 1. Fazer upload do script PySpark para o S3
        print(f"Fazendo upload do script ETL para: {script_s3_path}")
        s3_client.put_object(
            Bucket=bucket_name_dl,
            Key=script_key,
            Body=PYSPARK_ETL_SCRIPT
        )
    except ClientError as e:
        print(f"❌ Erro ao fazer upload do script para S3: {e}")
        raise

    try:
        # 2. Criar o Glue Job
        print(f"Criando Glue Job: {job_name}")
        glue_client.create_job(
            Name=job_name,
            Role=iam_role,
            Command={
                'Name': 'glueetl',
                'ScriptLocation': script_s3_path,
                'PythonVersion': '3'
            },
            GlueVersion='3.0',  # Usar 3.0 ou 4.0 é uma boa prática
            Description=f'Job ETL para processar {table_name} do TSV para Avro',
            DefaultArguments={
                '--job-language': 'python'
            }
        )
        print(f"✅ Job '{job_name}' criado com sucesso.")
        return job_name

    except glue_client.exceptions.AlreadyExistsException:
        print(f"⚠️ Job '{job_name}' já existe. Pulando criação.")
        return None

    except ClientError as e:
        print(f"❌ Erro ao criar o Job: {e}")
        raise

In [10]:
def run_glue_etl(
    glue_client,
    table_name: str,
    bucket_name_dl: str,
    folder_landing_zone: str,
    glue_database_name: str
):
    """
    Automatiza a Etapa 3: Executa o Job ETL criado.
    """
    job_name = f"cria-forma-vazia-{table_name}"
    s3_source_path = f"s3://{bucket_name_dl}/{folder_landing_zone.strip('/')}/{table_name}.tsv"
    target_table_name = f"forma-vazia-{table_name}"

    try:
        response = glue_client.start_job_run(
            JobName=job_name,
            Arguments={
                '--S3_SOURCE_PATH': s3_source_path,
                '--TARGET_DATABASE': glue_database_name,
                '--TARGET_TABLE': target_table_name
            }
        )
        job_run_id = response['JobRunId']
        print(f"✅ Job '{job_name}' iniciado com sucesso. Run ID: {job_run_id}")
        return job_run_id

    except ClientError as e:
        print(f"❌ Erro ao iniciar o Job: {e}")
        raise

In [11]:
def create_crawler(
    glue_client,
    table_name: str,
    iam_role: str,
    glue_database_name: str,
    bucket_name_dl: str,
    folder_brute_zone: str
):
    """
    Automatiza a Etapa 4: Criação do Crawler do Glue.
    """
    crawler_name = f"ler-{table_name}"
    s3_target_path = f"s3://{bucket_name_dl}/{folder_brute_zone.strip('/')}/{table_name}/"

    try:
        glue_client.create_crawler(
            Name=crawler_name,
            Role=iam_role,
            DatabaseName=glue_database_name,
            Description=f'Crawler para ler dados da brute-zone de {table_name}',
            Targets={
                'S3Targets': [
                    {
                        'Path': s3_target_path
                    }
                ]
            },
            # Política para atualizar a tabela 'forma-vazia-{table}'
            # que já aponta para este local S3.
            SchemaChangePolicy={
                'UpdateBehavior': 'UPDATE_IN_DATABASE',
                'DeleteBehavior': 'LOG'  # Não deletar tabelas, apenas logar
            }
        )
        print(f"✅ Crawler '{crawler_name}' criado com sucesso.")
        return crawler_name

    except glue_client.exceptions.AlreadyExistsException:
        print(f"⚠️ Crawler '{crawler_name}' já existe. Pulando criação.")
        return None

    except ClientError as e:
        print(f"❌ Erro ao criar o Crawler: {e}")
        raise

In [12]:
def run_crawler(glue_client, table_name: str):
    """
    Automatiza a Etapa 5: Executa o Crawler.
    """
    crawler_name = f"ler-{table_name}"

    try:
        # Verificar se o crawler não está rodando
        crawler_state = glue_client.get_crawler(Name=crawler_name)['Crawler']['State']
        if crawler_state == 'RUNNING':
            print(f"⚠️ Crawler '{crawler_name}' já está em execução.")
            return

        glue_client.start_crawler(Name=crawler_name)
        print(f"✅ Crawler '{crawler_name}' iniciado com sucesso.")

    except glue_client.exceptions.CrawlerRunningException:
        print(f"⚠️ Crawler '{crawler_name}' já está em execução.")
    except ClientError as e:
        print(f"❌ Erro ao iniciar o Crawler: {e}")
        raise

In [13]:
def wait_for_crawler_to_finish(glue_client, crawler_name: str, delay_seconds: int = 10):
    """
    Verifica manualmente o status de um Crawler até que ele termine.
    O Boto3 não possui um waiter para 'crawler_completion'.
    """
    print(f"Aguardando o Crawler '{crawler_name}' terminar...")
    while True:
        try:
            response = glue_client.get_crawler(Name=crawler_name)
            state = response['Crawler']['State']

            if state == 'READY':
                # O crawler terminou e está pronto para a próxima execução
                print(f"\t✅ Crawler concluiu com sucesso.")
                break
            elif state == 'RUNNING' or state == 'STOPPING':
                # Ainda está em execução, continue esperando
                print(f"\tCrawler ainda está no estado: {state}...")
                time.sleep(delay_seconds)
            else:
                # O crawler falhou ou foi interrompido
                last_crawl = response['Crawler'].get('LastCrawl', {})
                error_message = last_crawl.get('ErrorMessage', 'Estado de falha desconhecido.')
                print(f"\t❌ Crawler falhou ou parou. \n\t\tEstado: {state}. \n\t\tErro: {error_message}")
                raise Exception(f"Crawler {crawler_name} falhou: {error_message}")

        except ClientError as e:
            print(f"❌ Erro ao verificar o status do crawler: {e}")
            raise

In [14]:
def wait_for_etl_run_to_finish(
    glue_client,
    job_name: str,
    run_id: str,
    delay_seconds: int = 10
):
    """
    Verifica manualmente o status de um Glue Job Run até que ele termine.
    """
    print(f"Aguardando o Job Run '{job_name}' (Run ID: {run_id}) terminar...")

    while True:
        try:
            response = glue_client.get_job_run(JobName=job_name, RunId=run_id)
            state = response['JobRun']['JobRunState']

            if state == 'SUCCEEDED':
                # O job terminou com sucesso
                print("\t✅ Job Run concluiu com sucesso.")
                break

            elif state in ('RUNNING', 'STARTING', 'STOPPING', 'WAITING'):
                # Ainda está em execução, continue esperando
                print(f"\tJob Run ainda está no estado: {state}....")
                time.sleep(delay_seconds)

            elif state in ('FAILED', 'STOPPED', 'TIMEOUT'):
                # O job falhou, foi parado ou deu timeout
                error_message = response['JobRun'].get('ErrorMessage', 'Estado de falha desconhecido.')
                print(f"\t❌ Job Run '{job_name}' (Run ID: {run_id}) falhou ou parou. \n\t\tEstado: {state}. \n\t\tErro: {error_message}")
                raise Exception(f"Job Run {job_name} (Run ID: {run_id}) falhou: {error_message}")

            else:
                # Um estado inesperado
                print(f"\t❌ Job Run '{job_name}' (Run ID: {run_id}) está em estado desconhecido: {state}")
                raise Exception(f"Estado desconhecido para Job Run {job_name} (Run ID: {run_id}): {state}")

        except ClientError as e:
            # Lidar com erros de API, como "ThrottlingException"
            error_code = e.response.get('Error', {}).get('Code')
            if error_code == 'ThrottlingException':
                print(f"Throttling detectado. Aguardando {delay_seconds}s antes de tentar novamente...")
                time.sleep(delay_seconds)
            else:
                print(f"❌ Erro ao verificar o status do Job Run: {e}")
                raise

In [17]:
IAM_ROLE = "LabRole"
GLUE_DATABASE_NAME = "imdb"
FOLDER_SCRIPTS = "imdb/scripts"

tables_to_process = [
    "name-basics",
    "title-akas",
    "title-basics",
    "title-crew",
    "title-episode",
    "title-principals",
    "title-ratings",
]

In [18]:
# Create ETLs
job_runs_names_and_ids = []
for table in tables_to_process:
    job_name = create_glue_etl(
        glue_client,
        s3_client,
        table,
        IAM_ROLE,
        bucket_name_dl,
        FOLDER_SCRIPTS
    )

    if job_name is None:
        continue

    job_run_id = run_glue_etl(
        glue_client,
        table,
        bucket_name_dl,
        folder_landing_zone,
        GLUE_DATABASE_NAME
    )
    job_runs_names_and_ids.append((job_name, job_run_id))

Fazendo upload do script ETL para: s3://alv-dl-667214058531/imdb/scripts/cria-forma-vazia-title-basics_script.py
Criando Glue Job: cria-forma-vazia-title-basics
✅ Job 'cria-forma-vazia-title-basics' criado com sucesso.
✅ Job 'cria-forma-vazia-title-basics' iniciado com sucesso. Run ID: jr_0d76db0e505a4b5c9c34f095be1173601848526edb80f48c7ad4555db7661df1
Fazendo upload do script ETL para: s3://alv-dl-667214058531/imdb/scripts/cria-forma-vazia-title-crew_script.py
Criando Glue Job: cria-forma-vazia-title-crew
✅ Job 'cria-forma-vazia-title-crew' criado com sucesso.
✅ Job 'cria-forma-vazia-title-crew' iniciado com sucesso. Run ID: jr_0e632e4b9d6dd31fd544745112e754f58bc464b698ea964d87c8d65d3e05bc30
Fazendo upload do script ETL para: s3://alv-dl-667214058531/imdb/scripts/cria-forma-vazia-title-episode_script.py
Criando Glue Job: cria-forma-vazia-title-episode
✅ Job 'cria-forma-vazia-title-episode' criado com sucesso.
✅ Job 'cria-forma-vazia-title-episode' iniciado com sucesso. Run ID: jr_f121

In [19]:
# Wait until all etls ends
for job_name, job_run_id in job_runs_names_and_ids:
    wait_for_etl_run_to_finish(glue_client, job_name, job_run_id)

Aguardando o Job Run 'cria-forma-vazia-title-basics' (Run ID: jr_0d76db0e505a4b5c9c34f095be1173601848526edb80f48c7ad4555db7661df1) terminar...
	Job Run ainda está no estado: RUNNING....
	Job Run ainda está no estado: RUNNING....
	Job Run ainda está no estado: RUNNING....
	Job Run ainda está no estado: RUNNING....
	Job Run ainda está no estado: RUNNING....
	Job Run ainda está no estado: RUNNING....
	Job Run ainda está no estado: RUNNING....
	Job Run ainda está no estado: RUNNING....
	Job Run ainda está no estado: RUNNING....
	Job Run ainda está no estado: RUNNING....
	Job Run ainda está no estado: RUNNING....
	Job Run ainda está no estado: RUNNING....
	✅ Job Run concluiu com sucesso.
Aguardando o Job Run 'cria-forma-vazia-title-crew' (Run ID: jr_0e632e4b9d6dd31fd544745112e754f58bc464b698ea964d87c8d65d3e05bc30) terminar...
	✅ Job Run concluiu com sucesso.
Aguardando o Job Run 'cria-forma-vazia-title-episode' (Run ID: jr_f121cf39b59d4ac8a9b6e6a39639b89e9533d00192bb886f0c05a69ee42528f5) te

In [20]:
# Crawlers
crawlers_names = []
for table in tables_to_process:
    # Create crawler
    crawler_name = create_crawler(
        glue_client,
        table,
        IAM_ROLE,
        GLUE_DATABASE_NAME,
        bucket_name_dl,
        folder_brute_zone
    )

    if crawler_name is None:
        continue

    # Run crawler
    run_crawler(glue_client, table)

    crawlers_names.append(crawler_name)

✅ Crawler 'ler-title-basics' criado com sucesso.
✅ Crawler 'ler-title-basics' iniciado com sucesso.
✅ Crawler 'ler-title-crew' criado com sucesso.
✅ Crawler 'ler-title-crew' iniciado com sucesso.
✅ Crawler 'ler-title-episode' criado com sucesso.
✅ Crawler 'ler-title-episode' iniciado com sucesso.
✅ Crawler 'ler-title-principals' criado com sucesso.
✅ Crawler 'ler-title-principals' iniciado com sucesso.
✅ Crawler 'ler-title-ratings' criado com sucesso.
✅ Crawler 'ler-title-ratings' iniciado com sucesso.


In [21]:
for crawler_name in crawlers_names:
    wait_for_crawler_to_finish(glue_client, crawler_name)

Aguardando o Crawler 'ler-title-basics' terminar...
	Crawler ainda está no estado: RUNNING...
	Crawler ainda está no estado: RUNNING...
	Crawler ainda está no estado: RUNNING...
	✅ Crawler concluiu com sucesso.
Aguardando o Crawler 'ler-title-crew' terminar...
	✅ Crawler concluiu com sucesso.
Aguardando o Crawler 'ler-title-episode' terminar...
	Crawler ainda está no estado: RUNNING...
	Crawler ainda está no estado: STOPPING...
	Crawler ainda está no estado: STOPPING...
	Crawler ainda está no estado: STOPPING...
	Crawler ainda está no estado: STOPPING...
	Crawler ainda está no estado: STOPPING...
	Crawler ainda está no estado: STOPPING...
	Crawler ainda está no estado: STOPPING...
	✅ Crawler concluiu com sucesso.
Aguardando o Crawler 'ler-title-principals' terminar...
	✅ Crawler concluiu com sucesso.
Aguardando o Crawler 'ler-title-ratings' terminar...
	Crawler ainda está no estado: STOPPING...
	Crawler ainda está no estado: STOPPING...
	Crawler ainda está no estado: STOPPING...
	✅ Cra

## 4. Breve visualização dos dados utilizando pyathena

1. Entender volume dos dados
2. Entender quais tabelas precisam ser cruzadas

In [23]:
import sys
!{sys.executable} -m pip install "pyathena[sqlalchemy]"

Collecting pyathena[sqlalchemy]
  Downloading pyathena-3.21.0-py3-none-any.whl.metadata (6.3 kB)
Collecting sqlalchemy>=1.0.0 (from pyathena[sqlalchemy])
  Downloading sqlalchemy-2.0.44-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.5 kB)
Collecting greenlet>=1 (from sqlalchemy>=1.0.0->pyathena[sqlalchemy])
  Downloading greenlet-3.2.4-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (4.1 kB)
Downloading pyathena-3.21.0-py3-none-any.whl (112 kB)
Downloading sqlalchemy-2.0.44-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m10.6 MB/s[0m  [33m0:00:00[0m eta [36m0:00:01[0m
[?25hDownloading greenlet-3.2.4-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (584 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m584.4/584.4 kB[0m [31m3.0 MB/s[0m  [33m0:00:00[0m
[?25hInstalling collected packages: greenlet, sqlalch

In [44]:
from pyathena import connect
import pandas as pd
from sqlalchemy import create_engine
import unicodedata
import re
from IPython.display import display

In [36]:
s3_staging_dir = f"s3://{bucket_name_cache_athena}/"

def get_table_name(table):
    return f'"{glue_name_database}"."{table}"'

engine = create_engine(f"awsathena+rest://@athena.{region}.amazonaws.com:443/{glue_name_database}?s3_staging_dir={s3_staging_dir}")  # noqa

In [54]:
tables = [
    "name_basics",
    "title_akas",
    "title_basics",
    "title_crew",
    "title_episode",
    "title_principals",
    "title_ratings",
]

for table in tables:
    df_table = pd.read_sql(f"SELECT * FROM {get_table_name(table)} LIMIT 5;", engine)
    size = pd.read_sql(f"SELECT COUNT(*) FROM {get_table_name(table)};", engine).iloc[0].iloc[0]
    print(f"Table: {table}")
    print(f"\tTamanho: {float(size):,.0f}")
    display(df_table)

Table: name_basics
	Tamanho: 14,863,018


Unnamed: 0,nconst,primaryname,birthyear,deathyear,primaryprofession,knownfortitles
0,nm0000001,Fred Astaire,1899,1987,"actor,miscellaneous,producer","tt0072308,tt0050419,tt0027125,tt0025164"
1,nm0000002,Lauren Bacall,1924,2014,"actress,miscellaneous,soundtrack","tt0037382,tt0075213,tt0038355,tt0117057"
2,nm0000003,Brigitte Bardot,1934,\N,"actress,music_department,producer","tt0057345,tt0049189,tt0056404,tt0054452"
3,nm0000004,John Belushi,1949,1982,"actor,writer,music_department","tt0072562,tt0077975,tt0080455,tt0078723"
4,nm0000005,Ingmar Bergman,1918,2007,"writer,director,actor","tt0050986,tt0069467,tt0083922,tt0050976"


Table: title_akas
	Tamanho: 53,779,343


Unnamed: 0,titleid,ordering,title,region,language,types,attributes,isoriginaltitle
0,tt0145015,1,Júlia Pastrana,\N,\N,original,\N,1
1,tt0145015,2,Júlia Pastrana,BR,\N,imdbDisplay,\N,0
2,tt0145016,1,O kabouris,\N,\N,original,\N,1
3,tt0145016,2,The Hunchback,GB,\N,imdbDisplay,\N,0
4,tt0145016,3,The Hunchback,US,\N,imdbDisplay,\N,0


Table: title_basics
	Tamanho: 24,095,050


Unnamed: 0,tconst,titletype,primarytitle,originaltitle,isadult,startyear,endyear,runtimeminutes,genres
0,tt11125930,tvEpisode,Chance the Rapper,Chance the Rapper,0,2019,\N,90,"Comedy,Music"
1,tt11125932,tvEpisode,Kristen Stewart/Coldplay,Kristen Stewart/Coldplay,0,2019,\N,90,"Comedy,Music"
2,tt11125936,tvEpisode,Jay-Z,Jay-Z,0,2000,\N,\N,"Biography,Documentary"
3,tt11125938,tvEpisode,EP402 - Whilma's Filipino Restaurant,EP402 - Whilma's Filipino Restaurant,0,2019,\N,\N,Documentary
4,tt1112594,tvEpisode,Episode #3.11,Episode #3.11,0,2007,\N,\N,\N


Table: title_crew
	Tamanho: 12,052,394


Unnamed: 0,tconst,directors,writers
0,tt38069463,nm11479487,"nm11498549,nm14915707"
1,tt38069464,nm11479487,"nm11498549,nm14915707"
2,tt38069465,nm11479487,"nm11498549,nm14915707"
3,tt38069466,nm11479487,"nm11498549,nm14915707"
4,tt38069467,\N,\N


Table: title_episode
	Tamanho: 9,288,054


Unnamed: 0,tconst,parenttconst,seasonnumber,episodenumber
0,tt33253302,tt13777138,4,123
1,tt33253304,tt0795129,196,9
2,tt33253305,tt27119289,\N,\N
3,tt33253306,tt0184090,\N,\N
4,tt33253307,tt13198896,\N,\N


Table: title_principals
	Tamanho: 95,751,042


Unnamed: 0,tconst,ordering,nconst,category,job,characters
0,tt10099562,12,nm0438471,writer,developed by,\N
1,tt10099562,13,nm3813965,writer,\N,\N
2,tt10099562,14,nm1941818,writer,\N,\N
3,tt10099562,15,nm8827833,writer,\N,\N
4,tt10099562,16,nm0438506,producer,producer,\N


Table: title_ratings
	Tamanho: 1,635,210


Unnamed: 0,tconst,averagerating,numvotes
0,tt0000001,5.7,2184
1,tt0000002,5.5,305
2,tt0000003,6.4,2264
3,tt0000004,5.2,196
4,tt0000005,6.2,3007


## 5. Utilizar o pyathena para executar as queries necessárias e gerar o output final

1. Fazer query necessária
2. Definir critério de ordenação
3. Salvar output em um csv na `out-zone`

In [55]:
movie_data = pd.read_sql(
    f"""
        SELECT *
        FROM {get_table_name('title_basics')} AS tb
        INNER JOIN {get_table_name('title_ratings')} AS tr
            ON tb.tconst = tr.tconst
        WHERE tb.titletype = 'movie';
    """,
    engine,
)

In [67]:
dtypes = {
    'tconst': str,
    'titletype': str,
    'primarytitle': str,
    'originaltitle': str,
    'isadult': float,
    'startyear': float,
    'endyear': float,
    'runtimeminutes': float,
    'genres': int,
    'averagerating': float,
    'numvotes': float,
}

for col, dtype in dtypes.items():
    if dtype is float:
        movie_data[col] = pd.to_numeric(movie_data[col], errors='coerce')

In [68]:
movie_data[['averagerating', 'numvotes']].describe()

Unnamed: 0,averagerating,numvotes
count,676916.0,676916.0
mean,6.137554,3646.642
std,1.3801,37194.13
min,1.0,5.0
25%,5.3,20.0
50%,6.2,62.0
75%,7.1,318.0
max,10.0,3117805.0


In [81]:
df = movie_data[
    (movie_data["averagerating"] >= 6.2)
    & (movie_data["numvotes"] >= 500)
][["tconst", "primarytitle", "startyear", "runtimeminutes", "genres", "averagerating", "numvotes"]]

df = df.loc[:,~df.columns.duplicated()].copy()

df = df.drop_duplicates('tconst')

# slice genres to get only first
df['genres'] = df['genres'].str.split(',', expand=True)[0]

df = df.sort_values(['averagerating', 'numvotes'], ascending=False).reset_index(drop=True)

df

Unnamed: 0,tconst,primarytitle,startyear,runtimeminutes,genres,averagerating,numvotes
0,tt37561269,Raju Gaani Savaal,2025.0,113.0,Action,9.6,1078
1,tt12119248,Manmauji,2024.0,138.0,Drama,9.6,757
2,tt34000241,Mannu Kya Karegga,2025.0,141.0,Drama,9.5,3103
3,tt36460794,Kousalya Tanaya Ragava,2025.0,147.0,\N,9.5,1212
4,tt33505969,Irudhi Muyarchi,2025.0,121.0,\N,9.5,1042
...,...,...,...,...,...,...,...
37371,tt0023878,Central Airport,1933.0,72.0,Drama,6.2,501
37372,tt0076754,The Swindle,1977.0,99.0,Action,6.2,501
37373,tt6504868,Sadie,2018.0,96.0,Drama,6.2,500
37374,tt4256516,"America, Here We Come",2014.0,90.0,Comedy,6.2,500


In [83]:
# Save into s3

s3 = boto3.resource('s3')
file_path = '/tmp/output.parquet'
df.to_parquet(file_path, index=False)
bucket_folder_output = f'{folder_out_zone}/output.parquet'
s3.meta.client.upload_file(file_path, bucket_name_dl, bucket_folder_output)

# Create Url to download outsite aws
url = s3_client.generate_presigned_url(
    'get_object',
    Params={'Bucket': bucket_name_dl, 'Key': bucket_folder_output},
    ExpiresIn=60 * 60 * 24 * 15  # 1 hour
)

print("Download link:", url)

Download link: https://alv-dl-667214058531.s3.amazonaws.com/imdb/out-zone/output.parquet?AWSAccessKeyId=ASIAZWWIOSQRZZPMFRCO&Signature=UywApicjdPGkACJAEHtkDceqDN8%3D&x-amz-security-token=IQoJb3JpZ2luX2VjEGIaCXVzLWVhc3QtMSJGMEQCIDNj4ospzfTVkAfKNENi8kgKHPMLF2GVN4aDljTcf6dfAiBDttRwxqWSs6jD%2BFQZDCKhJvoaZTHsOsNJdT42JQUT%2BCq2AggrEAAaDDY2NzIxNDA1ODUzMSIM6I3F0r7wCij30LM4KpMCe6Fd%2FxqT99HFtJZswIAO5AGuy83jL%2F9Vyp6zsq8PXJQg2BUMoIJF52W6mvPqIucF5mUfdNw1sUC%2B74Nh2iKm%2BiLT7uNaiJYeMg2k6XhyzLuj4ptUPMkH%2F13wCbAIvNVo1RrQUhxkMy9%2BJbaHe8yNnWGpaSQOapcmHMtAVlCVcbn265WHXNHP6ZX0op3R1SK46fh8diB%2FDlJFcOXwZhOvZiphvYRjID23JdXpbMEST613IThiCFVnnCjpAogTeMv1fXqbHMuiyM4MSghnOEe0FEVsJt%2Fa2m7FtotRWWfqhk5jJcabp%2FcD%2B3UyMLSFY80OsCuyWUAnIY48uQQ1bJ4ga%2Fn44pw00DKSeSmA2GyZR6M4cyMwvMTPyAY6kgGYgJiSV2iMzRSN%2Bkvu1reoM5QYnxOnQ%2Fne2vBMuGOoBVdgbez8gKz0Hi7LvvA3t4nG2txd1sb2ypa3HQXsvtTOpN5xtbI5mzrT5log5VQzGf%2Bioi5Bszo7o8zZw6Q9a1KABE%2BthP5vZ8LYPO964dcw%2FGuaxgt6CCYWqkplgVeq5xffuonn4hVPovJfwQPe%2FayC5w%3D%3D&Expires=176420