In [1]:
from pyspark.sql import SparkSession, Window, functions as F

spark = SparkSession \
    .builder \
    .master("local[*]")\
    .appName("Processamento Camada Refined") \
    .getOrCreate()

output_path = 'assets/data/Layers/Refined/'

df_tmdb = spark.read.parquet('assets/data/Layers/TRT/TMDB')
df_imdb = spark.read.parquet('assets/data/Layers/TRT/Movies')

In [2]:
df_tmdb.printSchema()
df_tmdb.show()
df_imdb.printSchema()
df_imdb.show()

root
 |-- budget: long (nullable = true)
 |-- id: long (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: long (nullable = true)
 |-- runtime: long (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: long (nullable = true)
 |-- extraction_date: date (nullable = true)

+---------+------+---------+----------+------------+---------+-------+--------------------+------------+----------+---------------+
|   budget|    id|  imdb_id|popularity|release_date|  revenue|runtime|               title|vote_average|vote_count|extraction_date|
+---------+------+---------+----------+------------+---------+-------+--------------------+------------+----------+---------------+
|  2020000| 30145|tt0151572|      6.83|  1998-12-18|  4374994|    104|The Miracle of P....|         6.7|        98|     2024-03-27|
| 12000000| 31909|tt0091276|    14.

In [3]:
df_imdb = df_imdb.withColumn("id_genre", 
                           F.when(df_imdb["genero"].like("%Fantasy%") & ~df_imdb["genero"].like("%Sci-Fi%"), 1)
                           .when(df_imdb["genero"].like("%Sci-Fi%") & ~df_imdb["genero"].like("%Fantasy%"), 2)
                           .when(df_imdb["genero"].like("%Fantasy%") & df_imdb["genero"].like("%Sci-Fi%"), 3)
                           .otherwise(None))
df_imdb.show()

+---------+--------------------+--------------------+-------------+------------+--------------------+---------+-----------+--------------------+------------------+--------+
|       id|     tituloPrincipal|      tituloOriginal|anoLancamento|tempoMinutos|              genero|notaMedia|numeroVotos|          personagem|       nomeArtista|id_genre|
+---------+--------------------+--------------------+-------------+------------+--------------------+---------+-----------+--------------------+------------------+--------+
|tt0000679|The Fairylogue an...|The Fairylogue an...|         1908|         120|   Adventure,Fantasy|      5.2|         66|Tik-Tok - the Mac...| Wallace Illington|       1|
|tt0003015|Die Insel der Sel...|Die Insel der Sel...|         1913|          49|      Comedy,Fantasy|      4.7|         63|Gutmuetige VaterM...|Wilhelm Diegelmann|       1|
|tt0003772|          Cinderella|          Cinderella|         1914|          52|       Drama,Fantasy|      6.1|       1037|          Ci

In [4]:
df_tmdb = df_tmdb.withColumnRenamed('id', 'tmdb_id')
df_geral = df_imdb.join(df_tmdb, df_imdb.id == df_tmdb.imdb_id, 'inner')
df_geral.createOrReplaceTempView('db_geral')
df_tmdb.printSchema()
df_imdb.printSchema()
df_geral.printSchema()
df_geral.describe().show(truncate=False)

root
 |-- budget: long (nullable = true)
 |-- tmdb_id: long (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: long (nullable = true)
 |-- runtime: long (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: long (nullable = true)
 |-- extraction_date: date (nullable = true)

root
 |-- id: string (nullable = true)
 |-- tituloPrincipal: string (nullable = true)
 |-- tituloOriginal: string (nullable = true)
 |-- anoLancamento: integer (nullable = true)
 |-- tempoMinutos: integer (nullable = true)
 |-- genero: string (nullable = true)
 |-- notaMedia: double (nullable = true)
 |-- numeroVotos: integer (nullable = true)
 |-- personagem: string (nullable = true)
 |-- nomeArtista: string (nullable = true)
 |-- id_genre: integer (nullable = true)

root
 |-- id: string (nullable = true)
 |-- tituloPrincipal: string (nullable = true)

In [5]:
spark.sql('SELECT DISTINCT id FROM db_geral').count() # 1316

1316

In [6]:
# Criando tabela de artistas
df_actors = df_imdb.groupBy('nomeArtista').count()
df_actors = df_actors.withColumnRenamed('nomeArtista', 'name') \
    .withColumnRenamed('count', 'num_movies')

df_actors.orderBy(F.col('count').desc()).show()
df_actors.describe().show()

+--------------------+----------+
|                name|num_movies|
+--------------------+----------+
|     Lance Henriksen|        28|
|      John Carradine|        28|
|       Germán Valdés|        22|
|     Christopher Lee|        21|
|       Boris Karloff|        20|
|        Tom Sizemore|        19|
|        Nicolas Cage|        19|
|        Eric Roberts|        19|
|       Peter Cushing|        19|
|        David Warner|        18|
|      Lon Chaney Jr.|        17|
|Arnold Schwarzene...|        17|
|      Tadanobu Asano|        16|
|   Samuel L. Jackson|        16|
| Christopher Lambert|        16|
|    Malcolm McDowell|        16|
|    Michael Ironside|        15|
| Tony Chiu-Wai Leung|        15|
|         Johnny Depp|        15|
|        Keanu Reeves|        14|
+--------------------+----------+
only showing top 20 rows

+-------+--------------------+------------------+
|summary|                name|        num_movies|
+-------+--------------------+------------------+
|  count

In [7]:
# Add coluna de id
df_actors = df_actors.select(
    F.row_number().over(Window.orderBy(F.lit(1))).alias('id'),
    *df_actors.columns)

df_actors.describe().show()
df_actors.show()

# df_actors.write.parquet(output_path + 'dim_actor')

+-------+-----------------+--------------------+------------------+
|summary|               id|                name|        num_movies|
+-------+-----------------+--------------------+------------------+
|  count|            34664|               34664|             34664|
|   mean|          17332.5|                NULL|1.3511135471959381|
| stddev|10006.77920212093|                NULL| 1.060193900867523|
|    min|                1|'Baby' Carmen De Rue|                 1|
|    max|            34664|          Ünsal Emre|                28|
+-------+-----------------+--------------------+------------------+

+---+------------------+----------+
| id|              name|num_movies|
+---+------------------+----------+
|  1|      Edith Shayne|         1|
|  2| Sandra Milovanoff|         1|
|  3|   Donald Calthrop|         1|
|  4| Arturo de Córdova|         1|
|  5|        Lex Barker|         4|
|  6|  Bradford Dillman|         6|
|  7|        Saul Kahan|         1|
|  8| Geraldine Chaplin|   

In [8]:
# Criando tabela de datas
df_dates = df_tmdb.select(df_tmdb.release_date.alias('date')) \
    .distinct().orderBy('release_date')
df_dates.show()

+----------+
|      date|
+----------+
|1916-12-24|
|1924-03-18|
|1927-02-06|
|1931-02-12|
|1931-11-21|
|1931-12-24|
|1933-03-15|
|1935-04-20|
|1939-08-15|
|1940-11-13|
|1940-12-28|
|1942-12-05|
|1943-04-09|
|1943-12-24|
|1945-08-13|
|1946-12-20|
|1948-06-15|
|1949-07-27|
|1950-02-22|
|1950-06-27|
+----------+
only showing top 20 rows



In [9]:
df_dates = df_dates.withColumn('year', F.year('date')) \
    .withColumn('month', F.month('date')) \
    .withColumn('day', F.dayofmonth('date')) \
    .withColumn('quarter',
                F.when(F.month('date').isin([1, 2, 3]), 'Q1')
                .when(F.month('date').isin([4, 5, 6]), 'Q2')
                .when(F.month('date').isin([7, 8, 9]), 'Q3')
                .otherwise('Q4'))

# Add coluna de id
df_dates = df_dates.select(
    F.row_number().over(Window.orderBy(F.lit(1))).alias('id'),
    *df_dates.columns)


df_dates.printSchema()
df_dates.describe().show()
df_dates.show()


# df_dates.write.parquet(output_path + 'dim_date')

root
 |-- id: integer (nullable = false)
 |-- date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- quarter: string (nullable = false)

+-------+------------------+------------------+------------------+-----------------+-------+
|summary|                id|              year|             month|              day|quarter|
+-------+------------------+------------------+------------------+-----------------+-------+
|  count|              1232|              1232|              1232|             1232|   1232|
|   mean|             616.5|2000.6103896103896| 6.925324675324675|15.53814935064935|   NULL|
| stddev|355.79207411070865|16.546686701281544|3.3437174949039803|8.619033543302185|   NULL|
|    min|                 1|              1916|                 1|                1|     Q1|
|    max|              1232|              2022|                12|               31|     Q4|
+-------+------------------+-

In [10]:
# Criando tabela de filmes
df_movies = spark.sql("""
    SELECT DISTINCT 
        id,
        tituloPrincipal AS main_title,
        tituloOriginal AS original_title,
        release_date,
        genero AS genres
    FROM db_geral
""")

df_movies.printSchema()
df_movies.describe().show()
df_movies.show()

# df_movies.write.parquet(output_path + 'dim_movie')

root
 |-- id: string (nullable = true)
 |-- main_title: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- genres: string (nullable = true)

+-------+---------+--------------------+--------------------+--------------------+
|summary|       id|          main_title|      original_title|              genres|
+-------+---------+--------------------+--------------------+--------------------+
|  count|     1316|                1316|                1316|                1316|
|   mean|     NULL|              1367.0|              1367.0|                NULL|
| stddev|     NULL|   956.0285909253273|   956.0285909253273|                NULL|
|    min|tt0006333|(T)Raumschiff Sur...|(T)Raumschiff Sur...|Action,Adventure,...|
|    max|tt9844322|            Æon Flux|            Æon Flux|     Sci-Fi,Thriller|
+-------+---------+--------------------+--------------------+--------------------+

+---------+--------------------+------------

In [11]:
df_movies.filter(F.col('original_title').like('%Star Wars%')).show()

+---------+--------------------+--------------------+------------+--------------------+
|       id|          main_title|      original_title|release_date|              genres|
+---------+--------------------+--------------------+------------+--------------------+
|tt0120915|Star Wars: Episod...|Star Wars: Episod...|  1999-05-19|Action,Adventure,...|
|tt0086190|Star Wars: Episod...|Star Wars: Episod...|  1983-05-25|Action,Adventure,...|
|tt0076759|Star Wars: Episod...|           Star Wars|  1977-05-25|Action,Adventure,...|
|tt2527338|Star Wars: Episod...|Star Wars: Episod...|  2019-12-18|Action,Adventure,...|
|tt0121766|Star Wars: Episod...|Star Wars: Episod...|  2005-05-17|Action,Adventure,...|
|tt0121765|Star Wars: Episod...|Star Wars: Episod...|  2002-05-15|Action,Adventure,...|
|tt3778644|Solo: A Star Wars...|Solo: A Star Wars...|  2018-05-15|Action,Adventure,...|
|tt2488496|Star Wars: Episod...|Star Wars: Episod...|  2015-12-15|Action,Adventure,...|
|tt2527336|Star Wars: Episod...|

In [12]:
# Criando tabela fato
df_fato = spark.sql("""
    SELECT DISTINCT 
        id AS id_movie,
        nomeArtista,
        release_date,
        id_genre,
        runtime,
        revenue,
        budget,
        personagem AS actor_role,
        popularity,
        notaMedia AS imdb_rating,
        vote_average AS tmdb_rating,
        numeroVotos AS imdb_vote_count,
        vote_count AS tmdb_vote_count
    FROM db_geral
""")

df_fato.printSchema()
df_fato.describe().show()
df_fato.show()

root
 |-- id_movie: string (nullable = true)
 |-- nomeArtista: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- id_genre: integer (nullable = true)
 |-- runtime: long (nullable = true)
 |-- revenue: long (nullable = true)
 |-- budget: long (nullable = true)
 |-- actor_role: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- imdb_rating: double (nullable = true)
 |-- tmdb_rating: double (nullable = true)
 |-- imdb_vote_count: integer (nullable = true)
 |-- tmdb_vote_count: long (nullable = true)

+-------+---------+-------------------+------------------+------------------+--------------------+--------------------+------------+------------------+------------------+-----------------+------------------+-----------------+
|summary| id_movie|        nomeArtista|          id_genre|           runtime|             revenue|              budget|  actor_role|        popularity|       imdb_rating|      tmdb_rating|   imdb_vote_count|  tmdb_vote_count|
+--

In [13]:
# Add chaves estrangeiras
df_fato = df_fato.join(df_dates,
                       df_fato.release_date == df_dates.date, 'inner')
df_fato = df_fato.withColumnRenamed('id', 'id_date')
df_fato = df_fato.join(df_actors,
                       df_fato.nomeArtista == df_actors.name, 'inner')
df_fato = df_fato.withColumnRenamed('id', 'id_actor')

df_fato.show()
print(df_fato.count())

+---------+--------------------+------------+--------+-------+---------+---------+--------------------+----------+-----------+-----------+---------------+---------------+-------+----------+----+-----+---+-------+--------+--------------------+----------+
| id_movie|         nomeArtista|release_date|id_genre|runtime|  revenue|   budget|          actor_role|popularity|imdb_rating|tmdb_rating|imdb_vote_count|tmdb_vote_count|id_date|      date|year|month|day|quarter|id_actor|                name|num_movies|
+---------+--------------------+------------+--------+-------+---------+---------+--------------------+----------+-----------+-----------+---------------+---------------+-------+----------+----+-----+---+-------+--------+--------------------+----------+
|tt0022835|       Fredric March|  1931-12-24|       2|     98|  1300000|   535000|Dr. Henry JekyllM...|    12.799|        7.6|        7.2|          14238|            255|      6|1931-12-24|1931|   12| 24|     Q4|    6107|       Fredric Ma

In [14]:
# Eliminando as colunas indesejadas sem alterar os dados de IDs estrangeiros
colunas_indesejadas = ['release_date', 'date', 'year', 'month', 'day',
                       'quarter', 'nomeArtista', 'name', 'num_movies']
df_pandas = df_fato.toPandas()  # DataFrame Pandas é imutável
df_pandas = df_pandas.drop(colunas_indesejadas, axis=1)
df_fato = spark.createDataFrame(df_pandas)

df_pandas.head(5)

Unnamed: 0,id_movie,id_genre,runtime,revenue,budget,actor_role,popularity,imdb_rating,tmdb_rating,imdb_vote_count,tmdb_vote_count,id_date,id_actor
0,tt0022835,2,98,1300000,535000,Dr. Henry JekyllMr. Hyde,12.799,7.6,7.2,14238,255,6,6107
1,tt0035959,1,120,5363000,2627000,Pete Sandidge,7.648,6.9,6.5,2655,48,14,1421
2,tt0119535,1,103,4400000,12000000,Jackson,16.579,6.3,6.1,35927,421,425,5810
3,tt0119707,1,95,51376861,30000000,Kitana,26.328,3.6,4.5,55287,1126,427,3754
4,tt0293429,1,110,84426031,20000000,Cole Young,48.195,6.1,7.1,173595,5589,1192,8091


In [15]:
# Reordenando as colunas
df_fato = df_fato.select(
    *df_fato.columns[:2], *df_fato.columns[-2:], *df_fato.columns[2:-2])

df_fato.printSchema()
df_fato.describe().show()
df_fato.orderBy('id_movie').show()

# df_fato.write.partitionBy('id_movie').parquet(output_path + 'fact_movie_actor')

root
 |-- id_movie: string (nullable = true)
 |-- id_genre: long (nullable = true)
 |-- id_date: long (nullable = true)
 |-- id_actor: long (nullable = true)
 |-- runtime: long (nullable = true)
 |-- revenue: long (nullable = true)
 |-- budget: long (nullable = true)
 |-- actor_role: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- imdb_rating: double (nullable = true)
 |-- tmdb_rating: double (nullable = true)
 |-- imdb_vote_count: long (nullable = true)
 |-- tmdb_vote_count: long (nullable = true)

+-------+---------+------------------+-----------------+------------------+------------------+--------------------+--------------------+------------+------------------+------------------+------------------+------------------+------------------+
|summary| id_movie|          id_genre|          id_date|          id_actor|           runtime|             revenue|              budget|  actor_role|        popularity|       imdb_rating|       tmdb_rating|   imdb_vote_count| 

In [16]:
# Criando tabela de gêneros
df_genres = df_fato.select('id_movie', 'id_genre').distinct().groupBy('id_genre').count()
df_genres.show()

+--------+-----+
|id_genre|count|
+--------+-----+
|       1|  666|
|       3|   14|
|       2|  636|
+--------+-----+



In [17]:
df_genres = df_genres.withColumnRenamed('id_genre', 'id')
df_genres = df_genres.withColumnRenamed('count', 'num_movies')
df_genres = df_genres.withColumn('genre', 
                           F.when(F.col('id') == 1, 'Fantasy')
                           .when(F.col('id') == 2, 'Sci-Fi')
                           .when(F.col('id') == 3, 'Fantasy/Sci-Fi'))

df_genres = df_genres.select('id', 'genre', 'num_movies').orderBy('id')

df_genres.show()

# df_genres.write.parquet(output_path + 'dim_genre')

+---+--------------+----------+
| id|         genre|num_movies|
+---+--------------+----------+
|  1|       Fantasy|       666|
|  2|        Sci-Fi|       636|
|  3|Fantasy/Sci-Fi|        14|
+---+--------------+----------+

