Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE". You can run all the tests with the validate button. If the validate command takes too long, you can also confirm that you pass all the tests if you can run through the whole notebook without getting validation errors.

For this problem set, we'll be using the Jupyter notebook:

![](jupyter.png)

## Mllib exercises

In this notebook you will implement multiple small methods that are used to predict if a customer is going to be a `paid_customer`. We use logistic regression (https://en.wikipedia.org/wiki/Logistic_regression) for solving the classification problem.

We will use a sample of data from http://cs.hut.fi/u/arasalo1/resources/osge_pool-1-thread-1.data.zip.

Your task is to create a machine learning pipeline that transform the data so that mllib's logistic regression can make predictions on if a customer is going to pay for the service. First method `convert` transforms the file into a dataframe so that it is easier to process. Categorical features are transformed using method `indexer`. `featureAssembler` creates a single feature vector. Most mllib's machine learning algorithms require this. `scaler` scales the variables. `createModel`creates and trains the logistic regression model. Training data has been transformed properly using the previous methods. Finally, `predict` is used to make predictions whether the user is a paying customer using the trained model.

Note: try to avoid additional imports as it can cause problems with server tests.

### Data schema

| column_header | type |description |
| :------------- | :--- | :----------- |
| cid | uuid | customer id |
| cname | string | name of the user |
| email | string | email address of the user |
| gender | string | customer's gender |
| age | int | age of the customer |
| address | string | user provided address during registration, stores only US based addresses other countries gets 'N/A' |
| country | string | country to which customer belongs to |
| register_date | long | date on which user registered with us in milliseconds |
| friend_count | int | number of friends a user has |
| lifetime | int | number of days a user has been active since registration date |
| citygame_played | int | number of times citygame has been played by user |
| pictionarygame_played | int | number of times pictionary game has been played by user |
| scramblegame_played | int | number of times scaramble game has been played by user |
| snipergame_played | int | number of times sniper game has been played by user |
| revenue | int | revenue generated by the user |
| paid_subscriber | string | whether the customer is paid customer or not, represented by `yes` or `no` |


Use Spark machine learning library mllib's Binomial Logistic Regression algorithm.  
https://spark.apache.org/docs/latest/ml-classification-regression.html#binomial-logistic-regression

Use these features for training your model:
* gender 
* age
* country
* friend_count
* lifetime
* citygame_played
* pictionarygame_played
* scramblegame_played
* snipergame_played
* paid_subscriber(this is the feature to predict)

The data contains categorical features, so you need to change them accordingly.  
https://spark.apache.org/docs/latest/ml-features.html

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import *
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.sql.types import *

spark = SparkSession.builder\
    .master("local")\
    .appName("gaming")\
    .config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.shuffle.service.enabled", "true")\
    .getOrCreate()

sampleDataPath = "testData.data"

In [None]:
#Generate random sample
import random

randomData = "randomsample.data"

with open(sampleDataPath) as sampleFile:
    lines = random.sample(sampleFile.readlines(), 4000)

outF = open(randomData, "w")
outF.writelines(lines)
outF.close()


## Convert
`convert` creates a dataframe, removes unnecessary colums and converts the rest to right format.   
Data schema:
* gender: Double (1 if male else 0)
* age: Double
* country: String
* friend_count: Double
* lifetime: Double
* game1: Double (citygame_played)
* game2: Double (pictionarygame_played)
* game3: Double (scramblegame_played)
* game4: Double (snipergame_played)
* paid_customer: Double (1 if yes else 0)  

The function already creates a SQL table called "gaming", your job is to remove unneccesary columns and convert the rest to right format. Hint: SQL `SELECT` query and `CAST`. You will also need to use `IF` to properly parse and read some of the variables. e.g. `IF(gender='male',1,0)`.

param `path`: path to file  
`return`: converted DataFrame

In [None]:

def convert(path):
    originalCols = StructType([\
    StructField("session_id", StringType(),False),\
    StructField("cname", StringType(),False),\
    StructField("email",StringType(),False),\
    StructField("gender",StringType(),False),\
    StructField("age",DoubleType(),False),\
    StructField("address",StringType(),False),\
    StructField("country",StringType(),True),\
    StructField("register_date",StringType(),False),\
    StructField("friend_count",DoubleType(),False),\
    StructField("lifetime",DoubleType(),False),\
    StructField("game1",DoubleType(),False),\
    StructField("game2",DoubleType(),False),\
    StructField("game3",DoubleType(),False),\
    StructField("game4",DoubleType(),False),\
    StructField("revenue",DoubleType(),False),\
    StructField("paid_customer",StringType(),False)])
    data = spark.read.option("header","false").schema(originalCols).csv(path)
    data.createOrReplaceTempView("gaming")
    # Select only needed columns in the required order and cast/convert types
    converted = data.selectExpr(
        "IF(gender='male', 1.0, 0.0) as gender",
        "CAST(age as double) as age",
        "CAST(country as string) as country",
        "CAST(friend_count as double) as friend_count",
        "CAST(lifetime as double) as lifetime",
        "CAST(game1 as double) as game1",
        "CAST(game2 as double) as game2",
        "CAST(game3 as double) as game3",
        "CAST(game4 as double) as game4",
        "CAST(IF(paid_customer='yes', 1.0, 0.0) as double) as paid_customer"
    )
    return converted

In [None]:
data = convert(sampleDataPath)
data.cache()
data.show()

In [None]:
'''convert tests'''
correctCols = StructType([\
StructField("gender",DoubleType(),False),\
StructField("age",DoubleType(),True),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),True),\
StructField("lifetime",DoubleType(),True),\
StructField("game1",DoubleType(),True),\
StructField("game2",DoubleType(),True),\
StructField("game3",DoubleType(),True),\
StructField("game4",DoubleType(),True),\
StructField("paid_customer",DoubleType(),False)])

fakeData = [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0)]

fakeDf = spark.createDataFrame(fakeData, correctCols)

assert data.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, data.dtypes)

test1 = str(data.sample(False, 0.01, seed=12345).limit(1).first())
correct1 = "Row(gender=1.0, age=20.0, country='UK', friend_count=2.0, lifetime=4.0, game1=0.0, game2=0.0, game3=0.0, game4=4.0, paid_customer=0.0)"
assert test1 == correct1, "the row was expected to be %s but it was %s" % (correct1, test1)


## Indexer
`indexer` converts categorical features into doubles.  
https://spark.apache.org/docs/latest/ml-features.html#stringindexer  
`country` is the only categorical feature.  
After these modifications schema should be:
  * gender: Double (1 if male else 0)
  * age: Double
  * country: String
  * friend_count: Double
  * lifetime: Double
  * game1: Double (citygame_played)
  * game2: Double (pictionarygame_played)
  * game3: Double (scramblegame_played)
  * game4: Double (snipergame_played)
  * paid_customer: Double (1 if yes else 0)
  * country_index: Double
  
param `df`: DataFrame  
`return`: transformed Dataframe. The returned dataframe should have a new column called "country_index".

In [None]:
def indexer(df):
    indexer = StringIndexer(inputCol='country', outputCol='country_index')
    model = indexer.fit(df)
    return model.transform(df)

In [None]:
indexed = indexer(data)
indexed.show()

In [None]:
'''indexer tests'''
correctCols = StructType([\
StructField("gender",DoubleType(),False),\
StructField("age",DoubleType(),False),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),False),\
StructField("lifetime",DoubleType(),False),\
StructField("game1",DoubleType(),False),\
StructField("game2",DoubleType(),False),\
StructField("game3",DoubleType(),False),\
StructField("game4",DoubleType(),False),\
StructField("paid_customer",DoubleType(),False),\
StructField("country_index",DoubleType(),False)])

fakeData = [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0)]

fakeDf = spark.createDataFrame(fakeData, correctCols)

assert indexed.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, indexed.dtypes)

test2 = str(indexed.sample(False, 0.01, seed=12345).limit(1).first())
correct2 = "Row(gender=1.0, age=20.0, country='UK', friend_count=2.0, lifetime=4.0, game1=0.0, game2=0.0, game3=0.0, game4=4.0, paid_customer=0.0, country_index=1.0)"
assert test2 == correct2, "the row was expected to be %s but it was %s" % (correct2, test2)


## Feature Assembler
`featureAssembler` combines features into one vector. Most mllib algorithms require this step.  
https://spark.apache.org/docs/latest/ml-features.html#vectorassembler  
In this task your vector assembler should take and combine the following columns in the same order listed:  
```["gender", "age","friend_count","lifetime","game1","game2","game3","game4","country_index"]```.

param `df`: Dataframe that is transformed using indexer  
`return` transformed Dataframe. The returned dataframe should have a new column called "features"

In [None]:
def featureAssembler(df):
    assembler = VectorAssembler(inputCols=[
        'gender', 'age', 'friend_count', 'lifetime', 'game1', 'game2', 'game3', 'game4', 'country_index'
    ], outputCol='features')
    return assembler.transform(df)

In [None]:
assembled = featureAssembler(indexed)
assembled.show()

In [None]:
'''assembler schema test'''
from pyspark.ml.linalg import *
from pyspark.ml.linalg import VectorUDT

correctCols = StructType([\
StructField("gender",DoubleType(),False),\
StructField("age",DoubleType(),False),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),False),\
StructField("lifetime",DoubleType(),False),\
StructField("game1",DoubleType(),False),\
StructField("game2",DoubleType(),False),\
StructField("game3",DoubleType(),False),\
StructField("game4",DoubleType(),False),\
StructField("paid_customer",DoubleType(),False),\
StructField("country_index",DoubleType(),False),\
StructField("features", VectorUDT(),True)])

fakeData = [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0,(Vectors.dense([1.0, 2.0])))]

fakeDf = spark.createDataFrame(fakeData, correctCols)

assert assembled.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, assembled.dtypes)

test3 = str(assembled.sample(False, 0.01, seed=12345).limit(1).first())
correct3 = "Row(gender=1.0, age=20.0, country='UK', friend_count=2.0, lifetime=4.0, game1=0.0, game2=0.0, game3=0.0, game4=4.0, paid_customer=0.0, country_index=1.0, features=DenseVector([1.0, 20.0, 2.0, 4.0, 0.0, 0.0, 0.0, 4.0, 1.0]))"
assert test3 == correct3, "the row was expected to be %s but it was %s" % (correct3, test3)


## Scaler
`scaler` standardizes data to improve performance.  
https://spark.apache.org/docs/latest/ml-features.html#standardscaler  
For this task please remember to set the `withStd` and `withMean` parameters to true.

param `df` Dataframe that is transformed using featureAssembler  
param `outputColName` name of the scaled feature vector (output column name)  
`return` transformed Dataframe. The returned dataframe should have a new column named after the passed `outputColName` parameter. 

In [None]:
def scaler(df, outputColName):
    sc = StandardScaler(inputCol='features', outputCol=outputColName, withStd=True, withMean=True)
    model = sc.fit(df)
    return model.transform(df)

In [None]:
scaled = scaler(assembled, "scaledFeatures")
scaled.show()

In [None]:
'''scaler schema test'''
correctCols = StructType([\
StructField("gender",DoubleType(),False),\
StructField("age",DoubleType(),False),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),False),\
StructField("lifetime",DoubleType(),False),\
StructField("game1",DoubleType(),False),\
StructField("game2",DoubleType(),False),\
StructField("game3",DoubleType(),False),\
StructField("game4",DoubleType(),False),\
StructField("paid_customer",DoubleType(),False),\
StructField("country_index",DoubleType(),False),\
StructField("features", VectorUDT(),True),\
StructField("scaledFeatures", VectorUDT(),True)])

fakeData = [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0,(Vectors.dense([1.0, 2.0])),(Vectors.dense([2.0, 0.0])))]

fakeDf = spark.createDataFrame(fakeData, correctCols)

assert scaled.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, scaled.dtypes)

test4 = str(scaled.sample(False, 0.01, seed=12345).limit(1).first())
correct4 = "Row(gender=1.0, age=20.0, country='UK', friend_count=2.0, lifetime=4.0, game1=0.0, game2=0.0, game3=0.0, game4=4.0, paid_customer=0.0, country_index=1.0, features=DenseVector([1.0, 20.0, 2.0, 4.0, 0.0, 0.0, 0.0, 4.0, 1.0]), scaledFeatures=DenseVector([0.9008, -0.6236, -0.5183, -0.6848, -0.5844, -0.6369, -0.7638, -0.3154, -0.1343]))"
assert test4 == correct4, "the row was expected to be %s but it was %s" % (correct4, test4)


## Create Model
`createModel` creates a Logistic Regression model. When training, 5 iterations should be enough.  
https://spark.apache.org/docs/latest/ml-classification-regression.html#binomial-logistic-regression

param `training` transformed dataframe  
param `featuresCol` name of the features column  
param `labelCol` name of the label col (paid_customer)  
param `predCol` name of the prediction col  
`return` trained Logistic Regression model

In [None]:
def createModel(training, featuresCol, labelCol, predCol):
    lr = LogisticRegression(maxIter=5, featuresCol=featuresCol, labelCol=labelCol, predictionCol=predCol)
    model = lr.fit(training)
    return model

In [None]:
#split the dataset into training(70%) and prediction(30%) sets
splitted = scaled.randomSplit([0.7,0.3])

model = createModel(splitted[0],"scaledFeatures","paid_customer","prediction")

## Predict
Given a transformed and normalized dataset `predict` predicts if the customer is going to subscribe to the service.

85% correct will give you 3 points (all tests pass).  
70% correct will give you 2 points.  
50% correct will give you 1 point.

param `model` trained logistic regression model  
param `dataToPredict` normalized dataframe for prediction  
`return` DataFrame with predicted scores (1.0 == yes, 0.0 == no)  

In [None]:
def predict(model, dataToPredict):
    return model.transform(dataToPredict)

In [None]:
predictions = predict(model, splitted[1])
correct = predictions.where("prediction == paid_customer").count()
total = predictions.count()
print((correct / total) * 100, "% predicted correctly")
predictions.show()

In [None]:
'''prediction correctness test'''
data = convert(randomData)
data.cache()
indexed = indexer(data)
assembled = featureAssembler(indexed)
scaled = scaler(assembled, "scaledFeatures")
splitted = scaled.randomSplit([0.7,0.3])
model = createModel(splitted[0],"scaledFeatures","paid_customer","prediction")
predictions = predict(model, splitted[1])
correct = predictions.where("prediction == paid_customer").count()
total = predictions.count()
answer = (correct / total) * 100
print(answer, "% predicted correctly")
assert answer >= 50.0, "less than 50% predicted correctly, you get 0 points"

In [None]:
assert answer >= 70.0, "less than 70% predicted correctly, you get 1 point"

In [None]:
assert answer >= 85.0, "less than 85% predicted correctly, you get 2 points"

In [None]:
spark.stop()