## Teste técnico Pyspark - Confitec
### Matheus Damasceno

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, concat, col
from translate import Translator
from pyspark.sql.types import *
import pandas as pd
from tqdm import tqdm

In [None]:
spark = SparkSession \
        .builder \
        .appName("lead") \
        .enableHiveSupport() \
        .config("spark.executor.heartbeatInterval","8000") \
        .getOrCreate()

In [None]:
originals_netflix = spark.read.parquet("./OriginaisNetflix.parquet")

### 1. Transformar   os   campos   "Premiere"   e   "dt_inclusao"   de   string   para   datetime.  

In [None]:
originals_netflix.createOrReplaceTempView('originals_netflix')
originals_netflix_modified = spark.sql("""
    SELECT
    Title,
    Genre,
    GenreLabels,
    Seasons,
    SeasonsParsed,
    EpisodesParsed,
    Length,
    MinLength,
    MaxLength,
    Status,
    Active,
    Table,
    Language,
    to_date(Premiere, 'd-MMM-y') AS Premiere,
    CAST(dt_inclusao AS timestamp) AS dt_inclusao
    FROM originals_netflix
""")
originals_netflix_modified.printSchema()
originals_netflix_modified.createOrReplaceTempView('originals_netflix_modified')

### 2. Ordenar   os   dados   por   ativos   e   gênero   de   forma   decrescente,   0   =   inativo   e   1   =   ativo,   todos   com   número   1   devem   aparecer   primeiro.   
### 3. Remover linhas duplicadas e trocar o resultado das linhas que tiverem a coluna "Seasons" de "TBA" para "a ser anunciado".
### 4. Criar uma coluna nova chamada "Data de Alteração" e dentro dela um timestamp.
### 5. Trocar os nomes das colunas de inglês para português, exemplo: "Title" para "Título" (com acentuação).

In [None]:
@udf(returnType=StringType()) 
def translation(string):
    translator = Translator(to_lang = "pt-BR") 
    translated = translator.translate(string)
    return translated

originals_netflix_modified = spark.sql("""
    SELECT DISTINCT
        Title,
        Genre,
        GenreLabels,
        CASE
            WHEN Seasons LIKE '%TBA%' THEN 'a   ser   anunciado'
            ELSE Seasons
        END AS Seasons,
        SeasonsParsed,
        EpisodesParsed,
        Length,
        MinLength,
        MaxLength,
        Status,
        Active,
        Table,
        Language,
        Premiere,
        dt_inclusao,
        current_date AS `Data de Alteração`
    FROM originals_netflix_modified
    ORDER BY Active DESC
""")
originals_netflix_modified.printSchema()
originals_netflix_modified.createOrReplaceTempView('originals_netflix_modified')
originals_netflix_modified = originals_netflix_modified.withColumn("Título", translation("Title"))
originals_netflix_modified.show(10)

### 6. Testar e verificar se existe algum erro de processamento do spark e identificar onde pode ter ocorrido o erro.


In [None]:
originals_netflix.select('Premiere').collect()

In [None]:
originals_netflix_modified.select('Premiere').collect()

In [None]:
originals_netflix.select('dt_inclusao').collect()

In [None]:
originals_netflix_modified.select('dt_inclusao').collect()

In [None]:
originals_netflix_modified.select('Data de Alteração').collect()

In [None]:
originals_netflix_modified.select('Título').collect()

In [None]:
originals_netflix_modified = spark.sql("""
    SELECT DISTINCT
    Title,
    Genre,
    CASE
        WHEN Seasons LIKE '%TBA%' THEN 'a   ser   anunciado'
        ELSE Seasons
    END AS Seasons,
    Premiere,
    Language,
    Active,
    Status,
    dt_inclusao,
    current_date AS `Data de Alteração`
    FROM originals_netflix_modified
""")

In [None]:
#Escrevendo diretamente de um ambiente aws
s3_path = """s3://'{path}'"""
originals_netflix_modified.write.format("com.databricks.spark.csv").option("header", "true").save(s3_path)