# Machine learning - Student notebook
   
   - Dataset, RDD APIs
   - A dataframe is nothing but a table with columns, rows and headers
   - In this notebook, we will work through a dataset to demostrate spark's SQL-like abilities. We will also look at its machine learning capabilities in context of this dataset.
    
### Dataset
- The chosen dataset is that of Breast Cancer. This set is collected from digitized image of a fine needle aspirate (FNA) of a breast mass. For more information about this dataset, please visit https://www.kaggle.com/uciml/breast-cancer-wisconsin-data
- Each row contains information about a single breast mass along with a diagnosis of malignant/benign for this mass
    
### Problem statement
- We need to learn patterns describing malignant and benign masses and need to be able to place any future samples in either of these buckets.  
- Thus, the problem is a classification one where given a set of probable results, we need to pick one result with confidance. 
    
### Solution
   We will go through the following steps:
   - Read data (Spark SQL)
   - Feature Engineering (Accumulators, Broadcasters, ml/mllib APIs)
   - Data Visualizations (PixieDust, Seaborn)
   - Modeling (Spark ML)
   - Evaluation and prediction (Spark ML)
   - Deployment (Watson ML repository)

In [1]:
from IPython.display import display
from IPython.core.display import HTML 

In [2]:
!pip install seaborn



#### Machine Learning process ###

<center><img width="600px" height="600px" src="https://raw.githubusercontent.com/martinkearn/Content/master/Blogs/Images/MLProcess.PNG"></center>

### Get dataset
!wget https://raw.githubusercontent.com/joshishwetha/dsx-spark/master/data.csv

### Read dataset

#### copy and paste the following code
    data = spark.read.csv('data.csv',inferSchema='true',header='true')
    data = data.drop('_c32')

## Data Exploration ###

<center><img width="450px" height="450px" margin="auto" src="https://img.clipartfest.com/f92b25f421c985eed2ccb12cdb4cbf54_vector-clip-art-cartoon-safari-kids-cartoon-clipart_800-557.jpeg"></center>

#### Basic operations on dataframe 
- To view contents: df.show(), df.take(n)
- To transform columns: df.withColumn ("column_name","transformation")
- To rename columns: df.withColumnRenamed("old_name","new_name")

#### Print first 3 rows of the input dataset ###

    data.toPandas()[:3]

### Check datatypes
data.dtypes

#### Make sure the data types are correct ####

    from pyspark.sql.types import FloatType, StringType, IntegerType, DoubleType, ArrayType

    for col in data.columns:
        if col not in ['id','diagnosis']:
            data = data.withColumn(col,data[col].cast(FloatType()))

#### Replace null values with 0
    data = data.na.fill(0)

### SQL

#### Register dataframe as a temp table to query from (write sql on dataframes)
    data.registerTempTable('cancer_data')

#### Querying data
- NOTE that the returned object is another Dataframe
- One nice feature of the notebooks and python is that we can show it in a table via Pandas
- Remember to perform an action to get your results (sql queries are also transformations :))

  
    query = """
    select
        diagnosis ,
        count(1) as diagnosis_count
    from cancer_data
    group by diagnosis 
    """
    spark.sql(query).toPandas()


## Feature Engineering
   - Dataset = Features + target
   - Transforming features (categorical to numeric, continous to bins, scaling, normalization etc...)
   - Selecting a subset of columns for wide datasets
   - Exploding columns to make additional (synthetic features) for small datasets

### Feature set is numeric

##### Step 1: Make sure your target is numeric
- "target" is categorical (malignant/benign)
- Our encoding: 1 = malignant(M), 0 = Benign(B) using sql like "when" statement

### Review your data
#### Sample your data to view what it looks like before we make changes to the target column

    data.select('diagnosis','radius_se').show(3)

### Make label numeric

##### copy and paste to run code
    import  pyspark.sql.functions as F

    data = data.withColumn('diagnosis',F.when(data.diagnosis=='M',1).otherwise(0))
    data.toPandas().head(3)

#### Rename column from "diagnosis" to "label"

    data = data.withColumnRenamed('diagnosis','label')

***********************************************************************************

###   Detour  
- Feature engineering is a major task and often involves complicated scripts

What happens when
   - Have custom script based on other datasets/static variables?
   - I want to port over a current script to spark ?

### Accumulators, Broadcasters
- One of the most basic things in scripts are variables (data and metadata). 
- Data vars = DataFrames/RDDs in spark
- Metadata vars = Accumulators/broadcasters 
   
- Metadata vars are not straightforward in the distributed world


##### Accumulators
   - Global variables which can be written into
   
##### Broadcasters
   - Global variables to be read from 
************************************************************************************************

#### Lets run a simple accumulator
- Problem: Count the number of benign and malignant cases

In [1]:
def count_labels(row):
    global benign, malignant
    if row.label==1:
        malignant.add(1)
    else:
        benign.add(1)

#### copy and run this code
    benign = sc.accumulator(0)
    malignant = sc.accumulator(0)

    data.rdd.foreach(count_labels)

    benign.value, malignant.value

#### Broadcast the data variable so that each executor has its own version of it
   - This way, it does need not be shipped to the executor with every call
   - Saves network bandwidth

### Collect statistics for your dataset ###
- Collect statistics for mean/std
- store it in a variable called "df"


        df = data.describe()
        df.toPandas()

#### Calculate correlations of each columns w.r.t the target
- Change to do this in spark



#### Select a subset of columns
- Remove columns conveying very little information
   - Zero variance columns
   - Low correlation columns

In [3]:
df = data.toPandas()
col_corr = sorted(df.corr()['label'].to_dict().items(),key=lambda x:x[1],reverse=True)
for col in col_corr[-5:]:
    if col[0]!='id':
        data = data.drop(col[0])

- Remove columns with low correlation, such columns contribute to very little information
- Caution: Be wary of multicolinearity ("leaky" columns)

## Lets do some graphs!

###  Here, we look at the columns and their correlations with the target variable.
- We use PixieDust (charting library) for quick charts
- Also takes RDDs as inputs, most of the other charting librabries take pandas dataframes as inputs


##### copy and run code from here
    import pixiedust
    import pandas as pd
    
    df = pd.DataFrame(col_corr)
    df = df.dropna()
    df.columns = ['name','correlation']
    viz_spark_df = sqlContext.createDataFrame(df)

   
    display(viz_spark_df)

### Visualize correlations ####
 - Useful for datasets with small number of columns
 - A heatmap is useful to visualize how variables are related to each other (and not just the target)
 - Very useful while doing NLP applications to visualize similarity between documents (after using TFIDF)
 

In [2]:
import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.mllib.stat import Statistics

plt.style.use('ggplot')
values = data.rdd.map(lambda x:list(x.asDict().values()))
corr_values = Statistics.corr(values)
names = data.rdd.map(lambda x:list(x.asDict().keys())).first()

plt.figure(figsize=(10,10))
sns.set_style("darkgrid")
sns.heatmap(corr_values,xticklabels=names,yticklabels=names,square=True,vmin=0, vmax=1,
                cmap="YlGnBu")

### Vizualize existing classes

### PCA  - Dimensionality Reduction 
 - Very handy for visualization!

In [26]:
###### PCA Visualize data ######
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import UserDefinedFunction
from pyspark.ml.feature import VectorAssembler 

feature_cols = list(filter(lambda x:x not in ['id','label','dummy'],data.columns))
assembler = VectorAssembler(inputCols=feature_cols,outputCol='features')
df = assembler.transform(data)

pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

pca_result = model.transform(df)
result = pca_result.select("pcaFeatures")


### UserDefined Functions

In [9]:
import regex

def g(point,co_ord):
    if co_ord=='x':
        value = regex.split('\s+',point)[0][1:]
    else:
        value =  regex.split('\s+',point)[1][:-1]
    return value

get_x_coord = UserDefinedFunction(lambda point:g(point,'x'), StringType())
get_y_coord = UserDefinedFunction(lambda point:g(point,'y'), StringType())
convert_to_string = UserDefinedFunction(lambda point:str(point.toArray()),StringType())

pca_result = pca_result.withColumn('pcaFeatures_string',convert_to_string(pca_result['pcaFeatures']))
pca_result = pca_result.withColumn('x_coord',get_x_coord(pca_result['pcaFeatures_string']))\
                       .withColumn('y_coord',get_y_coord(pca_result['pcaFeatures_string']))\
                      
pca_result = pca_result.withColumn('x_coord',pca_result.x_coord.cast(FloatType()))\
                       .withColumn('y_coord',pca_result.y_coord.cast(FloatType()))
    
viz_df = pca_result.select('x_coord','y_coord','label').toPandas()
a = sns.lmplot(x='x_coord',y='y_coord',hue='label',data=viz_df,
           fit_reg=False,palette={1:'red',0:'green'},size=6,aspect=1)

## We are all set to do some machine learning!

<center><img width="350px" height="350px" margin="auto" src="http://www.clipartkid.com/images/127/cartoon-explorer-characters-vectors-6cbSmI-clipart.jpg"></center>

## Module layout
##### It divides into two packages:
- spark.mllib contains the original API built on top of RDDs.
- spark.ml provides higher-level API built on top of DataFrames for constructing ML pipelines.
- Using spark.ml is recommended because with DataFrames the API is more versatile and flexible. But we will keep supporting spark.mllib along with the development of spark.ml. Users should be comfortable using spark.mllib features and expect more features coming.

    
    
#### A common receipe
- Make sure your data is numeric 
- Collect feature columns
- Define transformations (Vector Indexer, Encoder)
- Assemble columns into a single Vector column (VectorAssembler)
- Split data into test and train
- Define pipelines and estimators

### Prepare the dataset for machine learning
    - Split data between test and train (70/30 split)
    - Apply logistic regression and decision tree classifiers

#### Convert 'label' to DoubleType()


In [35]:
data = data.withColumn('label',data['label'].cast(DoubleType()))

#### Split dataset into test and train sets (70/30) splits

In [36]:
train, test = data.randomSplit([0.7,0.3])

#### Count number of records in test and train sets

In [37]:
train.count(), test.count()

(399, 170)

### Machine learning Pipelines
- Easy to use API
- Define stages to transform your dataset
- Select any estimator (model) for prediction 
- Each pipeline will model a single estimator onto your model


<center><img src="https://image.slidesharecdn.com/mlwithapachespark-2-160818115257/95/introduction-to-ml-with-apache-spark-mllib-47-638.jpg?cb=1490306278" width="500px" height="500px"></center>

In [38]:
from pyspark.ml.pipeline import Pipeline

#### Define Assembler to combine feature into a single vetor (feature vector! :) )

In [39]:
from pyspark.ml.linalg import Vectors #linear algebra package, has matrices, arrays, Vectors (dense and sparse)
from pyspark.ml.feature import VectorAssembler 

feature_cols = list(filter(lambda x:x not in ['id','label'],data.columns))
assembler = VectorAssembler(inputCols=feature_cols,outputCol='features')

## Classification Pipelines 
- Logistic Regression
- Tree based classifier

- Classification models work towards making a Yes/No decision
- Metrics to evaluate the model: Precision/Recall, AUC


<center><img width="500px" height="500px" src="http://opexanalytics.com/cnt/uploads/2016/01/Red-Fish-High-Recall.jpg"></center>

In [40]:
#### Classification metrics ####
def calc_metrics(results):
    metrics = {}
    metrics['tp_0'] = results.filter((results.label==0)&(results.prediction==0)).count()
    metrics['fn_0'] = results.filter((results.label==0)&(results.prediction==1)).count()
    metrics['tn_0'] = results.filter((results.label==1)&(results.prediction==1)).count()
    metrics['fp_0'] = results.filter((results.label==1)&(results.prediction==0)).count()
    
    metrics['tp_1'] = results.filter((results.label==1)&(results.prediction==1)).count()
    metrics['fn_1'] = results.filter((results.label==1)&(results.prediction==0)).count()
    metrics['tn_1'] = results.filter((results.label==0)&(results.prediction==0)).count()
    metrics['fp_1'] = results.filter((results.label==0)&(results.prediction==1)).count()
    
    return metrics
        
### calc precision & recall ###
def precision_recall(results):
    items = calc_metrics(results)
    pre_0 = items['tp_0']/float((items['tp_0']+items['fp_0']))
    pre_1 = items['tp_1']/float((items['tp_1']+items['fp_1']))
    
    recall_0 = items['tp_0']/float((items['tp_0']+items['fn_0']))
    recall_1 = items['tp_1']/float((items['tp_1']+items['fn_1']))
    
    return {'pre_0':pre_0, 'recall_0':recall_0,'pre_1':pre_1,'recall_1':recall_1}

##### Logistic regression #####

In [4]:
from pyspark.ml.classification import LogisticRegression

#define estimator and fit data
estimator = LogisticRegression()
pipeline = Pipeline(stages=[assembler,estimator])
lr_model = pipeline.fit(train)

#get results
results = lr_model.transform(test)
precision_recall(results)

In [5]:
### Visualize results from logistic regression ###
pca_subset = pca_result.select('id','features','x_coord','y_coord')
viz_df = pca_subset.join(results,on=['id','features']).select('x_coord','y_coord','prediction','label')
viz_df = viz_df.dropna(how='any',subset=['x_coord','y_coord']).toPandas()

a = sns.lmplot(x='x_coord',y='y_coord',hue='prediction',col='label',data=viz_df,
           fit_reg=False,palette={1:'red',0:'green'},size=6,aspect=1)
a.set_titles(col_template=['Decision Tree results when label=="Benign"','Decision Tree results when label=="Malignant"'])

#### Tree classifier ####
- Can you do the same using a decicion tree classifier?

In [6]:
from pyspark.ml.classification import DecisionTreeClassifier
# define estimator and fit data
estimator = DecisionTreeClassifier()
pipeline = Pipeline(stages=[assembler,estimator])
tree_model = pipeline.fit(train)

# get results
results = tree_model.transform(test)
precision_recall(results)

### Visualize the results of tree classifier

In [7]:
### Visualize the results of tree classifier ###
pca_subset = pca_result.select('ID','features','x_coord','y_coord')
viz_df = pca_subset.join(results,on=['ID','features']).select('x_coord','y_coord','prediction','label')
viz_df = viz_df.dropna(how='any',subset=['x_coord','y_coord']).toPandas()

a = sns.lmplot(x='x_coord',y='y_coord',hue='prediction',col='label',data=viz_df,
           fit_reg=False,palette={1:'red',0:'green'},size=6,aspect=1)

### Clustering Pipeline
- K-means clustering

In [8]:
######## Kmeans clustering, join to the PCA set #########
from numpy import array
from math import sqrt
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=2,featuresCol='features')
model = kmeans.fit(pca_result)
centers = model.clusterCenters()
kmeans_result = model.transform(pca_result).select('id','features','prediction')

viz_df = pca_result.join(kmeans_result,on=['id','features'],how='inner')\
                   .select('id','features','x_coord','y_coord','prediction','label')

viz_df = viz_df.dropna(how='any',subset=['x_coord','y_coord']).toPandas()

sns.lmplot(x='x_coord',y='y_coord',hue='prediction',col='label',data=viz_df,
           fit_reg=False,palette={1:'red',0:'green'},size=6,aspect=1)

### Deploy to Cloud by using Watson Machine Learning Repo
   Please follow demonstration! :D
