version 1.0.0
#![GMV](http://www.gmv.com/export/system/modules/com.gmv.teresa.site/resources/imagenes/generales/logo.gif) + ![Apache Spark](http://spark.apache.org/images/spark-logo.png)
#Introduction to Spark SQL and DataFrame

## Overview
Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.

## DataFrames
A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

## Starting Point
The entry point into all relational functionality in Spark is the SQLContext class, or one of its decedents. To create a basic SQLContext, all you need is a SparkContext.

In [3]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [4]:
# creates a DataFrame based on the content of a JSON file:
df = sqlContext.read.json("../data/Introduction_Spark/people.json")

In [5]:
# Displays the content of the DataFrame to stdout
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



## DataFrame Operations

In [6]:
# Print the schema in a tree format
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [7]:
# Select only the "name" column
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [8]:
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [9]:
# Select people older than 21
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [10]:
# Count people by age
df.groupby("age").count().show()

+----+-----+
| age|count|
+----+-----+
|null|    1|
|  19|    1|
|  30|    1|
+----+-----+



##Running SQL Queries Programmatically
The *sql* function on a *SQLContext* enables applications to run SQL queries programmatically and returns the result as a *DataFrame*.

In [11]:
sqlContext.registerDataFrameAsTable(df, "table1")

In [12]:
sqlContext.tableNames()

[u'table1']

In [13]:
sqlContext.sql("SELECT * FROM table1 WHERE age is not NULL").show()

+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+



## Interoperating with RDDs
Spark SQL supports two different methods for converting existing RDDs into DataFrames. 

* Uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.

* Creating DataFrames is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct DataFrames when the columns and their types are not known until runtime

### Inferring the Schema Using Reflection
Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, and the types are inferred by looking at the first row. Since we currently only look at the first row, it is important that there is **no missing data in the first row** of the RDD

In [14]:
# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

In [15]:
# Load a text file and convert each line to a Row.
lines = sc.textFile("../data/Introduction_Spark/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

In [16]:
# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

In [17]:
# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

In [18]:
# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print teenName

Name: Justin


### Programmatically Specifying the Schema
When a dictionary of kwargs cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a *DataFrame* can be created programmatically with three steps:

1. Create an RDD of tuples or lists from the original RDD;
2. Create the schema represented by a *StructType* matching the structure of tuples or lists in the RDD created in the step 1.
3. Apply the schema to the RDD via *createDataFrame* method provided by *SQLContext*.

In [19]:
# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *

In [20]:
# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

In [21]:
# Load a text file and convert each line to a tuple.
lines = sc.textFile("../data/Introduction_Spark/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))

In [22]:
# The schema is encoded in a string.
schemaString = "name age"

In [23]:
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

In [24]:
# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)

In [25]:
# Register the DataFrame as a table.
schemaPeople.registerTempTable("people")

In [26]:
# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT name FROM people")

In [27]:
# The results of SQL queries are RDDs and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
  print name

Name: Michael
Name: Andy
Name: Justin


##Data Types
Spark SQL and DataFrames support the following data types:

* Numeric types
    * *ByteType*: Represents 1-byte signed integer numbers. The range of numbers is from -128 to 127.
    * *ShortType*: Represents 2-byte signed integer numbers. The range of numbers is from -32768 to 32767.
    * *IntegerType*: Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.
    * *LongType*: Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807.
    * *FloatType*: Represents 4-byte single-precision floating point numbers.
    * *DoubleType*: Represents 8-byte double-precision floating point numbers.
    * *DecimalType*: Represents arbitrary-precision signed decimal numbers. Backed internally by *java.math.BigDecimal*. A *BigDecimal* consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
* String type
    * *StringType*: Represents character string values.
* Binary type
    * *BinaryType*: Represents byte sequence values.
* Boolean type
    * *BooleanType*: Represents boolean values.
* Datetime type
    * *TimestampType*: Represents values comprising values of fields year, month, day, hour, minute, and second.
    * *DateType*: Represents values comprising values of fields year, month, day.
* Complex types
    * *ArrayType(elementType, containsNull)*: Represents values comprising a sequence of elements with the type of elementType. containsNull is used to indicate if elements in a ArrayType value can have null values.
    * *MapType(keyType, valueType, valueContainsNull)*: Represents values comprising a set of key-value pairs. The data type of keys are described by keyType and the data type of values are described by valueType. For a MapType value, keys are not allowed to have null values. valueContainsNull is used to indicate if values of a MapType value can have null values.
    * *StructType(fields)*: Represents values with the structure described by a sequence of StructFields (fields).
    * *StructField(name, dataType, nullable)*: Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataType. nullable is used to indicate if values of this fields can have null values.
    
All data types of Spark SQL are located in the package of pyspark.sql.types. You can access them by doing:

In [28]:
from pyspark.sql.types import *

## Statistical And Mathematical Functions with DataFrames in Spark
Statistics is an important part of everyday data science. 

In this notebook, we walk through some of the important functions, including:

1. Random data generation
2. Summary and descriptive statistics
3. Sample covariance and correlation
4. Cross tabulation (a.k.a. contingency table)
5. Frequent items
6. Mathematical functions

### Random Data Generation
Random data generation is useful for testing of existing algorithms and implementing randomized algorithms, such as random projection. Spark provides methods under sql.functions for generating columns that contains Independent and identically distributed values drawn from a distribution, e.g., uniform (rand),  and standard normal (randn).

In [29]:
from pyspark.sql.functions import rand, randn

In [35]:
# Create a DataFrame with one int column and 10 rows
df = sqlContext.range(0, 10)
df.toPandas()

Unnamed: 0,id
0,0
1,1
2,2
3,3
4,4
5,5
6,6
7,7
8,8
9,9


In [36]:
# Generate two other columns using uniform distribution and normal distribution
df.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")).toPandas()

Unnamed: 0,id,uniform,normal
0,0,0.722498,-0.187535
1,1,0.331202,-0.869258
2,2,0.243849,-2.37234
3,3,0.48751,-1.245589
4,4,0.668454,-0.603216
5,5,0.243781,-0.575915
6,6,0.317257,0.502374
7,7,0.200922,0.447776
8,8,0.517364,0.600054
9,9,0.95283,-0.732401


### Summary and Descriptive Statistics
The first operation to perform after importing data is to get some sense of what it looks like. For numerical columns, knowing the descriptive summary statistics can help a lot in understanding the distribution of your data. The function describe returns a DataFrame containing information such as number of non-null entries (count), mean, standard deviation, and minimum and maximum value for each numerical column

In [40]:
# A slightly different way to generate the two random columns
df = sqlContext.range(0, 10).withColumn('uniform', rand(seed=10)).withColumn('normal', randn(seed=27))

df.describe().toPandas()

Unnamed: 0,summary,id,uniform,normal
0,count,10.0,10.0,10.0
1,mean,4.5,0.4685667340029177,-0.5036050224529982
2,stddev,2.8722813232690143,0.2358201303075058,0.8648117073061448
3,min,0.0,0.2009218252543502,-2.372340011831022
4,max,9.0,0.9528301401955116,0.600053806707523


If you have a DataFrame with a large number of columns, you can also run describe on a subset of the columns:

In [42]:
df.describe('uniform', 'normal').toPandas()

Unnamed: 0,summary,uniform,normal
0,count,10.0,10.0
1,mean,0.4685667340029177,-0.5036050224529982
2,stddev,0.2358201303075058,0.8648117073061448
3,min,0.2009218252543502,-2.372340011831022
4,max,0.9528301401955116,0.600053806707523


Of course, while describe works well for quick exploratory data analysis, you can also control the list of descriptive statistics and the columns they apply to using the normal select on a DataFrame:

In [43]:
from pyspark.sql.functions import mean, min, max
df.select([mean('uniform'), min('uniform'), max('uniform')]).toPandas()

Unnamed: 0,avg(uniform),min(uniform),max(uniform)
0,0.468567,0.200922,0.95283


###Sample covariance and correlation

*Covariance* is a measure of how two variables change with respect to each other. A positive number would mean that there is a tendency that as one variable increases, the other increases as well. A negative number would mean that as one variable increases, the other variable has a tendency to decrease. 
The sample covariance of two columns of a DataFrame can be calculated as follows:

In [44]:
df = sqlContext.range(0, 10).withColumn('rand1', rand(seed=10)).withColumn('rand2', rand(seed=27))

In [45]:
df.stat.cov('rand1', 'rand2')

-0.026508568109948852

In [46]:
df.stat.cov('id', 'id')

9.166666666666666

As you can see from the above, the covariance of the two randomly generated columns is close to zero, while the covariance of the id column with itself is very high.

The covariance value of 9.17 might be hard to interpret. Correlation is a normalized measure of covariance that is easier to understand, as it provides quantitative measurements of the statistical dependence between two random variables.

In [47]:
df.stat.corr('rand1', 'rand2')

-0.3424729133749654

In [48]:
df.stat.corr('id', 'id')

1.0

In the above example, id correlates perfectly with itself, while the two randomly generated columns have low correlation value.

### Cross Tabulation (Contingency Table)
Cross Tabulation provides a table of the frequency distribution for a set of variables. Cross-tabulation is a powerful tool in statistics that is used to observe the statistical significance (or independence) of variables. In Spark, users will be able to cross-tabulate two columns of a DataFrame in order to obtain the counts of the different pairs that are observed in those columns. Here is an example on how to use crosstab to obtain the contingency table.

In [52]:
# Create a DataFrame with two columns (name, item)
names = ["Alice", "Bob", "Mike"]
items = ["milk", "bread", "butter", "apples", "oranges"]
df = sqlContext.createDataFrame([(names[i % 3], items[i % 5]) for i in range(100)], ["name", "item"])
df.limit(10).toPandas()

Unnamed: 0,name,item
0,Alice,milk
1,Bob,bread
2,Mike,butter
3,Alice,apples
4,Bob,oranges
5,Mike,milk
6,Alice,bread
7,Bob,butter
8,Mike,apples
9,Alice,oranges


In [54]:
df.stat.crosstab("name", "item").toPandas()

Unnamed: 0,name_item,apples,oranges,butter,milk,bread
0,Bob,6,7,7,6,7
1,Mike,7,6,7,7,6
2,Alice,7,7,6,7,7


One important thing to keep in mind is that the cardinality of columns we run crosstab on cannot be too big. That is to say, the number of distinct “name” and “item” cannot be too large. Just imagine if “item” contains 1 billion distinct entries: how would you fit that table on your screen?!

### Frequent Items
Figuring out which items are frequent in each column can be very useful to understand a dataset. In Spark, users will be able to find the frequent items for a set of columns using DataFrames. Spark has implemented an one-pass algorithm proposed by Karp et al. This is a fast, approximate algorithm that always return all the frequent items that appear in a user-specified minimum proportion of rows. Note that the result might contain false positives, i.e. items that are not frequent.

In [55]:
df = sqlContext.createDataFrame([(1, 2, 3) if i % 2 == 0 else (i, 2 * i, i % 4) for i in range(100)], ["a", "b", "c"])
df.limit(10).toPandas()

Unnamed: 0,a,b,c
0,1,2,3
1,1,2,1
2,1,2,3
3,3,6,3
4,1,2,3
5,5,10,1
6,1,2,3
7,7,14,3
8,1,2,3
9,9,18,1


In [56]:
#Find the frequent items that show up 40% of the time for each column:
freq = df.stat.freqItems(["a", "b", "c"], 0.4)
freq.collect()[0]

Row(a_freqItems=[1, 99], b_freqItems=[2, 198], c_freqItems=[1, 3])

As you can see, “1” and “99” are the frequent values for column “a”. 

Note: "a = 99" is a false positive generated by the algorithm

In [57]:
#You can also find frequent items for column combinations, by creating a composite column using the struct function:
from pyspark.sql.functions import struct
freq = df.withColumn('ab', struct('a', 'b')).stat.freqItems(['ab'], 0.4)
freq.collect()[0]

Row(ab_freqItems=[Row(a=99, b=198), Row(a=1, b=2)])

From the above example, the combination of “a=11 and b=22”, and “a=1 and b=2” appear frequently in this dataset. 

Note that “a=11 and b=22” is a false positive.

###6. Mathematical Functions
Spark also added a suite of mathematical functions. Users can apply these to their columns with ease. The list of math functions that are supported come from this [file](https://github.com/apache/spark/blob/efe3bfdf496aa6206ace2697e31dd4c0c3c824fb/python/pyspark/sql/functions.py#L109) (we will also post pre-built documentation once 1.4 is released). The inputs need to be columns functions that take a single argument, such as **cos, sin, floor, ceil**. For functions that take two arguments as input, such as **pow, hypot**, either two columns or a combination of a double and column can be supplied.

In [58]:
from pyspark.sql.functions import *
df = sqlContext.range(0, 10).withColumn('uniform', rand(seed=10) * 3.14)

In [59]:
# you can reference a column or supply the column name
df.select(
    'uniform',
     toDegrees('uniform'),
     (pow(cos(df['uniform']), 2) + pow(sin(df.uniform), 2)).alias("cos^2 + sin^2")).toPandas()

Unnamed: 0,uniform,DEGREES(uniform),cos^2 + sin^2
0,2.268643,129.983674,1
1,1.039975,59.586157,1
2,0.765685,43.870503,1
3,1.530783,87.70738,1
4,2.098947,120.260794,1
5,0.765472,43.858337,1
6,0.996188,57.077365,1
7,0.630895,36.147594,1
8,1.624522,93.078262,1
9,2.991887,171.422477,1
