-------------------------------------------------------------
Libs e Preparação do ambiente Spark
-------------------------------------------------------------



In [33]:
# Java jdk utilitários 
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [34]:
#Download do Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

# Descompactando os arquivos
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [35]:
# Importando a biblioteca os
import os

# Definindo a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Definindo a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [36]:
# Findspark
!pip install -q findspark

In [37]:
import findspark

# Iniciando o findspark
findspark.init()

In [38]:
# iniciar uma seção Spark
from pyspark.sql import SparkSession

# iniciando o spark context
spark = SparkSession.builder.master('local[*]').getOrCreate()
spark

# Usando DataFrame (DF)

In [39]:
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType
from datetime import date,datetime
from pyspark.sql import Row

In [40]:
#-----------------
#Abrindo um CSV
#pyspark.rdd.RDD
#-----------------

#file= spark.sparkContext.textFile('/content/food_coded.csv')

#type(file) #pyspark.rdd.RDD


In [41]:
#-----------------
#Abrindo um CSV
#pyspark.sql.dataframe.DataFrame
#-----------------

#file=sc.read.csv('/content/food_coded.csv',header=True,sep=',',nanValue=None)


#type(file) #pyspark.sql.dataframe.DataFrame
#file.show()
#Classes de file

#file.printSchema()

In [42]:
#-----------------
#Criando um CSV do zero (em diferentes nós)
#pyspark.sql.dataframe.DataFrame
#-----------------

datf=spark.createDataFrame([
    Row(Cidade='Salvador', Populacao=6., Time='Bahia', Dat_cadastro=date(2021, 10, 10)),
    Row(Cidade='Sao Paulo',Populacao=45., Time='Sao Paulo', Dat_cadastro=date(2021, 5, 5)),
    Row(Cidade='Recife', Populacao=3., Time='Sport',Dat_cadastro=date(2021, 7, 2)),
    Row(Cidade='Maceio', Populacao=2., Time='CSA', Dat_cadastro=date(2021, 8, 1)),
])
#datf


datf.show()

+---------+---------+---------+------------+
|   Cidade|Populacao|     Time|Dat_cadastro|
+---------+---------+---------+------------+
| Salvador|      6.0|    Bahia|  2021-10-10|
|Sao Paulo|     45.0|Sao Paulo|  2021-05-05|
|   Recife|      3.0|    Sport|  2021-07-02|
|   Maceio|      2.0|      CSA|  2021-08-01|
+---------+---------+---------+------------+



In [43]:
# Exibindo tres registros DF na vertical
datf.show(3, vertical=True)
#ou 
#datf.collect()

-RECORD 0------------------
 Cidade       | Salvador   
 Populacao    | 6.0        
 Time         | Bahia      
 Dat_cadastro | 2021-10-10 
-RECORD 1------------------
 Cidade       | Sao Paulo  
 Populacao    | 45.0       
 Time         | Sao Paulo  
 Dat_cadastro | 2021-05-05 
-RECORD 2------------------
 Cidade       | Recife     
 Populacao    | 3.0        
 Time         | Sport      
 Dat_cadastro | 2021-07-02 
only showing top 3 rows



In [44]:
# Analise rápida  dos dados
datf.select("Cidade", "Populacao").describe().show()

+-------+---------+-----------------+
|summary|   Cidade|        Populacao|
+-------+---------+-----------------+
|  count|        4|                4|
|   mean|     null|             14.0|
| stddev|     null|20.73644135332772|
|    min|   Maceio|              2.0|
|    max|Sao Paulo|             45.0|
+-------+---------+-----------------+



In [45]:
# Exibindo o Schema do Dataframe
datf.printSchema()

root
 |-- Cidade: string (nullable = true)
 |-- Populacao: double (nullable = true)
 |-- Time: string (nullable = true)
 |-- Dat_cadastro: date (nullable = true)



In [46]:
# Carregando o modulo de funções, expecificamente a função Upper
from pyspark.sql.functions import upper 


datf.withColumn('Cidade_U', upper(datf.Cidade)).show() # cidades ficam em caixa alta em  Cidade_U

+---------+---------+---------+------------+---------+
|   Cidade|Populacao|     Time|Dat_cadastro| Cidade_U|
+---------+---------+---------+------------+---------+
| Salvador|      6.0|    Bahia|  2021-10-10| SALVADOR|
|Sao Paulo|     45.0|Sao Paulo|  2021-05-05|SAO PAULO|
|   Recife|      3.0|    Sport|  2021-07-02|   RECIFE|
|   Maceio|      2.0|      CSA|  2021-08-01|   MACEIO|
+---------+---------+---------+------------+---------+



In [47]:
#Select data

datf.filter(datf.Cidade == "Salvador").show()

+--------+---------+-----+------------+
|  Cidade|Populacao| Time|Dat_cadastro|
+--------+---------+-----+------------+
|Salvador|      6.0|Bahia|  2021-10-10|
+--------+---------+-----+------------+



In [48]:
# Criando uma tabela temporária em memória com os dados e utilizando consulta SQL


datf.createOrReplaceTempView("Dados")
spark.sql("select count(*) from Dados").show()

+--------+
|count(1)|
+--------+
|       4|
+--------+



## Converter RDD --> DF

In [49]:
# carregando os dados sobre Capital de paises
pais = [("Brasil",10000),("Argentina",20000),("Australia",35000),("Italia",40000),("Egito",65000),("Mexico",80000)]
rddpais= spark.sparkContext.parallelize(pais)


# convertendo em DF
dataframerdd= rddpais.toDF()
#type(dataframerdd) #pyspark.sql.dataframe.DataFrame


dataframerdd.show()

+---------+-----+
|       _1|   _2|
+---------+-----+
|   Brasil|10000|
|Argentina|20000|
|Australia|35000|
|   Italia|40000|
|    Egito|65000|
|   Mexico|80000|
+---------+-----+



In [50]:
# Criando o schema das colunas dos campos do DF
Colunas = ["Pais","Total_capital(Bilhoes)"]
dataframerdd2= rddpais.toDF(Colunas)
dataframerdd2.printSchema()
dataframerdd2.show(truncate=False)

root
 |-- Pais: string (nullable = true)
 |-- Total_capital(Bilhoes): long (nullable = true)

+---------+----------------------+
|Pais     |Total_capital(Bilhoes)|
+---------+----------------------+
|Brasil   |10000                 |
|Argentina|20000                 |
|Australia|35000                 |
|Italia   |40000                 |
|Egito    |65000                 |
|Mexico   |80000                 |
+---------+----------------------+



##Tratamento dos dados em DF

In [56]:
df = spark.read.option("header", "true").csv("food_coded.csv")

df.show(3)

+-----+------+---------+----------------+------------+--------------+------+--------------------+--------------------+---------------------------+----+----------------------------+-------+--------------------+------------------+-----+--------------------+--------------------+---------------------+----------+----------+-----------+--------+----------------+-----------------+--------------+-----------------+--------+--------------------+-----+---------+-----------+----------+---------------+--------------------+--------------------+----------------+------+-----------+------------+--------------+--------------+--------------------+----------------+-----------------+-----------------+-------------+------------+------------+------------+----------------------+----+------+---------+-----------------+---------------+-----------+-----------+--------+---------------+--------------------+
|  GPA|Gender|breakfast|calories_chicken|calories_day|calories_scone|coffee|        comfort_food|comfort_foo

In [57]:
df.printSchema()

root
 |-- GPA: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- breakfast: string (nullable = true)
 |-- calories_chicken: string (nullable = true)
 |-- calories_day: string (nullable = true)
 |-- calories_scone: string (nullable = true)
 |-- coffee: string (nullable = true)
 |-- comfort_food: string (nullable = true)
 |-- comfort_food_reasons: string (nullable = true)
 |-- comfort_food_reasons_coded9: string (nullable = true)
 |-- cook: string (nullable = true)
 |-- comfort_food_reasons_coded11: string (nullable = true)
 |-- cuisine: string (nullable = true)
 |-- diet_current: string (nullable = true)
 |-- diet_current_coded: string (nullable = true)
 |-- drink: string (nullable = true)
 |-- eating_changes: string (nullable = true)
 |-- eating_changes_coded: string (nullable = true)
 |-- eating_changes_coded1: string (nullable = true)
 |-- eating_out: string (nullable = true)
 |-- employment: string (nullable = true)
 |-- ethnic_food: string (nullable = true)
 |-- e

In [58]:
# Selecionando alguns campos
df.select('mother_profession','type_sports', 'Gender', 'weight','calories_day').show(30)

+--------------------+--------------------+------+--------------------+------------+
|   mother_profession|         type_sports|Gender|              weight|calories_day|
+--------------------+--------------------+------+--------------------+------------+
|          unemployed|          car racing|     2|                 187|         nan|
|           Nurse RN |         Basketball |     1|                 155|           3|
|       owns business|                none|     1|I'm not answering...|           4|
|                null|                null|     1|                null|           3|
|                null|                null|  null|                null|        null|
|                null|                null|     4|                null|           1|
|Substance Abuse C...|            Softball|     1|                 190|           2|
|        Hair Braider|               None.|     1|                 190|           3|
|          Journalist|              soccer|     2|               

In [67]:
# filtros no PySpark
from pyspark.sql.functions import *


# Filtrando os dados não nulos 
resultdf1 = df.filter(df.weight.isNotNull()).select('mother_profession','type_sports', 'Gender', 'weight','calories_day')

resultdf1.show()

+--------------------+--------------------+------+--------------------+------------+
|   mother_profession|         type_sports|Gender|              weight|calories_day|
+--------------------+--------------------+------+--------------------+------------+
|          unemployed|          car racing|     2|                 187|         nan|
|           Nurse RN |         Basketball |     1|                 155|           3|
|       owns business|                none|     1|I'm not answering...|           4|
|Substance Abuse C...|            Softball|     1|                 190|           2|
|        Hair Braider|               None.|     1|                 190|           3|
|          Journalist|              soccer|     2|                 180|           3|
|                cook|                none|     1|                 137|           3|
|Elementary School...|                none|     1|                 180|         nan|
|  Pharmaceutical rep|        field hockey|     1|               

In [73]:
#Selecionando apenas uma coluna
df.select('weight').show()

+--------------------+
|              weight|
+--------------------+
|                 187|
|                 155|
|I'm not answering...|
|                null|
|                null|
|                null|
|                 190|
|                 190|
|                 180|
|                 137|
|                 180|
|                 125|
|                 116|
|                 110|
|                 264|
|                 123|
|                 185|
|                 180|
|                 145|
|                 170|
+--------------------+
only showing top 20 rows



In [61]:
# Retirando por meio de REGEX todos os carcteres de weight

resultdf2 = resultdf1.filter(col("weight").rlike("^[0-9]*$"))
resultdf2.show()

+--------------------+--------------------+------+------+------------+
|   mother_profession|         type_sports|Gender|weight|calories_day|
+--------------------+--------------------+------+------+------------+
|          unemployed|          car racing|     2|   187|         nan|
|           Nurse RN |         Basketball |     1|   155|           3|
|Substance Abuse C...|            Softball|     1|   190|           2|
|        Hair Braider|               None.|     1|   190|           3|
|          Journalist|              soccer|     2|   180|           3|
|                cook|                none|     1|   137|           3|
|Elementary School...|                none|     1|   180|         nan|
|  Pharmaceutical rep|        field hockey|     1|   125|           3|
|     Chidos Cleaners|              soccer|     1|   116|           3|
|     Court Reporter |             Running|     1|   110|           4|
|Child care provider |Soccer and basket...|     2|   264|           3|
|     

In [74]:
# GROUPBY
resultdf2.groupBy("Gender", "weight").count().orderBy("weight").show()

+------+------+-----+
|Gender|weight|count|
+------+------+-----+
|     1|     1|    1|
|     1|   100|    1|
|     1|   105|    1|
|     1|   110|    1|
|     1|   112|    1|
|     1|   113|    1|
|     1|   115|    1|
|     1|   116|    1|
|     1|   120|    3|
|     1|   123|    1|
|     1|   125|    4|
|     1|   127|    1|
|     1|   128|    2|
|     1|   129|    2|
|     1|   130|    3|
|     1|   135|    7|
|     2|   135|    1|
|     1|   137|    1|
|     2|   140|    2|
|     1|   140|    6|
+------+------+-----+
only showing top 20 rows



In [76]:
#Seleciona weight >= 150, indicando sobrepeso, separando por tipo de esporte

resultdf3 = resultdf2.select("type_sports", when(resultdf2.weight >= 150, "Sobrepeso"))

resultdf3.show()


+--------------------+--------------------------------------------+
|         type_sports|CASE WHEN (weight >= 150) THEN Sobrepeso END|
+--------------------+--------------------------------------------+
|          car racing|                                   Sobrepeso|
|         Basketball |                                   Sobrepeso|
|            Softball|                                   Sobrepeso|
|               None.|                                   Sobrepeso|
|              soccer|                                   Sobrepeso|
|                none|                                        null|
|                none|                                   Sobrepeso|
|        field hockey|                                        null|
|              soccer|                                        null|
|             Running|                                        null|
|Soccer and basket...|                                   Sobrepeso|
|intramural volley...|                          

In [78]:
#Fill nulos

resultdf4 = resultdf3.filter(resultdf3[1].isNotNull())

resultdf4.show()

+--------------------+--------------------------------------------+
|         type_sports|CASE WHEN (weight >= 150) THEN Sobrepeso END|
+--------------------+--------------------------------------------+
|          car racing|                                   Sobrepeso|
|         Basketball |                                   Sobrepeso|
|            Softball|                                   Sobrepeso|
|               None.|                                   Sobrepeso|
|              soccer|                                   Sobrepeso|
|                none|                                   Sobrepeso|
|Soccer and basket...|                                   Sobrepeso|
|              Hockey|                                   Sobrepeso|
|              Hockey|                                   Sobrepeso|
|              hockey|                                   Sobrepeso|
|          basketball|                                   Sobrepeso|
|              Soccer|                          

In [79]:
# Filtra none, None., nan do primeiro campo ( type_sports)
resultdf4.filter((resultdf3[0] != 'none') & (resultdf3[0] != 'None.') & (resultdf3[0] != 'nan')).show()

+--------------------+--------------------------------------------+
|         type_sports|CASE WHEN (weight >= 150) THEN Sobrepeso END|
+--------------------+--------------------------------------------+
|          car racing|                                   Sobrepeso|
|         Basketball |                                   Sobrepeso|
|            Softball|                                   Sobrepeso|
|              soccer|                                   Sobrepeso|
|Soccer and basket...|                                   Sobrepeso|
|              Hockey|                                   Sobrepeso|
|              Hockey|                                   Sobrepeso|
|              hockey|                                   Sobrepeso|
|          basketball|                                   Sobrepeso|
|              Soccer|                                   Sobrepeso|
|              Tennis|                                   Sobrepeso|
|   tennis soccer gym|                          