<a href="https://colab.research.google.com/github/natalialnb/Automacao_Web/blob/main/Untitled6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# <font color="#cd7f32"> **Mini Case PySpark** </font>
**Jump Start**

### O Que é Particionamento?

Particionamento é a forma como o PySpark divide um grande conjunto de dados em partes menores, chamadas de "partições", para processá-las de forma distribuída. Cada partição é um pedaço de dados que pode ser processado independentemente em paralelo. A ideia é distribuir a carga de trabalho entre diferentes nós de um cluster, para acelerar o processamento e tornar a análise de big data mais eficiente.

### Como o Particionamento Funciona no PySpark

1. **Partições de RDD**: No PySpark, os dados são representados por RDDs (Resilient Distributed Datasets). Um RDD é dividido em várias partições, que são processadas em paralelo. O número de partições pode ser ajustado para otimizar a performance.

2. **DataFrames e Datasets**: Para DataFrames e Datasets, o particionamento é um pouco mais abstrato, mas o princípio é o mesmo. Dados são divididos em partições para serem processados em paralelo.

3. **Paralelismo e Distribuição**: As partições são distribuídas entre os diferentes nós do cluster, permitindo que múltiplas operações sejam realizadas em paralelo. Isso é essencial para escalar o processamento de dados.

### Como Configurar o Particionamento

1. **Número de Partições**: Você pode definir o número de partições ao criar um RDD, usar a função `repartition()` ou `coalesce()` em DataFrames. O `repartition()` aumenta ou diminui o número de partições e pode causar um shuffle (redistribuição dos dados), enquanto o `coalesce()` é mais eficiente para reduzir o número de partições sem shuffle.

   ```python
   # Exemplo de reparticionamento
   df = df.repartition(10)
   ```

2. **Coluna de Particionamento**: Ao usar operações como `groupBy` ou `join`, você pode particionar os dados com base em uma coluna específica. Isso pode melhorar a eficiência do processamento, especialmente se os dados são frequentemente agrupados ou filtrados com base nessa coluna.

   ```python
   # Exemplo de particionamento por coluna
   df = df.repartition('column_name')
   ```

### Impacto na Performance

1. **Redução de Shuffle**: O particionamento adequado pode reduzir a quantidade de shuffle necessário. O shuffle ocorre quando os dados precisam ser redistribuídos entre diferentes partições e pode ser um processo caro em termos de tempo e recursos.

2. **Balanceamento de Carga**: Se as partições forem muito grandes ou muito pequenas, isso pode causar um balanceamento desigual da carga de trabalho entre os nós do cluster. O particionamento adequado ajuda a garantir que cada nó esteja trabalhando com uma quantidade equilibrada de dados.

3. **Melhoria da Localidade dos Dados**: Particionamento pode melhorar a localidade dos dados, ou seja, dados que são frequentemente acessados juntos são colocados na mesma partição. Isso reduz o tempo de acesso e melhora a eficiência.

4. **Performance de Operações**: Operações como `join`, `groupBy`, e `aggregate` podem ser muito mais eficientes quando o particionamento é feito com base nas colunas relevantes. Isso minimiza a quantidade de dados que precisam ser movidos entre partições.

5. **Efeito da Persistência**: Se você estiver persistindo dados em memória (`cache()` ou `persist()`), o particionamento adequado também pode impactar a eficiência do armazenamento e recuperação desses dados.

### Boas Práticas para Particionamento

1. **Ajustar o Número de Partições**: O número ideal de partições depende do tamanho dos dados e dos recursos disponíveis no cluster. Testar e ajustar o número de partições pode melhorar a performance.

2. **Evitar Partições Pequenas**: Partições muito pequenas podem resultar em overhead excessivo. O ideal é ter um número razoável de partições, onde cada uma contém uma quantidade significativa de dados.

3. **Escolher Colunas Adequadas para Particionamento**: Ao particionar com base em uma coluna, escolha uma coluna que distribua os dados de maneira uniforme e que seja frequentemente usada em operações de agrupamento ou junção.

4. **Monitorar e Ajustar**: Utilize ferramentas de monitoramento do Spark para analisar a performance e ajustar o particionamento conforme necessário.

### Conclusão

O particionamento é uma ferramenta poderosa no PySpark para otimizar o processamento de grandes volumes de dados. Um particionamento eficaz pode reduzir o tempo de processamento, melhorar o balanceamento de carga e minimizar o custo de operações de shuffle. Com uma compreensão adequada e a aplicação de boas práticas, você pode maximizar a eficiência das suas análises em big data.


In [7]:
!pip install pyspark



In [8]:
# Importando bibliotecas e os dados

In [9]:
from pyspark.sql import SparkSession
import time


# Inicializando uma Spark Session
spark = SparkSession.builder.appName("TitleBasicsExample").getOrCreate()

# Lendo o arquivo TSV comprimido
df = spark.read.csv("title.basics.tsv.gz", sep='\t', header=True, inferSchema=True)

# Exibindo o esquema do DataFrame
df.printSchema()

# Exibindo as primeiras 5 linhas do DataFrame
df.show(5)

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- endYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|     \N|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|     \N|             5|     Animation,

In [10]:
total_count = df.count()

print(f"Número total de registros: {total_count}")

Número total de registros: 8722919


In [11]:
# Marcar o início do tempo
start_time = time.time()

# Realizar o groupBy e contar os registros
df_grouped = df.groupBy("titleType").count()

# Coletar os resultados para visualizar (isto pode demorar dependendo do tamanho dos dados)
result = df_grouped.collect()

# Marcar o fim do tempo
end_time = time.time()

# Exibir o tempo decorrido
elapsed_time = end_time - start_time
print(f"Tempo decorrido: {elapsed_time:.2f} segundos")

# Exibir os resultados do groupBy
df_grouped.show()

Tempo decorrido: 24.66 segundos
+------------+-------+
|   titleType|  count|
+------------+-------+
|    tvSeries| 215743|
|tvMiniSeries|  43668|
|     tvMovie| 123422|
|     tvPilot|      1|
|   tvEpisode|6642733|
|       movie| 578512|
|   tvSpecial|  42668|
|       video| 242366|
|   videoGame|  31505|
|     tvShort|   6931|
|       short| 795370|
+------------+-------+



## Particionando o DataFrame em 10 partições:




In [12]:
df_particionado_num = df.repartition(10)

# Exibindo o número de partições
print("Número de partições:", df_particionado_num.rdd.getNumPartitions())

Número de partições: 10


In [13]:
# Salvando o DataFrame particionado em formato Parquet
df_particionado_num.write.parquet("output/particionado_num")

In [14]:
# Lendo os dados particionados por número
df_verificado_num = spark.read.parquet("output/particionado_num")

# Exibindo o esquema do DataFrame verificado
df_verificado_num.printSchema()

# Exibindo as primeiras 5 linhas do DataFrame verificado
df_verificado_num.show(5)

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- endYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)

+----------+---------+---------------+---------------+-------+---------+-------+--------------+-------------+
|    tconst|titleType|   primaryTitle|  originalTitle|isAdult|startYear|endYear|runtimeMinutes|       genres|
+----------+---------+---------------+---------------+-------+---------+-------+--------------+-------------+
|tt18927514|tvEpisode| Episode #1.198| Episode #1.198|      0|       \N|     \N|            \N|        Drama|
|tt12941206|tvEpisode|Episode #1.2790|Episode #1.2790|      0|     1984|     \N|            43|Drama,Romance|
|tt29386795|tvEpisode|  Episode #1.12|  Episode #1.12|      0|     2023| 

In [15]:
def count_in_partition(index, iterator):
    count = sum(1 for _ in iterator)
    yield (index, count)

# Aplicando a função para contar linhas em cada partição
partition_counts = df_particionado_num.rdd.mapPartitionsWithIndex(count_in_partition).collect()

# Exibindo o número de linhas em cada partição
for partition, count in partition_counts:
    print(f"Partição {partition}: {count} linhas")

Partição 0: 872292 linhas
Partição 1: 872292 linhas
Partição 2: 872292 linhas
Partição 3: 872292 linhas
Partição 4: 872292 linhas
Partição 5: 872292 linhas
Partição 6: 872292 linhas
Partição 7: 872292 linhas
Partição 8: 872291 linhas
Partição 9: 872292 linhas


In [22]:
# Marcar o início do tempo
start_time = time.time()

# Realizar o groupBy e contar os registros
df_grouped = df_particionado_num.groupBy("titleType").count()

# Coletar os resultados para visualizar (isto pode demorar dependendo do tamanho dos dados)
result = df_grouped.collect()

# Marcar o fim do tempo
end_time = time.time()

# Exibir o tempo decorrido
elapsed_time = end_time - start_time
print(f"Tempo decorrido: {elapsed_time:.2f} segundos")

# Exibir os resultados do groupBy
df_grouped.show()

Tempo decorrido: 37.71 segundos
+------------+-------+
|   titleType|  count|
+------------+-------+
|    tvSeries| 215743|
|tvMiniSeries|  43668|
|     tvMovie| 123422|
|   tvEpisode|6642733|
|       movie| 578512|
|   tvSpecial|  42668|
|       video| 242366|
|   videoGame|  31505|
|     tvShort|   6931|
|       short| 795370|
|     tvPilot|      1|
+------------+-------+



# Particionamento por coluna

In [17]:
start_time = time.time()
df_filtered = df.filter(df.titleType == 'tvEpisode')
start_time = time.time()

# Realizar o groupBy por startYear e contar os registros
df_grouped = df_filtered.groupBy("startYear").count()

# Coletar os resultados para visualizar (isto pode demorar dependendo do tamanho dos dados)
result = df_grouped.collect()

# Marcar o fim do tempo
end_time = time.time()

# Exibir o tempo decorrido
elapsed_time = end_time - start_time
print(f"Tempo decorrido: {elapsed_time:.2f} segundos")

# Exibir os resultados do groupBy
df_grouped.show()


Tempo decorrido: 23.01 segundos
+---------+------+
|startYear| count|
+---------+------+
|     1953|  6512|
|     1957| 10186|
|     1987| 27817|
|     1956|  8511|
|     2016| 99645|
|     1936|    46|
|     2012|201333|
|     2020|323994|
|     1958| 11140|
|     1943|    25|
|     1972| 19811|
|     1931|    14|
|     2026|    94|
|     1988| 26860|
|     1938|    57|
|     2019|301583|
|     2017|113702|
|     1932|     3|
|     1977| 21853|
|     1971| 21005|
+---------+------+
only showing top 20 rows



In [18]:
# Particionando o DataFrame com base na coluna titleType
df_particionado_col = df.repartition("titleType")

# Salvando o DataFrame particionado em formato Parquet
df_particionado_col.write.partitionBy("titleType").parquet("output/particionado_col")

In [19]:
# Particionando o DataFrame com base na coluna titleType
df_particionado_col = df.repartition("titleType")

# Verificando o número de partições
print("Número de partições:", df_particionado_col.rdd.getNumPartitions())

# Contando o número de registros em cada partição
df_particionado_col.groupBy("titleType").count().show()

Número de partições: 5
+------------+-------+
|   titleType|  count|
+------------+-------+
|    tvSeries| 215743|
|tvMiniSeries|  43668|
|     tvMovie| 123422|
|     tvPilot|      1|
|   tvEpisode|6642733|
|       movie| 578512|
|   tvSpecial|  42668|
|       video| 242366|
|   videoGame|  31505|
|     tvShort|   6931|
|       short| 795370|
+------------+-------+



In [20]:
start_time = time.time()

df_filtered = df_particionado_col.filter(df.titleType == 'tvEpisode')

df_grouped = df_filtered.groupBy("startYear").count()

# Coletar os resultados para visualizar (isto pode demorar dependendo do tamanho dos dados)
result = df_grouped.collect()

# Marcar o fim do tempo
end_time = time.time()

# Exibir o tempo decorrido
elapsed_time = end_time - start_time
print(f"Tempo decorrido: {elapsed_time:.2f} segundos")

# Exibir os resultados do groupBy
df_grouped.show()

Tempo decorrido: 25.66 segundos
+---------+------+
|startYear| count|
+---------+------+
|     1953|  6512|
|     1957| 10186|
|     1987| 27817|
|     1956|  8511|
|     2016| 99645|
|     1936|    46|
|     2012|201333|
|     2020|323994|
|     1958| 11140|
|     1943|    25|
|     1972| 19811|
|     1931|    14|
|     2026|    94|
|     1988| 26860|
|     1938|    57|
|     2019|301583|
|     2017|113702|
|     1932|     3|
|     1977| 21853|
|     1971| 21005|
+---------+------+
only showing top 20 rows

