# Databricks Intro
- criar conta https://community.cloud.databricks.com/
- criar um cluster (manter a versao do runtime)
- home>user>criar notebook = PysParkAula_pt1

In [None]:
# PySpark
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.enableHiveSupport().getOrCreate()
# Duas maneiras de acessar o contexto do spark a partir da sessão do spark
spark_context = spark_session._sc
spark_context = spark_session.sparkContext

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream('localhost', 9999)
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()

## Dataframe

In [None]:
# Leitura de Dataframe
## Opção 1
df1 = spark.read.format("csv").option("header","true").load(path_dataset1)
## Opção 2
df1 = spark.read.csv(path_dataset1)
df1 = spark.read.option("header","true").option("inferSchema","true").csv(path_dataset1)
## Exibindo dataframe
df1.show()

In [None]:
## Outras formas de leitura de arquivos com PySpark
path = "/../../arquivoXPTO"
# Criando um dataframe a partir de um JSON
dataframe = spark.read.json(path)
# Criando um dataframe a partir de um ORC
dataframe = spark.read.orc(path)
# Criando um dataframe a partir de um PARQUET
dataframe = spark.read.parquet(path)

In [None]:
# Leitura de um RDD
rdd = sc.textFile(path_rdd)
#rdd.show() = Errado, não é possível exibir um SHOW() de um RDD, somente um Dataframe
rdd.collect()

In [None]:
# Criando uma tabela temporária
nome_tabela_temporiaria = "tempTableDataFrame1"
df1.createOrReplaceTempView(nome_tabela_temporiaria)

nome_tabela_temporiaria = "tempTableDataFrame1"
df1.createOrReplaceTempView(nome_tabela_temporiaria)
# Lendo a tabela temporaria opcao 1
spark.read.table(nome_tabela_temporiaria).show()
# Lendo a tabela temporaria opcao 2
spark.sql("SELECT * FROM tempTableDataFrame1").show()

In [None]:
# Visualização do Databricks
display(spark.sql("SELECT * FROM tempTableDataFrame1"))

In [None]:
# Scala
#import org.apache.spark.sql.functions._
# Python
from pyspark.sql.functions import col, column
# Usando function col ou column
df1.select(col("country"), col("date"), column("iso_code")).show()
# Usando selectExpr
df1.selectExpr("country", "date", "iso_code").show()

# Scala import
# org.apache.spark.sql.types._
# Criando um Schema manualmente no PySpark
from pyspark.sql.types import *
dataframe_ficticio = StructType([
StructField("col_String_1", StringType()),
StructField("col_Integer_2", IntegerType()),
StructField("col_Decimal_3", DecimalType())
])
dataframe_

In [None]:
# Função para gerar Schema (campos/colunas/nomes de colunas)
'''
# Scala
org.apache.spark.sql.types._
def getSchema(fields : Array[StructField]) : StructType = {
new StructType(fields)
}
'''
# PySpark
def getSchema(fields):
    return StructType(fields)
schema = getSchema([StructField("coluna1", StringType()), StructField("coluna2", StringType()), StructField("coluna3",
StringType())])

#Show
df1.show(2)
#Take
df1.take(2)

In [None]:
# Gravando um novo CSV
path_destino="/FileStore/tables/CSV/"
nome_arquivo="arquivo.csv"
path_geral= path_destino + nome_arquivo
df1.write.format("csv").mode("overwrite").option("sep", "\t").save(path_geral)

In [None]:
# Gravando um novo JSON
path_destino="/FileStore/tables/JSON/"
nome_arquivo="arquivo.json"
path_geral= path_destino + nome_arquivo
df1.write.format("json").mode("overwrite").save(path_geral)

In [None]:
# Gravando um novo PARQUET
path_destino="/FileStore/tables/PARQUET/"
nome_arquivo="arquivo.parquet"
path_geral= path_destino + nome_arquivo
df1.write.format("parquet").mode("overwrite").save(path_geral)

In [None]:
# Gravando um novo ORC
path_destino="/FileStore/tables/ORC/"
nome_arquivo="arquivo.orc"
path_geral= path_destino + nome_arquivo
df1.write.format("orc").mode("overwrite").save(path_geral)

In [None]:
# Outros tipos de SELECT
#Diferentes formas de selecionar uma coluna
from pyspark.sql.functions import *
df1.select("country").show(5)
df1.select('country').show(5)
df1.select(col("country")).show(5)
df1.select(column("country")).show(5)
df1.select(expr("country")).show(5)

In [None]:
# Define uma nova coluna com um valor constante
df2 = df1.withColumn("nova_coluna", lit(1))
# Adicionar coluna
teste = expr("total_vaccinations < 40")
df1.select("country", "total_vaccinations").withColumn("teste", teste).show(5)
# Renomear uma coluna
df1.select(expr("total_vaccinations as total_de_vacinados")).show(5)
df1.select(col("country").alias("pais")).show(5)
df1.select("country").withColumnRenamed("country", "pais").show(5)
# Remover uma coluna
df3 = df1.drop("country")
df3.columns

In [None]:
# Filtrando dados e ordenando
# where() é um alias para filter().
# Seleciona apenas os primeiros registros da coluna "total_vaccinations"
df1.filter(df1.total_vaccinations > 55).orderBy(df1.total_vaccinations).show(2)
# Filtra por país igual Argentina
df1.select(df1.total_vaccinations, df1.country).filter(df1.country == "Argentina").show(5)
# Filtra por país diferente Argentina
df1.select(df1.total_vaccinations, df1.country).where(df1.country != "Argentina").show(5) # python type

In [None]:
# Filtrando dados e ordenando
# Mostra valores únicos
df1.select("country").distinct().show()
# Especificando vários filtros em comando separados
filtro_vacinas = df1.total_vaccinations < 100
filtro_pais = df1.country.contains("Argentina")
df1.select(df1.total_vaccinations, df1.country, df1.vaccines).where(df1.vaccines.isin("Sputnik V", "Sinovac")).filter(filtro_vacinas).show(5)
df1.select(df1.total_vaccinations, df1.country, df1.vaccines).where(df1.vaccines.isin("Sputnik V",
"Sinovac")).filter(filtro_vacinas).withColumn("filtro_pais", filtro_pais).show(5)

In [None]:
"""#######################################################################################################################
Convertendo dados
#######################################################################################################################"""
df5 = df1.withColumn("PAIS", col("country").cast("string").alias("PAIS"))
df5.select(df5.PAIS).show(2)
"""#######################################################################################################################
Trabalhando com funções
#######################################################################################################################"""
# Usando funções
df1.select(upper(df1.country)).show(3)
df1.select(lower(df1.country)).show(4