# Spark SQL, DataFrames and Datasets Guide

## Getting Started

## SparkSession

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Python Spark SQL basic example").getOrCreate()

## Creating DataFrames

In [5]:
df = spark.read.json("people.json")

In [6]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [7]:
#  Printa o schema do dataframe em formato de árvore
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [8]:
#  Seleciona a coluna name
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [9]:
#  Seleciona as colunas name e age, incrementando age em 1
df.select(df["name"], df["age"] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [10]:
#  Seleciona pessoas mais velhas que 21 anos
df.filter(df["age"] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [11]:
#  Conta a quantidade de pessoas por idade
df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



## Running SQL Queries Programmatically

In [12]:
df.createOrReplaceTempView("people")

In [13]:
sqlDF = spark.sql("SELECT * FROM people")

In [14]:
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



## Global Temporary View

O escopo de uma view temporária é a sessão em que ela foi criada, se a sessão for encerrada a view também desaparece; caso seja necessário que a view fique disponível através de várias sessões o recomendado é criar uma view temporária global, que fica armazenada na base `global_temp`, portanto, para acessarmos uma view temporária global devemos utilizar a seguinte síntaxe `SELECT * FROM global_temp.view1`

In [15]:
df.createGlobalTempView("people")

In [16]:
spark.sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [17]:
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



## Interoperating with RDDs

### Inferring the Schema Using Reflection

In [18]:
from pyspark.sql import Row

In [19]:
sc = spark.sparkContext

In [20]:
lines = sc.textFile("people.txt")

In [21]:
lines.collect()

['Michael, 29', 'Andy, 30', 'Justin, 19']

In [22]:
parts = lines.map(lambda l: l.split(","))

In [23]:
parts.collect()

[['Michael', ' 29'], ['Andy', ' 30'], ['Justin', ' 19']]

In [24]:
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

In [25]:
people.collect()

[Row(age=29, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]

In [26]:
#  Inferindo o schema do DataFrame a partir do RDD de rows e criando o DataFrame
schemaPeople = spark.createDataFrame(people)

  return f(*args, **kwds)
  return f(*args, **kwds)


In [27]:
schemaPeople.createOrReplaceTempView("people")

In [28]:
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

In [29]:
teenagers.show()

+------+
|  name|
+------+
|Justin|
+------+



In [30]:
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()

In [31]:
teenNames

['Name: Justin']

In [32]:
for name in teenNames:
    print(name)

Name: Justin


## Programmatically Specifying the Schema

In [33]:
from pyspark.sql.types import *

In [34]:
lines = sc.textFile("people.txt")

In [35]:
parts = lines.map(lambda l: l.split(", "))

In [36]:
parts.collect()

[['Michael', '29'], ['Andy', '30'], ['Justin', '19']]

In [37]:
people = parts.map(lambda p: (p[0].strip(), p[1]))

In [38]:
people.collect()

[('Michael', '29'), ('Andy', '30'), ('Justin', '19')]

In [39]:
schemaString = "name age"

In [40]:
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]

In [41]:
fields

[StructField(name,StringType,true), StructField(age,StringType,true)]

In [42]:
schema = StructType(fields)

In [43]:
schema

StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))

In [44]:
schemaPeople = spark.createDataFrame(people, schema)

In [45]:
schemaPeople.show()

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



In [46]:
schemaPeople.createOrReplaceTempView("people")

In [47]:
results = spark.sql("SELECT name FROM people")

In [48]:
results.show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



## Aggregations

In [49]:
schemaPeople.show()

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



In [51]:
schemaPeople.count()

3

In [55]:
from pyspark.sql.functions import *

In [67]:
schemaPeople.select(schemaPeople["name"]).show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [71]:
schemaPeople.select(schemaPeople["name"]).agg(countDistinct(schemaPeople["name"])).show()

+--------------------+
|count(DISTINCT name)|
+--------------------+
|                   3|
+--------------------+



In [72]:
schemaPeople.select(schemaPeople["age"]).agg(sum(schemaPeople["age"])).show()

+--------+
|sum(age)|
+--------+
|    78.0|
+--------+



In [73]:
schemaPeople.select(schemaPeople["age"]).agg(avg(schemaPeople["age"])).show()

+--------+
|avg(age)|
+--------+
|    26.0|
+--------+



In [74]:
schemaPeople.select(schemaPeople["age"]).agg(max(schemaPeople["age"])).show()

+--------+
|max(age)|
+--------+
|      30|
+--------+



In [75]:
schemaPeople.select(schemaPeople["age"]).agg(min(schemaPeople["age"])).show()

+--------+
|min(age)|
+--------+
|      19|
+--------+



## Data Sources

### Generic Load/Save Functions

In [92]:
df = spark.read.load("users.parquet")

In [93]:
df.select("name", "favorite_color").show()

+------+--------------+
|  name|favorite_color|
+------+--------------+
|Alyssa|          null|
|   Ben|           red|
+------+--------------+



In [94]:
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [99]:
#  O parâmetro mode por receber um dos três valores error ou errorifexists, append, overwrite, ignore
df.select("name", "favorite_color").write.save("namesAndAges.parquet", format="parquet", mode="overwrite")

In [81]:
names_ages_df = spark.read.load("namesAndAges.parquet")

In [82]:
names_ages_df.show()

+------+--------------+
|  name|favorite_color|
+------+--------------+
|Alyssa|          null|
|   Ben|           red|
+------+--------------+



In [85]:
df = spark.read.load("people.csv", format="csv", sep=";", inferSchema="true", header="true")

In [86]:
df.show()

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+



### Run SQL on files directly

In [100]:
df = spark.sql("SELECT * FROM parquet.`namesAndAges.parquet`")

In [101]:
df.show()

+------+--------------+
|  name|favorite_color|
+------+--------------+
|Alyssa|          null|
|   Ben|           red|
+------+--------------+



In [102]:
df = spark.sql("SELECT * FROM json.`people.json`")

In [103]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



### JSON Datasets

In [104]:
sc = spark.sparkContext

In [105]:
peopleDF = spark.read.json("people.json")

In [106]:
peopleDF.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [107]:
peopleDF.createOrReplaceTempView("people")

In [108]:
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")

In [109]:
teenagerNamesDF.show()

+------+
|  name|
+------+
|Justin|
+------+



In [110]:
jsonStrings = ['{"name": "Yin", "address" : {"city" : "Columbus", "state" : "Ohio"}}']

In [111]:
otherPeopleRDD = sc.parallelize(jsonStrings)

In [112]:
otherPeople = spark.read.json(otherPeopleRDD)

In [113]:
otherPeople.show()

+----------------+----+
|         address|name|
+----------------+----+
|[Columbus, Ohio]| Yin|
+----------------+----+

