# Pyspark

### Spark: Session

In [2]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName("session").getOrCreate()

### Spark: File reading

#### ▪️ Reading text files

In [48]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName("session").getOrCreate()

#Efetuando leitura dos Dados
dataframe_file_txt = spark.read.text("./datasets/sample_dataset_antibiotics.txt", lineSep="\r")

#Exibindo Schema
dataframe_file_txt.printSchema()

# Efetua Transformação dos Dados 
dataframe_transform = dataframe_file_txt.withColumn(
    "line",
    function.monotonically_increasing_id()
).filter(
    function.col("line") > 0
).select(
     function.col("line").cast("integer")
    ,function.col("value").alias("original_data")
    ,function.regexp_replace(function.split(function.col("value"), "\t")[0],"\n", "").cast("string").alias("agent")
    ,function.split(function.col("value"), "\t")[1].cast("string").alias("abbr")
    ,function.split(function.col("value"), "\t")[2].cast("string").alias("S")
    ,function.split(function.col("value"), "\t")[3].cast("string").alias("I")
    ,function.split(function.col("value"), "\t")[4].cast("string").alias("R")
)

# Criação da Estrutura de Dados
struct = StructType([
     StructField("line", IntegerType(), nullable=False)
    ,StructField("original_data", StringType(), nullable=False)
    ,StructField("agent", StringType(), nullable=False)
    ,StructField("abbr", StringType(), nullable=False)
    ,StructField("S", StringType(), nullable=False)
    ,StructField("I", StringType(), nullable=False)
    ,StructField("R", StringType(), nullable=False)
])

# Criação do Dataframe com os Dados Estruturados
dataframe_antibiotics = spark.createDataFrame(data=dataframe_transform.rdd, schema=struct)

# Visualização dos Dados
dataframe_antibiotics.show(truncate=False,vertical=False)


root
 |-- value: string (nullable = true)

+----+--------------------------------------+-----------+----+----+-----+-----+
|line|original_data                         |agent      |abbr|S   |I    |R    |
+----+--------------------------------------+-----------+----+----+-----+-----+
|1   |\nAmpicillin\tAMP\t<=8\t16\t>=32      |Ampicillin |AMP |<=8 |16   |>=32 |
|2   |\nCefotetan\tCTT\t<=16\t32\t>=64      |Cefotetan  |CTT |<=16|32   |>=64 |
|3   |\nCefdinir\tCFD\t<=1\t2\t>=4          |Cefdinir   |CFD |<=1 |2    |>=4  |
|4   |\nErtapenem\tERT\t<=2\t4\t>=8         |Ertapenem  |ERT |<=2 |4    |>=8  |
|5   |\nTicarcillin\tTIC\t<=16\t32-64\t>=128|Ticarcillin|TIC |<=16|32-64|>=128|
+----+--------------------------------------+-----------+----+----+-----+-----+



In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Definindo Schema do Dataset
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

#Efetuando Leitura dos Dados
dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')

#Salvando DataFrame em Memoria
dataFrameCSV.cache()

#Salvando DataFrame em Disco
dataFrameCSV.persist()

#### Spark: Criando Dataframe
*- Cria uma Dataframe manualmente*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

# Estrutura de Dados
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

# Definição do Schema
schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
#Criação do Dataframe
df = spark.createDataFrame(data=data,schema=schema)

#Exibição do Schema
df.printSchema()

#Exibição dos Dados
df.show(truncate=False)

#### Spark: Criando uma VIEW
*- Cria uma view com base em um Dataframe*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Cria Dataset de test
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')
df_test = dataFrameCSV.select('kms_driven','owner').limit(10)

#Cria View no Spark
df_test.createOrReplaceTempView("df_test")

#Executa Consulta Usando SQL
spark.sql("""
    SELECT 
    DATE_FORMAT(NOW(),'dd/MM/yyyy hh:ss') AS dt_atual, 
    kms_driven 
    FROM df_test"""
).show()


#### Spark: Listando Tabelas do Spark
*- Lista todas as tabelas catalogadas no Spark*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Cria Dataset de test
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')
df_test = dataFrameCSV.select('kms_driven','owner').limit(10)

#Cria View no Spark
df_test.createOrReplaceTempView("df_test")

#Lista Tabelas do Catálogo do Spark
tbls = spark.catalog.listTables()
print(tbls)

#### Spark: Adicionando Colunas ao DataFrame
*- Adiciona 1 ou N colunas em um DF*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Cria Dataset de test
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')

#Adiciona Coluna
dataFrameCSV_NewColumn = dataFrameCSV \
                        .withColumn("name_new1",dataFrameCSV["name"])\
                        .withColumn("name_new2",dataFrameCSV["name"])

dataFrameCSV_NewColumn \
.select("name", "name_new1", "name_new2")\
.show(5)

#### Spark: Deletando Dataframe
*- Apagando Estrutura de Dados (DataFrame)*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Cria Dataset de test
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')

#Remove Dataframe
del dataFrameCSV

#### Spark: Select (*)

*- Seleção de todos os valores de todas as colunas*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Cria Dataset de test
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')

#Seleção de Todas as Colunas
dataFrameCSV.show(5)

#### Spark: Select [column_n, ...]

*- Selecão de todos os valores de colunas específicas*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Cria Dataset de test
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')

#Seleção de Coluna Explícita
dataFrameCSV.select('name', 'pirce').show(5)

#### Spark: Select TOP\LIMIT 

*- Seleção de um Limite de Linhas*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Cria Dataset de test
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')

# Seleção de 2 Linhas para todas as colunas
dataFrameCSV.limit(2).show() 

# Selecão de 2 Linhas para 1 coluna explícita
dataFrameCSV.select('name').limit(2).show() 

#### Spark: DISTINCT 

*- Seleção de Valores Distintos*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Cria Dataset de test
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')

# Efetua Seleção Distinta de Valor
dataFrameCSV.select('kms_driven').distinct().show(5)

#### Spark: ORDER BY

*- Ordernando colunas*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Cria Dataset de test
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')

# Usando função OrderBy

dataFrameCSV.select('kms_driven').distinct().orderBy('kms_driven', ascending=True).limit(5).show() # OrderBy - Ascendente
dataFrameCSV.select('kms_driven').distinct().orderBy('kms_driven', ascending=False).limit(5).show() # OrderBy - Descendente

# Usando função Sort

dataFrameCSV.select('kms_driven').distinct().sort(function.asc('kms_driven')).limit(5).show() # Sort - Ascendente
dataFrameCSV.select('kms_driven').distinct().sort(function.desc('kms_driven')).limit(5).show() # Sort - Descendente

#### Spark: GROUP BY - COUNT

*- Agrupando Valores*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Cria Dataset de test
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')

#Efetua Contagem dos Valores
dataFrameCSV.groupBy('kms_driven').agg(function.count('name').alias("Quantidade")).show(5)

#### Spark: GROUP BY - COUNT (com Ordernação)

*- Agrupando Valores*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Cria Dataset de test
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')

#Efetua Agrupamento dos Valores
dataFrameCSV.groupBy('kms_driven').agg(function.count('name').alias("Quantidade")).sort(function.desc("Quantidade")).show(5)

#### Spark: CASE

*- Coluna condicional*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Cria Dataset de test
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')

# Single Condition
dataFrameCSV.withColumn(
    'age_not_null',
    function.when(dataFrameCSV.age.isNull(),0).otherwise(dataFrameCSV.age)
).show(5)

# Multiple Condition
df_test = dataFrameCSV.select('kms_driven','owner').limit(10)

df_test.withColumn(
    'kms_driven',
    function\
        .when((~dataFrameCSV["kms_driven"].isin(["Ahmedabad","Delhi"]) | (dataFrameCSV["kms_driven"]=="Delhi")),0)\
        .otherwise(dataFrameCSV.kms_driven)
).show(10)

#### Spark: FILTER

*- Filtrando Dados*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Cria Dataset de test
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')

# String (Filter Single Value)
dataFrameCSV.select("name").filter(dataFrameCSV.name.like('%Yamaha SZ 150%')).show()

# Number (Filter Single Value)
dataFrameCSV.filter(dataFrameCSV["power"]==3.0).limit(3).show()

# Number (Filter Single Value)
dataFrameCSV.filter(dataFrameCSV["power"]==3.0).limit(3).show()

# Number (Filter Multiple Value - AND Operator)
dataFrameCSV.filter((dataFrameCSV["power"]==3.0) & (dataFrameCSV["brand"]==110.0)).limit(3).show()

# Number (Filter Multiple Value - OR Operator)
dataFrameCSV.filter((dataFrameCSV["power"]==5.0) | (dataFrameCSV["brand"]==110.0)).limit(3).show()

# List - (Filter Multipe Value IN)
list_kms_driven = ['Ahmedabad', 'Bangalore']
dataFrameCSV.filter(dataFrameCSV['kms_driven'].isin(list_kms_driven)).limit(3).show()

# List - (Filter Multipe Value NOT IN)
list_kms_driven = ['Ahmedabad', 'Bangalore', 'Delhi']
dataFrameCSV.filter(~(dataFrameCSV['kms_driven'].isin(list_kms_driven))).limit(3).show()

# List - (Filter NULL)
dataFrameCSV.filter(dataFrameCSV['age'].isNull()).limit(3).show()

# List - (Filter NOT NULL)
dataFrameCSV.filter(dataFrameCSV['age'].isNotNull()).limit(3).show()


#### Spark: ROW_NUMBER

*- Adicionando Indice de particionamento e ordernação de Dados*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Cria Dataset de test
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')
df_test = dataFrameCSV.select('kms_driven','owner').limit(10)

#Ordernação do índice ASCENDENTE

df_test\
.withColumn(
    "row_number", 
    function.row_number().over(Window.partitionBy("kms_driven").orderBy(function.asc("owner")))
).select(
    "row_number",
    "kms_driven",
    "owner"
).show(1000)

#Ordernação do índice DESCENDENTE

df_test\
.withColumn(
    "row_number", 
    function.row_number().over(Window.partitionBy("kms_driven").orderBy(function.desc("owner")))
).select(
    "row_number",
    "kms_driven",
    "owner"
).show(1000)

#### Spark: SQL
*- Utilizando comando SQL Explícito para construção de Consultas*

In [None]:
#Importação de Biblioteca
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as function
from pyspark.sql.window import *

#Criação da Sessão Spark
spark = pyspark.sql.SparkSession.builder.appName('session').getOrCreate()

#Cria Dataset de test
schema = StructType([
StructField("name", StringType(), True),
StructField("pirce", DoubleType(), True),
StructField("kms_driven", StringType(), True),
StructField("owner", DoubleType(), True),
StructField("age", IntegerType(), True),
StructField("power", DoubleType(), True),
StructField("brand", StringType(), True),])

dataFrameCSV = spark.read.option('header',True).schema(schema).csv('./datasets/Bikes.csv')
df_test = dataFrameCSV.select('kms_driven','owner').limit(10)

#Cria View no Spark
df_test.createOrReplaceTempView("df_test")

#Executa Consulta Usando SQL
spark.sql("""
    SELECT 
    DATE_FORMAT(NOW(),'dd/MM/yyyy hh:ss') AS dt_atual, 
    kms_driven 
    FROM df_test"""
).show()