<a href="https://colab.research.google.com/github/lucaskrlima/bootcamp_soulcode_da/blob/main/PySpark_primeiros_passos.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Conhecimento só tem função de verdade quando é compartilhado.

Marianna Moreno

## Instalando os requisitos necessários

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

## Importando as ferramentas e criando o ambiente virtual para o spark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

## Criando a sessão clusterizada do PySpark

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
from pyspark.sql.functions import regexp_replace
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Para deixar a visualição das tabelas mais amigável
spark

## Importando o pandas para utilizar ferramentas do pandas e cer como podemos fazer o equivalente no PySpark

In [None]:
import pandas as pd

In [None]:
# Criando um dataframe a partir de listas Python
nome = ['Douglas','Daniela','Pedro','Maria','Eduardo','Ester']
idade = [45,7,65,64,37,37]
altura = [1.85,1.23,1.75,1.67,1.82,1.73]
peso = [70,22,87,64,96,68]
df_pd = pd.DataFrame(zip(nome,idade,altura,peso),columns=['nome','idade','altura','peso'])
display(df_pd)
df_pd.info()

Unnamed: 0,nome,idade,altura,peso
0,Douglas,45,1.85,70
1,Daniela,7,1.23,22
2,Pedro,65,1.75,87
3,Maria,64,1.67,64
4,Eduardo,37,1.82,96
5,Ester,37,1.73,68


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 4 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   nome    6 non-null      object 
 1   idade   6 non-null      int64  
 2   altura  6 non-null      float64
 3   peso    6 non-null      int64  
dtypes: float64(1), int64(2), object(1)
memory usage: 320.0+ bytes


In [None]:
'''
Criação do schema que nada mais é do que o esqueleto da tabela
'''
from pyspark.sql.types import *
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,DoubleType # importando estrutura da tabela (structType)
schema = StructType([ \
    StructField('nome',StringType(),True), \
    StructField('idade',IntegerType(),True), \
    StructField('altura',DoubleType(),True), \
    StructField('peso', DoubleType(), True), \
  ])

'''
Criação do objeto (nível Python) que contém as tuplas com os regitros que farão
parte da tabela
'''
dados = [('Douglas',45,1.85,70.),
    ('Daniela',7,1.23,22.),
    ('Pedro',65,1.75,87.),
    ('Maria',64,1.67,64.),
    ('Eduardo',37,1.82,96.),
    ('Ester',37,1.73,68.)
  ]
'''
Criação do dataframe py spark juntando a estutura da tabela com o objeto
que contém os registros
'''
df_py = spark.createDataFrame(data=dados,schema=schema)
'''
Imprimir o schema do df_py
'''
df_py.printSchema()

'''
Mostrar o df_py
'''
df_py.show()

root
 |-- nome: string (nullable = true)
 |-- idade: integer (nullable = true)
 |-- altura: double (nullable = true)
 |-- peso: double (nullable = true)

+-------+-----+------+----+
|   nome|idade|altura|peso|
+-------+-----+------+----+
|Douglas|   45|  1.85|70.0|
|Daniela|    7|  1.23|22.0|
|  Pedro|   65|  1.75|87.0|
|  Maria|   64|  1.67|64.0|
|Eduardo|   37|  1.82|96.0|
|  Ester|   37|  1.73|68.0|
+-------+-----+------+----+



In [None]:
'''
Criando um dataframe PySpark a partir de um dataframe pandas
'''
df_py2 = spark.createDataFrame(df_pd)

'''
Imprimir o schema do df_py
'''
df_py2.printSchema()

'''
Mostrar o df_py
'''
df_py2.show()

  for column, series in pdf.iteritems():


root
 |-- nome: string (nullable = true)
 |-- idade: long (nullable = true)
 |-- altura: double (nullable = true)
 |-- peso: long (nullable = true)

+-------+-----+------+----+
|   nome|idade|altura|peso|
+-------+-----+------+----+
|Douglas|   45|  1.85|  70|
|Daniela|    7|  1.23|  22|
|  Pedro|   65|  1.75|  87|
|  Maria|   64|  1.67|  64|
|Eduardo|   37|  1.82|  96|
|  Ester|   37|  1.73|  68|
+-------+-----+------+----+



In [None]:
# Slicing no pandas
display(df_pd[0:1])

Unnamed: 0,nome,idade,altura,peso
0,Douglas,45,1.85,70


In [None]:
# Slicing no Pypark
df_py.filter(df_py.nome == 'Douglas')

nome,idade,altura,peso
Douglas,45,1.85,70.0


In [None]:
# Slicing no Pypark (alternativa estilo sql)
df_py.where(df_py.nome == 'Douglas')

nome,idade,altura,peso
Douglas,45,1.85,70.0


In [None]:
# Unir dois dataframes no pandas
nome2 = ['Douglas','Daniela']
idade2 = [45,7]
altura2 = [1.85,1.23]
peso2 = [70,22]
df_pd2 = pd.DataFrame(zip(nome2,idade2,altura2,peso2),columns=['nome','idade','altura','peso'])
display(df_pd2)

nome3 = ['Pedro','Maria']
idade3 = [69,68]
altura3 = [1.75,1.67]
peso3 = [96,65]
df_pd3 = pd.DataFrame(zip(nome3,idade3,altura3,peso3),columns=['nome','idade','altura','peso'])
display(df_pd3)
casa = pd.concat([df_pd2, df_pd3], ignore_index=True)
display(casa)

Unnamed: 0,nome,idade,altura,peso
0,Douglas,45,1.85,70
1,Daniela,7,1.23,22


Unnamed: 0,nome,idade,altura,peso
0,Pedro,69,1.75,96
1,Maria,68,1.67,65


Unnamed: 0,nome,idade,altura,peso
0,Douglas,45,1.85,70
1,Daniela,7,1.23,22
2,Pedro,69,1.75,96
3,Maria,68,1.67,65


In [None]:
# Unir dois dataframes no PySpark
douglas = df_py.filter(df_py.nome == 'Douglas')
daniela = df_py.filter(df_py.nome == 'Daniela')
filtro = douglas.union(daniela)
filtro.show()
n1 = df_py.filter(df_py.nome == 'Pedro')
n2 = df_py.filter(df_py.nome == 'Maria')
n3 = n1.union(n2)
n3.show()
df = filtro.union(n3)
df.show()

+-------+-----+------+----+
|   nome|idade|altura|peso|
+-------+-----+------+----+
|Douglas|   45|  1.85|70.0|
|Daniela|    7|  1.23|22.0|
+-------+-----+------+----+

+-----+-----+------+----+
| nome|idade|altura|peso|
+-----+-----+------+----+
|Pedro|   65|  1.75|87.0|
|Maria|   64|  1.67|64.0|
+-----+-----+------+----+

+-------+-----+------+----+
|   nome|idade|altura|peso|
+-------+-----+------+----+
|Douglas|   45|  1.85|70.0|
|Daniela|    7|  1.23|22.0|
|  Pedro|   65|  1.75|87.0|
|  Maria|   64|  1.67|64.0|
+-------+-----+------+----+



In [None]:
df_py3 = spark.createDataFrame(df_pd2)
df_py4 = spark.createDataFrame(df_pd3)
df_py5 = df_py3.union(df_py4)
df_py5.show()

  for column, series in pdf.iteritems():


+-------+-----+------+----+
|   nome|idade|altura|peso|
+-------+-----+------+----+
|Douglas|   45|  1.85|  70|
|Daniela|    7|  1.23|  22|
|  Pedro|   69|  1.75|  96|
|  Maria|   68|  1.67|  65|
+-------+-----+------+----+

