### 1. Purpose
The purpose of this document is to provide a set of problems to be solved by Vodafone Advanced Data Analytics
candidates as a means of skill assessment.
### 2. Instructions
Solve the problems as directed below.

#### 2.1. Code base
- Python>=3.6
- Include requirements.txt file in each repository

### 2.2. Submission
Each solution should be uploaded as a GitHub repository that will be deleted after
assessment.

Provide the repository links to repos in an email to mo.namazi@tpgtelecom.com.au and cc jiaxi.li@tpgtelecom.com.au / mahati.suvvari@tpgtelecom.com.au

#### 2.3. Data
The data for the problems can be found under the `data/` folder

#### 2.4. Guidelines
General solutions to these problems may be found available on the internet, feel free to
leverage these, however keep in mind we are looking for out-of-the-box thinking as well as
neat and scalable code.
Focus on the areas you are skilled in.
You will be asked to explain your code in full detail.
This is an opportunity to show your skills, as much as pass a test, we do not have perfect
solutions in mind.

### Dataset Description

We provide an open source dataset for cardiac arrhythmia classification. The dataset contains 452 patients and 279 attributes

The dataset is provided in 2 different files:
- `arrhythmia.data`

- `arrhythmia.names`

### Task 1 - Modelling

Build a standalone python program (.py) that can be executed in command-line (such as Terminal, Powershell, etc.).

`a.` Explore the dataset on the basic statistics, produce a classification model to predict the different classes of Arrhythmia, and present the findings as well as the performance of the model.

---

`Requirement.` **Using PySpark to build the application**

In [None]:
#TODO 

In [None]:
def transform_data(path:str)-> None:
    
    
    

In [None]:
%pip install pyspark

In [None]:
#change threhold values - algorithm
#reduce the amount of data for normal class.
# low recall samples, keep not normal class. 

In [None]:
#model need to be flexible. training data 

1. compare model efficency binary classification model
2. tune parameters. 
3. improve recall.
4. data cleaning.

### Task 2 - Deployment

Build a controlled environment to package the above solution, so that this code and model could be easily executed in different platform and servers without manually resolving the dependencies/libraries.

- Build a container environment/application that the training and inferencing workflow could be executed separated in CLI.
- Make sure the application code is modular and easy to read by peers.

Note that, it is important to keep in mind that, the same code need to be executed in our environemnt without changing much to the code submitted.

---

`Hint.` To complete this task, it is expected to include the detailed steps of executing the build of the container environment.

`Requirement.` Using `spark-submit` to start/execute the scripts/application.

In [None]:
#TODO
#see app folder

### Task 3 - Data Analytics

Build interesting insights that you could find from the provided dataset and/or the predictive results.

---

`Hint.` Insights could be presented with or without the results from `Task 1`. Feel free to use any tools and libraries to visualise the insights.

#TODO
1. age, heights, sex, weights, 
2. scalter, boxplot, obeserve data outliers  

In [1]:
import pandas as pd
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df_data = pd.read_csv('data/arrhythmia.data',  header=None)

df_data.groupby(df_data[279]).count().sort_values(0, ascending=False)[0]/len(df_data)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/29 22:20:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/09/29 22:20:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/09/29 22:20:09 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


279
1     0.542035
10    0.110619
2     0.097345
6     0.055310
16    0.048673
3     0.033186
4     0.033186
5     0.028761
9     0.019912
15    0.011062
14    0.008850
7     0.006637
8     0.004425
Name: 0, dtype: float64

There are 16 diffrent calsses of this data set, however, as we can see that about 54% percent of the instances are normal, the rest of samples are in different levels of disorder. In this case, it would be a great challange to classify all 16 levels in one go because the data is extremely imbalanced. Therefore, I would like to initially treat this problem as a binary classification, to predict whether a pateint is normal or has arraythmia. 

In [11]:
no_of_nan = {}
for i in range(0, 279):
    no_of_nan[i] = len(df_data[df_data[i]== '?'])
sorted(no_of_nan.items(), key=lambda item: item[1], reverse=True)[:6]

[(13, 376), (11, 22), (10, 8), (12, 1), (14, 1), (0, 0)]

In [12]:
376/len(df_data)

0.831858407079646

Missing values are in column 11, 10, 12, 13, 14. 
we can see column 13 has over 83% missing data. 
These columns are Vector angles in degrees on front plane of:
10 QRS
11 T
12 P
13 QRST
14 JAs 
Seems like they are related, therefore, I decide to drop all of them.

In [13]:
df_data.drop(columns = [10, 11, 12, 13, 14], inplace=True)

Now, Lets build the new labels for the binary classification.


In [14]:
new_labels = []
#the last column records labels
for label in df_data[279].values:
    if label != 1:
        new_labels.append(0)
    else:
        new_labels.append(1)


In [15]:
#now I want to drop the label column to build the training data.
df_data.drop(columns = 279, inplace = True)

In [16]:
df_data.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,269,270,271,272,273,274,275,276,277,278
0,75,0,190,80,91,193,371,174,121,-16,...,-0.3,0.0,9.0,-0.9,0.0,0.0,0.9,2.9,23.3,49.4
1,56,1,165,64,81,174,401,149,39,25,...,-0.5,0.0,8.5,0.0,0.0,0.0,0.2,2.1,20.4,38.8
2,54,0,172,95,138,163,386,185,102,96,...,0.9,0.0,9.5,-2.4,0.0,0.0,0.3,3.4,12.3,49.0
3,55,0,175,94,100,202,380,179,143,28,...,0.1,0.0,12.2,-2.2,0.0,0.0,0.4,2.6,34.6,61.6
4,75,0,190,80,88,181,360,177,103,-16,...,-0.4,0.0,13.1,-3.6,0.0,0.0,-0.1,3.9,25.4,62.8


In [17]:

from sklearn.preprocessing import normalize
from pyspark.ml.linalg import Vectors
def prepare_data(rawData, normalizer=None):
    if normalizer:
        return normalize(rawData, norm=normalizer)
    else:
        return rawData
    
array = prepare_data(df_data.to_numpy(), normalizer = None)
list_tuples = []
for i in range(0, len(array)):
    list_tuples.append((new_labels[i],Vectors.dense(array[i])))
data = spark.createDataFrame(list_tuples, ["label", "features"])
data.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|[75.0,0.0,190.0,8...|
|    0|[56.0,1.0,165.0,6...|
|    0|[54.0,0.0,172.0,9...|
|    1|[55.0,0.0,175.0,9...|
|    0|[75.0,0.0,190.0,8...|
|    0|[13.0,0.0,169.0,5...|
|    1|[40.0,1.0,160.0,5...|
|    1|[49.0,1.0,162.0,5...|
|    1|[44.0,0.0,168.0,5...|
|    0|[50.0,1.0,167.0,6...|
|    0|[62.0,0.0,170.0,7...|
|    1|[45.0,1.0,165.0,8...|
|    0|[54.0,1.0,172.0,5...|
|    0|[30.0,0.0,170.0,7...|
|    1|[44.0,1.0,160.0,8...|
|    1|[47.0,1.0,150.0,4...|
|    0|[47.0,0.0,171.0,5...|
|    1|[46.0,1.0,158.0,5...|
|    1|[73.0,0.0,165.0,6...|
|    1|[57.0,1.0,166.0,7...|
+-----+--------------------+
only showing top 20 rows



                                                                                

I also want to do feature selections

In [18]:
from pyspark.ml.feature import UnivariateFeatureSelector
selector = UnivariateFeatureSelector(outputCol="selectedFeatures")
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(100)
model = selector.fit(data)
model.getFeaturesCol()
df_selected = model.transform(data)

                                                                                

In [19]:
df_selected.select('features', 'selectedFeatures').show()

+--------------------+--------------------+
|            features|    selectedFeatures|
+--------------------+--------------------+
|[75.0,0.0,190.0,8...|[0.0,91.0,174.0,4...|
|[56.0,1.0,165.0,6...|[1.0,81.0,149.0,0...|
|[54.0,0.0,172.0,9...|[0.0,138.0,185.0,...|
|[55.0,0.0,175.0,9...|[0.0,100.0,179.0,...|
|[75.0,0.0,190.0,8...|[0.0,88.0,177.0,4...|
|[13.0,0.0,169.0,5...|[0.0,100.0,174.0,...|
|[40.0,1.0,160.0,5...|[1.0,77.0,133.0,0...|
|[49.0,1.0,162.0,5...|[1.0,78.0,157.0,3...|
|[44.0,0.0,168.0,5...|[0.0,84.0,160.0,0...|
|[50.0,1.0,167.0,6...|[1.0,89.0,156.0,4...|
|[62.0,0.0,170.0,7...|[0.0,102.0,156.0,...|
|[45.0,1.0,165.0,8...|[1.0,77.0,150.0,2...|
|[54.0,1.0,172.0,5...|[1.0,78.0,163.0,0...|
|[30.0,0.0,170.0,7...|[0.0,91.0,157.0,0...|
|[44.0,1.0,160.0,8...|[1.0,77.0,163.0,0...|
|[47.0,1.0,150.0,4...|[1.0,75.0,169.0,0...|
|[47.0,0.0,171.0,5...|[0.0,82.0,169.0,0...|
|[46.0,1.0,158.0,5...|[1.0,70.0,122.0,0...|
|[73.0,0.0,165.0,6...|[0.0,91.0,175.0,5...|
|[57.0,1.0,166.0,7...|[1.0,82.0,

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import DecisionTreeClassifier

lr = LogisticRegression(maxIter=10)

dt = DecisionTreeClassifier(featuresCol = 'selectedFeatures', labelCol = 'label', maxDepth = 3)
pipeline = Pipeline(stages=[dt])

In [5]:
from dependencies.model import Model

In [6]:
Model('LogisticRegression')

ValueError: dictionary update sequence element #5 has length 1; 2 is required