In [None]:
# Treinamento de PySpark para Engenheiros
## 02. Conceitos básicos de PySpark

### Objetivo

* Aprender a criar DataFrames
* Aprender a manipular colunas

### Highlights

* `SparkContext.getOrCreate()` Obtenha ou instancie um SparkContext e registre-o como um objeto singleton.
* `SparkSession()` SparkSession, os aplicativos podem criar DataFrames a partir de um RDD existente, de uma tabela Hive ou de fontes de dados Spark.
* `sc.parallelize` é uma função no SparkContext e é usada para criar um RDD de uma coleção de listas.
* `spark.read.load()` informa ao Spark que o arquivo contém uma linha de cabeçalho.

### Implementação
Para cada JOB PySpark, **always create a context**. Isso basicamente significa que você obtém um (ou mais) contêineres executando um JOB PySpark alocado para você/aplicação.

In [8]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import *

#-----------------------------------#

sc = SparkContext.getOrCreate()
spark =  SparkSession(sc)

Verificar se o Spark context foi criado:

In [22]:
spark

#### Criando Data Frames

In [14]:
%%file csvfile01.csv
paulo,chaves,bot,34
silvio,dias,bot,35

Overwriting csvfile01.csv


In [15]:
%%file csvfile02.csv
camila,silva,eudora,31
priscila,alves,eudora,18

Writing csvfile02.csv


In [7]:
## RDD

## RDD

RDD (Resilient Distributed Dataset) é um bloco de construção fundamental do PySpark, que é uma coleção de objetos distribuídos imutáveis ​​e tolerantes a falhas. Significado imutável, uma vez que você cria um RDD, não pode alterá-lo. Cada registro no RDD é dividido em partições lógicas, que podem ser computadas em diferentes nós do cluster

<img src="https://sparkbyexamples.com/wp-content/uploads/2020/08/rdd-creation-1024x635.png"/>


In [10]:
#1. Spark Create DataFrame from RDD

columns  =  ('languages','users')
data = (('java','2000'),('python','30000'))
rdd  = sc.parallelize(data)

df_rdd = rdd.toDF()
df_rdd.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)



In [11]:
# 2. Create DataFrame from Dictionary

data_map = { 'language':['python','java','c#'],
         'user':['30k','20k','10k'],
        'spped': ['2x','4x','8x']}


map_1 = [(k,)+(v,) for k,v in data_map.items()]


df = spark.createDataFrame(map_1,['key','val'])

df.show()

+--------+------------------+
|     key|               val|
+--------+------------------+
|language|[python, java, c#]|
|    user|   [30k, 20k, 10k]|
|   spped|      [2x, 4x, 8x]|
+--------+------------------+



In [13]:
#3. Using createDataFrame() with the Row type

from pyspark.sql.types import StructType,StructField,IntegerType,StringType

data = [(1,'12102021','13102021'),
       (2,'11112021','21112021'),
       (3,'2102021','15102021')]
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("created_at", StringType(), True),
    StructField("updated_at", StringType(), True)
])

df2  = spark.createDataFrame(data= data,schema  = schema)
df2.show()

+---+----------+----------+
| id|created_at|updated_at|
+---+----------+----------+
|  1|  12102021|  13102021|
|  2|  11112021|  21112021|
|  3|   2102021|  15102021|
+---+----------+----------+



In [None]:
Verifique o caminho absoluto do CSV:

In [14]:
!pwd

/home/colaborador/notebooks


In [15]:
#4.  Create Spark DataFrame from CSV


filename  ='/home/colaborador/notebooks/Base_2019.csv'

df  = spark.read.csv(filename)
df.show()


+--------+-------------------+--------+-----------+----------+---------+
|     _c0|                _c1|     _c2|        _c3|       _c4|      _c5|
+--------+-------------------+--------+-----------+----------+---------+
|ID_MARCA|              MARCA|ID_LINHA|      LINHA|DATA_VENDA|QTD_VENDA|
|       4|               VULT|       2| PERFUMARIA|01/15/2019|        7|
|       4|               VULT|       2| PERFUMARIA|10/12/2019|        2|
|       3|QUEM DISSE BERENICE|       5|HIDRATANTES|09/01/2019|        3|
|       3|QUEM DISSE BERENICE|       3|  MAQUIAGEM|07/29/2019|       17|
|       1|          BOTICÁRIO|       4|      SOLAR|11/15/2019|       19|
|       2|             EUDORA|       1|    CABELOS|03/08/2019|        9|
|       2|             EUDORA|       4|      SOLAR|02/24/2019|        6|
|       3|QUEM DISSE BERENICE|       1|    CABELOS|09/27/2019|        4|
|       5|      BELEZA NA WEB|       3|  MAQUIAGEM|01/26/2019|       12|
|       4|               VULT|       1|    CABELOS|

In [16]:
'''Other fomat supported 
Xml
json
text
tsv
avro
parquet
HBase
jdbc
hive
'''

'Other fomat supported \nXml\njson\ntext\ntsv\navro\nparquet\n HBase\njdbc\nhive\n'

In [17]:
#B. Função que utiliza a linha de cabeçalho como header


filename  ='/home/colaborador/notebooks/Base_2019.csv'

df  = spark.read.load(filename,format ='csv',header="true")
df.show()

+--------+-------------------+--------+-----------+----------+---------+
|ID_MARCA|              MARCA|ID_LINHA|      LINHA|DATA_VENDA|QTD_VENDA|
+--------+-------------------+--------+-----------+----------+---------+
|       4|               VULT|       2| PERFUMARIA|01/15/2019|        7|
|       4|               VULT|       2| PERFUMARIA|10/12/2019|        2|
|       3|QUEM DISSE BERENICE|       5|HIDRATANTES|09/01/2019|        3|
|       3|QUEM DISSE BERENICE|       3|  MAQUIAGEM|07/29/2019|       17|
|       1|          BOTICÁRIO|       4|      SOLAR|11/15/2019|       19|
|       2|             EUDORA|       1|    CABELOS|03/08/2019|        9|
|       2|             EUDORA|       4|      SOLAR|02/24/2019|        6|
|       3|QUEM DISSE BERENICE|       1|    CABELOS|09/27/2019|        4|
|       5|      BELEZA NA WEB|       3|  MAQUIAGEM|01/26/2019|       12|
|       4|               VULT|       1|    CABELOS|02/24/2019|        9|
|       3|QUEM DISSE BERENICE|       2| PERFUMARIA|

In [20]:
# Renomear coluna

df  = df.withColumnRenamed("DATA_VENDA","DATA DE VENDA")

df.show()

+--------+-------------------+--------+-----------+-------------+---------+
|ID_MARCA|              MARCA|ID_LINHA|      LINHA|DATA DE VENDA|QTD_VENDA|
+--------+-------------------+--------+-----------+-------------+---------+
|       4|               VULT|       2| PERFUMARIA|   01/15/2019|        7|
|       4|               VULT|       2| PERFUMARIA|   10/12/2019|        2|
|       3|QUEM DISSE BERENICE|       5|HIDRATANTES|   09/01/2019|        3|
|       3|QUEM DISSE BERENICE|       3|  MAQUIAGEM|   07/29/2019|       17|
|       1|          BOTICÁRIO|       4|      SOLAR|   11/15/2019|       19|
|       2|             EUDORA|       1|    CABELOS|   03/08/2019|        9|
|       2|             EUDORA|       4|      SOLAR|   02/24/2019|        6|
|       3|QUEM DISSE BERENICE|       1|    CABELOS|   09/27/2019|        4|
|       5|      BELEZA NA WEB|       3|  MAQUIAGEM|   01/26/2019|       12|
|       4|               VULT|       1|    CABELOS|   02/24/2019|        9|
|       3|QU

In [21]:
# Renomear multiplas colunas

df  = df.withColumnRenamed("ID_MARCA","ID DA MARCA") \
        .withColumnRenamed('QTD_VENDA','QTDE DE VENDAS')

df.show()

+-----------+-------------------+--------+-----------+-------------+--------------+
|ID DA MARCA|              MARCA|ID_LINHA|      LINHA|DATA DE VENDA|QTDE DE VENDAS|
+-----------+-------------------+--------+-----------+-------------+--------------+
|          4|               VULT|       2| PERFUMARIA|   01/15/2019|             7|
|          4|               VULT|       2| PERFUMARIA|   10/12/2019|             2|
|          3|QUEM DISSE BERENICE|       5|HIDRATANTES|   09/01/2019|             3|
|          3|QUEM DISSE BERENICE|       3|  MAQUIAGEM|   07/29/2019|            17|
|          1|          BOTICÁRIO|       4|      SOLAR|   11/15/2019|            19|
|          2|             EUDORA|       1|    CABELOS|   03/08/2019|             9|
|          2|             EUDORA|       4|      SOLAR|   02/24/2019|             6|
|          3|QUEM DISSE BERENICE|       1|    CABELOS|   09/27/2019|             4|
|          5|      BELEZA NA WEB|       3|  MAQUIAGEM|   01/26/2019|        

In [25]:
# Copiar uma coluna e renomear

df = df.withColumn('MARCA2',col('MARCA'))
df.show()

+-----------+-------------------+--------+-----------+-------------+--------------+-------------------+
|ID DA MARCA|              MARCA|ID_LINHA|      LINHA|DATA DE VENDA|QTDE DE VENDAS|             MARCA2|
+-----------+-------------------+--------+-----------+-------------+--------------+-------------------+
|          4|               VULT|       2| PERFUMARIA|   01/15/2019|             7|               VULT|
|          4|               VULT|       2| PERFUMARIA|   10/12/2019|             2|               VULT|
|          3|QUEM DISSE BERENICE|       5|HIDRATANTES|   09/01/2019|             3|QUEM DISSE BERENICE|
|          3|QUEM DISSE BERENICE|       3|  MAQUIAGEM|   07/29/2019|            17|QUEM DISSE BERENICE|
|          1|          BOTICÁRIO|       4|      SOLAR|   11/15/2019|            19|          BOTICÁRIO|
|          2|             EUDORA|       1|    CABELOS|   03/08/2019|             9|             EUDORA|
|          2|             EUDORA|       4|      SOLAR|   02/24/2

In [26]:
# Renomar coluna usando o alias

df.select(df["MARCA2"].alias("NOME_MARCAS"),"LINHA").show()

+-------------------+-----------+
|        NOME_MARCAS|      LINHA|
+-------------------+-----------+
|               VULT| PERFUMARIA|
|               VULT| PERFUMARIA|
|QUEM DISSE BERENICE|HIDRATANTES|
|QUEM DISSE BERENICE|  MAQUIAGEM|
|          BOTICÁRIO|      SOLAR|
|             EUDORA|    CABELOS|
|             EUDORA|      SOLAR|
|QUEM DISSE BERENICE|    CABELOS|
|      BELEZA NA WEB|  MAQUIAGEM|
|               VULT|    CABELOS|
|QUEM DISSE BERENICE| PERFUMARIA|
|QUEM DISSE BERENICE|      SOLAR|
|             EUDORA|HIDRATANTES|
|               VULT|HIDRATANTES|
|          BOTICÁRIO|    CABELOS|
|             EUDORA|      SOLAR|
|      BELEZA NA WEB|HIDRATANTES|
|             EUDORA|HIDRATANTES|
|             EUDORA|  MAQUIAGEM|
|          BOTICÁRIO| PERFUMARIA|
+-------------------+-----------+
only showing top 20 rows

