In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.3.2-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Model-Logisticregression').getOrCreate() 

In [2]:
#Using the 2 obtained merged datasets from previous steps

final1  = spark.read.csv("Updated Datasets after cleaning/final1.csv",header=True,inferSchema=True) 
final2  = spark.read.csv("Updated Datasets after cleaning/final2.csv",header=True,inferSchema=True) 
finalindex =spark.read.csv("finindexed.csv",header=True,inferSchema=True)
final1.show()


+-------+----+-----+----------+----------+------+--------+---------+--------+
|Country|TIME|  Sex|GDPsubject| GDP Value|Impexp|    Cost|pte_value|se_value|
+-------+----+-----+----------+----------+------+--------+---------+--------+
|    AUS|2013|WOMEN|       TOT|1102723.05|   IMP|246949.7|    38.14|    7.89|
|    AUS|2013|  MEN|       TOT|1102723.05|   IMP|246949.7|    13.65|   11.97|
|    AUS|2013|  TOT|       TOT|1102723.05|   IMP|246949.7|    24.88|    10.1|
|    AUS|2013|WOMEN|       TOT|1102723.05|   EXP|254201.7|    38.14|    7.89|
|    AUS|2013|  MEN|       TOT|1102723.05|   EXP|254201.7|    13.65|   11.97|
|    AUS|2013|  TOT|       TOT|1102723.05|   EXP|254201.7|    24.88|    10.1|
|    AUS|2014|WOMEN|       TOT|1116293.11|   IMP|238300.4|    38.33|    7.98|
|    AUS|2014|  MEN|       TOT|1116293.11|   IMP|238300.4|    14.03|   12.05|
|    AUS|2014|  TOT|       TOT|1116293.11|   IMP|238300.4|    25.21|   10.18|
|    AUS|2014|WOMEN|       TOT|1116293.11|   EXP|240425.8|    38

In [3]:
final2.show()

+--------------+-----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+--------+--------+-----------------------------+------------------------+---------------------+------------------------------------+--------------------------------------------+------------------------------------------+----------------------------------------+--------------------------------------+
|       Country|  Sex|2011_fte|2012_fte|2013_fte|2014_fte|2015_fte|2016_fte| 2011gwg| 2012gwg| 2013gwg| 2014gwg| 2015gwg| 2016gwg|2000_gnif|2005_gnif|2010_gnif|2011_gnif|2012_gnif|2013_gnif|2014_gnif|2015_gnif|2000_gni|2005_gni|2010_gni|2011_gni|2012_gni|2013_gni|2014gni|2015_gni|GII Rank|Gender Inequality Index (GII)|Maternal Mortality Ratio|Adolescent Birth Rate|Percent Representation in Parliament|Population with Seconda

In [4]:
# Dataset 1

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer



#converting categoricalvalues to numeric through Stringindexer
Countryindex = StringIndexer(inputCol="Country", outputCol="Country_Index")

Sexindex= StringIndexer(inputCol="Sex", outputCol="Sex_Index")

Impexpindex = StringIndexer(inputCol="Impexp", outputCol="Impexp_Index")

TIMEindex = StringIndexer(inputCol="TIME", outputCol="TIME_Index")

GDP_label = StringIndexer(inputCol='GDP Value',outputCol='label')



In [5]:

#using onehotencoder to convert the data into vectors
from pyspark.ml.feature import OneHotEncoder

Countryencoder = OneHotEncoder(inputCol="Country_Index", outputCol="Country_vector")

Sexencoder = OneHotEncoder(inputCol="Sex_Index", outputCol="Sex_vector")

Impexpencoder = OneHotEncoder(inputCol="Impexp_Index", outputCol="Impexp_vector")

TIMEencoder = OneHotEncoder(inputCol="TIME_Index", outputCol="TIME_vector")

assembler = VectorAssembler(inputCols=['Country_vector','Sex_vector','Impexp_vector','TIME_vector'], outputCol="features")

In [6]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[Countryindex,Sexindex,Impexpindex ,TIMEindex,GDP_label,Countryencoder,Sexencoder,
                            Impexpencoder,TIMEencoder,assembler])

# Now that we've got a number of steps, let's apply it to the DataFrame.
pipeline_model = pipeline.fit(final1)

# Incorporate results into a new DataFrame.
pipe_df = pipeline_model.transform(final1)

# Remove all variables other than features and label. 
pipe_df = pipe_df.select('label', 'features')


In [9]:
#importing logistic regression
from pyspark.ml.classification import LogisticRegression

# Split the data into test and train sets

train_data, test_data = pipe_df.randomSplit([0.7,0.3])
print("Training Dataset Count: " + str(train_data.count()))
print("Test Dataset Count: " + str(test_data.count()))

# Instantiate the model.
lr_model = LogisticRegression(featuresCol='features',labelCol='label')

# Fit the model.
lr_model = lr_model.fit(train_data)

# And evaluate the model using the test data.
results = lr_model.transform(test_data)

Training Dataset Count: 1376
Test Dataset Count: 532


In [8]:
#displaying train and test data
train_data.show()
test_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(42,[27,35,37,41]...|
|  0.0|(42,[27,35,41],[1...|
|  0.0|(42,[27,36,37,41]...|
|  0.0|(42,[27,37,41],[1...|
|  0.0|(42,[27,41],[1.0,...|
|  1.0|(42,[21,35,40],[1...|
|  1.0|(42,[21,36,37,40]...|
|  1.0|(42,[21,36,40],[1...|
|  1.0|(42,[21,40],[1.0,...|
|  2.0|(42,[8,35,37,41],...|
|  2.0|(42,[8,36,37,41],...|
|  2.0|(42,[8,36,41],[1....|
|  2.0|(42,[8,37,41],[1....|
|  2.0|(42,[8,41],[1.0,1...|
|  3.0|(42,[14,35,37,39]...|
|  3.0|(42,[14,36,39],[1...|
|  3.0|(42,[14,37,39],[1...|
|  3.0|(42,[14,39],[1.0,...|
|  4.0|(42,[16,35,37,38]...|
|  4.0|(42,[16,35,38],[1...|
+-----+--------------------+
only showing top 20 rows

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(42,[27,36,41],[1...|
|  1.0|(42,[21,35,37,40]...|
|  1.0|(42,[21,37,40],[1...|
|  2.0|(42,[8,35,41],[1....|
|  3.0|(42,[14,35,39],[1...|
|  3.0|(42,[14,36,37,39]...|
|  4.0|(42,[16,36