In [0]:
#CAMADA PRATA
#OBRAS
from pyspark.sql.functions import col

#função para ler o arquivo tsv
def readTSV(filepath):
    df = spark.read.format("tsv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .option("sep", "\t") \
    .csv(filepath)
    return df

#lendo do arquivo de obras baixado do IMDB
dfObras = readTSV("file:/mnt/imdb/title.basics.tsv")

#renomeando as colunas
de_para_nomes = [ \
    ('tconst','id_obra'), \
    ('titleType','in_tipo_obra'), \
    ('primaryTitle','tx_nome_obra'), \
    ('originalTitle','tx_nome_obra_original'), \
    ('isAdult','in_adulto'), \
    ('startYear','nu_ano_inicial'), \
    ('endYear','nu_ano_final'), \
    ('runtimeMinutes','nu_duracao_min') \
    ]
for antigo, novo in de_para_nomes:
    dfObras = dfObras.withColumnRenamed(antigo, novo)

#Convertendo as colunas de duração, de ano inicial e final para inteiro
dfObras = dfObras.withColumn("nu_ano_inicial", col("nu_ano_inicial").cast("int"))
dfObras = dfObras.withColumn("nu_ano_final", col("nu_ano_final").cast("int"))
dfObras = dfObras.withColumn("nu_duracao_min", col("nu_duracao_min").cast("int"))

#trocando todos os valores N por nulos
dfObras = dfObras.na.replace("\\N", None)

#GENEROS_OBRAS
from pyspark.sql.functions import split, explode, monotonically_increasing_id

#tratamento de campo multivalorado de gêneros de filmestransformando-o em uma tabela de gêneros e uma tabela de ligação entre filmes e gêneros (relacionamento N-N)
dfTodosGeneros = dfObras.withColumn("genres", explode(split(dfObras["genres"], ","))) \
                .select("id_obra","genres")
dfTodosGeneros = dfTodosGeneros.withColumnRenamed('genres', 'tx_nome_genero')
#seleciona apenas gêneros distintas
dfGeneros = dfTodosGeneros.select("tx_nome_genero").distinct() \
                               .withColumn("id_genero", monotonically_increasing_id()+1)
dfGenerosObras = dfGeneros.join(dfTodosGeneros, "tx_nome_genero", "inner") \
                            .select("id_obra", "id_genero")
#salvando o resultado final
dfGeneros.write.format("delta").mode("overwrite").save("dbfs:/mnt/delta/generos")
dfGenerosObras.write.format("delta").mode("overwrite").save("dbfs:/mnt/delta/generos_obras")

#OBRAS_AVALIACOES
from pyspark.sql.functions import col, bround

#função para ler o arquivo tsv
def readTSV(filepath):
    df = spark.read.format("tsv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .option("sep", "\t") \
    .csv(filepath)
    return df

#lendo do arquivo baixado do IMDB
dfAvaliacoes = readTSV("file:/mnt/imdb/title.ratings.tsv")

#renomeando as colunas
de_para_nomes = [ \
    ('tconst','id_obra'), \
    ('averageRating','vl_avaliacao_media'), \
    ('numVotes','nu_votos') \
    ]
for antigo, novo in de_para_nomes:
    dfAvaliacoes = dfAvaliacoes.withColumnRenamed(antigo, novo)

#Convertendo as colunas da nota de avaliação e do número de votos para ponto flutuante e inteiro, respectivamente
dfAvaliacoes = dfAvaliacoes.withColumn("vl_avaliacao_media", bround(col("vl_avaliacao_media").cast("double"), 1))
dfAvaliacoes = dfAvaliacoes.withColumn("nu_votos", col("nu_votos").cast("int"))

#trocando todos os valores N por nulos
dfAvaliacoes = dfAvaliacoes.na.replace("\\N", None)

#excluindo coluna multivalorada de gêneros
dfObras = dfObras.drop("genres")

#incorpoando as colunas de número de votos e nota média de avaliação da tabela de avaliações na tabela de obras
dfObras = dfObras.join(dfAvaliacoes, on="id_obra", how="left")

#salvando o resultado final em um arquivo de tabela
dfObras.write.format("delta").mode("overwrite").save("dbfs:/mnt/delta/obras")

In [0]:
%sql
--CAMADA PRATA
--criação das tabelas intermediárias
CREATE TABLE IF NOT EXISTS generos USING DELTA LOCATION 'dbfs:/mnt/delta/generos';
CREATE TABLE IF NOT EXISTS generos_obras USING DELTA LOCATION 'dbfs:/mnt/delta/generos_obras';
CREATE TABLE IF NOT EXISTS obras USING DELTA LOCATION 'dbfs:/mnt/delta/obras';

--CAMADA OURO
--criando o banco de dados de dados curados
CREATE DATABASE IF NOT EXISTS curado;

--criação da view da tabela fato de filmes
CREATE OR REPLACE VIEW 
    curado.fato_filmes 
AS
    SELECT 
        id_obra,
        tx_nome_obra, 
        tx_nome_obra_original, 
        nu_ano_inicial,
        nu_duracao_min, 
        vl_avaliacao_media,
        nu_votos 
    FROM
        default.obras 
    WHERE
        in_tipo_obra = 'movie' 
        AND in_adulto = 0;

--criação da view de dimensão de gêneros
CREATE OR REPLACE VIEW 
    curado.dim_generos 
AS
    SELECT 
        id_genero,
        tx_nome_genero
    FROM
        default.generos;

--criação da view de generos_filmes
CREATE OR REPLACE VIEW 
    curado.generos_filmes
AS
    SELECT 
        id_obra,
        id_genero
    FROM
        default.generos_obras gobr
    WHERE
        EXISTS 
            (
                    SELECT 
                        id_obra 
                    FROM 
                        default.obras  obr
                    WHERE
                        in_tipo_obra = 'movie' 
                        AND in_adulto = 0
                        AND obr.id_obra=gobr.id_obra
            );



In [0]:
#CAMADA PRATA
#ARTISTAS
from pyspark.sql.functions import col

#função para ler o arquivo tsv
def readTSV(filepath):
    df = spark.read.format("tsv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .option("sep", "\t") \
    .csv(filepath)
    return df

#lendo do arquivo baixado do IMDB
dfPessoas = readTSV("file:/mnt/imdb/name.basics.tsv")

#trocando todos os valores N por nulos
dfPessoas = dfPessoas.na.replace("\\N", None)

#renomeando as colunas
de_para_nomes = [ \
    ('nconst','id_pessoa'), \
    ('primaryName','tx_nome'), \
    ('birthYear','nu_ano_nascimento'), \
    ('deathYear','nu_ano_falecimento') \
    ]
for antigo, novo in de_para_nomes:
    dfPessoas = dfPessoas.withColumnRenamed(antigo, novo)

#Convertendo as colunas de anode nascimento e falecimento para inteiro
dfPessoas = dfPessoas.withColumn("nu_ano_nascimento", col("nu_ano_nascimento").cast("int"))
dfPessoas = dfPessoas.withColumn("nu_ano_falecimento", col("nu_ano_falecimento").cast("int"))

#salvando o resultado final em um arquivo de tabela
dfPessoas.write.format("delta").mode("overwrite").save("dbfs:/mnt/delta/pessoas")

#tratamento de campo multivalorado de obras em que a pessoa é famosa por tê-la feito
dfArtistasFamososPorObras = dfPessoas.withColumn("knownForTitles", explode(split(dfPessoas["knownForTitles"], ","))) \
                .select("id_pessoa","knownForTitles")
dfArtistasFamososPorObras = dfArtistasFamososPorObras.withColumnRenamed('knownForTitles', 'id_obra')

#salvando o resultado final em um arquivo de tabela
dfArtistasFamososPorObras.write.format("delta").mode("overwrite").save("dbfs:/mnt/delta/pessoas_fama_obras")

In [0]:
#CAMADA PRATA
#PERSONAGENS
from pyspark.sql.functions import col,regexp_extract

#função para ler o arquivo tsv
def readTSV(filepath):
    df = spark.read.format("tsv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .option("sep", "\t") \
    .csv(filepath)
    return df

#lendo do arquivo baixado do IMDB
dfPessoasObras = readTSV("file:/mnt/imdb/title.principals.tsv")

#trocando todos os valores N por nulos
dfPessoasObras = dfPessoasObras.na.replace("\\N", None)

#renomeando as colunas
de_para_nomes = [ \
    ('nconst','id_pessoa'), \
    ('tconst','id_obra'), \
    ('category','tx_categoria'), \
    ('job','tx_funcao'), \
    ('ordering','nu_ordenacao'), \
    ('characters','tx_personagem')\
    ]
for antigo, novo in de_para_nomes:
    dfPessoasObras = dfPessoasObras.withColumnRenamed(antigo, novo)

#Convertendo a coluna de ordenação para inteiro
dfPessoasObras = dfPessoasObras.withColumn("nu_ordenacao", col("nu_ordenacao").cast("int"))

#tratando a coluna de personagens para apenas guardar o primeiro valor
dfPessoasObras = dfPessoasObras.withColumn("tx_personagem", regexp_extract("tx_personagem", r"\[\"(.*?)\"", 1))

#salvando o resultado final em um arquivo de tabela
dfPessoasObras.write.format("delta").mode("overwrite").save("dbfs:/mnt/delta/pessoas_obras")

In [0]:
%sql
--CAMADA OURO
--criando o banco de dados de dados curados
CREATE DATABASE IF NOT EXISTS curado;

--criação das tabelas intermediárias
CREATE TABLE IF NOT EXISTS pessoas USING DELTA LOCATION 'dbfs:/mnt/delta/pessoas';
CREATE TABLE IF NOT EXISTS pessoas_obras USING DELTA LOCATION 'dbfs:/mnt/delta/pessoas_obras';
CREATE TABLE IF NOT EXISTS pessoas_fama_obras USING DELTA LOCATION 'dbfs:/mnt/delta/pessoas_fama_obras';
--

CREATE OR REPLACE VIEW curado.dim_personagens_filmes AS
SELECT
  id_obra,
  nu_ordenacao,
  id_pessoa,
  tx_personagem,
  tx_categoria,
  CASE 
    (select count(*) from default.pessoas_fama_obras fam where fam.id_obra = pesobr.id_obra and fam.id_pessoa = pesobr.id_pessoa )
        WHEN 0 then 0
    ELSE
        1
    end as in_famoso_por
FROM
  default.pessoas_obras pesobr
WHERE
  tx_categoria in ('actor','actress')
  AND
    EXISTS 
      (
              SELECT 
                  id_obra 
              FROM 
                  default.obras obr
              WHERE
                  in_tipo_obra = 'movie' 
                  AND in_adulto = 0
                  AND obr.id_obra=pesobr.id_obra
      );
CREATE OR REPLACE VIEW curado.dim_artistas AS
SELECT
  pes.id_pessoa,
  pes.tx_nome,
  pes.nu_ano_nascimento,
  pes.nu_ano_falecimento
FROM
  default.pessoas pes
WHERE
  EXISTS 
      (
              SELECT 
                  id_obra 
              FROM 
                  curado.dim_personagens_filmes  pers
              WHERE
                  pers.id_pessoa = pes.id_pessoa
      );

CREATE OR REPLACE VIEW curado.diretores_filmes AS
SELECT
  id_obra,
  nu_ordenacao,
  id_pessoa
FROM
  default.pessoas_obras pesobr
WHERE
  tx_categoria in ('director')
  AND
    EXISTS 
      (
              SELECT 
                  id_obra 
              FROM 
                  default.obras  obr
              WHERE
                  in_tipo_obra = 'movie' 
                  AND in_adulto = 0
                  AND obr.id_obra=pesobr.id_obra
      );

CREATE OR REPLACE VIEW curado.dim_diretores AS
SELECT
  pes.id_pessoa,
  pes.tx_nome,
  pes.nu_ano_nascimento,
  pes.nu_ano_falecimento
FROM
  default.pessoas pes
WHERE
  EXISTS 
      (
              SELECT 
                  id_obra 
              FROM 
                  curado.diretores_filmes dir
              WHERE
                  dir.id_pessoa = pes.id_pessoa
      );

CREATE OR REPLACE VIEW curado.roteiristas_filmes AS
SELECT
  id_obra,
  nu_ordenacao,
  id_pessoa
FROM
  default.pessoas_obras pesobr
WHERE
  tx_categoria in ('writer')
  AND
    EXISTS 
      (
              SELECT 
                  id_obra 
              FROM 
                  default.obras  obr
              WHERE
                  in_tipo_obra = 'movie' 
                  AND in_adulto = 0
                  AND obr.id_obra=pesobr.id_obra
      );

CREATE OR REPLACE VIEW curado.dim_roteiristas AS
SELECT
  pes.id_pessoa,
  pes.tx_nome,
  pes.nu_ano_nascimento,
  pes.nu_ano_falecimento
FROM
  default.pessoas pes
WHERE
  EXISTS 
      (
              SELECT 
                  id_obra 
              FROM 
                  curado.roteiristas_filmes rot
              WHERE
                  rot.id_pessoa = pes.id_pessoa
      );


In [0]:
#CAMADA PRATA
#NOMES_OBRAS
from pyspark.sql.functions import col

#função para ler o arquivo tsv
def readTSV(filepath):
    df = spark.read.format("tsv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .option("sep", "\t") \
    .csv(filepath)
    return df

#lendo do arquivo baixado do IMDB
dfNomesObras = readTSV("file:/mnt/imdb/title.akas.tsv")

#renomeando as colunas
de_para_nomes = [ \
    ('titleId','id_obra'), \
    ('ordering','nu_ordenacao'), \
    ('title','tx_nome_obra'), \
    ('region','in_regiao'), \
    ('language','in_idioma'), \
    ('types','tx_tipo'), \
    ('attributes','tx_atributos'), \
    ('isOriginalTitle','in_titulo_original') \
    ]

for antigo, novo in de_para_nomes:
    dfNomesObras = dfNomesObras.withColumnRenamed(antigo, novo)

#trocando todos os valores N por nulos
dfNomesObras = dfNomesObras.na.replace("\\N", None)

#salvando o resultado final em um arquivo de tabela
dfNomesObras.write.format("delta").mode("overwrite").save("dbfs:/mnt/delta/nomes_obras")

In [0]:
%sql
CREATE TABLE IF NOT EXISTS nomes_obras USING DELTA LOCATION 'dbfs:/mnt/delta/nomes_obras';

In [0]:
#EPISÓDIOS
from pyspark.sql.functions import col, bround

#função para ler o arquivo tsv
def readTSV(filepath):
    df = spark.read.format("tsv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .option("sep", "\t") \
    .csv(filepath)
    return df

#lendo do arquivo baixado do IMDB
dfEpisodios = readTSV("file:/mnt/imdb/title.episode.tsv")

#trocando todos os valores N por nulos
dfEpisodios = dfEpisodios.na.replace("\\N", None)

#renomeando as colunas
de_para_nomes = [ \
    ('tconst','id_obra'), \
    ('parentTconst','id_obra_pai'), \
    ('seasonNumber','nu_temporada'), \
    ('episodeNumber','nu_episodio') \
    ]
for antigo, novo in de_para_nomes:
    dfEpisodios = dfEpisodios.withColumnRenamed(antigo, novo)

#Convertendo as colunas número de temporada e número de episódio para inteiro
dfEpisodios = dfEpisodios.withColumn("nu_temporada", col("nu_temporada").cast("int"))
dfEpisodios = dfEpisodios.withColumn("nu_episodio", col("nu_episodio").cast("int"))

#salvando o resultado final em um arquivo de tabela
dfEpisodios.write.format("delta").mode("overwrite").save("dbfs:/mnt/delta/episodios")

In [0]:
%sql
CREATE TABLE IF NOT EXISTS episodios USING DELTA LOCATION 'dbfs:/mnt/delta/episodios';