## Ramping Up Sparks

In [ ]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

import spark.implicits._

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import spark.implicits._


In [ ]:
val spark = SparkSession.builder().appName("houseprices").getOrCreate()

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@7abd985


In [ ]:
val df = spark.read.option("header", true).option("inferSchema", true).csv("data/train.csv")

df: org.apache.spark.sql.DataFrame = [Id: int, MSSubClass: int ... 79 more fields]


In [ ]:
df.head(3)

res71: Array[org.apache.spark.sql.Row] = Array([1,60,RL,65,8450,Pave,NA,Reg,Lvl,AllPub,Inside,Gtl,CollgCr,Norm,Norm,1Fam,2Story,7,5,2003,2003,Gable,CompShg,VinylSd,VinylSd,BrkFace,196,Gd,TA,PConc,Gd,TA,No,GLQ,706,Unf,0,150,856,GasA,Ex,Y,SBrkr,856,854,0,1710,1,0,2,1,3,1,Gd,8,Typ,0,NA,Attchd,2003,RFn,2,548,TA,TA,Y,0,61,0,0,0,0,NA,NA,NA,0,2,2008,WD,Normal,208500], [2,20,RL,80,9600,Pave,NA,Reg,Lvl,AllPub,FR2,Gtl,Veenker,Feedr,Norm,1Fam,1Story,6,8,1976,1976,Gable,CompShg,MetalSd,MetalSd,None,0,TA,TA,CBlock,Gd,TA,Gd,ALQ,978,Unf,0,284,1262,GasA,Ex,Y,SBrkr,1262,0,0,1262,0,1,2,0,3,1,TA,6,Typ,1,TA,Attchd,1976,RFn,2,460,TA,TA,Y,298,0,0,0,0,0,NA,NA,NA,0,5,2007,WD,Normal,181500], [3,60,RL,68,11250,Pave,NA,IR1,Lvl,AllPub,Inside,Gtl,CollgCr,Norm,Norm,1Fam,2Story,7,5,2001,2002,Gable,CompShg,VinylSd,Vin...

In [ ]:
df.dtypes

res68: Array[(String, String)] = Array((Id,IntegerType), (MSSubClass,IntegerType), (MSZoning,StringType), (LotFrontage,StringType), (LotArea,IntegerType), (Street,StringType), (Alley,StringType), (LotShape,StringType), (LandContour,StringType), (Utilities,StringType), (LotConfig,StringType), (LandSlope,StringType), (Neighborhood,StringType), (Condition1,StringType), (Condition2,StringType), (BldgType,StringType), (HouseStyle,StringType), (OverallQual,IntegerType), (OverallCond,IntegerType), (YearBuilt,IntegerType), (YearRemodAdd,IntegerType), (RoofStyle,StringType), (RoofMatl,StringType), (Exterior1st,StringType), (Exterior2nd,StringType), (MasVnrType,StringType), (MasVnrArea,StringType), (ExterQual,StringType), (ExterCond,StringType), (Foundation,StringType), (BsmtQual,StringType), (Bs...

In [ ]:
df.select("id","SalePrice").limit(3)

res62: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, SalePrice: int]


In [ ]:
df.filter(df("SalePrice") > 200000).select("id", "SalePrice").limit(2)

res64: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, SalePrice: int]


In [ ]:
// Widgets/Visualization Tools Section

In [ ]:
val avgs = df.groupBy("YearBuilt").agg(avg("SalePrice"))

avgs: org.apache.spark.sql.DataFrame = [YearBuilt: int, avg(SalePrice): double]


In [ ]:
LineChart(avgs.sort("YearBuilt"))

res43: notebook.front.widgets.charts.LineChart[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = <LineChart widget>


In [ ]:
PieChart(avgs.sort("YearBuilt"))

res47: notebook.front.widgets.charts.PieChart[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = <PieChart widget>


In [ ]:
val richestNeighborhood = df.groupBy("Neighborhood").agg(sum("SalePrice"))

richestNeighborhood: org.apache.spark.sql.DataFrame = [Neighborhood: string, sum(SalePrice): bigint]


In [ ]:
PieChart(richestNeighborhood)

res51: notebook.front.widgets.charts.PieChart[org.apache.spark.sql.DataFrame] = <PieChart widget>


In [ ]:
val radarData = df.select("Id", "BsmtFullBath", "BsmtHalfBath", "FullBath", "HalfBath")
RadarChart(radarData.take(4), labelField=Some("Id"))

radarData: org.apache.spark.sql.DataFrame = [Id: int, BsmtFullBath: int ... 3 more fields]
res58: notebook.front.widgets.charts.RadarChart[Array[org.apache.spark.sql.Row]] = <RadarChart widget>


In [ ]:
// Statistical Functions in Spark Section - Aggregate and Non-Aggregate Functions

import spark.implicits._


In [ ]:
df.select("Id", "SalePrice").limit(2)

res74: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Id: int, SalePrice: int]


In [ ]:
df.select($"Id", $"SalePrice").limit(3) //S is for interpolation

res78: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Id: int, SalePrice: int]


In [ ]:
df.select($"Id", $"SalePrice" as "Sale Price in Dollars", $"SalePrice" * 0.9 as "Sale Price in Euros").limit(2)

res84: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Id: int, Sale Price in Dollars: int ... 1 more field]


In [ ]:
def dollarsToEuros(c: Column): Column = c * 0.9
df.select($"Id", $"SalePrice" as  "Sale Price in Dollars", dollarsToEuros($"SalePrice") as "Sale Price in Euros").limit(2)

dollarsToEuros: (c: org.apache.spark.sql.Column)org.apache.spark.sql.Column
res88: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Id: int, Sale Price in Dollars: int ... 1 more field]


In [ ]:
df.withColumn("Random", rand(42) > 0.5).filter("Random").select("Id", "SalePrice").limit(3)

In [ ]:
// Stats
df.select(
  variance("SalePrice") as "Variance"
  ,mean("SalePrice") as "Mean"
  ,stddev("SalePrice") as "Standard Deviation")

res92: org.apache.spark.sql.DataFrame = [Variance: double, Mean: double ... 1 more field]


In [ ]:
df.describe("SalePrice", "LotArea")

res94: org.apache.spark.sql.DataFrame = [summary: string, SalePrice: string ... 1 more field]


In [ ]:
df.select(corr("OverallQual", "SalePrice"))

res96: org.apache.spark.sql.DataFrame = [corr(OverallQual, SalePrice): double]
