# Spark SQL

When we actually go about writing our Spark Application, we are going to need a way to send user commands and data to it. We do that by first creating a __SparkSession__.
When you start Spark in this interactive mode (__spark-shell__), you __implicitly__ create a SparkSession that manages the Spark Application. When you start it through a standalone application, you must create the SparkSession object yourself in your application code.

The SparkSession instance is the way Spark executes user-defined
manipulations across the cluster. There is a one-to-one correspondence between a SparkSession and
a Spark Application

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("Word Count") \
.getOrCreate()

In [None]:
# old API, mainly for working on RDD
# import pyspark
# import pyspark.sql.functions as func
# sc = pyspark.SparkContext(appName='people')
# sqlContext = pyspark.sql.SQLContext(sc)

In [None]:
people = spark.read.json('data/people.json')

In [None]:
type(people)

In [None]:
people.printSchema()

In [None]:
people.show(1, False)

## SQL and Dataframes

Structured data is any data that has a schema — that is, a known set of fields for each record. Spark SQL provides a dataset abstraction that simplifies working with structured datasets. Dataset is similar to tables in a relational database. Dataset has a natural schema, and this let's Spark store data in a more efficient manner and can run SQL queries on it using actual SQL commands.

In [None]:
# You have to Register table first, before using it with spark SQL
# Table alias can have any name
people.registerTempTable('people')

In [None]:
# Now we can query "people" table
r = spark.sql('SELECT * FROM people')
# returns new DataFrame
print(type(r))
r.show()

In [None]:
spark.sql('SELECT name, age FROM people').show()

In [None]:
spark.sql('SELECT * FROM people WHERE age > 30').show()

In [None]:
# you can use docstring for longer queries
query = """
    SELECT 
        gender,
        count(*) AS count, 
        avg(age) AS avg_age, 
        avg(children) AS avg_children 
    FROM people
    WHERE gender = 'male'
    GROUP BY gender
"""
grouped_by_gender_sql = spark.sql(query)
grouped_by_gender_sql.show()

In [None]:
grouped_by_gender_sql.explain()

In [None]:
# Be careful, this one won't work
#sqlContext.sql("SELECT name, surname, max(age) as maxAge FROM people WHERE gender = 'male'").show()
# But this works
spark.sql("SELECT first(name) AS name, first(surname) AS surname, max(age) as maxAge FROM people WHERE gender = 'male'").show()

## Using Dataframe methods

Alternatively to SQL queries, Spark Dataframes have methods corresponding to SQL syntax. 
They are more familiar to OOP methods and allow compiler/interpreter to catch many bugs (at compile time). 
SQL is simpler but many bugs can be caught at runtime.

In [None]:
import pyspark.sql.functions as func

In [None]:
people.select('name', 'age').show()

In [None]:
people.where(people.age > 30).show()

In [None]:
grouped_by_gender_df = people \
    .groupBy('gender') \
    .agg(func.avg('age').alias('avg_age'), 
         func.avg('children').alias('avg_children'))
    
grouped_by_gender_df.show()

In [None]:
grouped_by_gender_df.explain()

In [None]:
people.groupBy('gender').agg(func.avg('age').alias('avg_age'), func.max('children').alias('max_children')).show()

In [None]:
people.groupBy('gender').pivot('name').agg(func.avg('age')).show()

In [None]:
people.where(people.gender == 'male') \
    .select(
        func.first('name').alias('name'), 
        func.first('surname').alias('surname'), 
        func.max('age').alias('maxAge')) \
    .show()

In [None]:
from pyspark.sql.functions import desc

In [None]:
# Add renamed column and sort
people \
    .groupBy("gender") \
    .avg("children") \
    .withColumnRenamed("avg(children)", "children_avg") \
    .sort(desc("children_avg")) \
    .limit(5) \
    .show()

In [None]:
import matplotlib
%matplotlib inline

# Można otrzymywać data frame Pandas bezpośrednio z DataFrames; pamiętaj jednak o rozmiarze danych...
p = people.groupBy('gender').agg(func.avg('age').alias('avg_age'), func.max('children').alias('max_children')) \
        .toPandas().set_index('gender')
p.plot(kind='bar', figsize=(14,10))
p