In [26]:
import numpy as np 
import matplotlib
import matplotlib.pyplot as plt 
%matplotlib inline
import pandas as pd 
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import (LinearRegression, RandomForestRegressor)
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField,StringType,IntegerType,StructType)
spark = SparkSession.builder.appName('Iteration_8').getOrCreate()


In [27]:
data = spark.read.csv("nzhealthsurvey2016.csv",inferSchema=True,header=True)

In [28]:
#data exploration
data.show()

+----------+-----------------+----+--------------+-----+------------+-------------+----+-----------+------------+------+-------------+--------------+----------------+-----------------------+------------------------+
|population|short.description|year|         group|total|total.low.CI|total.high.CI|male|male.low.CI|male.high.CI|female|female.low.CI|female.high.CI|estimated.number|estimated.number.low.CI|estimated.number.high.CI|
+----------+-----------------+----+--------------+-----+------------+-------------+----+-----------+------------+------+-------------+--------------+----------------+-----------------------+------------------------+
|    adults|Physically active|2011|         Total| 54.4|        51.8|         56.9|57.3|       54.6|        59.9|  51.7|         48.7|          54.7|         1899000|                1810000|                 1986000|
|    adults|Physically active|2011|         15-17| 58.1|        52.2|         63.7|61.1|       52.5|          69|  54.9|         45.8|  

In [29]:
data.printSchema()

root
 |-- population: string (nullable = true)
 |-- short.description: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- group: string (nullable = true)
 |-- total: double (nullable = true)
 |-- total.low.CI: double (nullable = true)
 |-- total.high.CI: double (nullable = true)
 |-- male: string (nullable = true)
 |-- male.low.CI: string (nullable = true)
 |-- male.high.CI: string (nullable = true)
 |-- female: string (nullable = true)
 |-- female.low.CI: string (nullable = true)
 |-- female.high.CI: string (nullable = true)
 |-- estimated.number: string (nullable = true)
 |-- estimated.number.low.CI: string (nullable = true)
 |-- estimated.number.high.CI: string (nullable = true)



In [30]:
data.head()

Row(population='adults', short.description='Physically active', year=2011, group='Total', total=54.4, total.low.CI=51.8, total.high.CI=56.9, male='57.3', male.low.CI='54.6', male.high.CI='59.9', female='51.7', female.low.CI='48.7', female.high.CI='54.7', estimated.number='1899000', estimated.number.low.CI='1810000', estimated.number.high.CI='1986000')

In [31]:
#trims for just anxiety
#anxiety =  data[522:624]
anxiety = spark.createDataFrame(data.head(619), data.schema)
tanxiety = spark.createDataFrame(anxiety.head(517), anxiety.schema)
anxiety = anxiety.subtract(tanxiety)
anxiety.show()

+----------+-----------------+----+----------+-----+------------+-------------+----+-----------+------------+------+-------------+--------------+----------------+-----------------------+------------------------+
|population|short.description|year|     group|total|total.low.CI|total.high.CI|male|male.low.CI|male.high.CI|female|female.low.CI|female.high.CI|estimated.number|estimated.number.low.CI|estimated.number.high.CI|
+----------+-----------------+----+----------+-----+------------+-------------+----+-----------+------------+------+-------------+--------------+----------------+-----------------------+------------------------+
|    adults| Anxiety disorder|2013|     35-44|  8.9|         7.4|         10.7| 7.5|        5.5|          10|  10.2|          8.2|          12.6|           52000|                  43000|                   63000|
|    adults| Anxiety disorder|2013|     Maori|  7.7|         6.3|          9.2| 5.1|        3.9|         6.8|   9.9|          7.8|          12.6|       

In [38]:
anxiety = anxiety[['year', 'group','total', 'male', 'female']]
anxiety = anxiety.filter(anxiety["group"].rlike("[0-9]{2}-[0-9]{2}"))

In [39]:
assembler = VectorAssembler(
  inputCols=['year',
            'male',
            'female',
            'total'],
              outputCol="features")

anxiety = anxiety.withColumn("male", anxiety["male"].cast('float'))
anxiety = anxiety.withColumn("female", anxiety["female"].cast('float'))

output = assembler.transform(anxiety)

final_data = output.select("features",'total')


train_data,test_data = final_data.randomSplit([0.7,0.3])
# We can also use the multinomial family for binary classification

lr = LinearRegression(labelCol='total')
lrModel = lr.fit(train_data)
print("Coefficients: {} \n Intercept: {}".format(lrModel.coefficients,lrModel.intercept))
test_results = lrModel.evaluate(test_data)
test_results.residuals.show()

# Let's get some evaluation metrics (as discussed in the previous linear regression notebook).
print("RSME: {}".format(test_results.rootMeanSquaredError))
print("R2: {}".format(test_results.r2))
unlabeled_data = test_data.select('features')
unlabeled_data.show()
predictions = lrModel.transform(unlabeled_data)
predictions.show()
predictions.head(1)

Coefficients: [2.4644103775102257e-16,-1.640951304877098e-13,-1.5321926219568728e-13,1.0000000000003102] 
 Intercept: -4.814965468705074e-13
+--------------------+
|           residuals|
+--------------------+
|2.042810365310288...|
|2.486899575160350...|
|1.953992523340275...|
|-6.21724893790087...|
|3.552713678800501...|
|-3.37507799486047...|
|-1.42108547152020...|
|1.332267629550187...|
|4.440892098500626...|
|3.552713678800501...|
+--------------------+

RSME: 1.7377455352225866e-14
R2: 1.0
+--------------------+
|            features|
+--------------------+
|[2012.0,3.5999999...|
|[2013.0,4.3000001...|
|[2013.0,5.9000000...|
|[2011.0,6.3000001...|
|[2014.0,7.5999999...|
|[2015.0,5.5,11.0,...|
|[2011.0,3.2999999...|
|[2013.0,5.0999999...|
|[2011.0,5.4000000...|
|[2013.0,8.3000001...|
+--------------------+

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[2012.0,3.5999999...|  5.19999999999998|
|[2013.0

[Row(features=DenseVector([2012.0, 3.6, 6.9, 5.2]), prediction=5.19999999999998)]