In [4]:
#We begin by importing necessary library
import pandas as pd
from sklearn.ensemble import RandomForestClassifier  as RF
from sklearn.model_selection import KFold, train_test_split
import datetime

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler,OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m25.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=a389c44801a25f8f2f9fbdb08c4e7c604f003823470292cf891101f43373c3f5
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [5]:
# Still importing necessary library
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

In [6]:
# Initializing a spark session
spark = SparkSession.builder.master("local[*]").appName("SimpleApp").getOrCreate()

In [7]:
# Here the aim is to import the dataset from the drive
from google.colab import drive

In [9]:
drive.mount('/content/drive')

Mounted at /content/drive


In [10]:
from IPython.utils.text import Path
DATA_DIR = Path().cwd().joinpath("drive")

In [11]:
DATA_ML = DATA_DIR.joinpath("MyDrive","hh_data_ml.csv")

In [12]:
# Importing the dataset and storing it in an object
sdf = spark.read.csv("/content/drive/MyDrive/hh_data_ml.csv",header=True,sep="|",inferSchema=True)

In [13]:
# Looking into the dataset to determine its structure
sdf.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- prov_id: integer (nullable = true)
 |-- reg_id: integer (nullable = true)
 |-- dist_id: integer (nullable = true)
 |-- adm4: integer (nullable = true)
 |-- urban_rural: integer (nullable = true)
 |-- hh_id: decimal(26,0) (nullable = true)
 |-- P03: integer (nullable = true)
 |-- P05: integer (nullable = true)
 |-- P07M: integer (nullable = true)
 |-- P07A: integer (nullable = true)
 |-- P08: string (nullable = true)
 |-- lan_spoken_local: string (nullable = true)
 |-- lan_spoken_fr: string (nullable = true)
 |-- lan_spoken_en: string (nullable = true)
 |-- lan_spoken_other: string (nullable = true)
 |-- lan_read_write_local: string (nullable = true)
 |-- lan_read_write_fr: string (nullable = true)
 |-- lan_read_write_en: string (nullable = true)
 |-- lan_read_write_other: string (nullable = true)
 |-- P21: string (nullable = true)
 |-- P22N: string (nullable = true)
 |-- P23: string (nullable = true)
 |-- P25: string (nullable = true)
 |-- 

In [14]:
# Here we are looking at the number of missing values per column
from pyspark.sql.functions import col,isnan,when,count
df2 = sdf.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == ' ' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in sdf.columns])
df2.show()

+---+-------+------+-------+----+-----------+-----+---+---+----+----+---+----------------+-------------+-------------+----------------+--------------------+-----------------+-----------------+--------------------+-------+-------+-------+--------+-------+--------+----------+
|_c0|prov_id|reg_id|dist_id|adm4|urban_rural|hh_id|P03|P05|P07M|P07A|P08|lan_spoken_local|lan_spoken_fr|lan_spoken_en|lan_spoken_other|lan_read_write_local|lan_read_write_fr|lan_read_write_en|lan_read_write_other|    P21|   P22N|    P23|     P25|    P28|     P29|change_log|
+---+-------+------+-------+----+-----------+-----+---+---+----+----+---+----------------+-------------+-------------+----------------+--------------------+-----------------+-----------------+--------------------+-------+-------+-------+--------+-------+--------+----------+
|  0|      0|     0|      0|   0|          0|    0|  0|  0|   0|   0|881|         2165987|      2166013|      2166021|         2166030|             2165457|          2165466| 

The result of the code above shows the number of missing values per column. we are getting rid of the column whose number of missing values is greater that that of the target. These colimns are "P25", "P28", and "P29". . The number of missing obeservations for the target is 2165345, while the number of missing observations for P25, P28, and P29 are 3705803, 14444406 and 15332430 respectively.We futhermore decided to also remove "_c0" and "change_log". In the next step, we replace missing observations by "null" and we get also rid of them.


In [15]:
# After doing necessary feature engineering, we proceed by deleting some column. It doesn't excludes that they are still missing values
sdf = sdf.drop('P25','P28','P29','change_log','_c0')

Now we look at the missing values per rows and delete them

In [18]:
#to replace empty spaces by null
from pyspark.sql.functions import col,when
sdf_null=sdf.select([when(col(c)==" ",None).otherwise(col(c)).\
                       alias(c) for c in sdf.columns])

In [43]:
sdf_null.show()

+-------+------+-------+-----+-----------+--------------------+---+---+----+----+---+----------------+-------------+-------------+----------------+--------------------+-----------------+-----------------+--------------------+----+----+----+
|prov_id|reg_id|dist_id| adm4|urban_rural|               hh_id|P03|P05|P07M|P07A|P08|lan_spoken_local|lan_spoken_fr|lan_spoken_en|lan_spoken_other|lan_read_write_local|lan_read_write_fr|lan_read_write_en|lan_read_write_other| P21|P22N| P23|
+-------+------+-------+-----+-----------+--------------------+---+---+----+----+---+----------------+-------------+-------------+----------------+--------------------+-----------------+-----------------+--------------------+----+----+----+
|      1|    11|    111|11101|          1|11101101010011066...|  0|  1|  10|1954| 63|               1|            1|            2|               2|                   1|                1|                2|                   2|   1|   3|   6|
|      1|    11|    111|11101|      

In [45]:
#dropping the rows with missing observations
sdf_null_out = sdf_null.dropna(how='any', thresh=None, subset=None)
sdf_null_out.printSchema()

root
 |-- prov_id: integer (nullable = true)
 |-- reg_id: integer (nullable = true)
 |-- dist_id: integer (nullable = true)
 |-- adm4: integer (nullable = true)
 |-- urban_rural: integer (nullable = true)
 |-- hh_id: decimal(26,0) (nullable = true)
 |-- P03: integer (nullable = true)
 |-- P05: integer (nullable = true)
 |-- P07M: integer (nullable = true)
 |-- P07A: integer (nullable = true)
 |-- P08: string (nullable = true)
 |-- lan_spoken_local: string (nullable = true)
 |-- lan_spoken_fr: string (nullable = true)
 |-- lan_spoken_en: string (nullable = true)
 |-- lan_spoken_other: string (nullable = true)
 |-- lan_read_write_local: string (nullable = true)
 |-- lan_read_write_fr: string (nullable = true)
 |-- lan_read_write_en: string (nullable = true)
 |-- lan_read_write_other: string (nullable = true)
 |-- P21: string (nullable = true)
 |-- P22N: string (nullable = true)
 |-- P23: string (nullable = true)



In [46]:
#check if there are still missing values
sdf1 = sdf_null_out.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == ' ' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in sdf_null_out.columns])

In [47]:
sdf1.show()

+-------+------+-------+----+-----------+-----+---+---+----+----+---+----------------+-------------+-------------+----------------+--------------------+-----------------+-----------------+--------------------+---+----+---+
|prov_id|reg_id|dist_id|adm4|urban_rural|hh_id|P03|P05|P07M|P07A|P08|lan_spoken_local|lan_spoken_fr|lan_spoken_en|lan_spoken_other|lan_read_write_local|lan_read_write_fr|lan_read_write_en|lan_read_write_other|P21|P22N|P23|
+-------+------+-------+----+-----------+-----+---+---+----+----+---+----------------+-------------+-------------+----------------+--------------------+-----------------+-----------------+--------------------+---+----+---+
|      0|     0|      0|   0|          0|    0|  0|  0|   0|   0|  0|               0|            0|            0|               0|                   0|                0|                0|                   0|  0|   0|  0|
+-------+------+-------+----+-----------+-----+---+---+----+----+---+----------------+-------------+--------

In [48]:
sdf2 = sdf_null_out

Now that we have cleaned the missing values, we can proceed in building the model. We are using linear regression. If we have enough time, we may train other algorith.

In [49]:
# After droping the columns we inspect again the structure of the dataset
sdf2.printSchema()

root
 |-- prov_id: integer (nullable = true)
 |-- reg_id: integer (nullable = true)
 |-- dist_id: integer (nullable = true)
 |-- adm4: integer (nullable = true)
 |-- urban_rural: integer (nullable = true)
 |-- hh_id: decimal(26,0) (nullable = true)
 |-- P03: integer (nullable = true)
 |-- P05: integer (nullable = true)
 |-- P07M: integer (nullable = true)
 |-- P07A: integer (nullable = true)
 |-- P08: string (nullable = true)
 |-- lan_spoken_local: string (nullable = true)
 |-- lan_spoken_fr: string (nullable = true)
 |-- lan_spoken_en: string (nullable = true)
 |-- lan_spoken_other: string (nullable = true)
 |-- lan_read_write_local: string (nullable = true)
 |-- lan_read_write_fr: string (nullable = true)
 |-- lan_read_write_en: string (nullable = true)
 |-- lan_read_write_other: string (nullable = true)
 |-- P21: string (nullable = true)
 |-- P22N: string (nullable = true)
 |-- P23: string (nullable = true)



In [50]:
# Using pandas, we want to do a summary table of the numerical variable of the modified dataset
num_feat = [t[0] for t in sdf2.dtypes if t[1] == 'int']
sdf2.select(num_feat).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
prov_id,17175742,2.700236065492833,1.6630628095987892,1,6
reg_id,17175742,28.97242669341447,16.6492342262855,11,62
dist_id,17175742,292.98405186803575,166.3294675639007,111,624
adm4,17175742,29306.41254543763,16632.953163784536,11101,62418
urban_rural,17175742,1.7623886059769645,0.42561981861802883,1,2
P03,17175742,1.8491634306104505,2.015562723584839,0,9
P05,17175742,1.502947238029076,0.49999132826770804,1,2
P07M,17175742,25.081489288788806,37.446624338084014,1,99
P07A,17175742,2584.5630223136795,2096.1860310969887,1890,9999


In [51]:
# Here we are selecting the column we are going to build our model with and bind them into a new dataset
# We also look into the structure of this new dataset to determine if is the same with the previous
df = sdf2.select('prov_id','reg_id',
    'dist_id','adm4','urban_rural',
    'hh_id','P03','P05','P07M',
    'P07A','P08','lan_spoken_local',
    'lan_spoken_fr','lan_spoken_en','lan_spoken_other',
    'lan_read_write_local','lan_read_write_fr','lan_read_write_en',
    'lan_read_write_other','P21','P22N','P23')
cols = df.columns
df.printSchema()

root
 |-- prov_id: integer (nullable = true)
 |-- reg_id: integer (nullable = true)
 |-- dist_id: integer (nullable = true)
 |-- adm4: integer (nullable = true)
 |-- urban_rural: integer (nullable = true)
 |-- hh_id: decimal(26,0) (nullable = true)
 |-- P03: integer (nullable = true)
 |-- P05: integer (nullable = true)
 |-- P07M: integer (nullable = true)
 |-- P07A: integer (nullable = true)
 |-- P08: string (nullable = true)
 |-- lan_spoken_local: string (nullable = true)
 |-- lan_spoken_fr: string (nullable = true)
 |-- lan_spoken_en: string (nullable = true)
 |-- lan_spoken_other: string (nullable = true)
 |-- lan_read_write_local: string (nullable = true)
 |-- lan_read_write_fr: string (nullable = true)
 |-- lan_read_write_en: string (nullable = true)
 |-- lan_read_write_other: string (nullable = true)
 |-- P21: string (nullable = true)
 |-- P22N: string (nullable = true)
 |-- P23: string (nullable = true)



In [52]:
# Our ML model cannot learn from string. To solve this issue, we transform these string into ML column of label indices from which the
# ML model can lear. Onehotencoder encodes ctegorical numerical features as one-hot numeric array
#Vectorassembler is helping us to assemble the indices arrays, and the numeric column we had into one vector 
categoricalColumns = ['P08','lan_spoken_local','lan_spoken_fr','lan_spoken_en','lan_spoken_other',
    'lan_read_write_local','lan_read_write_fr','lan_read_write_en','lan_read_write_other','P21','P23']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'P22N', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['prov_id','reg_id','dist_id','adm4','urban_rural','hh_id','P03','P05','P07M','P07A']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [53]:
#A pipeline is a specified sequeence of stages and each stage is either a transformer or anestimator
#The pipeline and pipelineModel help to ensure that training and test data go through identical feature processing steps
pipeline1 = Pipeline(stages = stages)

In [54]:
pipelineModel = pipeline1.fit(df)

In [35]:
df = pipelineModel.transform(df)

In [36]:
selectedCols = ['label', 'features'] + cols

In [37]:
df = df.select(selectedCols)

In [38]:
df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- prov_id: integer (nullable = true)
 |-- reg_id: integer (nullable = true)
 |-- dist_id: integer (nullable = true)
 |-- adm4: integer (nullable = true)
 |-- urban_rural: integer (nullable = true)
 |-- hh_id: decimal(26,0) (nullable = true)
 |-- P03: integer (nullable = true)
 |-- P05: integer (nullable = true)
 |-- P07M: integer (nullable = true)
 |-- P07A: integer (nullable = true)
 |-- P08: string (nullable = true)
 |-- lan_spoken_local: string (nullable = true)
 |-- lan_spoken_fr: string (nullable = true)
 |-- lan_spoken_en: string (nullable = true)
 |-- lan_spoken_other: string (nullable = true)
 |-- lan_read_write_local: string (nullable = true)
 |-- lan_read_write_fr: string (nullable = true)
 |-- lan_read_write_en: string (nullable = true)
 |-- lan_read_write_other: string (nullable = true)
 |-- P21: string (nullable = true)
 |-- P22N: string (nullable = true)
 |-- P23: string (nullable = true

In [39]:
#Wedivide the dataset into training and testing with ration 0.7 and 0.3 respectively
train, test = df.randomSplit([0.7, 0.3], seed = 2018)

In [40]:
#We instantiate the model by defining what are the features and what is the response
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)

In [None]:
#We fit the train into the logistic regression algorithm 
lrModel = lr.fit(train)

In [37]:
beta = lrModel.coefficientMatrix

In [46]:
trainingSummary = lrModel.summary

In [49]:
#Now that we have train the model, we want to evaluate the performance We use some metric to evaluate the performance
trainingSummary.accuracy

0.8392590863202651

In [None]:
#Let's make prediction on the test set
predictions = lrModel.transform(test)