<a href="https://colab.research.google.com/github/MarianaDuartee/Pyspark/blob/main/Pyspark3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 62.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=ce2aaf46bcd614e7033c75c50fba3fce6a2644b8414180eeb930a04fe3853d41
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [3]:
spark = (SparkSession.builder
         .master("local")
         .appName("aprendendo-dataframes")
         .config("spark.ui.port","4050")
         .getOrCreate())

In [None]:
df = (spark
      .read
      .format("csv")
      .option("header","true")
      .option("inferschema","true")
      .option("delimiter",";")
      .load("/content/drive/MyDrive/DataSets/arquivo_geral.csv")
      )

df.show()

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).



---

#Outras Funções - Pyspark


---



#WHEN - OTHERWISE

* Condições para criar ou não novas colunas

* Parecido com IF | ELSE, onde IF = WHEN e ELSE = OTHERWISE

* Exemplo: Criar uma nova coluna com as seguintes condições:

> Se o numero de casos novos for maior que zero, acrescentar o texto "Tem casos Novos"

> Se o numero de casos novos não for maior que zero, acrescentar o texto "Sem casos Novos"

1. O código abaixo cria um novo dataframe (df2) com o dataframe original (df)
2. Além das colunas presentes no df original, cria uma nova coluna chamada status (withColumn("status") ...
3. Os valores passados para essa nova coluna possuem condições especificas (**F.when**(condição , valor passado caso a condição seja True).**otherwise**(valor passado caso a condição seja False)

In [None]:
df2 = (df.withColumn("status",F.when(F.col("casosNovos")>0,"Tem Casos Novos")
       .otherwise("Sem casos novos")))

df2.show(100)

#Concat

* F.concat (coluna1,F.lit('texto a ser concatenado')

* Exemplo: Concatenando a quantidade de casos novos com o status

In [None]:
df3 = (df.withColumn("status", 
                     F.when(F.col('casosNovos') > 0, F.concat(df.casosNovos,F.lit(" casos novos")))
                     .otherwise("Não tem casos novos.")
                     ))

df3.select(F.col('status'),F.col('casosNovos'),F.col('data')).filter(F.col('status') != 'Não tem casos novos.').show()

#Substring 
* Exige 3 parametros

* Primeiro : A propria string que esta percorrendo

* Segundo : Posição inicial que quer pegar da string

* Terceiro: Tamanho da substring que quer pegar

* F.substring('valor da string', 'Posição inicial', 'Tamanho total')

* Exemplo: Criar colunas com Dia/Mês/Ano para ficar mais organizado no Dataframe

> Isso é feito com a utilização da withColumn que cria uma nova coluna junto com a substring 

> A substring mostra uma parte da string de acordo com os parâmetros escolhidos 

In [None]:
df3 = (df.withColumn("Dia", F.substring(df.data,9,2))
         .withColumn("Mês",F.substring(df.data,6,2))
         .withColumn("Ano",F.substring(df.data,1,4)))

df3.select(F.col('data'),F.col('Dia'),F.col("Mês"),F.col("Ano")).show()

#StructType

* Definir a Estrutura do DataFrame

* Definir quais as colunas e tipos de dados estão vindo do DataFrame

In [23]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [24]:
dados = [
         ("Mariana","Duarte","Moreira","12345","F",3500),
         ("João","da","Silva","87663","M",2500),
         ("Priscila","dos","Santos","12879","F",1500),
         ("Carla","Paixão","dos Anjos","97856","F",2700),
         ("Edson","de","Almeida","44023","F",2200)
]

schema = (
    StructType([
                StructField("primeiro_nome",StringType(),True),
                StructField("nome_do_meio",StringType(),True),
                StructField("ultimo_nome",StringType(),True),
                StructField("codigo",StringType(),True),
                StructField("genero",StringType(),True),
                StructField("salario",IntegerType(),True)
    ])
)

df = spark.createDataFrame(data=dados,schema=schema)

df.show()

+-------------+------------+-----------+------+------+-------+
|primeiro_nome|nome_do_meio|ultimo_nome|codigo|genero|salario|
+-------------+------------+-----------+------+------+-------+
|      Mariana|      Duarte|    Moreira| 12345|     F|   3500|
|         João|          da|      Silva| 87663|     M|   2500|
|     Priscila|         dos|     Santos| 12879|     F|   1500|
|        Carla|      Paixão|  dos Anjos| 97856|     F|   2700|
|        Edson|          de|    Almeida| 44023|     F|   2200|
+-------------+------------+-----------+------+------+-------+



In [25]:
df.printSchema()

root
 |-- primeiro_nome: string (nullable = true)
 |-- nome_do_meio: string (nullable = true)
 |-- ultimo_nome: string (nullable = true)
 |-- codigo: string (nullable = true)
 |-- genero: string (nullable = true)
 |-- salario: integer (nullable = true)



In [26]:
dados = [
         (("João", "da", "Silva"), "12345", "M", 1000),
         (("Priscila", "dos", "Santos"), "87663", "F", 2200),
         (("Carla", "Pereira", "Costa"), "12697", "F", 3800),
         (("Edson", "Paixão", "dos Anjos"), "44023", "M", 1500),
         (("Stella", "de", "Almeida"), "110001", "F", 1560)
]

schema = StructType([
        StructField("nome", StructType([
             StructField("primeiro_nome", StringType(), True),
             StructField("nome_do_meio", StringType(), True),
             StructField("ultimo_nome", StringType(), True) 
        ])),
        StructField("codigo", StringType(), True),
        StructField("genero", StringType(), True),
        StructField("salario", IntegerType(), True)
])

df1 = spark.createDataFrame(data=dados, schema=schema)
df1.printSchema()

root
 |-- nome: struct (nullable = true)
 |    |-- primeiro_nome: string (nullable = true)
 |    |-- nome_do_meio: string (nullable = true)
 |    |-- ultimo_nome: string (nullable = true)
 |-- codigo: string (nullable = true)
 |-- genero: string (nullable = true)
 |-- salario: integer (nullable = true)



In [27]:
df1.show()

+--------------------+------+------+-------+
|                nome|codigo|genero|salario|
+--------------------+------+------+-------+
|   {João, da, Silva}| 12345|     M|   1000|
|{Priscila, dos, S...| 87663|     F|   2200|
|{Carla, Pereira, ...| 12697|     F|   3800|
|{Edson, Paixão, d...| 44023|     M|   1500|
|{Stella, de, Alme...|110001|     F|   1560|
+--------------------+------+------+-------+

