## Entendendo as RDDs

In [None]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [None]:
# Criação de uma RDD simples
rdd1 = sc.parallelize([1, 2, 3])
rdd1.collect()

In [None]:
# Utilizando o NumPy
import numpy as np
a = np.array(range(100))
a

In [None]:
rdd2 = sc.parallelize(np.array(range(100)))
rdd2.take(10)

In [None]:
# Verificando a paralelização
# Observe que a paralelização da RDD não ocorreu. Ela não foi dividida
rdd3 = sc.parallelize(np.array(range(100)))
print(rdd3.glom().collect())

## O particionamento das RDDs é feito por padrão pelo tamanho do cluster, mas também podemos especificar explicamento, de preferência com o número de cores do PC.

In [None]:
# Paralelizando explicitamento
rdd4 = sc.parallelize(np.array(range(100)), 10)

In [None]:
# Verificando o paralelização
print(rdd4.glom().collect())

In [None]:
print(rdd4.getNumPartitions())

In [None]:
# Lendo O Primo Basilio - Eça de Queiroz
rddBasilio = sc.textFile('Basilio.txt')

In [None]:
print(rddBasilio.getNumPartitions())

In [None]:
rddBasilioPart = sc.textFile('Basilio.txt', 100)

In [None]:
print(rddBasilioPart.getNumPartitions())

In [None]:
print(rddBasilioPart.glom().collect())

## Leitura da Bíblia

In [None]:
# Verifique se o arquivo está em ANSI
rddANSI = sc.textFile('Biblia-ANSI.txt')
print(rddANSI.take(1000))

In [None]:
# Altere o arquivo para UTF-8
rddBiblia = sc.textFile('Biblia-UTF8.txt')

print(rddBiblia.take(1000))

## Transformações

1 - São operações em um RDD que devolvem um novo RDD

2 - Normalmente executam uma função anônima (lambda) sobre cada um dos elementos do RDD

3 - Operam sobre lazy

## Utilizando o Intersect

In [None]:
dados1 = sc.parallelize(['A', 'B', 'C', 'D', 'E'])
dados2 = sc.parallelize(['A', 'E', 'I', 'O', 'U'])

result = dados1.intersection(dados2)
result.take(10)

In [None]:
jose = rddBiblia.filter(lambda linha: 'José' in linha)

In [None]:
maria = rddBiblia.filter(lambda linha: 'Maria' in linha)

In [None]:
biblia = jose.intersection(maria)

## Ações

1 - Devolvem um resultado

2 - Faz todas as transformações anteriores serem executadas

## Utilizando o takeSample para gerar amostras

In [None]:
# True - Possibilita elementos REPETIDOS
# Numero - Indica o tamanho da amostra

# Criando um RDD com range de números
rdd5 = sc.parallelize(np.array(range(10)))

# Contando o número de elementos
contador = rdd5.count()

# Imprimindo o número de elementos
print('Número de elementos do range{0}'.format(contador))

# utilizando um for para imprimir cada elemento
for l in rdd5.takeSample(True, 8):
    print(l)

In [None]:
type(rdd5)

In [None]:
lines = biblia.count()

print('Número de linhas: {0}'.format(lines))

for l in biblia.takeSample(False, 5):
    print(l)

In [None]:
for l in biblia.takeSample(False, 8):
    print(l)

In [None]:
for l in biblia.takeSample(True, 10):
    print(l)

## Compreendendo todo o código

In [None]:
# Aqui, criamos o RDD lendo do arquivo
rddBiblia = sc.textFile('Biblia-UTF8.txt')

# Criamos duas variáveis, utilizando filter e lambda, trazendo apenas as linhas com determinada palavra
jose = rddBiblia.filter(lambda linha: 'José' in linha)
maria = rddBiblia.filter(lambda linha: 'Maria' in linha)

# Criamos uma variável int com o número de linhas da RD resultado da interseção
lines = biblia.count()

# Imprimimos o número de linhas
print('Número de linhas com José e Maria {0}'.format(lines))

# Iteramos a RDD com intersection utilizando takeSample, que retorna um objeto iterável
for l in biblia.takeSample(False, 5):
    print(l)

## Podemos utilizar variáveis

In [None]:
rddBiblia = sc.textFile('Biblia-UTF8.txt')

A = 'Jesus'
B = 'Cristo'

linhas1 = rddBiblia.filter(lambda linha: A in linha)
linhas2 = rddBiblia.filter(lambda linha: B in linha)

inter = linhas1.intersection(linhas2)

lines = inter.count()

print('Número de linhas: ' + str(lines))

for l in inter.takeSample(False, 10):
    print(l)

## E se quisermos todas as linhas ?

In [None]:
A = 'Jesus'
B = 'Cristo'

linhas1 = rddBiblia.filter(lambda linha: A in linha)
linhas2 = rddBiblia.filter(lambda linha: B in linha)

inter = linhas1.intersection(linhas2)

lines = inter.count()

print('Número de linhas: ' + str(lines))

inter.takeSample(False, lines)

## podemos utilizar o collect()

In [None]:
# Podemos iterar ?
A = 'Jesus'
B = 'Cristo'

linhas1 = rddBiblia.filter(lambda linha: A in linha)
linhas2 = rddBiblia.filter(lambda linha: B in linha)

inter = linhas1.intersection(linhas2)

lines = inter.count()

print('Número de linhas: ' + str(lines))
print('Tipo da RDD inter: ' + str(type(inter)))

for l in inter.collect():
    print(l)

In [None]:
A = 'Jesus'
B = 'Cristo'

linhas1 = rddBiblia.filter(lambda linha: A in linha)
linhas2 = rddBiblia.filter(lambda linha: B in linha)

inter = linhas1.intersection(linhas2)

lines = inter.count()

print('Número de linhas: ' + str(lines))

inter.collect()