# Use Spark for Python to load data and run SQL queries
This notebook introduces basic Spark concepts and helps you to start using Spark for Python.

Some familiarity with Python is recommended.

In this notebook, you'll use the publicly available **mtcars** data set from *Motor Trend* magazine to learn some basic Python. You'll learn how to load data, create a Spark DataFrame, aggregate data, run mathematical formulas, and run SQL queries against the data.


## Table of contents
This notebook contains these main sections:

1. [Load a DataFrame](#Load_a_DataFrame)
2. [Create an SQLContext](#Create_an_SQLContext)
3. [Create a Spark DataFrame](#Create_a_Spark_DataFrame)
4. [Aggregate data after grouping by columns](#Aggregate_data_after_grouping_by_columns)
5. [Operate on columns](#Operate_on_columns)
6. [Run SQL queries from the Spark DataFrame](#Run_SQL_queries_from_the_Spark_DataFrame)

<a id='Load_a_DataFrame'></a>
## 1. Load a DataFrame
A DataFrame is a distributed collection of data that is organized into named columns. The Python pandas DataFrame that you will create will load an external DataFrame called **mtcars**, which includes observations on the following 11 variables:

`[, 1]	mpg     Miles / (US) gallon`  
`[, 2]	cyl     Number of cylinders`  
`[, 3]	disp	Displacement (cu. in.)`  
`[, 4]	hp      Gross horsepower`  
`[, 5]	drat    Rear axle ratio`  
`[, 6]	wt      Weight (1000 lbs)`  
`[, 7]	qsec    1/4 mile time (seconds)`  
`[, 8]	vs      0 = V-engine, 1 = straight engine`  
`[, 9]	am      Transmission (0 = automatic, 1 = manual)`  
`[,10]	gear    Number of forward gears`  
`[,11]	carb    Number of carburetors`

In [1]:
import pandas as pd
import numpy as np

In [2]:
mtcars_array = np.array([
    ["Car Brand1", 18, 4, 1000, 110, 3.9, 3.1, 16.46, 1, 4, 4, 4],
    ["Car Brand2", 24, 6, 2000, 120, 3.8, 3.5, 16.46, 0, 55, 4, 4],
    ["Car Brand3", 30, 8, 3000, 130, 3.7, 4.0, 16.46, 1, 4, 4, 4],
    ["Car Brand4", 14, 4, 4000, 140, 3.6, 3.2, 16.46, 0, 55, 3, 4],
    ["Car Brand5", 12, 4, 5000, 150, 3.5, 3.3, 16.46, 1, 4, 4, 4],
    ["Car Brand6", 20, 6, 6000, 160, 3.4, 3.6, 16.46, 0, 55, 3, 4],
    ["Car Brand7", 15.5, 4, 700, 170, 3.3, 3.4, 16.46, 0, 1, 4, 4],
    ["Car Brand8", 10, 4, 800, 180, 3.2, 3.3, 16.46, 0, 0, 3, 4],
    ["Car Brand9", 5, 4, 900, 190, 3.1, 3.2, 16.46, 0, 1, 5, 4],
    ["Car Brand10", 7, 4, 2000, 110, 3.0, 3.1, 16.46, 0, 55, 3, 4],
    ["Car Brand11", 22, 6, 3000, 111, 2.9, 3.7, 16.46, 1, 4, 4, 4],
    ["Car Brand12", 33, 8, 4000, 112, 2.8, 4.2, 16.46, 0, 55, 4, 4],
    ["Car Brand13", 46, 8, 5000, 113, 2.7, 4.3, 16.46, 1, 4, 5, 4],
    ["Car Brand14", 8, 4, 6000, 114, 2.5, 3.2, 16.46, 0, 55, 4, 4],
    ["Car Brand15", 60, 8, 7000, 115, 2.4, 4.4, 16.46, 1, 4, 5, 4]
                        ])
mtcars = pd.DataFrame(mtcars_array)
mtcars.columns = ['car', 'mpg', 'cyl', 'disp', 'hp', 'drat', 'wt', 'qsec', 'vs', 'am', 'gear', 'carb'] 

In [4]:
print(mtcars)

            car   mpg cyl  disp   hp drat   wt   qsec vs  am gear carb
0    Car Brand1    18   4  1000  110  3.9  3.1  16.46  1   4    4    4
1    Car Brand2    24   6  2000  120  3.8  3.5  16.46  0  55    4    4
2    Car Brand3    30   8  3000  130  3.7  4.0  16.46  1   4    4    4
3    Car Brand4    14   4  4000  140  3.6  3.2  16.46  0  55    3    4
4    Car Brand5    12   4  5000  150  3.5  3.3  16.46  1   4    4    4
5    Car Brand6    20   6  6000  160  3.4  3.6  16.46  0  55    3    4
6    Car Brand7  15.5   4   700  170  3.3  3.4  16.46  0   1    4    4
7    Car Brand8    10   4   800  180  3.2  3.3  16.46  0   0    3    4
8    Car Brand9     5   4   900  190  3.1  3.2  16.46  0   1    5    4
9   Car Brand10     7   4  2000  110  3.0  3.1  16.46  0  55    3    4
10  Car Brand11    22   6  3000  111  2.9  3.7  16.46  1   4    4    4
11  Car Brand12    33   8  4000  112  2.8  4.2  16.46  0  55    4    4
12  Car Brand13    46   8  5000  113  2.7  4.3  16.46  1   4    5    4
13  Ca

In [5]:
mtcars.head(3)

Unnamed: 0,car,mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb
0,Car Brand1,18,4,1000,110,3.9,3.1,16.46,1,4,4,4
1,Car Brand2,24,6,2000,120,3.8,3.5,16.46,0,55,4,4
2,Car Brand3,30,8,3000,130,3.7,4.0,16.46,1,4,4,4


Preview the first 3 rows of the DataFrame by using the `head()` method:

<a id='Create_an_SQLContext'></a>
## 2. Create an SQLContext
To work with a DataFrame, you need an SQLContext class object, and to create a basic one, all you need is a SparkContext. A SparkContext represents the connection to a Spark cluster, and a SparkContext class object named sc, which has been created for you, is used to initialize the SQLContext:

In [7]:
from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("HelloSpark02").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [None]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-2.2.1.tar.gz (188.2MB)
[K    100% |████████████████████████████████| 188.2MB 7.2kB/s ta 0:00:011
[?25hCollecting py4j==0.10.4 (from pyspark)
  Downloading py4j-0.10.4-py2.py3-none-any.whl (186kB)
[K    100% |████████████████████████████████| 194kB 1.6MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Running setup.py bdist_wheel for pyspark ... [?25l- \ | / - \ | / - done
[?25h  Stored in directory: /user-home/999/.cache/pip/wheels/15/13/d2/79b478cd48d20956d136216574cbc38e35b4957d918127c26f
Successfully built pyspark
Installing collected packages: py4j, pyspark
[31mException:
Traceback (most recent call last):
  File "/opt/conda/lib/python2.7/site-packages/pip/basecommand.py", line 215, in main
    status = self.run(options, args)
  File "/opt/conda/lib/python2.7/site-packages/pip/commands/install.py", line 342, in run
    prefix=options.prefix_path,
  File "/opt/conda/lib/python2.7/site-packages/pip

<a id='Create_a_Spark_DataFrame'></a>
## 3. Create a Spark DataFrame
Using the SQLContext class object and the loaded local DataFrame, create a Spark DataFrame and print the schema, or structure, of the DataFrame:

In [8]:
sdf = sqlContext.createDataFrame(mtcars) 
sdf.printSchema()

root
 |-- car: string (nullable = true)
 |-- mpg: string (nullable = true)
 |-- cyl: string (nullable = true)
 |-- disp: string (nullable = true)
 |-- hp: string (nullable = true)
 |-- drat: string (nullable = true)
 |-- wt: string (nullable = true)
 |-- qsec: string (nullable = true)
 |-- vs: string (nullable = true)
 |-- am: string (nullable = true)
 |-- gear: string (nullable = true)
 |-- carb: string (nullable = true)



Display the content of the Spark DataFrame:

In [9]:
sdf.show(32)

+-----------+----+---+----+---+----+---+-----+---+---+----+----+
|        car| mpg|cyl|disp| hp|drat| wt| qsec| vs| am|gear|carb|
+-----------+----+---+----+---+----+---+-----+---+---+----+----+
| Car Brand1|  18|  4|1000|110| 3.9|3.1|16.46|  1|  4|   4|   4|
| Car Brand2|  24|  6|2000|120| 3.8|3.5|16.46|  0| 55|   4|   4|
| Car Brand3|  30|  8|3000|130| 3.7|4.0|16.46|  1|  4|   4|   4|
| Car Brand4|  14|  4|4000|140| 3.6|3.2|16.46|  0| 55|   3|   4|
| Car Brand5|  12|  4|5000|150| 3.5|3.3|16.46|  1|  4|   4|   4|
| Car Brand6|  20|  6|6000|160| 3.4|3.6|16.46|  0| 55|   3|   4|
| Car Brand7|15.5|  4| 700|170| 3.3|3.4|16.46|  0|  1|   4|   4|
| Car Brand8|  10|  4| 800|180| 3.2|3.3|16.46|  0|  0|   3|   4|
| Car Brand9|   5|  4| 900|190| 3.1|3.2|16.46|  0|  1|   5|   4|
|Car Brand10|   7|  4|2000|110| 3.0|3.1|16.46|  0| 55|   3|   4|
|Car Brand11|  22|  6|3000|111| 2.9|3.7|16.46|  1|  4|   4|   4|
|Car Brand12|  33|  8|4000|112| 2.8|4.2|16.46|  0| 55|   4|   4|
|Car Brand13|  46|  8|500

Try different ways of retrieving subsets of the data. For example, get the first 5 values in the **mpg** column:

In [10]:
sdf.select('mpg').show(5)

+---+
|mpg|
+---+
| 18|
| 24|
| 30|
| 14|
| 12|
+---+
only showing top 5 rows



Filter the DataFrame to retain only rows with **mpg** values that are less than 18:

In [8]:
sdf.filter(sdf['mpg'] < 18).show()

+-----------+----+---+----+---+----+---+-----+---+---+----+----+
|        car| mpg|cyl|disp| hp|drat| wt| qsec| vs| am|gear|carb|
+-----------+----+---+----+---+----+---+-----+---+---+----+----+
| Car Brand4|  14|  4|4000|140| 3.6|3.2|16.46|  0| 55|   3|   4|
| Car Brand5|  12|  4|5000|150| 3.5|3.3|16.46|  1|  4|   4|   4|
| Car Brand7|15.5|  4| 700|170| 3.3|3.4|16.46|  0|  1|   4|   4|
| Car Brand8|  10|  4| 800|180| 3.2|3.3|16.46|  0|  0|   3|   4|
| Car Brand9|   5|  4| 900|190| 3.1|3.2|16.46|  0|  1|   5|   4|
|Car Brand10|   7|  4|2000|110| 3.0|3.1|16.46|  0| 55|   3|   4|
|Car Brand14|   8|  4|6000|114| 2.5|3.2|16.46|  0| 55|   4|   4|
+-----------+----+---+----+---+----+---+-----+---+---+----+----+



<a id='Aggregate_data_after_grouping_by_columns'></a>
## 4. Aggregate data after grouping by columns
Spark DataFrames support a number of common functions to aggregate data after grouping. For example, you can compute the average weight of cars as a function of the number of cylinders:

In [9]:
sdf.groupby(['cyl'])\
.agg({"wt": "AVG"})\
.show()

+---+------------------+
|cyl|           avg(wt)|
+---+------------------+
|  8|             4.225|
|  6|               3.6|
|  4|3.2250000000000005|
+---+------------------+



You can also sort the output from the aggregation to determine the most popular cylinder configuration in the DataFrame:

In [10]:
car_counts = sdf.groupby(['cyl'])\
.agg({"wt": "count"})\
.sort("count(wt)", ascending=False)\
.show()

+---+---------+
|cyl|count(wt)|
+---+---------+
|  4|        8|
|  8|        4|
|  6|        3|
+---+---------+



<a id='Operate_on_columns'></a>
## 5. Operate on columns
Python provides a number of functions that you can apply directly to columns for data processing. In the following example, a basic arithmetic function converts lbs to metric tons:

In [11]:
sdf.withColumn('wtTon', sdf['wt'] * 0.45).select('car','wt','wtTon').show(6)

+----------+---+------------------+
|       car| wt|             wtTon|
+----------+---+------------------+
|Car Brand1|3.1|             1.395|
|Car Brand2|3.5|             1.575|
|Car Brand3|4.0|               1.8|
|Car Brand4|3.2|1.4400000000000002|
|Car Brand5|3.3|1.4849999999999999|
|Car Brand6|3.6|              1.62|
+----------+---+------------------+
only showing top 6 rows



<a id='Run_SQL_queries_from_the_Spark_DataFrame'></a>
## 6. Run SQL queries from the Spark DataFrame
You can register a Spark DataFrame as a temporary table and then run SQL queries over the data. The `sql` function enables an application to run SQL queries programmatically and returns the result as a DataFrame:

In [12]:
sdf.registerTempTable("cars")

highgearcars = sqlContext.sql("SELECT car, gear FROM cars WHERE gear >= 5")
highgearcars.show()    

+-----------+----+
|        car|gear|
+-----------+----+
| Car Brand9|   5|
|Car Brand13|   5|
|Car Brand15|   5|
+-----------+----+



## That's it!
You successfully completed this notebook! You learned how to load a DataFrame, view and filter the data, aggregate the data, perform operations on the data in specific columns, and run SQL queries against the data. For more information about Spark, see the [Spark Quick Start Guide](http://spark.apache.org/docs/latest/quick-start.html).

<div class="alert alert-block alert-info"> Note: To save resources and get the best performance please use the code below to stop the kernel before exiting your notebook.</div>

In [None]:
%%javascript
Jupyter.notebook.session.delete();

<IPython.core.display.Javascript object>

<hr>
Copyright &copy; IBM Corp. 2017. Released as licensed Sample Materials.