<a href="https://colab.research.google.com/github/npto04/PySpark/blob/main/pyspark_Netflix.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Baixe o CSV, sobre a Netflix, e utilizando os conhecimentos em limpeza e tratamento de dados e os comandos da ferramenta Pyspark, faça os tratamentos necessários para que os Datasets fiquem prontos para análise.

# Instalações & Importações

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 47.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=91697ac822ffd242c4c77790441d34d94f18ad5b96e96612ebae3bd22293ad4d
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql import functions as F
from pyspark.sql.types import *
from google.cloud import storage

# Criando sessão do spark, assim como configurando conector do Google Cloud Storage

## Caminho do Google Cloud Storage conector

Deve ser baixado para rodar e atualizar caminho:
[Download](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage)

In [None]:
gcs_conector = '/content/drive/MyDrive/GCS_connector/gcs-connector-hadoop3-latest.jar'

 ## [Configurando o conector](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/INSTALL.md#configuring-the-connector)

Antes de começar é preciso ter um JSON arquivo de chave para o conector poder se autenticar com Google Cloud Storage. [Clique aqui](https://cloud.google.com/storage/docs/authentication#service_accounts) para entender ou [aqui](https://cloud.google.com/iam/docs/creating-managing-service-accounts#creating) para como obter esse JSON.

Após eu ter adquirido o JSON, eu pude configurar o framework (PySpark no caso) para usar o GCS connector quando acessando dados no Google Cloud Storage.

### Para uso dos professores/avaliadores. Apagar quando avaliação é terminada

Download das credenciais JSON

In [None]:
#código retirado da documentação do Google
# Import PyDrive and associated libraries.
# This only needs to be done once per notebook.
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client.
# This only needs to be done once per notebook.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

# Download a file based on its file ID.
#
# A file ID looks like: laggVyWshwcyP6kEI-y_W3P8D26sz
file_id = '1-S77SFs6zVrscrJOtB24z0_IZyOecu1u'
# Initialize GoogleDriveFile instance with file id.
downloaded = drive.CreateFile({'id': file_id})
downloaded.GetContentFile('JSON_keys.json') # Download file

### Caminho do arquivo de chave JSON

In [None]:
json_path = '/content/drive/MyDrive/credentials/engdados-soulcode-ae874e15f2ec.json'


#para uso dos professores ou avaliadores
#json_path = '/content/JSON_keys.json'


## Sessão spark


* The AbstractFileSystem for 'gs:' URIs

  * spark.hadoop.fs.AbstractFileSystem.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS

* Optional. Google Cloud Project ID with access to GCS buckets. Required only for list buckets and create bucket operations.

  * spark.hadoop.fs.gs.project.id=

* Whether to use a service account for GCS authorization. Setting this property to `false` will disable use of service accounts for authentication.

  * spark.hadoop.google.cloud.auth.service.account.enable=true

* The JSON keyfile of the service account used for GCS access when google.cloud.auth.service.account.enable is `true`.

  * spark.hadoop.google.cloud.auth.service.account.json.keyfile=/path/to/keyfile

In [None]:
spark = (SparkSession.builder
        .master("local")
        .appName("netflix")
        .config('spark.jars', gcs_conector)
        .config('spark.hadoop.fs.gs.impl','com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
        .config('spark.hadoop.fs.gs.auth.service.account.enable', 'true')
        .config("google.cloud.auth.service.account.json.keyfile",json_path)
        .config('spark.hadoop.fs.gs.project.id','engdados-soulcode')
        .getOrCreate())

In [None]:
spark

# Ler dados do GCS

## Caminho do Dataset

In [None]:
file_path = 'gs://datasets-brutos/netflix_daily_top_10.csv'

## Montagem do Schema

In [None]:
esquema = (
    StructType([
        StructField("Data_rank",DateType(), True),
        StructField("Rank", IntegerType(), True),
        StructField("Semanas_no_rank", IntegerType(), True),
        StructField("Rank_ultima_semana", IntegerType(), True),
        StructField("Titulo", StringType(), True),
        StructField("Tipo", StringType(), True),
        StructField("Exclusividade", StringType(), True),
        StructField("Data_lancamento_Netflix", StringType(), True),
        StructField("Dias_no_top10", IntegerType(), True),
        StructField("Pontos_visualizacoes", IntegerType(), True)
    ])
)

## Ler com csv dataset com pyspark usando schema 

In [None]:
df = (
    spark
       .read
       .format("csv")
       .option("header", "true")
       .option("delimiter", ",")
       .load(file_path,schema=esquema)
)

In [None]:
df.printSchema()

root
 |-- Data_entrada_rank: date (nullable = true)
 |-- Rank: integer (nullable = true)
 |-- Semanas_no_rank: integer (nullable = false)
 |-- Rank_ultima_semana: integer (nullable = true)
 |-- Titulo: string (nullable = true)
 |-- Tipo: string (nullable = true)
 |-- Exclusividade: string (nullable = false)
 |-- Data_lancamento_Netflix: string (nullable = true)
 |-- Dias_no_top10: integer (nullable = true)
 |-- Pontos_visualizacoes: integer (nullable = true)



Criando backup

In [None]:
df_bk = df

# Pré-análise dos dados

In [None]:
df.summary().show()

+-------+------------------+-----------------+------------------+------+----------------+-------------+-----------------------+------------------+--------------------+
|summary|              Rank|  Semanas_no_rank|Rank_ultima_semana|Titulo|            Tipo|Exclusividade|Data_lancamento_Netflix|     Dias_no_top10|Pontos_visualizacoes|
+-------+------------------+-----------------+------------------+------+----------------+-------------+-----------------------+------------------+--------------------+
|  count|              7100|             7100|              3132|  7100|            7100|         7100|                   7100|              7100|                7100|
|   mean|               5.5|4.439295774647888|3.9383780332056193|  null|            null|         null|                   null|24.123661971830987|  122.79014084507043|
| stddev|2.8724836179710516|3.041267475459205|2.6253990562461285|  null|            null|         null|                   null| 58.47378925134757|  213.86164216

Achar onde e quantos valores `null` existem



In [None]:
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------------+----+---------------+------------------+------+----+-------------+-----------------------+-------------+--------------------+
|Data_entrada_rank|Rank|Semanas_no_rank|Rank_ultima_semana|Titulo|Tipo|Exclusividade|Data_lancamento_Netflix|Dias_no_top10|Pontos_visualizacoes|
+-----------------+----+---------------+------------------+------+----+-------------+-----------------------+-------------+--------------------+
|                0|   0|            859|              3968|     0|   0|         2501|                      0|            0|                   0|
+-----------------+----+---------------+------------------+------+----+-------------+-----------------------+-------------+--------------------+



Achar os `NaN` nas colunas sem date-type

In [None]:
(df.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in df.columns[1:]])
.show())

+----+---------------+------------------+------+----+-------------+-----------------------+-------------+--------------------+
|Rank|Semanas_no_rank|Rank_ultima_semana|Titulo|Tipo|Exclusividade|Data_lancamento_Netflix|Dias_no_top10|Pontos_visualizacoes|
+----+---------------+------------------+------+----+-------------+-----------------------+-------------+--------------------+
|   0|              0|                 0|     0|   0|            0|                      0|            0|                   0|
+----+---------------+------------------+------+----+-------------+-----------------------+-------------+--------------------+



Achar valores distintos em exclusividade

In [None]:
distinct_exc = [x.Exclusividade for x in df.select('Exclusividade').distinct().collect()]
print(distinct_exc)

[None, 'Yes']


Achar valores distintos em Tipos

In [None]:
distinct_tipo = [x.Tipo for x in df.select('Tipo').distinct().collect()]
print(distinct_tipo)

['Concert/Perf…', 'Stand-Up Comedy', 'TV Show', 'Movie']


# Tratamento de dados

## Renomear dados para português, exceto títulos

In [None]:
df = df.na.replace('Yes', 'Sim','Exclusividade')
df.show()

+-----------------+----+---------------+------------------+--------------------+-------+-------------+-----------------------+-------------+--------------------+
|Data_entrada_rank|Rank|Semanas_no_rank|Rank_ultima_semana|              Titulo|   Tipo|Exclusividade|Data_lancamento_Netflix|Dias_no_top10|Pontos_visualizacoes|
+-----------------+----+---------------+------------------+--------------------+-------+-------------+-----------------------+-------------+--------------------+
|       2020-04-01|   1|              1|                 1|Tiger King: Murde...|TV Show|          Sim|           Mar 20, 2020|            9|                  90|
|       2020-04-01|   2|              2|              null|               Ozark|TV Show|          Sim|           Jul 21, 2017|            5|                  45|
|       2020-04-01|   3|              3|                 2|        All American|TV Show|         null|           Mar 28, 2019|            9|                  76|
|       2020-04-01|   4|    

In [None]:
df = df.replace(distinct_tipo,['Concertos/Shows', 'Comedia Stand-Up', 'Series de TV', 'Filmes'],'Tipo')
df.show()

+-----------------+----+---------------+------------------+--------------------+------------+-------------+-----------------------+-------------+--------------------+
|Data_entrada_rank|Rank|Semanas_no_rank|Rank_ultima_semana|              Titulo|        Tipo|Exclusividade|Data_lancamento_Netflix|Dias_no_top10|Pontos_visualizacoes|
+-----------------+----+---------------+------------------+--------------------+------------+-------------+-----------------------+-------------+--------------------+
|       2020-04-01|   1|              1|                 1|Tiger King: Murde...|Series de TV|          Sim|           Mar 20, 2020|            9|                  90|
|       2020-04-01|   2|              2|              null|               Ozark|Series de TV|          Sim|           Jul 21, 2017|            5|                  45|
|       2020-04-01|   3|              3|                 2|        All American|Series de TV|         null|           Mar 28, 2019|            9|                  76

## Tratando null
Substituir `Null` para valores padrões da classe.

Se não tem exclusividade, então `Não` é o valor.
Se não há semanas no rank, então sua primeira semana no rank, logo, `0`. Já rank da última semana, não há valores padrões, pois os casos são quando não estavam presente ou estreia

In [None]:
df = df.na.fill(value={'Semanas_no_rank':0,'Exclusividade':"Não"})
df.show()

+-----------------+----+---------------+------------------+--------------------+------------+-------------+-----------------------+-------------+--------------------+
|Data_entrada_rank|Rank|Semanas_no_rank|Rank_ultima_semana|              Titulo|        Tipo|Exclusividade|Data_lancamento_Netflix|Dias_no_top10|Pontos_visualizacoes|
+-----------------+----+---------------+------------------+--------------------+------------+-------------+-----------------------+-------------+--------------------+
|       2020-04-01|   1|              1|                 1|Tiger King: Murde...|Series de TV|          Sim|           Mar 20, 2020|            9|                  90|
|       2020-04-01|   2|              2|              null|               Ozark|Series de TV|          Sim|           Jul 21, 2017|            5|                  45|
|       2020-04-01|   3|              3|                 2|        All American|Series de TV|          Não|           Mar 28, 2019|            9|                  76

Checar os `null` tratados

In [None]:
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in ['Exclusividade','Semanas_no_rank']]).show()

+-------------+---------------+
|Exclusividade|Semanas_no_rank|
+-------------+---------------+
|            0|              0|
+-------------+---------------+



## Tratar coluna `Data_lancamento_Netflix` transformar em date-type

### Verificar tamanhos que as strings possuem para extrair dia, mes e ano

In [None]:
list_data_lancamento_Netflix = [len(x.Data_lancamento_Netflix) for x in df.select('Data_lancamento_Netflix').collect()]
myset = set(list_data_lancamento_Netflix)
print(myset)

{11, 12}


### Separando valor mes na string em colunas auxiliares

In [None]:
df = (df.withColumn("mes", F.date_format(F.to_date(df.Data_lancamento_Netflix.substr(1,3), "MMM"), "MM"))
)

### Separando valor ano na string em colunas 

In [None]:
df = (df.withColumn('ano', 
               F.when(F.length('Data_lancamento_Netflix') == 11,
                      df.Data_lancamento_Netflix.substr(8,4))
               .otherwise(df.Data_lancamento_Netflix.substr(9,4)))
)

### Separarando o valor dia na string

In [None]:
df = (df.withColumn('dia', 
               F.when(F.length('Data_lancamento_Netflix') == 11,
                      df.Data_lancamento_Netflix.substr(5,1))
               .otherwise(df.Data_lancamento_Netflix.substr(5,2))))

### Concatenar ano, mes e dia em uma coluna e transforma em DateType

In [None]:
df = df.withColumn("Data_lancamento_Netflix",F.concat_ws("-",F.col("ano"),F.col("mes"),F.col("dia")).cast("date"))

Dropando colunas auxiliares

In [None]:
cols = ['dia', 'mes', 'ano']
df = df.drop(*cols)

In [None]:
df.show(5)
df.printSchema()
df.summary().show()

+-----------------+----+---------------+------------------+--------------------+------------+-------------+-----------------------+-------------+--------------------+
|Data_entrada_rank|Rank|Semanas_no_rank|Rank_ultima_semana|              Titulo|        Tipo|Exclusividade|Data_lancamento_Netflix|Dias_no_top10|Pontos_visualizacoes|
+-----------------+----+---------------+------------------+--------------------+------------+-------------+-----------------------+-------------+--------------------+
|       2020-04-01|   1|              1|                 1|Tiger King: Murde...|Series de TV|          Sim|             2020-03-20|            9|                  90|
|       2020-04-01|   2|              2|              null|               Ozark|Series de TV|          Sim|             2017-07-21|            5|                  45|
|       2020-04-01|   3|              3|                 2|        All American|Series de TV|          Não|             2019-03-28|            9|                  76

# Salvar csv no GCS para data visualization e analytics

In [None]:
path_dest = 'gs://datasets-brutos/datasets-tratados/netflix_daily_top_10.csv'


In [None]:
df.write.option("header",'true').format('csv').mode('overwrite').save(path_dest)