# <span style="color:red"> PySpark labo met oplossingen </span>

Tijdens dit PySpark labo zal je de PySpark basics leren.  
De dataset waarmee gewerkt zal worden is data over huizen in king county en op het einde zal je de prijs van een huis proberen te voorspellen aan de hand van lineaire regressie.  
De kolommen aanwezig zijn in de dataset en hun beschrijving zijn:  
`price`: prediction target  
`bedrooms`: number of bedrooms/house  
`bathrooms`: number of bathrooms/house  
`sqft_living`: square footage of the home  
`sqft_lot`: square footage of the lot  
`floors`: Total floors(levels) in a house  
`waterfront`: House which has a view to a waterfront  
`view`: Has been viewed  
`condition`: How good the condition overall is  
`grade`: overall grade given to the housing unit, based on King County grading system  
`sqft_above`: square footage of house apart from basement  
`sqft_basement`: square footage of the basement  
`yr_built`: built year  
`yr_renovated`: Year when house was renovated  
`zipcode`: zip  
`sqft_living15`: Living room area in 2015(implies some renovations) This might or might not have affected the lotsize area
`sqft_lot15`: LotSize area in 2015(implies some renovation) 

In [1]:
import findspark
findspark.init('/home/marie/spark')
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression

tekstje schrijven over waarom sparksession nodig? 

In [2]:
spark = SparkSession.builder.appName('lrex').getOrCreate()

de data kan ingelezen worden met de functie: `spark.read.csv()`  
*laad de data kc_house_data.csv in en zet de parameters `inferSchema` en `header` op True*

In [38]:
data = spark.read.csv('kc_house_data.csv',inferSchema=True,header=True)

de functie `printSchema()` toont alle kolommen en het type waarden dat hierin aanwezig zijn.  

In [39]:
data.printSchema() 

root
 |-- id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- price: decimal(7,0) (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: double (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- grade: integer (nullable = true)
 |-- sqft_above: integer (nullable = true)
 |-- sqft_basement: integer (nullable = true)
 |-- yr_built: integer (nullable = true)
 |-- yr_renovated: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- sqft_living15: integer (nullable = true)
 |-- sqft_lot15: integer (nullable = true)



*Wat is het type waarden aanwezig in de kolom sqft_basement?*

In [5]:
data = data.drop('date')
data = data.drop('long')
data = data.drop('lat')

In [6]:
data.show()

+----------+-------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------------+----------+
|        id|  price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|yr_renovated|zipcode|sqft_living15|sqft_lot15|
+----------+-------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------------+----------+
|7129300520| 221900|       3|      1.0|       1180|    5650|   1.0|         0|   0|        3|    7|      1180|            0|    1955|           0|  98178|         1340|      5650|
|6414100192| 538000|       3|     2.25|       2570|    7242|   2.0|         0|   0|        3|    7|      2170|          400|    1951|        1991|  98125|         1690|      7639|
|5631500400| 180000|       2|      1.0|        770|   10000|   1.0|         0|   0|        3|    6| 

In [8]:
data.columns

['id',
 'price',
 'bedrooms',
 'bathrooms',
 'sqft_living',
 'sqft_lot',
 'floors',
 'waterfront',
 'view',
 'condition',
 'grade',
 'sqft_above',
 'sqft_basement',
 'yr_built',
 'yr_renovated',
 'zipcode',
 'sqft_living15',
 'sqft_lot15']

In [9]:
#data.describe()?

# <span style="color:green"> selecteren van kolommen </span>
Je kan een kolom selecteren aan de hand van de methode:  
`df["column_name"]`  
selecteer de kolom condition

In [10]:
data['condition']

Column<condition>

Het verkregen type is een Column, maar met de functie `df.select('column_name')` krijg je een dataframe terug, die veelzijdiger is dan het type Column. Selecteer opnieuw de kolom condition maar nu als dataframe

In [11]:
data.select('condition')

DataFrame[condition: int]

Om de inhoud van de geselecteerde kolom weer te geven moet je de methode `.show` aanroepen.

In [12]:
data.select('condition').show()

+---------+
|condition|
+---------+
|        3|
|        3|
|        3|
|        5|
|        3|
|        3|
|        3|
|        3|
|        3|
|        3|
|        3|
|        4|
|        4|
|        4|
|        3|
|        3|
|        3|
|        4|
|        4|
|        4|
+---------+
only showing top 20 rows



Je kan ook meerdere kolommen selecteren 

`data.head(n)` geeft de eerste n rijen terug als lijst.  
print de eerste 5 rijen


In [13]:
data.head(5)

[Row(id=7129300520, price=Decimal('221900'), bedrooms=3, bathrooms=1.0, sqft_living=1180, sqft_lot=5650, floors=1.0, waterfront=0, view=0, condition=3, grade=7, sqft_above=1180, sqft_basement=0, yr_built=1955, yr_renovated=0, zipcode=98178, sqft_living15=1340, sqft_lot15=5650),
 Row(id=6414100192, price=Decimal('538000'), bedrooms=3, bathrooms=2.25, sqft_living=2570, sqft_lot=7242, floors=2.0, waterfront=0, view=0, condition=3, grade=7, sqft_above=2170, sqft_basement=400, yr_built=1951, yr_renovated=1991, zipcode=98125, sqft_living15=1690, sqft_lot15=7639),
 Row(id=5631500400, price=Decimal('180000'), bedrooms=2, bathrooms=1.0, sqft_living=770, sqft_lot=10000, floors=1.0, waterfront=0, view=0, condition=3, grade=6, sqft_above=770, sqft_basement=0, yr_built=1933, yr_renovated=0, zipcode=98028, sqft_living15=2720, sqft_lot15=8062),
 Row(id=2487200875, price=Decimal('604000'), bedrooms=4, bathrooms=3.0, sqft_living=1960, sqft_lot=5000, floors=1.0, waterfront=0, view=0, condition=5, grade=

je kan met `data.head()` ook een specifieke rij of een bepaalde waarde selecteren met behulp van:  
`data.head(n)[row_number][column_number]`  
*1) selecteer de 4de rij*  
*2) selecteer prijs op de 3de rij*

In [14]:
data.head(5)[3]

Row(id=2487200875, price=Decimal('604000'), bedrooms=4, bathrooms=3.0, sqft_living=1960, sqft_lot=5000, floors=1.0, waterfront=0, view=0, condition=5, grade=7, sqft_above=1050, sqft_basement=910, yr_built=1965, yr_renovated=0, zipcode=98136, sqft_living15=1360, sqft_lot15=5000)

In [15]:
data.head(5)[2][1]

Decimal('180000')

*Is dit een action of transformation?*

# <span style="color:green"> Creating a new column and dropping columns </span>

Je kan een nieuwe kolom aanmaken met de functie: `data.withColumn('new_column_name',data["column"])`  
data["column"] kan ook bewerkt worden bv: `data.withColumn('new_column_name',data["column"]/2)`  
een kolom kan ook verwijdert worden met `data.drop('column')`  
_verander nu alle sqft kolommen naar m2 (*0.0929) en verwijder de sqft kolommen._

In [16]:
def to_m2(new_name,old_column,dataset):
    dataset = dataset.withColumn(new_name,dataset[old_column]*0.0929)
    dataset = dataset.drop(old_column)
    return dataset

In [17]:
data = to_m2('m2_living','sqft_living',data)
data = to_m2('m2_lot','sqft_lot',data)
data = to_m2('m2_living15','sqft_living15',data)
data = to_m2('m2_lot15','sqft_lot15',data)

In [18]:
data.columns

['id',
 'price',
 'bedrooms',
 'bathrooms',
 'floors',
 'waterfront',
 'view',
 'condition',
 'grade',
 'sqft_above',
 'sqft_basement',
 'yr_built',
 'yr_renovated',
 'zipcode',
 'm2_living',
 'm2_lot',
 'm2_living15',
 'm2_lot15']

# Filtering and grouping data

## <span style="color:green"> Filtering Data </span>

voor het werken met big data is het belangrijk dat je kan snel je data filteren gebaseerd op bepaalde condities. Aangezien Spark DataFrames bovenop de Spark SQL platform gebouwd zijn kan je het doen met SQL commands maar er bestaan ook DataFrame methodes waar wij ons op zullen focussen tijdens dit labo.

je kunt de dataset filteren met behulp van de methode:  
`df.filter(df["column_name"] < condition)`
je kan ook filteren voor meerdere condities met behulp van `|` of `&`

*filter de dataset voor huizen die gebouwd zijn na het jaar 2000*

In [19]:
data.filter(data["yr_built"] > 2000).show()

+----------+-------+--------+---------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+------------------+------------------+------------------+------------------+
|        id|  price|bedrooms|bathrooms|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|yr_renovated|zipcode|         m2_living|            m2_lot|       m2_living15|          m2_lot15|
+----------+-------+--------+---------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+------------------+------------------+------------------+------------------+
|7237550310|1225000|       4|      4.5|   1.0|         0|   0|        3|   11|      3890|         1530|    2001|           0|  98053|           503.518|          9469.297|           442.204|          9469.297|
|3793500160| 323000|       3|      2.5|   2.0|         0|   0|        3|    7|      1890|            0|    2003|           0|  98038|           175.581|        

met de `.count()` methode kan je het aantal gefilterde huizen opvragen.  
*hoeveel huizen bevat de dataset met minimum 2 slaapkamers en die gelegen zijn aan het waterfront*

In [20]:
data.filter((data['bedrooms'] >1) & (data['waterfront']==1)).count()

158

## <span style="color:green"> GroupBy and Aggregate Functions </span>

De `groupBy('column_name')` functie laat je rijen groeperen gebaseerd op een bepaalde kolom, bijvoorbeeld je kan huizen groeperen volgens bouwjaar.  
Eenmaal je de rijen gegroepeerd hebt, kan je meerdere rijen van data aggregeren tot een output, bijvoorbeeld door het nemen van de som van alle inputrijen of de minimum waarde. 
*gebruik de groupBy functie op de 'condition' kolom en vraag de minimum waarde op met de min() functie.*

In [21]:
#aggregate
data.groupBy('condition').min().show()

+---------+--------+----------+-------------+--------------+-----------+---------------+---------+--------------+----------+---------------+------------------+-------------+-----------------+------------+--------------+-----------+------------------+------------------+
|condition| min(id)|min(price)|min(bedrooms)|min(bathrooms)|min(floors)|min(waterfront)|min(view)|min(condition)|min(grade)|min(sqft_above)|min(sqft_basement)|min(yr_built)|min(yr_renovated)|min(zipcode)|min(m2_living)|min(m2_lot)|  min(m2_living15)|     min(m2_lot15)|
+---------+--------+----------+-------------+--------------+-----------+---------------+---------+--------------+----------+---------------+------------------+-------------+-----------------+------------+--------------+-----------+------------------+------------------+
|        1|40000362|     78000|            0|           0.0|        1.0|              0|        0|             1|         1|            290|                 0|         1900|                0

*groepeer de rijen volgens de `waterfront` kolom en aggregeer volgens het gemiddelde.*

In [22]:
data.groupBy('waterfront').mean().show()

+----------+-------------------+------------+------------------+------------------+------------------+---------------+-------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+
|waterfront|            avg(id)|  avg(price)|     avg(bedrooms)|    avg(bathrooms)|       avg(floors)|avg(waterfront)|          avg(view)|    avg(condition)|       avg(grade)|   avg(sqft_above)|avg(sqft_basement)|     avg(yr_built)|avg(yr_renovated)|     avg(zipcode)|    avg(m2_living)|       avg(m2_lot)|  avg(m2_living15)|     avg(m2_lot15)|
+----------+-------------------+------------+------------------+------------------+------------------+---------------+-------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+----

Niet alle methoden hebben nood aan een groupby call. In plaats darvan kan je de algemene agg() methode oproepen. Dit kan alle rijen in de dataframe aggregeren in een kolom.  
`df.agg({'column_name':'operation'})` de operation kan mean,min,max,count en sum zijn.

*gebruiken de algemene agg() methode om de gemiddelde prijs te vinden*

In [23]:
data.agg({"price":"mean"}).show()

+-----------+
| avg(price)|
+-----------+
|540088.1418|
+-----------+



*groepeer eerst alle rijen volgens `yr_renovated` en sla dit op als de variabele grouped. 
gebruik vervolgens de algemene agg() methode om de maximale prijs te vinden per jaar*

In [24]:
grouped = data.groupBy('yr_renovated')
grouped.agg({"price":"max"}).show()

+------------+----------+
|yr_renovated|max(price)|
+------------+----------+
|        1959|    397500|
|        1990|   1646000|
|        1975|    685000|
|        1977|   1598890|
|        2003|   2888000|
|        2007|   2750000|
|        1974|    737500|
|        2015|   1485000|
|        1955|    550000|
|        2006|   2160000|
|        1978|    655000|
|        2013|   2500000|
|        1944|    521000|
|        1956|   1160000|
|        1934|    459950|
|        1988|   1900000|
|        1997|   1580000|
|        1994|   2350000|
|        1968|   1245000|
|        2014|   1755000|
+------------+----------+
only showing top 20 rows



# <span style="color:red"> MlLib gedeelte </span>

Om machine learning te kunnen toepassen met MlLib verwacht spark een format dat 2 kolommen bevat met de namen: "label" en "features".  
De label kolom moet een numerische label bevatten, dit kan een numerische waarde zijn voor regressie of classificatie.  
De feature kolom moet een vector van alle features bevatten.  
In deze sectie zal je een dataframe omzetten naar het verwachte format en zal je de huisprijzen proberen te voorspellen met lineaire regressie.

In [25]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

## prepping the data

In [26]:
data_features = data.drop('price')
data_features = data_features.drop('id')
columns = data_features.columns
print(columns)

['bedrooms', 'bathrooms', 'floors', 'waterfront', 'view', 'condition', 'grade', 'sqft_above', 'sqft_basement', 'yr_built', 'yr_renovated', 'zipcode', 'm2_living', 'm2_lot', 'm2_living15', 'm2_lot15']


In [27]:
assembler = VectorAssembler(inputCols=columns,outputCol='features')

In [28]:
output = assembler.transform(data)

In [29]:
output.head(1)

[Row(id=7129300520, price=Decimal('221900'), bedrooms=3, bathrooms=1.0, floors=1.0, waterfront=0, view=0, condition=3, grade=7, sqft_above=1180, sqft_basement=0, yr_built=1955, yr_renovated=0, zipcode=98178, m2_living=109.622, m2_lot=524.885, m2_living15=124.48599999999999, m2_lot15=524.885, features=DenseVector([3.0, 1.0, 1.0, 0.0, 0.0, 3.0, 7.0, 1180.0, 0.0, 1955.0, 0.0, 98178.0, 109.622, 524.885, 124.486, 524.885]))]

In [30]:
final_data = output.select('features','price')
final_data.head()

Row(features=DenseVector([3.0, 1.0, 1.0, 0.0, 0.0, 3.0, 7.0, 1180.0, 0.0, 1955.0, 0.0, 98178.0, 109.622, 524.885, 124.486, 524.885]), price=Decimal('221900'))

## splitsen in train en test data

belangrijk: de target moet in dezelfde dataframe blijven

In [31]:
train_data,test_data = final_data.randomSplit([0.8,0.2])

In [32]:
train_data.describe().show()

+-------+-----------------+
|summary|            price|
+-------+-----------------+
|  count|            17265|
|   mean|      540045.7694|
| stddev|366867.8283524557|
|    min|            75000|
|    max|          7700000|
+-------+-----------------+



In [33]:
test_data.describe().show()

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|              4348|
|   mean|       540256.3935|
| stddev|368197.52570288384|
|    min|             85000|
|    max|           6885000|
+-------+------------------+



## runnen algoritme

In [34]:
lr = LinearRegression(labelCol='price',regParam=0.1)
lr_model = lr.fit(train_data)
 #residuals is the difference between the predicted value and the label of the test data

## get the results and evaluate

In [35]:
test_results = lr_model.evaluate(test_data)
test_results.residuals.show()

+-------------------+
|          residuals|
+-------------------+
| -72098.31926134787|
|-247791.47242938355|
|  285395.8796847649|
|-105203.91558420379|
|   -99887.327973512|
|  234101.1302129086|
|  439468.5608370006|
|  77125.52982911374|
| -303617.4875629926|
| -534219.8708007811|
|-105706.24107830413|
|  145240.6058780672|
|-157702.86670738552|
| 314305.36914058775|
| 35255.593744102865|
|-53868.530140295625|
|-19862.468336434104|
|   231177.204164288|
|-135424.39479584713|
| -65433.86905041989|
+-------------------+
only showing top 20 rows



In [36]:
test_results.rootMeanSquaredError

217511.31634150408

In [37]:
test_results.r2

0.6509385590386749