In [None]:
# importando bibliotecas necessárias 
import os
import boto3
from google.colab import drive
from pyspark.sql import functions as f
from pyspark.sql.types import
from pyspark.sql import SparkSession
import findspark

# instalação das dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
# configurando variáveis de ambiente
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
# iniciando spark
findspark.init()

In [None]:
# criando sessão
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Case Confitech") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [None]:
# montando drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# lendo e vizualizando arquivo 
path = '/content/drive/MyDrive/case confitech/files'

netflix_parquet = spark.read.parquet(path)

netflix_parquet.show(5, False)

In [None]:
# investigando schema
netflix_parquet.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- GenreLabels: string (nullable = true)
 |-- Premiere: string (nullable = true)
 |-- Seasons: string (nullable = true)
 |-- SeasonsParsed: long (nullable = true)
 |-- EpisodesParsed: long (nullable = true)
 |-- Length: string (nullable = true)
 |-- MinLength: long (nullable = true)
 |-- MaxLength: long (nullable = true)
 |-- Status: string (nullable = true)
 |-- Active: long (nullable = true)
 |-- Table: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- dt_inclusao: string (nullable = true)



# Transformando os campos "Premiere" e "dt_inclusão" de string para datetime





In [None]:
netflix_parquet2 = netflix_parquet\
    .withColumn(
        "dt_inclusao",
        f.to_date(netflix_parquet.dt_inclusao)
    )\
    .withColumn(
        "Premiere",
        f.date_format(f.current_timestamp(), "yyyy-MM-dd")
    )\

netflix_parquet2.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- GenreLabels: string (nullable = true)
 |-- Premiere: string (nullable = false)
 |-- Seasons: string (nullable = true)
 |-- SeasonsParsed: long (nullable = true)
 |-- EpisodesParsed: long (nullable = true)
 |-- Length: string (nullable = true)
 |-- MinLength: long (nullable = true)
 |-- MaxLength: long (nullable = true)
 |-- Status: string (nullable = true)
 |-- Active: long (nullable = true)
 |-- Table: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- dt_inclusao: date (nullable = true)



In [None]:
# novos valores para premiere e dt_inclusao
netflix_parquet2.show(5, False)

+-----------------------+----------------+----------------+----------+----------------------+-------------+--------------+----------+---------+---------+-------+------+-----+--------+-----------+
|Title                  |Genre           |GenreLabels     |Premiere  |Seasons               |SeasonsParsed|EpisodesParsed|Length    |MinLength|MaxLength|Status |Active|Table|Language|dt_inclusao|
+-----------------------+----------------+----------------+----------+----------------------+-------------+--------------+----------+---------+---------+-------+------+-----+--------+-----------+
|House of Cards         |Political drama |political,drama |2023-04-27|6 seasons, 73 episodes|6            |73            |42–59 min.|42       |59       |Ended  |0     |Drama|English |2021-03-16 |
|Hemlock Grove          |Horror/thriller |horror,thriller |2023-04-27|3 seasons, 33 episodes|3            |33            |45–58 min.|45       |58       |Ended  |0     |Drama|English |2021-03-16 |
|Orange Is the New B

# Ordenando os dados por ativos e gênero de forma decrescente

### Legenda
- 0 = inativo
-1 = ativo

*Todos com número 1 devem aparecer primeiro.*

In [None]:
# ordenação e visualização
netflix_parquet2\
    .select('Genre', "Active")\
    .orderBy(['Active', 'Genre'], ascending=[False, False])\
    .show(10, False)

+------------------------+------+
|Genre                   |Active|
+------------------------+------+
|chrildrens musical short|1     |
|chrildrens musical short|1     |
|chrildrens musical short|1     |
|childrens-animation     |1     |
|childrens-animation     |1     |
|childrens-animation     |1     |
|childrens-animation     |1     |
|childrens-animation     |1     |
|childrens-animation     |1     |
|childrens-animation     |1     |
+------------------------+------+
only showing top 10 rows



# Removendo linhas duplicadas e trocando o resultado das linhas que tiverem a coluna "Seasons"
- "TBA" --> "a ser anunciado". 



In [None]:
# removendo linhas duplicadas
netflix_parquet_no_duplicate = netflix_parquet2.dropDuplicates()

netflix_parquet_no_duplicate.show(5)

+--------------------+-------------------+-------------------+----------+--------------------+-------------+--------------+----------+---------+---------+-------+------+---------------+--------+-----------+
|               Title|              Genre|        GenreLabels|  Premiere|             Seasons|SeasonsParsed|EpisodesParsed|    Length|MinLength|MaxLength| Status|Active|          Table|Language|dt_inclusao|
+--------------------+-------------------+-------------------+----------+--------------------+-------------+--------------+----------+---------+---------+-------+------+---------------+--------+-----------+
|          Mindhunter|        Crime drama|        crime,drama|2023-04-27|1 season, 10 epis...|            1|            10|34–60 min.|       34|       60|Renewed|     1|          Drama| English| 2021-03-16|
|              Rotten|        Documentary|        documentary|2023-04-27|1 season, 6 episodes|            1|             6|48–58 min.|       48|       58|  Ended|     0|   

In [None]:
# alterando valor de 'TBAq para 'a ser anunciado'
netflix_parquet_clean = netflix_parquet_no_duplicate.withColumn(
    'Seasons', f.regexp_replace('Seasons', 'TBA', 'a ser anunciado'))

In [None]:
# dataframe tratado
netflix_parquet_clean\
    .where("Seasons == 'a ser anunciado'")\
    .show(5, False)

+----------------------------------+---------------+---------------+----------+---------------+-------------+--------------+-------+---------+---------+-------+------+---------------+--------+-----------+
|Title                             |Genre          |GenreLabels    |Premiere  |Seasons        |SeasonsParsed|EpisodesParsed|Length |MinLength|MaxLength|Status |Active|Table          |Language|dt_inclusao|
+----------------------------------+---------------+---------------+----------+---------------+-------------+--------------+-------+---------+---------+-------+------+---------------+--------+-----------+
|Knights of the Zodiac: Saint Seiya|Action         |action         |2023-04-27|a ser anunciado|0            |0             |TBA    |0        |0        |Pending|1     |Anime          |English |2021-03-16 |
|7 Seeds                           |Science fiction|science-fiction|2023-04-27|a ser anunciado|0            |0             |TBA    |0        |0        |Pending|1     |Anime        

# Criando uma nova coluna chamada "Data de Alteração" e passando um de valor timestamp

In [None]:
# criando nova coluna
netflix_parquet_clean = netflix_parquet_clean.withColumn(
    "Data de Alteracao", f.current_timestamp())

In [None]:
netflix_parquet_clean.show(5, False)

+-------------------------------------+-------------------+-------------------+----------+----------------------+-------------+--------------+----------+---------+---------+-------+------+---------------+--------+-----------+----------------------+
|Title                                |Genre              |GenreLabels        |Premiere  |Seasons               |SeasonsParsed|EpisodesParsed|Length    |MinLength|MaxLength|Status |Active|Table          |Language|dt_inclusao|Data de Alteracao     |
+-------------------------------------+-------------------+-------------------+----------+----------------------+-------------+--------------+----------+---------+---------+-------+------+---------------+--------+-----------+----------------------+
|Mindhunter                           |Crime drama        |crime,drama        |2023-04-27|1 season, 10 episodes |1            |10            |34–60 min.|34       |60       |Renewed|1     |Drama          |English |2021-03-16 |2023-04-27 17:23:07.06|
|Rot

# Trocando os nomes das colunas de inglês para português, exemplo: "Title" para "Título" (com acentuação). 


In [None]:
# novos nomes
netflix_colnames = ['Título', 'Gênero', 'Categoria de Gênero', 'Pré estreia',
                    'Temporadas', 'Quantidade de temporadas', 'quantidade de episodios',
                    'Duração', 'Duração mínima', 'Duração máxima', 'Status', 'Ativa',
                    'Catálogo', 'Idioma', 'dt_inclusao', 'Data de Alteracao']

In [None]:
# alterando nomes
netflix_parquet_clean = netflix_parquet_clean.toDF(*netflix_colnames)

netflix_parquet_clean.columns

['Título',
 'Gênero',
 'Categoria de Gênero',
 'Pré estreia',
 'Temporadas',
 'Quantidade de temporadas',
 'quantidade de episodios',
 'Duração',
 'Duração mínima',
 'Duração máxima',
 'Status',
 'Ativa',
 'Catálogo',
 'Idioma',
 'dt_inclusao',
 'Data de Alteracao']

# Testando e verificando se existe algum erro de processamento do spark e identificando onde pode ter ocorrido o erro.



No caso iremos criar uma URL publico no [ngrok](https://ngrok.com/), pois no amibente do colab não é possível acessar a Spark UI.

In [None]:
# dependências necessarias
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


In [None]:
# configurações necessárias
get_ipython().system_raw('./ngrok authtoken 2OsKJRDOoowEb0vBvOnPShc9vhL_e7j6a74UYY9PKcnfSvvA')
get_ipython().system_raw('./ngrok http 4050 &')

In [None]:
# criando url publica
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://9cba-35-243-238-87.ngrok-free.app","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}},{"name":"command_line (http)","uri":"/api/tunnels/command_line%20%28http%29","public_url":"http://9cba-35-243-238-87.ngrok-free.app","proto":"http","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}}],"uri":"/api/tunnels"}


Acessando Spark UI para analisar se há algum erro pela link gerado no campo "public_url", **contudo foi observado que não houve nenhum job com erro.** 

# Criar apenas 1 .csv com as seguintes colunas que foram nomeadas anteriormente:
 

1. Title
2. Genre
3. Seasons
4. Premiere
5. Language
6. Active
7. Status
8. dt_inclusao
9. Data de Alteração

as colunas devem estar em português com header e separadas por ";". 


In [None]:
# criando csv único
netflix_parquet_clean.coalesce(1).write.csv(
    path='/content/drive/MyDrive/case confitech/csv-unico',
    mode='overwrite',
    sep=';',
    header=True
)

# Inserir esse .csv dentro de um bucket do AWS s3

o upload foi feito por um arquivo 'upload_csv.py' que se encontra nesse repositório, para realizar o upload é necessário algumas configurações para tal leia o README.

In [None]:
# exemplo de upload

# instanciando o client
s3 = boto3.client('s3')

# passando o nome do bucket
S3_BUCKET_NAME = 'caseconfitech'

#
with open('upload_csv\output.csv', 'rb', ) as f:
    s3.upload_fileobj(f, S3_BUCKET_NAME, S3_BUCKET_NAME)