In [None]:
import findspark
findspark.init("/home/ubuntu/spark-2.1.1-bin-hadoop2.7")
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BDAS-Linda').getOrCreate()

from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType


In [None]:
# import Maori language in education datasets
filename1 = "./Maori-Language-Learning-Student-Numbers-by-Ethnicity-2004-2008.csv"
filename2 = "./Maori-Language-Learning-Student-Numbers-by-Ethnicity-2009-2013.csv"
filename3 = "./Maori-Language-Learning-Student-Numbers-by-Ethnicity-2014-2018.csv"
filename4 = "./Maori-Language-Learning-Student-Numbers-by-Ethnicity-2019.csv"
MLLSN1=spark.read.csv(filename1, inferSchema=True, header="true")
MLLSN2=spark.read.csv(filename2, inferSchema=True, header="true")
MLLSN3=spark.read.csv(filename3, inferSchema=True, header="true")
MLLSN4=spark.read.csv(filename4, inferSchema=True, header="true")

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

MLLSN=unionAll(MLLSN1,MLLSN2,MLLSN3,MLLSN4)
MLLSN.columns

In [None]:
# import population data
filename = "./Subnational-ethnic-population-projections-2013base.csv"
POP=spark.read.csv(filename, inferSchema=True, header="true")
POP.columns

In [None]:
MLLSN.count()

In [None]:
MLLSN.printSchema()

In [None]:
POP.count()

In [None]:
POP.printSchema()

In [None]:
MLLSN.show()

In [None]:
#check unique values in the key attributes
MLLSN.select("Student: Ethnicity").distinct().show()
MLLSN.select("Student: Year level (Grouped)").distinct().show()

In [None]:
MLLSN.select("Student: Maori Language Immersion Level").distinct().show()
MLLSN.select("Student: Maori Language in Education level").distinct().show()

In [None]:
MLLSN.select('School: Type').distinct().show()
MLLSN.select('School: Sector').distinct().show()

In [None]:
POP.show()

In [None]:
POP.select('Year at 30 June').distinct().show()

In [None]:
from pyspark.sql.functions import *

MLLSN.select([count(when(isnull(c),c)).alias(c) for c in MLLSN.columns]).show()

In [None]:
MLLSN.filter(MLLSN['Students Number']<=0).show()

In [None]:
POP.select([count(when(isnull(c),c)).alias(c) for c in POP.columns]).show()

In [None]:
# exclude irrelevant attributes in Māori language data 
cols=MLLSN.columns
cols.remove('School: Name')
cols.remove('School: Gender')
cols.remove('School: Affiliation Type')
cols.remove('School: Affiliation')
cols.remove('School: Definition')
cols.remove('Region: Census area unit')

In [None]:
# exclude related attributes in Māori language data
cols.remove('Student: Year level')
cols.remove('School: Type')
cols.remove('Region: Regional Council')
cols.remove('Region: Education Region')
cols.remove('Region: Territorial Authority')
cols.remove('Region: Ward')
cols.remove('Region: TA with Auckland wards')
cols.remove('Region: MOE Local office')
MLLSN2=MLLSN[cols]

In [None]:
MLLSN2.dtypes

In [None]:
# exclude irrelevant attributes in population data
cols2=POP.columns
cols2.remove('Euro-all-ages')
cols2.remove('Euro-5-9')
cols2.remove('Euro-10-14')
cols2.remove('Euro-15-19')
cols2.remove('Asian-all-ages')
cols2.remove('Asian-5-9')
cols2.remove('Asian-10-14')
cols2.remove('Asian-15-19')
cols2.remove('Pacific-all-ages')
cols2.remove('Pacific-5-9')
cols2.remove('Pacific-10-14')
cols2.remove('Pacific-15-19')
POP2=POP[cols2]

In [None]:
POP2.dtypes

In [None]:
MLLSN2=MLLSN2.filter(MLLSN2["Students Number"]>0)

In [None]:
MLLSN2.count()

In [None]:
MLLSN2.filter(MLLSN2["Students Number"]<=0).show()

In [None]:
# aggregation by Student: Year level (Grouped) in the School level
GMLLSN2 = MLLSN2.groupby('School: ID','Year: As at 1 July','Student: Year level (Grouped)','Student: Ethnicity', 
                         'Student: Maori Language Immersion Level','Student: Maori Language in Education level',
                         'School: Highest Maori Language Immersion Level','School: Kura Type','School: Authority',
                         'School: Maori Language Descriptor','School: Medium','School: Decile','School: Sector',
                         'Region: TA with Auckland local boards').sum('Students Number')

In [None]:
GMLLSN2.count()

In [None]:
GMLLSN2.head(1)

In [None]:
# combine age groups
POP2 = POP2.withColumn('Total-school-age',POP2['Total-5-9']+POP2['Total-10-14']+POP2['Total-15-19'])
POP2 = POP2.withColumn('Maori-school-age',POP2['Maori-5-9']+POP2['Maori-10-14']+POP2['Maori-15-19'])
            

In [None]:
# create non Māori population attributes
POP2 = POP2.withColumn('non-Maori-all-ages',POP2['Total-all-ages'] - POP2['Maori-all-ages'])
POP2 = POP2.withColumn('non-Maori-school-age',POP2['Total-school-age'] - POP2['Maori-school-age'])

In [None]:
POP2.head(1)

In [None]:
POP2.dtypes

In [None]:
# remove irrelevant attributes in population
cols3=POP2.columns
cols3.remove('Total-5-9')
cols3.remove('Total-10-14')
cols3.remove('Total-15-19')
cols3.remove('Maori-5-9')
cols3.remove('Maori-10-14')
cols3.remove('Maori-15-19')
cols3.remove('Total-all-ages')
cols3.remove('Maori-all-ages')
cols3.remove('non-Maori-all-ages')
POP3=POP2[cols3]
POP3.dtypes

In [None]:
# Ensure values of key attributes to be joined are consistent
POP3.select('TA with Auckland Local Board').distinct().show(20,False)

In [None]:
GMLLSN2.select('Region: TA with Auckland local boards').distinct().show(20,False)

In [None]:
POP4 = POP3.withColumn('TA with Auckland Local Board', regexp_replace('TA with Auckland Local Board',' local board area',''))

In [None]:
POP4.select('TA with Auckland Local Board').distinct().show(20,False)

In [None]:
GMLLSN3 = GMLLSN2.withColumn('Region: TA with Auckland local boards', regexp_replace('Region: TA with Auckland local boards',
                                                                                     'Auckland- ',''))

In [None]:
GMLLSN3.select('Region: TA with Auckland local boards').distinct().show(20,False)

In [None]:
POP5=POP4.filter(POP4['Year at 30 June']=='2018').drop('Year at 30 June')

In [None]:
POP5.show()

In [None]:
POP5.count()

In [None]:
# merge two data sets
df = GMLLSN3.join(POP5,lower(GMLLSN3['Region: TA with Auckland local boards'])==
                 lower(POP5['TA with Auckland Local Board']),how="left")

In [None]:
df.head(1)

In [None]:
df=df.drop('TA with Auckland Local Board')

In [None]:
df.count()

In [None]:
df.dtypes

In [None]:
df.na.drop().count()

In [None]:
df2=df.filter(isnull(df['Total-school-age']))

In [None]:
df2.select('Region: TA with Auckland local boards').distinct().show(20,False)

In [None]:
df=df.na.drop()

In [None]:
df.count()

In [None]:
df2=df.filter(df['Year: As at 1 July']=='2019')

In [None]:
df2.head(1)

In [None]:
# aggregate data by regions
gdf = df2.groupby('Region: TA with Auckland local boards', 'Student: Year level (Grouped)', 
                  'Student: Ethnicity', 'Student: Maori Language Immersion Level', 
                  'Student: Maori Language in Education level',
                  'School: Highest Maori Language Immersion Level','School: Kura Type',
                  'School: Authority','School: Maori Language Descriptor',
                  'School: Medium','School: Decile','School: Sector','Total-school-age','Maori-school-age',
                 'non-Maori-school-age').sum('sum(Students Number)')

In [None]:
gdf.count()

In [None]:
gdf.dtypes

In [None]:
# rename column names
gdf = gdf.withColumnRenamed("Student: Year level (Grouped)", "Student: Year level grouped")
gdf = gdf.withColumnRenamed("sum(sum(Students Number))", "Students Number sumed")

In [None]:
# replace attribute values from string to number
gdf.select('Student: Year level grouped').distinct().show(20,False)

In [None]:
gdf2=gdf.withColumn("Student: Year level grouped", regexp_replace(
   "Student: Year level grouped","Primary \(Year 1-8\)","1")).withColumn(
    "Student: Year level grouped", regexp_replace(
    "Student: Year level grouped","Secondary \(Year 9\+\)","2"))
gdf2=gdf2.withColumn('Student: Year level grouped', gdf2['Student: Year level grouped']
                     .cast(IntegerType()))
gdf2.select('Student: Year level grouped').distinct().show(20,False)

In [None]:
gdf2.select('Student: Ethnicity').distinct().show(20,False)

In [None]:
gdf2=gdf2.withColumn("Student: Ethnicity", regexp_replace(
    "Student: Ethnicity","Non Maori","2")).withColumn("Student: Ethnicity", regexp_replace(
    "Student: Ethnicity","Maori","1"))
gdf2=gdf2.withColumn('Student: Ethnicity', gdf2['Student: Ethnicity']
                     .cast(IntegerType()))
gdf2.select('Student: Ethnicity').distinct().show(20,False)

In [None]:
gdf2.select('Student: Maori Language Immersion Level').distinct().show(20,False)

In [None]:
gdf2=gdf2.withColumn('Student: Maori Language Immersion Level', regexp_replace(
    'Student: Maori Language Immersion Level','Level 1\: 81\-100\%','1')).withColumn(
    'Student: Maori Language Immersion Level', regexp_replace(
    'Student: Maori Language Immersion Level','Level 2\: 51\-80\%','2')).withColumn(
    'Student: Maori Language Immersion Level', regexp_replace(
    'Student: Maori Language Immersion Level','Level 3\: 31\-50\%','3')).withColumn(
    'Student: Maori Language Immersion Level', regexp_replace(
    'Student: Maori Language Immersion Level','Level 4\(a\)\: up to 30\%','4')).withColumn(
    'Student: Maori Language Immersion Level', regexp_replace(
    'Student: Maori Language Immersion Level','Level 4\(b\)\: At least 3 Hours','5')).withColumn(
    'Student: Maori Language Immersion Level', regexp_replace(
    'Student: Maori Language Immersion Level','Level 5\: Less than 3 Hours','6')).withColumn(
    'Student: Maori Language Immersion Level', regexp_replace(
    'Student: Maori Language Immersion Level','Level 6\: Taha Maori','7')).withColumn(
    'Student: Maori Language Immersion Level', regexp_replace(
    'Student: Maori Language Immersion Level','No Maori language learning / Not Applicable','8'))
gdf2=gdf2.withColumn('Student: Maori Language Immersion Level', gdf2['Student: Maori Language Immersion Level']
                     .cast(IntegerType()))
gdf2.select('Student: Maori Language Immersion Level').distinct().show(20,False)

In [None]:
gdf2.select('Student: Maori Language in Education level').distinct().show(20,False)

In [None]:
gdf2=gdf2.withColumn('Student: Maori Language in Education level', regexp_replace(
    'Student: Maori Language in Education level','Maori medium','1')).withColumn(
    'Student: Maori Language in Education level', regexp_replace(
    'Student: Maori Language in Education level', 'Maori Language in English medium','2')).withColumn(
    'Student: Maori Language in Education level', regexp_replace(
    'Student: Maori Language in Education level', 'No Maori language learning / Not Applicable','3'))
gdf2=gdf2.withColumn('Student: Maori Language in Education level', gdf2['Student: Maori Language in Education level']
                     .cast(IntegerType()))
gdf2.select('Student: Maori Language in Education level').distinct().show(20,False)

In [None]:
gdf2.select('School: Highest Maori Language Immersion Level').distinct().show(20,False)

In [None]:
gdf2=gdf2.withColumn('School: Highest Maori Language Immersion Level', regexp_replace(
    'School: Highest Maori Language Immersion Level','Level 1\: 81\-100\%','1')).withColumn(
    'School: Highest Maori Language Immersion Level', regexp_replace(
    'School: Highest Maori Language Immersion Level','Level 2\: 51\-80\%','2')).withColumn(
    'School: Highest Maori Language Immersion Level', regexp_replace(
    'School: Highest Maori Language Immersion Level','Level 3\: 31\-50\%','3')).withColumn(
    'School: Highest Maori Language Immersion Level', regexp_replace(
    'School: Highest Maori Language Immersion Level','Level 4\(a\)\: up to 30\%','4')).withColumn(
    'School: Highest Maori Language Immersion Level', regexp_replace(
    'School: Highest Maori Language Immersion Level','Level 4\(b\)\: At least 3 Hours','5')).withColumn(
    'School: Highest Maori Language Immersion Level', regexp_replace(
    'School: Highest Maori Language Immersion Level','Level 5\: Less than 3 Hours','6')).withColumn(
    'School: Highest Maori Language Immersion Level', regexp_replace(
    'School: Highest Maori Language Immersion Level','Level 6\: Taha Maori','7')).withColumn(
    'School: Highest Maori Language Immersion Level', regexp_replace(
    'School: Highest Maori Language Immersion Level','No Maori language learning','8'))
gdf2=gdf2.withColumn('School: Highest Maori Language Immersion Level', gdf2['School: Highest Maori Language Immersion Level']
                     .cast(IntegerType()))
gdf2.select('School: Highest Maori Language Immersion Level').distinct().show(20,False)

In [None]:
gdf2.select('School: Kura Type').distinct().show(20,False)

In [None]:
gdf2=gdf2.withColumn('School: Kura Type', regexp_replace(
    'School: Kura Type','Kura Kaupapa Maori \(Section 155\)','1')).withColumn(
    'School: Kura Type', regexp_replace(
    'School: Kura Type', 'Designated Character \(Section 156\)','2')).withColumn(
    'School: Kura Type', regexp_replace(
    'School: Kura Type', 'Not a Kura School','3'))
gdf2=gdf2.withColumn('School: Kura Type', gdf2['School: Kura Type']
                     .cast(IntegerType()))
gdf2.select('School: Kura Type').distinct().show(20,False)

In [None]:
gdf2.select('School: Authority').distinct().show(20,False)

In [None]:
gdf2=gdf2.withColumn('School: Authority', regexp_replace(
    'School: Authority','State\: Not integrated','1')).withColumn(
    'School: Authority', regexp_replace(
    'School: Authority', 'State\: Integrated','2')).withColumn(
    'School: Authority', regexp_replace(
    'School: Authority', 'Private\: Prov\.Reg\.','3')).withColumn(
    'School: Authority', regexp_replace(
    'School: Authority', 'Private\: Fully Reg\.','4')).withColumn(
    'School: Authority', regexp_replace(
    'School: Authority', 'Other \: Vote Ed\.','5'))
gdf2=gdf2.withColumn('School: Authority', gdf2['School: Authority']
                     .cast(IntegerType()))
gdf2.select('School: Authority').distinct().show(20,False)

In [None]:
gdf2.select('School: Maori Language Descriptor').distinct().show(20,False)

In [None]:
gdf2=gdf2.withColumn('School: Maori Language Descriptor', regexp_replace(
    'School: Maori Language Descriptor','Maori medium school','1')).withColumn(
    'School: Maori Language Descriptor', regexp_replace(
    'School: Maori Language Descriptor', 'School with some students in Maori medium education','2')).withColumn(
    'School: Maori Language Descriptor', regexp_replace(
    'School: Maori Language Descriptor', 'Mixed Maori language in education school','3')).withColumn(
    'School: Maori Language Descriptor', regexp_replace(
    'School: Maori Language Descriptor', 'School with some students in mixed Maori language in education','4')).withColumn(
    'School: Maori Language Descriptor', regexp_replace(
    'School: Maori Language Descriptor', 'Maori language in English medium school','5')).withColumn(
    'School: Maori Language Descriptor', regexp_replace(
    'School: Maori Language Descriptor', 'School with some students in Maori language in English medium','6')).withColumn(
    'School: Maori Language Descriptor', regexp_replace(
    'School: Maori Language Descriptor', 'No Maori language in education School','7'))
gdf2=gdf2.withColumn('School: Maori Language Descriptor', gdf2['School: Maori Language Descriptor']
                     .cast(IntegerType()))
gdf2.select('School: Maori Language Descriptor').distinct().show(20,False)

In [None]:
gdf2.select('School: Medium').distinct().show(20,False)

In [None]:
gdf2=gdf2.withColumn('School: Medium', regexp_replace(
    'School: Medium','English and Maori medium','2')).withColumn('School: Medium', regexp_replace(
    'School: Medium','Maori medium','1')).withColumn('School: Medium', regexp_replace('School: Medium', 'English medium','3'))
gdf2=gdf2.withColumn('School: Medium', gdf2['School: Medium'].cast(IntegerType()))
gdf2.select('School: Medium').distinct().show(20,False)

In [None]:
gdf2.select('School: Decile').distinct().show(20,False)

In [None]:
gdf2=gdf2.withColumn('School: Decile', regexp_replace(
    'School: Decile','Decile ','')).withColumn('School: Decile', regexp_replace(
    'School: Decile','Not Applicable','0'))
gdf2=gdf2.withColumn('School: Decile', gdf2['School: Decile'].cast(IntegerType()))
gdf2.select('School: Decile').distinct().show(20,False)

In [None]:
gdf2.select('School: Sector').distinct().show(20,False)

In [None]:
gdf2=gdf2.withColumn('School: Sector', regexp_replace(
    'School: Sector','Primary','1')).withColumn('School: Sector', regexp_replace(
    'School: Sector','Secondary','2')).withColumn('School: Sector', regexp_replace(
    'School: Sector','Composite','3')).withColumn('School: Sector', regexp_replace(
    'School: Sector','Special','4'))
gdf2=gdf2.withColumn('School: Sector', gdf2['School: Sector'].cast(IntegerType()))
gdf2.select('School: Sector').distinct().show(20,False)

In [None]:
gdf2.dtypes

In [None]:
my_col = gdf2.columns

In [None]:
from pyspark.mllib.stat import Statistics

In [None]:
gdf2.corr('Student: Year level grouped','School: Sector')

In [None]:
my_col.remove('School: Sector')

In [None]:
gdf2.corr('Student: Maori Language in Education level','Student: Maori Language Immersion Level')

In [None]:
gdf2.corr('Student: Maori Language in Education level','School: Highest Maori Language Immersion Level')

In [None]:
gdf2.corr('Student: Maori Language in Education level','School: Maori Language Descriptor')

In [None]:
gdf2.corr('Student: Maori Language in Education level','School: Medium')

In [None]:
my_col.remove('Student: Maori Language Immersion Level')
my_col.remove('School: Highest Maori Language Immersion Level')
my_col.remove('School: Maori Language Descriptor')
my_col.remove('School: Medium')
gdf3 = gdf2[my_col]
gdf3.dtypes

In [None]:
gdf2.corr('Students Number sumed', 'Student: Year level grouped')

In [None]:
gdf2.corr('Students Number sumed', 'School: Kura Type')

In [None]:
gdf2.corr('Students Number sumed', 'School: Authority')

In [None]:
my_col.remove('Student: Year level grouped')
my_col.remove('School: Kura Type')
my_col.remove('School: Authority')
gdf3 = gdf2[my_col]
gdf3.dtypes

In [None]:
# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorAssembler, StringIndexer)

In [None]:
# create a string indexer for TA column
TA_indexer = StringIndexer(inputCol='Region: TA with Auckland local boards',outputCol='TAIndex')
gdf3_indexed = TA_indexer.fit(gdf3).transform(gdf3)
gdf3_indexed.dtypes

In [None]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
gdf3_indexed.select('Students Number sumed').describe().show()

In [None]:
studentArr=np.array(gdf3_indexed.select('Students Number sumed').collect())
plt.hist(studentArr, bins=50)
plt.show()

In [None]:
from pyspark.sql import functions as F
gdf4=gdf3_indexed.withColumn("logvalue", F.log('Students Number sumed'))

In [None]:
studentArr=np.array(gdf4.select('logvalue').collect())
plt.hist(studentArr, bins=50)
plt.show()

In [None]:
gdf4 = gdf4.filter(gdf4['logvalue']>1).filter(gdf4['Students Number sumed']<2000)
gdf4.select('Students Number sumed').describe().show()

In [None]:
studentArr=np.array(gdf4.select('logvalue').collect())
plt.hist(studentArr, bins=50)
plt.show()

In [None]:
gdf4.dtypes

In [None]:
# assemble all columns as one vector in the features column. 
assembler = VectorAssembler(inputCols=[
 'Student: Ethnicity',
 'Student: Maori Language in Education level',
 'School: Decile',
 'Total-school-age',
 'Maori-school-age', 
 'non-Maori-school-age',
 'TAIndex'],outputCol='features')

In [None]:
output = assembler.transform(gdf4)
final_data = output.select("features","Students Number sumed")
final_data.show()

In [None]:
# randomised 80/20 split
train_data, test_data = final_data.randomSplit([0.8,0.2])
train_data.describe().show()
test_data.describe().show()


In [None]:
# decision tree regression model
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# exclude TAIndex in the features column. 
assembler = VectorAssembler(inputCols=[
 'Student: Maori Language in Education level',
 'School: Decile',
 'Student: Ethnicity', 
 'Total-school-age',
 'Maori-school-age',
 'non-Maori-school-age'],outputCol='features')
output = assembler.transform(gdf4)
final_data = output.select("features","Students Number sumed")
train_data, test_data = final_data.randomSplit([0.8,0.2])

In [None]:
# train a DecisionTree model.
dt = DecisionTreeRegressor(labelCol="Students Number sumed",featuresCol="features")
dt_model = dt.fit(train_data)

In [None]:
print(dt_model.toDebugString)

In [None]:
dt_prediction = dt_model.transform(test_data)
dt_prediction.show(10)

In [None]:
# compute test error
evaluator = RegressionEvaluator(
    labelCol="Students Number sumed", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(dt_prediction)
rmse

In [None]:
# linear regression model
from pyspark.ml.regression import LinearRegression

In [None]:
lr = LinearRegression(labelCol='Students Number sumed')

# Fit the model to the data.
lrModel = lr.fit(train_data)

# Print the coefficients and intercept for linear regression.
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

In [None]:
# evaluate the model against the test data.
test_results = lrModel.evaluate(test_data)

In [None]:
# residuals show the difference between the predicted value and the test data.
test_results.residuals.show(5)

print("RSME: {}".format(test_results.rootMeanSquaredError))