In [23]:
#Build Sparksession and load teh data with CSV format

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import FloatType

schema = StructType([
    StructField("medianHouseValue", FloatType()),
    StructField("medianIncome", FloatType()),
    StructField("housingMedianAge", FloatType()),
    StructField("totalRooms",FloatType()),
    StructField("totalBedRooms",FloatType()),
    StructField("population",FloatType()),
    StructField("households",FloatType()),
    StructField("latitude",FloatType()),
    StructField("Longitude",FloatType())
])

spark=SparkSession.builder.appName("Python Spark SQL ML demo").getOrCreate()

df=spark.read.format("csv").schema(schema).option("header", "true").load("/user/spark/cadata.csv")


In [10]:
# Small Data Exploration data analysis

df.printSchema()

df.createOrReplaceTempView("dfv")
spark.sql("select * from dfv").show(10)

spark.sql("select housingMedianAge, count(*) from dfv group by housingMedianAge order by housingMedianAge desc").show()

root
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- totalRooms: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- Longitude: float (nullable = true)

+----------------+------------+----------------+----------+-------------+----------+----------+--------+---------+
|medianHouseValue|medianIncome|housingMedianAge|totalRooms|totalBedRooms|population|households|latitude|Longitude|
+----------------+------------+----------------+----------+-------------+----------+----------+--------+---------+
|        452600.0|      8.3252|            41.0|     880.0|        129.0|     322.0|     126.0|   37.88|  -122.23|
|        358500.0|      8.3014|            21.0|    7099.0|       1106.0|    2401.0|    1138.0|   37.86|  -122.22|
|        352100.0|      7.2574|     

In [24]:
#get summary of the statistics

df.describe().show()

+-------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+-------------------+
|summary|  medianHouseValue|      medianIncome|  housingMedianAge|        totalRooms|    totalBedRooms|        population|       households|         latitude|          Longitude|
+-------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+-------------------+
|  count|             20640|             20640|             20640|             20640|            20640|             20640|            20640|            20640|              20640|
|   mean|206855.81690891474|3.8706710030346416|28.639486434108527|2635.7630813953488|537.8980135658915|1425.4767441860465|499.5396802325581|35.63186143109965|-119.56970444871473|
| stddev|115395.61587441359|1.8998217183639696| 12.58555761211163|2181.6152515827944| 421.247905943133|  

In [25]:
# Data Preprocessing( for demo purpose allzero values have been excluded from the data set)

#preprocessing the target value ,in this data set dependent variable is medianHouseValue, express hous values in units of 100,000

from pyspark.sql.functions import *

df=df.withColumn("medianHouseValue",col("medianHouseValue")/100000)

df.select("medianHouseValue").take(2)

[Row(medianHouseValue=4.526), Row(medianHouseValue=3.585)]

In [26]:
# Feature Enginering (decide the feaures )

#Add some new features
df=df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")).\
withColumn("populationPerHousehold",col("population")/col("households")).\
withColumn("bedroomsPerRoom",col("totalBedRooms")/col("totalRooms"))
df.first()

#Re-order selected columns

df.createOrReplaceTempView("dfv")
df=spark.sql("select medianHouseValue,totalBedRooms,population,households,medianIncome,roomsPerHousehold,populationPerHousehold,bedroomsPerRoom from dfv")
df.show(2)

+----------------+-------------+----------+----------+------------+-----------------+----------------------+-------------------+
|medianHouseValue|totalBedRooms|population|households|medianIncome|roomsPerHousehold|populationPerHousehold|    bedroomsPerRoom|
+----------------+-------------+----------+----------+------------+-----------------+----------------------+-------------------+
|           4.526|        129.0|     322.0|     126.0|      8.3252|6.984126984126984|    2.5555555555555554|0.14659090909090908|
|           3.585|       1106.0|    2401.0|    1138.0|      8.3014|6.238137082601054|     2.109841827768014|0.15579659106916466|
+----------------+-------------+----------+----------+------------+-----------------+----------------------+-------------------+
only showing top 2 rows



In [28]:

#Separating the features from the target variable

from pyspark.ml.linalg import DenseVector

indata=df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
df=spark.createDataFrame(indata,["label","features"])
df.take(2)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]))]

In [30]:
#scale the data by using spark MLlib

from pyspark.ml.feature import StandardScaler

# Initialize the standardscaler

standardScaler = StandardScaler(inputCol="features",outputCol="features_scaled")

#fit the DataFrame to the scaler

scaler=standardScaler.fit(df)

#Transform data in df with the scaler

scaleddf = scaler.transform(df)

scaleddf.take(2)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

In [57]:
# Building A ML model by using spark MLlib liner regression model

train_data,test_data=scaleddf.randomSplit([.8,.2],seed=1234)

from pyspark.ml.regression import LinearRegression

lr=LinearRegression(labelCol="label",maxIter=10,regParam=0.3,elasticNetParam=0.8)

lm=lr.fit(train_data)

predicted=lm.transform(test_data)

predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels =predicted.select("label").rdd.map(lambda x: x[0])

predictionandlabel =predictions.zip(labels).collect()

predictionandlabel[:5]


# evaluate model

lm.coefficients


DenseVector([0.0, 0.0, 0.0, 0.2796, 0.0, 0.0, 0.0])

In [58]:
lm.intercept

0.9841344205626824

In [60]:
lm.summary.rootMeanSquaredError

0.8765335684459216

In [61]:
lm.summary.r2

0.42282227755911483

In [None]:
spark.stop()