# Machine Learning and Big Data Analysis course
## Topic: Advanced Big Data analysis techniques
### Part 2. Apache Spark for data processing and ML

### 1. Libraries

In [None]:
import os
import sys
import numpy as np
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import udf
from pyspark.sql import types 
from pyspark.sql.types import *

pd.set_option('display.max_columns', None)

### 2. Data processing with Spark

#### 2.1. Initialize Spark

In [None]:
print('user:', os.environ['JUPYTERHUB_SERVICE_PREFIX'])

def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return '{}proxy/{}/jobs/'.format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)

SparkContext.uiWebUrl = property(uiWebUrl)

conf = SparkConf()
conf.set('spark.master', 'local[5]')           # max 5 cores available
conf.set('spark.driver.memory', '12G')         # max 16 GB available
conf.set('spark.driver.maxResultSize', '1G')   # helps sometime
# You may play with this settings
# but it does not matter for
# standalone Spark installation
#conf.set('spark.driver.memory', '512m')
#conf.set('spark.executor.memory', '2G')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark

In [None]:
sc

#### 2.2. Load data

Will use train dataset for Microsoft Malware Prediction [competition on Kaggle](https://www.kaggle.com/competitions/microsoft-malware-prediction/).

In [None]:
file_path = '../__DATA/malware_prediction.csv'

In [None]:
sdf = spark.read.csv(
    file_path, 
    header=True
)

In [None]:
sdf.printSchema()

In [None]:
# can look ugly for many columns
sdf.show()

In [None]:
sdf.limit(5).toPandas()

#### 2.3. Load many files at once

In [None]:
file_path = '../__DATA/*.csv'
sdf = spark.read.csv(
    file_path, 
    header=True
)

In [None]:
sdf.limit(3).toPandas()

#### 2.4. Basic operations

Note, that we did not call any process computations before!

In [None]:
%%time
sdf.count()

##### Select columns

In [None]:
sdf.columns

In [None]:
sdf.select('Census_GenuineStateName').show()

In [None]:
sdf.select('Wdft_IsGamer', 'Wdft_RegionIdentifier').show()

In [None]:
sdf.select(sdf.Wdft_IsGamer).show()

##### Complex pipelines

In [None]:
sdf \
    .orderBy('Census_InternalPrimaryDiagonalDisplaySizeInInches', ascending=False) \
    .select('MachineIdentifier', 'Census_MDC2FormFactor', 'Census_InternalPrimaryDiagonalDisplaySizeInInches') \
    .limit(10) \
    .toPandas()

In [None]:
sdf \
    .orderBy('Census_InternalPrimaryDiagonalDisplaySizeInInches', ascending=True) \
    .select('MachineIdentifier', 'Census_MDC2FormFactor', 'Census_InternalPrimaryDiagonalDisplaySizeInInches') \
    .limit(10) \
    .toPandas()

In [None]:
sdf \
    .limit(1000) \
    .select('Census_InternalPrimaryDiagonalDisplaySizeInInches') \
    .distinct() \
    .show()

In [None]:
sdf \
    .orderBy('Census_InternalPrimaryDiagonalDisplaySizeInInches', ascending=False) \
    .select('MachineIdentifier', 'Census_MDC2FormFactor', 'Census_InternalPrimaryDiagonalDisplaySizeInInches') \
    .filter(sdf['Census_MDC2FormFactor'] == 'Notebook') \
    .limit(10) \
    .toPandas()

#### 2.5. Spark SQL

[Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) is a Spark module for structured data processing. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API.

In [None]:
sdf.createOrReplaceTempView('malware')

In [None]:
sdf_temp = spark.sql('SELECT * FROM malware LIMIT 10')
sdf_temp.toPandas()

In [None]:
query = '''
SELECT Census_MDC2FormFactor, Census_InternalPrimaryDiagonalDisplaySizeInInches 
FROM malware 
WHERE Census_MDC2FormFactor='Notebook'
LIMIT 10
'''
sdf_temp = spark.sql(query)
sdf_temp.toPandas()

### 3. Machine Learning with Spark

#### 3.1. Exploratory Data Analysis (EDA)

In [None]:
sdf.printSchema()

In [None]:
for dtype in ['BinaryType', 'BooleanType', 'ByteType', 'DateType', 
          'DecimalType', 'DoubleType', 'FloatType', 'IntegerType', 
          'LongType', 'ShortType', 'StringType', 'TimestampType']:
    print(f'{dtype}: {getattr(types, dtype)().simpleString()}')

In [None]:
sdf.limit(1000).describe().toPandas()

In [None]:
dummy_cols = [
    'IsSxsPassiveMode',
    'HasTpm',
    'IsProtected',
    'Firewall',
    'Census_HasOpticalDiskDrive',
    'Census_IsPortableOperatingSystem',
    'Census_IsSecureBootEnabled',
    'Census_IsVirtualDevice',
    'Census_IsTouchEnabled',
    'Census_IsPenCapable',
    'Census_IsAlwaysOnAlwaysConnectedCapable',
    'Wdft_IsGamer',
    'HasDetections'
]
for col in dummy_cols:
    sdf = sdf.withColumn(col, sdf[col].cast(IntegerType()))
    print(col, 'done')

In [None]:
cols_to_bigint = [
    'Census_ProcessorCoreCount',
    'Census_PrimaryDiskTotalCapacity',
    'Census_SystemVolumeTotalCapacity',
    'Census_TotalPhysicalRAM',
    'Census_InternalPrimaryDisplayResolutionHorizontal',
    'Census_InternalPrimaryDisplayResolutionVertical',
    'Census_InternalBatteryNumberOfCharges'
]
for col in cols_to_bigint:
    sdf = sdf.withColumn(col, sdf[col].cast(LongType()))
    print(col, 'done')

cols_to_float = [
    'Census_InternalPrimaryDiagonalDisplaySizeInInches'
]
for col in cols_to_float:
    sdf = sdf.withColumn(col, sdf[col].cast(FloatType()))
    print(col, 'done')

In [None]:
import matplotlib.pyplot as plt

In [None]:
plt.figure(figsize=(16, 4))
plt.hist(
    [
        row['Census_TotalPhysicalRAM'] 
        for row in sdf.limit(10000).select('Census_TotalPhysicalRAM').collect() 
        if row['Census_TotalPhysicalRAM'] is not None
    ], 
    bins=50
)
plt.xticks(rotation=90)
plt.show()

In [None]:
sdf \
    .groupBy('HasDetections') \
    .pivot('Wdft_IsGamer') \
    .count() \
    .show()

In [None]:
sdf \
    .groupBy('HasDetections') \
    .pivot('Wdft_IsGamer') \
    .mean('Census_TotalPhysicalRAM') \
    .show()

In [None]:
sdf \
    .groupBy('HasDetections') \
    .pivot('Wdft_IsGamer') \
    .mean('Census_InternalPrimaryDiagonalDisplaySizeInInches') \
    .show()

In [None]:
sdf \
    .groupBy('HasDetections') \
    .pivot('ProductName') \
    .count() \
    .show()

In [None]:
sdf \
    .groupBy('HasDetections') \
    .pivot('ProductName') \
    .count() \
    .show()

#### 3.2. Buliding and training model

Here is [complete guide](https://spark.apache.org/docs/latest/ml-guide.html)

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
# we use only 10% of total data for training model demo
# you may try to use more data for your experiments
sdf_train = sdf.sample(fraction=.1)
print('training dataset count:', sdf_train.count())

In [None]:
cat_col = 'ProductName'
stringIndexer = StringIndexer(inputCol=cat_col, outputCol=cat_col + '_index')
sdf_train = stringIndexer.fit(sdf_train).transform(sdf_train)
sdf_train.limit(10).toPandas()

In [None]:
encoder = OneHotEncoder(
    inputCol=stringIndexer.getOutputCol(), 
    outputCol=cat_col + '_class_vec'
)
sdf_train = encoder.fit(sdf_train).transform(sdf_train)
sdf_train.limit(10).toPandas()

In [None]:
# we can impute missing data
# and select strategy for each 
# type of a column

imputer = Imputer(
    inputCols=cols_to_bigint, 
    outputCols=cols_to_bigint
)
sdf_train = imputer.setStrategy('median').fit(sdf_train).transform(sdf_train)

imputer = Imputer(
    inputCols=cols_to_float, 
    outputCols=cols_to_float
)
sdf_train = imputer.setStrategy('mean').fit(sdf_train).transform(sdf_train)

imputer = Imputer(
    inputCols=dummy_cols, 
    outputCols=dummy_cols
)
sdf_train = imputer.setStrategy('mode').fit(sdf_train).transform(sdf_train)

In [None]:
cols_to_model = dummy_cols + cols_to_bigint + cols_to_float + [cat_col + '_class_vec']
print('colums to model: ', cols_to_model)

[VectorAssembler](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) is needed as a feature transformer that merges multiple columns into a vector column:

In [None]:
features = sdf_train.select(cols_to_model)
vecAssembler = VectorAssembler(
    inputCols=[c for c in cols_to_model if c != 'HasDetections'], 
    outputCol='features'
)
features_vec = vecAssembler.transform(features)
features_vec = features_vec.withColumnRenamed('HasDetections', 'label')
features_vec.limit(5).toPandas()

In [None]:
# vector assempled is all we need
# to start training with Spark

features_data = features_vec.select('label', 'features')
features_data.limit(5).toPandas()

In [None]:
# Your HOME ASSIGNMENT is to implement
# `RandomForestClassifier` with the grid search 
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.RandomForestClassifier.html
# and to improve final score ROC-AUC

lr = LogisticRegression(maxIter=20)
pipeline = Pipeline(stages=[lr])
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [100., 10., 1., .1, .01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()
cross_val = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=BinaryClassificationEvaluator(),
    numFolds=4)

In [None]:
%%time
feat_train, feat_test = features_data.randomSplit([.8, .2], seed=42)
model = cross_val.fit(feat_train)

#### 3.3. Evaluation

In [None]:
predictions = model.transform(feat_test)
predictionLabels = predictions.select('prediction', 'label')
metrics = BinaryClassificationMetrics(
    predictionLabels.rdd.map(
        lambda lines: [float(x) for x in lines]
    )
)
print('ROC AUC: ', metrics.areaUnderROC)
print('Area under PR-curve: ', metrics.areaUnderPR)