### Connect to Hive with Hive Context

A Spark context is already initialized when pyspark starts up in the Jupyter Notebook. To connect to Hive, we also need to create a **Hive Context.**

In [1]:
sc

Waiting for a Spark session to start...

Waiting for a Spark session to start...

<SparkContext master=yarn appName=Apache Toree>

In [2]:
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)

In [3]:
hive_context.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|  hr_data|      false|
+--------+---------+-----------+



### Pull data into Spark Memory

With the `.sql` method on the hive context, we can pull data from Hive with traditional SQL queries. I am just going to pull the full data set in, but with this syntax you can perform any SQL commands on all of the databases and tables stored in Hive

In [4]:
hr_data = hive_context.sql("SELECT * FROM hr_data")

In [5]:
hr_data.show(5)

+------------+---------+-----------+-----+----------------+-------------+----+---------+----------+------+
|satisfaction|last_eval|num_project|hours|years_at_company|work_accident|quit|promotion|department|salary|
+------------+---------+-----------+-----+----------------+-------------+----+---------+----------+------+
|        null|     null|       null| null|            null|         null|null|     null|     sales|salary|
|        0.38|     0.53|          2|  157|               3|            0|   1|        0|     sales|   low|
|         0.8|     0.86|          5|  262|               6|            0|   1|        0|     sales|medium|
|        0.11|     0.88|          7|  272|               4|            0|   1|        0|     sales|medium|
|        0.72|     0.87|          5|  223|               5|            0|   1|        0|     sales|   low|
+------------+---------+-----------+-----+----------------+-------------+----+---------+----------+------+
only showing top 5 rows


### Analyze Data with Spark DataFrame

After we pulled the data with the above SQL command it is stored in Sparks memory as a dataframe. There are two main ways to manipulate data with Spark. You have run SQL queries on Hive tables (as seen above) and you can chain commands together in a similar style from pythons `pandas` library. I will run the same data manipluations that were done in Hive to illustrate this functionality. 

In [6]:
hr_data.printSchema()

root
 |-- satisfaction: float (nullable = true)
 |-- last_eval: float (nullable = true)
 |-- num_project: integer (nullable = true)
 |-- hours: integer (nullable = true)
 |-- years_at_company: integer (nullable = true)
 |-- work_accident: integer (nullable = true)
 |-- quit: integer (nullable = true)
 |-- promotion: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: string (nullable = true)



The first thing we need to do is drop the NA rows.

In [7]:
hr_data = hr_data.dropna()

In [8]:
hr_data.groupby('department') \
    .agg({'satisfaction':'mean', 'quit':'count'}) \
    .orderBy('count(quit)',ascending=False) \
    .show()

+-----------+-----------+------------------+
| department|count(quit)| avg(satisfaction)|
+-----------+-----------+------------------+
|      sales|       4140|0.6144468597617846|
|  technical|       2720|0.6078970591263736|
|    support|       2229|0.6182996854854571|
|         IT|       1227|0.6181418096985192|
|product_mng|        902|0.6196341452636899|
|  marketing|        858|0.6186013972356325|
|      RandD|        787|0.6198221083631055|
| accounting|        767|0.5821512387057638|
|         hr|        739|0.5988092017794817|
| management|        630|0.6213492063657632|
+-----------+-----------+------------------+



In [9]:
hr_data.groupby(['salary','quit']). \
    mean('satisfaction'). \
    orderBy('avg(satisfaction)',ascending=False) \
    .show()

+------+----+------------------+
|salary|quit| avg(satisfaction)|
+------+----+------------------+
|medium|   0|0.6688750243022512|
|   low|   0| 0.668102643885657|
|  high|   0|0.6518787871514048|
|   low|   1| 0.441247697521694|
|medium|   1|0.4385497335997964|
|  high|   1|0.4345121940643322|
+------+----+------------------+



### Machine Learning with Spark ML

In [10]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import (StringIndexer, OneHotEncoder, VectorAssembler)

In [11]:
hr_data.show(5)

+------------+---------+-----------+-----+----------------+-------------+----+---------+----------+------+
|satisfaction|last_eval|num_project|hours|years_at_company|work_accident|quit|promotion|department|salary|
+------------+---------+-----------+-----+----------------+-------------+----+---------+----------+------+
|        0.38|     0.53|          2|  157|               3|            0|   1|        0|     sales|   low|
|         0.8|     0.86|          5|  262|               6|            0|   1|        0|     sales|medium|
|        0.11|     0.88|          7|  272|               4|            0|   1|        0|     sales|medium|
|        0.72|     0.87|          5|  223|               5|            0|   1|        0|     sales|   low|
|        0.37|     0.52|          2|  159|               3|            0|   1|        0|     sales|   low|
+------------+---------+-----------+-----+----------------+-------------+----+---------+----------+------+
only showing top 5 rows



**Turn String variables into Factors of (1, 2, 3, ...)**

In [12]:
dept_indexer = StringIndexer(inputCol = 'department', outputCol = 'dept_indexer')
salary_indexer = StringIndexer(inputCol = 'salary', outputCol = 'salary_indexer')

**Turn factors into binary (1,0) variables**

In [13]:
dept_encoder = OneHotEncoder(inputCol = 'dept_indexer', outputCol = 'dept_encoder')
salary_encoder = OneHotEncoder(inputCol = 'salary_indexer', outputCol = 'salary_encoder')

**Assemble all indpendent variables into one data object**

In [14]:
assembler = VectorAssembler(inputCols = ['dept_encoder','salary_encoder','satisfaction',
                                        'last_eval','num_project','hours','years_at_company',
                                        'work_accident','promotion'],
                            outputCol = 'features') 

**Initialize Logistic Regression**

In [15]:
from pyspark.ml.classification import LogisticRegression
log_reg = LogisticRegression(labelCol = 'quit')

**Assemble Pipeline**

In [16]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[dept_indexer,salary_indexer,dept_encoder,salary_encoder, assembler, log_reg])

**Train, test, split**

In [17]:
train_hr ,test_hr = hr_data.randomSplit([0.7,0.3])

**Initialize Model Evaluator**

In [18]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
model_eval = BinaryClassificationEvaluator(rawPredictionCol = 'rawPrediction',
                                           labelCol = 'quit')

**Fit the model**

In [19]:
fitted_model = pipeline.fit(train_hr)

**Use fitted model to get predictions**

In [20]:
predictions = fitted_model.transform(test_hr)

In [21]:
predictions['quit','rawPrediction','probability','prediction'].limit(5).toPandas()

   quit                      rawPrediction                       probability  \
0     1  [-0.799246083323, 0.799246083323]  [0.310186812054, 0.689813187946]   
1     1  [-0.686849340204, 0.686849340204]  [0.334734319313, 0.665265680687]   
2     1    [-1.34333017428, 1.34333017428]  [0.206962946034, 0.793037053966]   
3     1    [-0.70476239021, 0.70476239021]  [0.330757191667, 0.669242808333]   
4     1    [-0.98143652301, 0.98143652301]  [0.272606839105, 0.727393160895]   

   prediction  
0         1.0  
1         1.0  
2         1.0  
3         1.0  
4         1.0  

**Find accuracy**

The only to binary classification metrics allowed are "areaUnderROC", "areaUnderPR", the default is AUROC. Below in SparkR I calculate accuracy so this isn't an apples to apples comparision. 

In [22]:
not_accuracy = model_eval.evaluate(predictions)
print not_accuracy

0.829864990689


This is the apples to apples comparison

In [23]:
# Total number of rows
float(predictions.count())

4474.0

In [24]:
# Number of rows where the prediction is correct
predictions.filter(((predictions.quit==1) & (predictions.prediction==1)) | \
                   ((predictions.quit==0) & (predictions.prediction==0))).count()

3533

In [25]:
accuracy = predictions.filter(((predictions.quit==1) & (predictions.prediction==1)) | \
                   ((predictions.quit==0) & (predictions.prediction==0))).count() / float(predictions.count())
print accuracy

0.789673670094
