**Nesse notebook é feito o processo de Extração, Transformação e Carregamento dos dados da Amazon e da Netflix**

**Inicializando a sessão do Spark**

In [None]:
# Instalar Java OpenJDK 11
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Definir variáveis de ambiente para Java
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64'

# Baixar e instalar Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
!tar xf spark-3.5.2-bin-hadoop3.tgz
!ls -l spark-3.5.2-bin-hadoop3

# Definir variáveis de ambiente para Spark
os.environ['SPARK_HOME'] = '/content/spark-3.5.2-bin-hadoop3'
os.environ['PATH'] += ':/content/spark-3.5.2-bin-hadoop3/bin'

!pip install pyspark

#Upload do arquivo json da chave API.
from google.colab import files
uploaded = files.upload()
!pip install kaggle

import zipfile

# Criando o diretório .kaggle
os.makedirs('/content/.kaggle', exist_ok=True)
# Movendo o arquivo kaggle.json para o diretório correto
os.rename('kaggle.json', '/content/.kaggle/kaggle.json')

In [3]:
#Importação e inicialização da sessão do spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window, Row

spark = SparkSession.builder \
    .appName('Movies_ETL') \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

sc = spark.sparkContext

**NETFLIX**

**Extraindo os dados do banco de dados**

In [None]:
#Baixar os arquivos diretamente do Kaggle
!kaggle datasets download -d netflix-inc/netflix-prize-data

# Caminho para o arquivo zip
zip_file_path = '/content/netflix-prize-data.zip'

# Descompacta o arquivo (Nesse momento pode levar um pouco mais de tempo)
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall('netflix_data')

# Caminho para o arquivo CSV
csv_file_path1 = '/content/netflix_data/combined_data_1.txt'
csv_file_path2 = '/content/netflix_data/combined_data_2.txt'
csv_file_path3 = '/content/netflix_data/combined_data_3.txt'
csv_file_path4 = '/content/netflix_data/combined_data_4.txt'
csv_file_path5 = '/content/netflix_data/movie_titles.csv'

#Dados combinados
rdd1 = sc.textFile(csv_file_path1)
rdd2 = sc.textFile(csv_file_path2)
rdd3 = sc.textFile(csv_file_path3)
rdd4 = sc.textFile(csv_file_path4)

#Dados dos títulos
titles_schema = StructType([
                StructField("movieID", IntegerType(), True),
                StructField("year_release", IntegerType(), True),
                StructField("title", StringType(), True)])
#Leitura do arquivo de títulos
df_movies_titles = spark.read.csv(csv_file_path5,titles_schema, header = True)

**Transformando o dataframe da NETFLIX**

In [5]:
#Função de transformação dos dados combinados de entrada para dataframe
def transfom2DF_combined (rdd):
    def parse_line(line):
        if ':' in line:
            parts = line.split(":")
            # Cria a tupla (movieID, customer_id, rating, year)
            return [(int(parts[0]), -1, -1, -1)]
        else:
            parts = line.split(",")
            # Cria a tupla (movieID, customer_id, rating, year)
            return [(None, int(parts[0]), int(parts[1]), parts[2])]

    rdd_structured = rdd.flatMap(parse_line)

    schema = StructType([
             StructField("movieID", IntegerType(), True),
             StructField("customer_id", IntegerType(), True),
             StructField("rating", IntegerType(), True),
             StructField("review_date", StringType(), False)])

    df = spark.createDataFrame(rdd_structured, schema)

    #Enumerando as linhas para auxiliar no processo de completar com os moviesID
    df = df.withColumn("index", monotonically_increasing_id())

    #Criando uma janela de indices e completando com o último valor não nulo visto
    window = Window.orderBy("index").rowsBetween(-sys.maxsize, 0)
    fill_with = last(df['movieID'], True).over(window)
    df = df.withColumn('movieID', fill_with)

    #Eliminando Coluna auxiliar
    df = df.drop('index')

    #Eliminando linhas auxiliares que contem números negativos.
    df = df.filter(df['customer_id']>0)

    return df

In [6]:
#Transformando os dados combinados de entrada em DataFrame
df1_curated = transfom2DF_combined(rdd1)
df2_curated = transfom2DF_combined(rdd2)
df3_curated = transfom2DF_combined(rdd3)
df4_curated = transfom2DF_combined(rdd4)

#Unindo todos os DF em um unico DataFrame
temp = df1_curated.union(df2_curated)
temp = temp.union(df3_curated)
dfcomb_curated = temp.union(df4_curated)

##Comando a critério do usuário##
#dfcomb_curated.show()

In [7]:
#Unindo os dois dataframes para ter um só com os títulos dos filmes
df_netflix = dfcomb_curated.join(df_movies_titles, "movieID", how = "left")

#Eliminando a coluna de movie_id
df_netflix = df_netflix.drop("movieID")

#Adicionando uma coluna "company" para identificar os dados como sendo da Netflix
df_netflix = df_netflix.withColumn("company",lit("netflix"))

#Eliminando possíveis valores nulos
df_netflix = df_netflix.dropna(how="any")

##Comando a critério do usuário##
#df_netflix.show()

In [None]:
##Comando a critério do usuário##
#Extraindo o dataframe e e carregando numa camada intermediaria do datalake
#df_netflix.write.mode('overwrite').parquet("/content/df_netflix_curated")

**AMAZON**

**Extraindo os dados do banco de dados**

In [None]:
#Baixar os arquivos diretamente do Kaggle
!kaggle datasets download -d cynthiarempel/amazon-us-customer-reviews-dataset

# Caminho para o arquivo zip
zip_file_path = '/content/amazon-us-customer-reviews-dataset.zip'

# Descompacta o arquivo(Nesse momento pode levar um pouco mais de tempo, normal mais de 20 min)
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall('amazon_data')

# Caminho para o arquivo CSV
csv_file_path1 = '/content/amazon_data/amazon_reviews_us_Digital_Video_Download_v1_00.tsv'
csv_file_path2 = '/content/amazon_data/amazon_reviews_us_Video_DVD_v1_00.tsv'
csv_file_path3 = '/content/amazon_data/amazon_reviews_us_Video_v1_00.tsv'

#Extraindo os dados dos arquivos Tsv
df1_raw = spark.read.csv(csv_file_path1, sep= '\t', header = True)
df2_raw = spark.read.csv(csv_file_path2, sep=  '\t', header = True)
df3_raw = spark.read.csv(csv_file_path3,sep= '\t', header = True)

**Transformando o dataframe da AMAZON**

In [9]:
#Unindo so dataframes e um só
df = df1_raw.union(df2_raw)
df_amazon = df.union(df3_raw)

##Comando a critério do usuário##
#df_amazon.show()

In [10]:
#Filtrando para dados somente com lingua inglesa
df_amazon = df_amazon.where(df_amazon.marketplace == "US")
#Eliminar as colunas que não serão necessárias
df_amazon = df_amazon.drop("marketplace","review_id","product_id","product_parent", "product_category","helpful_votes", "total_votes", "vine", \
                           "verified_purchase", "review_headline", "review_body")

In [11]:
#Eliminando possíveis valores nulos
df_amazon = df_amazon.dropna(how="any")

In [12]:
#Agora que as colunas estão filtradas, vamos transformar os formatos das entradas das colunas para manipula-las
df_amazon = df_amazon.withColumnRenamed("star_rating","rating")
df_amazon = df_amazon.withColumnRenamed("product_title","title")
df_amazon = df_amazon.withColumn("customer_id", col("customer_id").cast('integer').alias("customer_id"))
df_amazon = df_amazon.withColumn("rating", col("rating").cast('integer').alias("rating"))
df_amazon = df_amazon.withColumn("review_date",to_date(col("review_date"),"yyyy-MM-dd").alias("review_date"))

In [13]:
#Adicionando uma coluna "company" para identificar os dados como sendo da Amazon
df_amazon = df_amazon.withColumn("company",lit("amazon"))

In [None]:
##Comando a critério do usuário##
#Extraindo o dataframe e salvando na camada intermediária do datalake
#df_amazon.write.mode('overwrite').parquet("/content/df_amazon_curated")

**Unindo os dois dataframes para transforma-los**

In [14]:
#Antes de unir os dois dataframes é necessário que as colunas sejam as mesmas e na mesma ordem
#Ordenando as colunas e preenchendo com valor sem sentido a coluna do ano de lançamento da Amazon
df_netflix = df_netflix.select("customer_id","title","rating","review_date","year_release","company")
df_amazon = df_amazon.withColumn("year_release", lit(-1))
df_amazon = df_amazon.select("customer_id","title","rating","review_date","year_release","company")

#Unindo os dois dataframes
df_movies = df_amazon.union(df_netflix)

##Comando a critério do usuário##
#df_movies.write.mode('overwrite').parquet("/content/df_movies")
#df_movies.show()

In [None]:
#Objetivo das seguintes transformações é padronizar os dados de ambas os dataframes para que os dados de entrada possam ser comparados para responder as perguntas de negócio

#Particionando para melhorar a performance do processo
df_movies = df_movies.repartition(400)

#Transformando todas as letras para minusculo e criando uma coluna de título padronizado
df_movies = df_movies.withColumn("cleanTitle", lower(col("title")))

#A partir desse ponto utilizaremos expressões regulares da biblioteca regex para padronização dos títulos
#Iniciando pela identificação e extração da informação de Season/Part/Volume do filme
#Extrair a informação de Season/Part/Volume do filme

#Determinando o padrão dos números que serão encontrados na string. Os dados da Amazon são correspondentes
#a 20 anos de reviews dos filmes, então considerei que o valor máximo que um filme poderia atingir é 20
#[1-9]([0-9])+ seleciona qualquer padrão númerico com 1 algarismo ou dois. ? indica que essa expressão numérica pode ou não ocorrer.
#Isso seguido das possíveis strings indicando o número por extenso em inglês.
number = ('([1-9]([0-9]+)?|one|two|three|four|five|six|seven|eight|nine|ten|eleven|twelve|'
	          'thirteen|fourteen|fifteen|sixteen|seventeen|eighteen|nineteen|twenty|'
	          'first|second|third|fourth|fifth|sixth|seventh|eighth|nineth|tenth|'
	          'eleventh|twelfth|thirteenth|fourteenth|fifteenth|sixteenth|seventeenth|'
	          'eighteenth|nineteenth|twentieth)')

#Padrão de indicação de season, part, series ou volume
#String season ou part ou series ou vol seguida de qualquer padrão alfanumérico
words = '(season|part|series|vol(\w+)?)'

#Situações que existem, a string seguida do número ou o número seguido da string
#(?<= ...) Lookbehind positivo: corresponde a uma posição após um padrão específico
after = '(?<=' + words + ' +)' + number
#(?= ...) Lookahead positivo: corresponde a uma posição antes de um padrão específico
before = number + '(?= +' + words + ')'

#Utilizando a função de extração do regex para extrair o padrão com número após e pegando a correspondência completa (index 0)
number_after = regexp_extract(df_movies["cleanTitle"], after, 0)
condition = number_after != ''
number_before = regexp_extract(df_movies["cleanTitle"], before, 0)

#Criando uma coluna para guardar o valor da season encontrado.
df_movies = df_movies.withColumn("part", when(condition, number_after).otherwise(number_before))

#Depois da separação da informação da Season, agora retirando essa informação do título padronizado
remove_num_after = regexp_replace(df_movies["cleanTitle"], words + ' ?' + after, '')
remove_num_before = regexp_replace(df_movies["cleanTitle"], before + ' ' + words , '')
df_movies = df_movies.withColumn("cleanTitle", when(condition, remove_num_after).otherwise(remove_num_before))

#Removendo os textos que iniciam com parentesis ou cochetes para posteriormente adicionar de volta a informação da season.
# Ex. movie title [2024]
no_parenthesis= regexp_extract (df_movies["cleanTitle"], r'(^[^(\(|\[)]+)', 0)
df_movies = df_movies.withColumn("cleanTitle", no_parenthesis)

#Padronizando as entradas que indicam a Season, pois podem existir algumas por extenso e outra numéricos.
df_movies = df_movies.withColumn('part', when((col('part') == 'one') | (col('part') =='first'), '1') \
	                                      .when((col('part') == 'two') | (col('part') == 'second'), '2') \
	                                      .when((col('part') == 'three') | (col('part') == 'third'), '3') \
	                                      .when((col('part') == 'four') | (col('part') == 'fourth'), '4') \
	                                      .when((col('part') == 'five') | (col('part') == 'fifth'), '5') \
	                                      .when((col('part') == 'six') | (col('part') == 'sixth'), '6') \
	                                      .when((col('part') == 'seven') | (col('part') == 'seventh'), '7') \
	                                      .when((col('part') == 'eight') | (col('part') == 'eighth'), '8') \
	                                      .when((col('part') == 'nine') | (col('part') == 'nineth'), '9') \
	                                      .when((col('part') == 'ten') | (col('part') == 'tenth'), '10') \
	                                      .when((col('part') == 'eleven') | (col('part') == 'eleventh'), '11') \
	                                      .when((col('part') == 'twelve') | (col('part') == 'twelfth'), '12') \
	                                      .when((col('part') == 'thirteen') | (col('part') == 'thirteenth'), '13') \
	                                      .when((col('part') == 'fourteen') | (col('part') == 'fourteenth'), '14') \
	                                      .when((col('part') == 'fifteen') | (col('part') == 'fifteenth'), '15') \
	                                      .when((col('part') == 'sixteen') | (col('part') == 'sixteenth'), '16') \
	                                      .when((col('part') == 'seventeen') | (col('part') == 'seventeenth'), '17') \
	                                      .when((col('part') == 'eighteen') | (col('part') == 'eighteenth'), '18') \
	                                      .when((col('part') == 'nineteen') | (col('part') == 'nineteenth'), '19') \
	                                      .when((col('part') == 'twenty') | (col('part') == 'twentieth'), '20') \
	                                      .otherwise(col('part')))

#Adicionando novamente a informação de season/part/vol a string principal agora no formato padrão numérico
concat = concat(col('cleanTitle'), lit(' '), col('part'))
df_movies = df_movies.withColumn('cleanTitle', concat)

#Removendo todos os caracteres que não são espaços em branco, alfanuméricos ou sublinhados, mantendo os sublinhados puros.
no_punctuation = regexp_replace(df_movies["cleanTitle"], r'([^\s\w_]|_)+', '')
df_movies = df_movies.withColumn("cleanTitle", no_punctuation)

#Substituindo possíveis espaços duplos por espaço simples
double_spaces = regexp_replace(df_movies["cleanTitle"], r'  +', ' ')
df_movies = df_movies.withColumn("cleanTitle", double_spaces)

#Removendo espaços em branco no inicio e no final do título que podem diferenciar uma string da outras
df_movies = df_movies.withColumn("cleanTitle", trim(col("cleanTitle")))

#Reorganizando as colunas do DataFrame agora com o titulo no formato padrao
df_movies = df_movies.select(col("customer_id"),col("cleanTitle").alias("title"),col("rating"),col("review_date"),col("year_release"),col("company"))


##Comando a critério do usuário##
df_movies.show()


### Carregando o data frame final na camada refinada do Datalake

In [16]:
#Carregando o DataFrame final na camada refined do datalake já no formato escolhido
df_movies.write.mode('overwrite').parquet("/content/df_movies_refined")
#A partir daqui os arquivos estão prontos para serem colocados no Data WareHouse

In [None]:
#Transformando os 400 arquivos .parquet em um arquivo unico .zip
!zip -r /content/df_movies.zip /content/df_movies_refined