In [1]:
# Section must be included at the beginning of each new notebook. Remember to change the app name. 
# If you're using VirtualBox, change the below to '/home/user/spark-2.1.1-bin-hadoop2.7'
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()

In [2]:
# Let's read in the data. If you open the dataset, you'll find that each column has a header. We specify that by stating that header=True.
# To make our lives easier, we can also use 'inferSchema' when importing CSVs. This automatically detects data types.
# If you would like to manually change data types, refer to this article: https://medium.com/@mrpowers/adding-structtype-columns-to-spark-dataframes-b44125409803
df = spark.read.csv('Absenteeism_at_work.csv',header=True,inferSchema=True)

In [3]:
# The show method allows you visualise DataFrames in a tabular format. 
df.show()

+---+------------------+----------------+---------------+-------+----------------------+-------------------------------+------------+---+----------------------+----------+--------------------+---------+---+--------------+-------------+---+------+------+---------------+-------------------------+
| ID|Reason_for_absence|Month_of_absence|Day_of_the_week|Seasons|Transportation_expense|Distance_from_Residence_to_Work|Service_time|Age|Work_load_Average/day |Hit_target|Disciplinary_failure|Education|Son|Social_drinker|Social_smoker|Pet|Weight|Height|Body_mass_index|Absenteeism_time_in_hours|
+---+------------------+----------------+---------------+-------+----------------------+-------------------------------+------------+---+----------------------+----------+--------------------+---------+---+--------------+-------------+---+------+------+---------------+-------------------------+
| 11|                26|               7|              3|      1|                   289|                        

In [4]:
# Print schema allows us to visualise the data structure at a high level. 
df.printSchema()

# We can also use head to print a specific amount of rows, so we can get a better understanding of the data points. 
# Note that we have to specify 'print' depending on the method we're using. Otherwise it may not show up!
print(df.head(1))

root
 |-- ID: integer (nullable = true)
 |-- Reason_for_absence: integer (nullable = true)
 |-- Month_of_absence: integer (nullable = true)
 |-- Day_of_the_week: integer (nullable = true)
 |-- Seasons: integer (nullable = true)
 |-- Transportation_expense: integer (nullable = true)
 |-- Distance_from_Residence_to_Work: integer (nullable = true)
 |-- Service_time: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Work_load_Average/day : double (nullable = true)
 |-- Hit_target: integer (nullable = true)
 |-- Disciplinary_failure: integer (nullable = true)
 |-- Education: integer (nullable = true)
 |-- Son: integer (nullable = true)
 |-- Social_drinker: integer (nullable = true)
 |-- Social_smoker: integer (nullable = true)
 |-- Pet: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Height: integer (nullable = true)
 |-- Body_mass_index: integer (nullable = true)
 |-- Absenteeism_time_in_hours: integer (nullable = true)

[Row(ID=11, Reason_for_absence

In [5]:
# We can use the describe method get some general statistics on our data too. 
df.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+----------------------+-------------------------------+------------------+-----------------+----------------------+-----------------+--------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+------------------+-------------------------+
|summary|                ID|Reason_for_absence|  Month_of_absence|   Day_of_the_week|           Seasons|Transportation_expense|Distance_from_Residence_to_Work|      Service_time|              Age|Work_load_Average/day |       Hit_target|Disciplinary_failure|         Education|               Son|     Social_drinker|      Social_smoker|               Pet|            Weight|            Height|   Body_mass_index|Absenteeism_time_in_hours|
+-------+------------------+------------------+------------------+------------------+------------------+------------------

In [6]:
# Let's select the balance column and assign it to a variable. 
df= df.select('ID', 'Reason_for_absence', 'age', 'Son', 'Pet', 'Weight', 'Body_mass_index', 'Absenteeism_time_in_hours')

# We can then use the show method on that variable.
df.show()

+---+------------------+---+---+---+------+---------------+-------------------------+
| ID|Reason_for_absence|age|Son|Pet|Weight|Body_mass_index|Absenteeism_time_in_hours|
+---+------------------+---+---+---+------+---------------+-------------------------+
| 11|                26| 33|  2|  1|    90|             30|                        4|
| 36|                 0| 50|  1|  0|    98|             31|                        0|
|  3|                23| 38|  0|  0|    89|             31|                        2|
|  7|                 7| 39|  2|  0|    68|             24|                        4|
| 11|                23| 33|  2|  1|    90|             30|                        2|
|  3|                23| 38|  0|  0|    89|             31|                        2|
| 10|                22| 28|  1|  4|    80|             27|                        8|
| 20|                23| 36|  4|  0|    65|             23|                        4|
| 14|                19| 34|  2|  0|    95|           

In [7]:
# Section must be included at the beginning of each new notebook. Remember to change the app name.
# If you're using VirtualBox, change the below to '/home/user/spark-2.1.1-bin-hadoop2.7'
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import *
spark = SparkSession.builder.appName('logistic_regression_adv').getOrCreate()

# If you're getting an error with numpy, please type 'sudo pip3 install numpy --user' into the console.
# If you're getting an error with another package, type 'sudo pip3 install PACKAGENAME --user'. 
# Replace PACKAGENAME with the relevant package (such as pandas, etc).
from pyspark.ml.classification import LogisticRegression

# Import data and print schema - columns is another way to view the data's features.
#df = spark.read.csv('Datasets/bank_dataset.csv', header=True, inferSchema=True)
df.printSchema()
print(df.columns)

root
 |-- ID: integer (nullable = true)
 |-- Reason_for_absence: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- Son: integer (nullable = true)
 |-- Pet: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Body_mass_index: integer (nullable = true)
 |-- Absenteeism_time_in_hours: integer (nullable = true)

['ID', 'Reason_for_absence', 'age', 'Son', 'Pet', 'Weight', 'Body_mass_index', 'Absenteeism_time_in_hours']


In [8]:
# Import pandas.
import pandas as pd

# Take the first five rows of data, and visualise.
pd.DataFrame(df.take(10), columns=df.columns)

Unnamed: 0,ID,Reason_for_absence,age,Son,Pet,Weight,Body_mass_index,Absenteeism_time_in_hours
0,11,26,33,2,1,90,30,4
1,36,0,50,1,0,98,31,0
2,3,23,38,0,0,89,31,2
3,7,7,39,2,0,68,24,4
4,11,23,33,2,1,90,30,2
5,3,23,38,0,0,89,31,2
6,10,22,28,1,4,80,27,8
7,20,23,36,4,0,65,23,4
8,14,19,34,2,0,95,25,40
9,1,22,37,1,1,88,29,8


In [9]:
# To visualise the first five columns, simply add transpose. 
pd.DataFrame(df.take(10), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
ID,11,36,3,7,11,3,10,20,14,1
Reason_for_absence,26,0,23,7,23,23,22,23,19,22
age,33,50,38,39,33,38,28,36,34,37
Son,2,1,0,2,2,0,1,4,2,1
Pet,1,0,0,0,1,0,4,0,0,1
Weight,90,98,89,68,90,89,80,65,95,88
Body_mass_index,30,31,31,24,30,31,27,23,25,29
Absenteeism_time_in_hours,4,0,2,4,2,2,8,4,40,8


In [10]:
# We can use group by and count to find out how many data points we have for each class in our predictor. 
df.groupby('absenteeism_time_in_hours').count().toPandas()

Unnamed: 0,absenteeism_time_in_hours,count
0,1,88
1,16,19
2,3,112
3,40,7
4,120,3
5,48,1
6,5,7
7,64,3
8,112,2
9,4,60


In [11]:
# Using a for loop to find all columns that belong to the integer data type. 
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']

# Selecting the numeric features, generating summary statistics, and converting to a Pandas DataFrame.
df.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
ID,740,18.017567567567568,11.021247263063657,1,36
Reason_for_absence,740,19.216216216216218,8.43340588279965,0,28
age,740,36.45,6.478772457611868,27,58
Son,740,1.018918918918919,1.0984890195302817,0,4
Pet,740,0.745945945945946,1.3182582913258336,0,8
Weight,740,79.03513513513514,12.883210507177214,56,108
Body_mass_index,740,26.677027027027027,4.285452223167274,19,38
Absenteeism_time_in_hours,740,6.924324324324324,13.330998100978196,0,120


In [12]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer)

# First create a string indexer which converts every string into a number, such as male = 0 and female = 1.
# A number will be assigned to every category in the column.
ID_indexer = StringIndexer(inputCol='ID',outputCol='IDIndex')
Reason_indexer = StringIndexer(inputCol='Reason_for_absence',outputCol='ReasonIndex')
age_indexer = StringIndexer(inputCol='age',outputCol='ageIndex')
Son_indexer = StringIndexer(inputCol='Son',outputCol='SonIndex')
Pet_indexer = StringIndexer(inputCol='Pet',outputCol='PetIndex')
Body_indexer = StringIndexer(inputCol='Body_mass_index',outputCol='BodyIndex')
Weight_indexer = StringIndexer(inputCol='Weight',outputCol='WeightIndex')
Absenteeism_indexer = StringIndexer(inputCol='Absenteeism_time_in_hours',outputCol='label')

# Now we can one hot encode these numbers. This converts the various outputs into a single vector.
# Multiple columns are collapsed into one. 
# This makes it easier to process when you have multiple classes.
ID_encoder = OneHotEncoder(inputCol='ID',outputCol='IDVec')
Reason_encoder = OneHotEncoder(inputCol='ReasonIndex',outputCol='ReasonVec')
age_encoder = OneHotEncoder(inputCol='ageIndex',outputCol='ageVec')
Son_encoder = OneHotEncoder(inputCol='SonIndex',outputCol='SonVec')
Pet_encoder = OneHotEncoder(inputCol='PetIndex',outputCol='PetVec')
Weight_encoder = OneHotEncoder(inputCol='WeightIndex',outputCol='WeightVec')
Body_encoder = OneHotEncoder(inputCol='BodyIndex',outputCol='BodyVec')

# And finally, using vector assembler to turn all of these columns into one column (named features).
assembler = VectorAssembler(inputCols=['IDVec','ReasonVec','ageVec','SonVec',
                                       'BodyVec','PetVec', 'WeightVec'], outputCol="personal1")

In [13]:
from pyspark.ml import Pipeline

# Then go through our steps. It's essentially sequential to the above.
pipeline = Pipeline(stages=[ID_indexer, Reason_indexer, age_indexer, Son_indexer,
                            Pet_indexer, Body_indexer,Weight_indexer, ID_encoder, Reason_encoder, age_encoder,
                            Son_encoder, Pet_encoder, Body_encoder, Weight_encoder,assembler])

# Now that we've got a number of steps, let's apply it to the DataFrame.
pipeline_model = pipeline.fit(df)

# Incorporate results into a new DataFrame.
pipe_df = pipeline_model.transform(df)

# Remove all variables other than features and label. 
pipe_df = pipe_df.select('Absenteeism_time_in_hours', 'personal1')

In [14]:
from pyspark.ml.classification import LogisticRegression

# Split our data. Note that the new DataFrame is being used.
train_data, test_data = pipe_df.randomSplit([0.7,0.3])
print("Training Dataset Count: " + str(train_data.count()))
print("Test Dataset Count: " + str(test_data.count()))

# Instantiate the model.
lr_model = LogisticRegression(featuresCol='personal1',labelCol='Absenteeism_time_in_hours')

# Fit the model.
lr_model = lr_model.fit(train_data)

# And evaluate the model using the test data.
results = lr_model.transform(test_data)
# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lr_model.coefficientMatrix))
print("Intercept: " + str(lr_model.interceptVector))

Training Dataset Count: 520
Test Dataset Count: 220
Coefficients: 
DenseMatrix([[ 9.59663663e-15, -1.92728588e+00,  1.68939651e+00, ...,
               9.59663663e-15,  7.36376616e+00,  5.86541240e+00],
             [ 9.59663663e-15,  2.36132450e+00,  5.00580548e+00, ...,
               9.59663663e-15, -2.22013035e+00, -2.79045369e+00],
             [ 9.59663663e-15,  2.51088492e+00, -2.86848903e+00, ...,
               9.59663663e-15, -3.21631053e+00, -3.83988109e+00],
             ...,
             [ 9.59663663e-15, -9.00157941e-02, -3.71117482e-02, ...,
               9.59663663e-15, -3.70866735e-02, -3.36164159e-02],
             [ 9.59663663e-15, -9.00157941e-02, -3.71117482e-02, ...,
               9.59663663e-15, -3.70866735e-02, -3.36164159e-02],
             [ 9.59663663e-15, -5.36037040e-01, -1.45646689e-01, ...,
               9.59663663e-15, -1.48293096e-01, -1.69219462e-01]])
Intercept: [2.84731984631591,5.266578094791934,5.968824573342175,5.386247896461349,4.5031578069970