## import modules

In [7]:
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
import pandas as pd
import numpy as np

In [8]:
sc = SparkContext()
sqlContext = SQLContext(sc)

## read csv with library

In [11]:
df = sqlContext.read.format('com.databricks.spark.csv')\
                    .options(header = 'true', inferSchema = 'true')\
                    .load('songhunhwa.github.com\\tutorial\\tutorial_01\\doc_use_log.csv')\
                    .cache()

In [12]:
df.printSchema()

root
 |-- actiontype: string (nullable = true)
 |-- ismydoc: boolean (nullable = true)
 |-- ext: string (nullable = true)
 |-- sessionid: string (nullable = true)
 |-- documentposition: string (nullable = true)
 |-- datetime: string (nullable = true)



In [13]:
df.count()

301861

## convert df to tmp table ( if in database)

In [14]:
df.registerTempTable("df_tmp")

## extract data from table with sql

In [15]:
df1 = sqlContext.sql("select ismydoc, actiontype, sessionid, datetime from df_tmp where ismydoc = true")

## other sql examples

In [19]:
sqlContext.sql("select datetime, count(1) from df_tmp group by datetime  order by datetime").show()

+---------+--------+
| datetime|count(1)|
+---------+--------+
| 2016.7.1|   11836|
|2016.7.10|    6750|
|2016.7.11|   10075|
|2016.7.12|   12340|
|2016.7.13|   12159|
|2016.7.14|   11721|
|2016.7.15|   11057|
|2016.7.16|    8513|
|2016.7.17|    6906|
|2016.7.18|    9987|
|2016.7.19|   11872|
| 2016.7.2|    8884|
|2016.7.20|   11912|
|2016.7.21|   11599|
|2016.7.22|   11250|
|2016.7.23|    8390|
|2016.7.24|    6813|
|2016.7.25|   10081|
|2016.7.26|   12034|
|2016.7.27|   11593|
+---------+--------+
only showing top 20 rows



In [20]:
sqlContext.sql("select count(distinct sessionid) as session_cnt from df_tmp where documentposition = 'MYPOLARISDRIVE' group by ext having count(distinct sessionid) ").show()

+-----------+
|session_cnt|
+-----------+
|       5383|
|       2172|
|       1155|
|          1|
|       2260|
|       1502|
|          3|
|       1669|
|          1|
|       5860|
|       6184|
|       1304|
|        390|
|         47|
+-----------+



In [21]:
print(df.count())
print(df1.count())

301861
118732


## Lazy Execution

In [22]:
df2 = sqlContext.sql("select * from df_tmp")

In [31]:
df2_pdf = df2.select("sessionid", "ext").filter(" ext == 'PDF' or ext == 'DOC'").dropDuplicates().cache()
df2.distinct().count()

301833

In [32]:
df2.count()

301861

In [34]:
df2_min_date = df2.groupby("sessionid").agg(min("datetime").alias("min_date"))
df2_min_date.show()

+--------------------+---------+
|           sessionid| min_date|
+--------------------+---------+
|000ad8bfdff1ac4ab...|2016.7.17|
|008e1cb0462257ec9...| 2016.7.3|
|013f6c2b225e3367d...|2016.7.13|
|01ef158e70ef4ce7b...| 2016.7.9|
|02bb1592965d0dd8c...|2016.7.20|
|03288dde777589053...| 2016.7.6|
|0376d4749a8d1d710...|2016.7.24|
|0378158b8aee19123...| 2016.7.1|
|045c6e82a181152ae...|2016.7.27|
|04917f8f10bf22dab...|2016.7.21|
|04e0324d6a64c743d...|2016.7.19|
|050a25d3b0bf9081e...|2016.7.20|
|057cd5d22a7ebcf2b...|2016.7.28|
|05b3df1764b2f8435...|2016.7.13|
|06554b488500b58ea...|2016.7.15|
|065f2825395cd914f...|2016.7.21|
|07bdb289d126fa929...| 2016.7.6|
|082e52a48bc92a8fa...|2016.7.29|
|08f38ff50c2401683...|2016.7.28|
|09a604c8fe3d4e3c9...|2016.7.25|
+--------------------+---------+
only showing top 20 rows



In [35]:
df2_join = df2_pdf.join(df2_min_date, "sessionid", "left")
df2_join.show()

+--------------------+---+---------+
|           sessionid|ext| min_date|
+--------------------+---+---------+
|008e1cb0462257ec9...|PDF| 2016.7.3|
|013f6c2b225e3367d...|PDF|2016.7.13|
|02bb1592965d0dd8c...|DOC|2016.7.20|
|0376d4749a8d1d710...|DOC|2016.7.24|
|045c6e82a181152ae...|PDF|2016.7.27|
|050a25d3b0bf9081e...|PDF|2016.7.20|
|07bdb289d126fa929...|PDF| 2016.7.6|
|09a604c8fe3d4e3c9...|PDF|2016.7.25|
|0ceec7faa5315de34...|PDF| 2016.7.1|
|0db9b1d879a02aa5f...|PDF|2016.7.26|
|1096ac872ebf5b378...|PDF| 2016.7.5|
|11818d845e6a3a47a...|PDF|2016.7.28|
|1327ad759f8b306a7...|PDF|2016.7.15|
|1423841b1a4dba316...|PDF|2016.7.27|
|14e0f6e63b439b494...|PDF|2016.7.12|
|176b49ac47de89ed0...|PDF|2016.7.30|
|1795fea8fd01ec520...|PDF| 2016.7.9|
|182b43429e15f51aa...|PDF|2016.7.13|
|188aa9d6215c41e4a...|PDF| 2016.7.5|
|1aff8c39db429f402...|PDF|2016.7.18|
+--------------------+---+---------+
only showing top 20 rows



In [36]:
df2_join1 = df2_join.groupby("min_date", "ext").agg(count("sessionid").alias("cnt"))

In [37]:
df2_join1.describe().show()

+-------+--------+----+-----------------+
|summary|min_date| ext|              cnt|
+-------+--------+----+-----------------+
|  count|      60|  60|               60|
|   mean|    null|null|809.6333333333333|
| stddev|    null|null|473.5906108303528|
|    min|2016.7.1| DOC|              231|
|    max|2016.7.9| PDF|             1503|
+-------+--------+----+-----------------+



## Pandas

In [38]:
df2_pd = df2.toPandas()
df2_pd.groupby("ext")['sessionid'].count().sort_values(ascending=False)
df2_pd['ext'].value_counts()

PDF      82004
DOCX     58303
XLSX     52118
HWP      26244
DOC      24586
XLS      24489
PPTX     15385
TXT       9814
PPT       5382
PPSX      2374
ODT        820
PPS        324
JPG          9
SHEET        5
WORD         2
PNG          2
Name: ext, dtype: int64

In [40]:
df2_pd.describe()

Unnamed: 0,actiontype,ismydoc,ext,sessionid,documentposition,datetime
count,301861,301861,301861,301861,301861,301861
unique,8,2,16,114994,7,30
top,OPEN,False,PDF,6d4da731443c11cdd5bb986c58381172,OTHERAPP,2016.7.12
freq,151802,183129,82004,31,213779,12340


### more reference: http://spark.apache.org/docs/latest/api/python/getting_started/index.html

## import modules

In [42]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import col, stddev_samp

## read datafiles

In [44]:
df = sqlContext.read.format('com.databricks.spark.csv')\
                    .options(header = 'true', inferSchema = 'true')\
                    .load('MachineLearning_Pyspark\\data\\Default.csv')\
                    .drop("_c0")\
                    .cache()

## transform categorical values to int

In [45]:
strIdx = StringIndexer(inputCol = "student", outputCol = "studentIdx")

## one-hot encoding

In [47]:
encode = OneHotEncoder(inputCol = "studentIdx", outputCol = "StudentclassVec")

## transform categorical values to int

In [48]:
label_StrIdx = StringIndexer(inputCol = "default", outputCol = "label")

## set stages for pipeline

In [72]:
stages = [strIdx, encode, label_StrIdx]

## columns

In [57]:
numCols = ['income', 'balance']
for c in numCols:
    df = df.withColumn(c + "Scaled", col(c)/df.agg(stddev_samp(c)).first()[0])

In [64]:
df.columns

['default', 'student', 'balance', 'income', 'incomeScaled', 'balanceScaled']

## set inputs and append it to the stage

In [73]:
inputs = ["StudentclassVec", "incomeScaled", "balanceScaled"]
assembler = VectorAssembler(inputCols = inputs, outputCol = "features")
stages += [assembler]

## create pipeline


In [75]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
dataset = pipelineModel.transform(df)

In [76]:
originalCols = df.columns
selectedScaledCols = ["label", "features"] + originalCols

In [77]:
dataset = dataset.select(selectedScaledCols)

## cross validation and fit models


In [78]:
(train, test) = dataset.randomSplit([0.7, 0.3], seed = 14)

In [80]:
train.groupby("label").count().show()
test.groupby("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0| 6693|
|  1.0|  214|
+-----+-----+

+-----+-----+
|label|count|
+-----+-----+
|  0.0| 2974|
|  1.0|  119|
+-----+-----+



In [81]:
lr = LogisticRegression(labelCol = "label", featuresCol = "features", maxIter = 10)

In [82]:
lrModel = lr.fit(train)

In [84]:
predictions = lrModel.transform(test)

In [85]:
predictions.show()

+-----+--------------------+-------+-------+-----------+-----------+-------------------+-------------------+--------------------+--------------------+----------+
|label|            features|default|student|    balance|     income|       incomeScaled|      balanceScaled|       rawPrediction|         probability|prediction|
+-----+--------------------+-------+-------+-----------+-----------+-------------------+-------------------+--------------------+--------------------+----------+
|  0.0|[0.0,0.1905428126...|     No|    Yes|229.9887235|2541.200814| 0.1905428126813269| 0.4754633007699693|[9.22145192035202...|[0.99990111478356...|       0.0|
|  0.0|[0.0,0.2026734184...|     No|    Yes|1019.647755|2702.982331|0.20267341846391734|  2.107951554481274|[5.21375071756283...|[0.99458821986213...|       0.0|
|  0.0|[0.0,0.2235405353...|     No|    Yes|696.5835269|2981.279548| 0.2235405353782619| 1.4400701822316115|[6.86160209326446...|[0.99895386073337...|       0.0|
|  0.0|[0.0,0.3281793975...|

## evaluation

In [86]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [87]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol = "rawPrediction")

In [89]:
print(evaluator.getMetricName(), "The AUC of the Model is {}".format(evaluator.evaluate(predictions)))
print("The AUC under PR curve is {}".format(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})))

areaUnderROC The AUC of the Model is 0.9563499912406146
The AUC under PR curve is 0.5854049032047725


In [91]:
evaluator.getMetricName()

'areaUnderROC'

In [92]:
print('Model Intercept: ', lrModel.interceptVector)
print('Model coefficientMatrix: ', lrModel.coefficientMatrix)

Model Intercept:  [-10.329140276751733]
Model coefficientMatrix:  DenseMatrix([[ 1.70374694, -0.31847167,  2.45733128]])



## grid search for parametor tuning


In [96]:
paramGrid = (ParamGridBuilder()
            .addGrid(lr.regParam, [0.01, 0.05, 0.1, 0.5, 2.0])
            .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.5, 1.0])
            .addGrid(lr.maxIter, [1, 5, 10, 20])
            .build())

In [98]:
print(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The

In [99]:
cv = CrossValidator(estimator = lr, estimatorParamMaps = paramGrid, evaluator = evaluator, numFolds = 5)

In [100]:
cvModel = cv.fit(train)
predictions = cvModel.transform(test)

In [101]:
evaluator.evaluate(predictions)

0.9584776748628219

In [102]:
print('Model Intercept: ', cvModel.bestModel.intercept)
print('Model coefficientMatrix: ', cvModel.bestModel.coefficientMatrix)

Model Intercept:  -7.423921954722778
Model coefficientMatrix:  DenseMatrix([[0.08886351, 0.10043084, 1.53323191]])

