<a href="https://colab.research.google.com/github/nararodriguess/transformacoes_acoes_rdd/blob/main/transformacoesRDDSpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Principais transformações que podem ser executadas sobre RDDs do Spark ✅


*     A operação de Transformação do Spark produz um ou mais novos RDDs.


In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 56.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=311d38d4137a6ceadadf9654f9c2e274a90308e5df45cdc888c43be584a59d6c
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from operator import add

**map()**: Cria um novo RDD passando cada elemento do RDD de origem pela função func. Cada elemento do RDD de origem é mapeado em um único item no novo RDD:

In [5]:
spark = (SparkSession.builder.appName("Map").getOrCreate()) #Cria sessão

In [7]:
fileWords = "palavras2.txt" # atribui à variável fileWords o arquivo .txt contendo um texto
data = spark.read.text(fileWords).rdd.map(lambda r: r[0]) # transforma.txt em RDD usando o map()

**flatMap()**: Similar ao map(), mapeia os elementos e retorna um novo RDD aplicando primeiro uma função a todos os elementos desse RDD e, em seguida, nivelando os resultados:

In [8]:
fileWords = data.flatMap(lambda line : line.split()) #Atribui à variável fileFlat o arquivo data, mapeia o arquivo por meio do flatMap()
output = fileWords.collect()
for i in output:
  print(i)

Unify
the
processing
of
your
data
in
batches
and
real-time
streaming,
using
your
preferred
language:
Python,
SQL,
Scala,
Java
or
R.
Execute
fast,
distributed
ANSI
SQL
queries
for
dashboarding
and
ad-hoc
reporting.
Runs
faster
than
most
data
warehouses.
Perform
Exploratory
Data
Analysis
(EDA)
on
petabyte-scale
data
without
having
to
resort
to
downsampling.
Train
machine
learning
algorithms
on
a
laptop
and
use
the
same
code
to
scale
to
fault-tolerant
clusters
of
thousands
of
machines.


**filter()**: Cria um novo RDD contendo apenas itens que foram retornados como verdadeiro, conforme a função:

In [9]:
fileWords = data.flatMap(lambda line : line.split()).filter(lambda word: word.startswith("a")) # filter() filtra o RDD para palavras que começam com 'a'
output = fileWords.collect()
for i in output:
  print(i)

and
and
ad-hoc
algorithms
a
and


**reduceByKey()**: Redução pela chave, agrega todas as tuplas:

In [10]:
list = ["maria", "jose", "jose", "joao", "joao"]

rdd = spark.sparkContext.parallelize(list)
rdd2 = (rdd.map(lambda w: (w,1)).reduceByKey(add)).collect()
for i in rdd2:
  print(i)

('joao', 2)
('maria', 1)
('jose', 2)


**sortByKey()**: ordena as tuplas de acordo com a chave:

In [11]:
list = ["um", "dois", "cinco", "um", "dois"]

rdd = spark.sparkContext.parallelize(list)
rdd2 = (rdd.map(lambda w: (w,1)).reduceByKey(add).sortByKey("asc")).collect()
for i in rdd2:
  print(i)

('cinco', 1)
('dois', 2)
('um', 2)


**union(rdd)**: Cria um RDD que contem todos os elementos dos RDDs. Os objetos dos RDDs devem ser do mesmo tipo:

In [12]:
list = ["um", "dois", "cinco", "um", "dois"]
list2 = ["tres", "tres", "quantro", "zero", "zero"]

rdd = spark.sparkContext.parallelize(list)
rdd2 = spark.sparkContext.parallelize(list2)
rddUnion = rdd.union(rdd2).collect()
for i in rddUnion:
  print(i)

um
dois
cinco
um
dois
tres
tres
quantro
zero
zero


**intersection(rdd)**: Cria um RDD com elementos comuns entre os RDDs:

In [13]:
list = ["um", "dois", "cinco", "um", "dois"]
list2 = ["um", "cinco", "quantro", "cinco", "zero"]

rdd = spark.sparkContext.parallelize(list)
rdd2 = spark.sparkContext.parallelize(list2)
rddInter = rdd.intersection(rdd2).collect()
for i in rddInter:
  print(i)

um
cinco


**distinct(rdd)**: Cria um RDD com itens distintos:

In [14]:
list = ["um", "dois", "cinco", "um", "dois"]

rdd = spark.sparkContext.parallelize(list).distinct().collect()
for i in rdd:
  print(i)

um
dois
cinco


**join)rdd)**: Junção de tuplas com mesma chave:

In [15]:
nome = [("Joao", 25), ("Lucas", 18)]
cidade = [("Joao", "SP"), ("Lucas", "RJ")]

rddNome = spark.sparkContext.parallelize(nome)
rddCidade = spark.sparkContext.parallelize(cidade)

rddJoin = rddNome.join(rddCidade).collect()
print(rddJoin)

[('Joao', (25, 'SP')), ('Lucas', (18, 'RJ'))]


# Principais ações que podem ser executadas sobre RDDs do Spark ✅


*   Ações são operações que produzem valores que não são RDDs
*   Os valores produzidos são copiados para o *driver* ou para o sistema de armazenamento

**collect()**: Retorna o conteúdo do RDD para o driver:

In [16]:
rddJoin = rddNome.join(rddCidade).collect()
print(rddJoin)

[('Joao', (25, 'SP')), ('Lucas', (18, 'RJ'))]


**count()**: Conta o número de itens do RDD:

In [17]:
nome = [("Joao", 25), ("Lucas", 18)]
cidade = [("Joao", "SP"), ("Lucas", "RJ")]

rddNome = spark.sparkContext.parallelize(nome)
rddCidade = spark.sparkContext.parallelize(cidade)

rddJoin = rddNome.join(rddCidade)
print(rddJoin.count())

2


**take(in)**: Retorna N elementos aleatórios do RDD:

In [18]:
list = ["dois", "dois", "cinco", "um", "dois"]
list2 = ["um", "cinco", "quantro", "cinco", "zero"]

rdd = spark.sparkContext.parallelize(list)
rdd2 = spark.sparkContext.parallelize(list2)
rddUni = rdd.union(rdd2)
rddUni.take(3)

['dois', 'dois', 'cinco']

**top(k)**: Retorna k elementos considerando a ordem decrescente:

In [19]:
list = ["dois", "dois", "cinco", "um", "dois"]
list2 = ["um", "cinco", "quantro", "cinco", "zero"]

rdd = spark.sparkContext.parallelize(list)
rdd2 = spark.sparkContext.parallelize(list2)
rddUni = rdd.union(rdd2)
rddUni.top(7)

['zero', 'um', 'um', 'quantro', 'dois', 'dois', 'dois']

**countByValue()**: Conta o número de ocorrências para cada valor do RDD:

In [20]:
list = ["dois", "dois", "cinco", "um", "dois"]
list2 = ["um", "cinco", "quantro", "cinco", "zero"]

rdd = spark.sparkContext.parallelize(list)
rdd2 = spark.sparkContext.parallelize(list2)
rddUni = rdd.union(rdd2)
rddUni.countByValue()

defaultdict(int, {'cinco': 3, 'dois': 3, 'quantro': 1, 'um': 2, 'zero': 1})

**reduce()**: Executa uma operação para todos os itens do RDD:

In [21]:
list = ["dois", "dois", "cinco", "um", "dois"]
list2 = ["um", "cinco", "quantro", "cinco", "zero"]

rdd = spark.sparkContext.parallelize(list)
rdd2 = spark.sparkContext.parallelize(list2)
rddUni = rdd.union(rdd2)
rddUni.reduce(lambda a,b: a + ' ' + b)

'dois dois cinco um dois um cinco quantro cinco zero'