# Windows Partitioning

## Prerrequisites

Install Spark and Java in VM

In [None]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark 3.5.0
!wget -q https://apache.osuosl.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

In [None]:
ls -l # check the .tgz is there

total 1173056
drwx------  5 root root      4096 Jan 22 11:23 [0m[01;34mdrive[0m/
drwxr-xr-x  1 root root      4096 Jan 18 14:21 [01;34msample_data[0m/
drwxr-xr-x 13 1000 1000      4096 Sep  9 02:08 [01;34mspark-3.5.0-bin-hadoop3[0m/
-rw-r--r--  1 root root 400395283 Sep  9 02:10 spark-3.5.0-bin-hadoop3.tgz
-rw-r--r--  1 root root 400395283 Sep  9 02:10 spark-3.5.0-bin-hadoop3.tgz.1
-rw-r--r--  1 root root 400395283 Sep  9 02:10 spark-3.5.0-bin-hadoop3.tgz.2


In [None]:
# unzip it
!tar xf spark-3.5.0-bin-hadoop3.tgz

In [None]:
!pip install -q findspark

Defining the environment

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local[*] pyspark-shell"

Start Spark Session

---

In [None]:
import findspark
findspark.init("spark-3.5.0-bin-hadoop3")# SPARK_HOME

from pyspark.sql import SparkSession

# create the session
spark = SparkSession \
        .builder \
        .appName("Window Partitioning") \
        .master("local[*]") \
        .getOrCreate()

spark.version

'3.5.0'

In [None]:
spark

In [None]:
# For Pandas conversion optimization
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [None]:
# Import sql functions
from pyspark.sql.functions import *

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Cargo el csv en la variable realStateDF, lo muestro y veo el esquema.

In [None]:
realStateDF= spark.read.option("header", "true").option("delimiter", ";").csv("/content/drive/MyDrive/Real Estate Dataset.csv")
realStateDF.show(3)
realStateDF.printSchema()

+--------+------+-----+-----------+-----------------+------+---------+--------+-----+--------------------+----+------------+---------+-----------+-----------------+-----------+----------+-------------------+------------+-----+----+---------+------+------+----------------+-----+----------+
|name_nsi| price|index|environment|quality_of_living|safety|transport|services|relax|           condition|area|energy_costs|provision|certificate|construction_type|orientation|year_built|last_reconstruction|total_floors|floor|lift|balkonies|loggia|cellar|            type|rooms|  district|
+--------+------+-----+-----------+-----------------+------+---------+--------+-----+--------------------+----+------------+---------+-----------+-----------------+-----------+----------+-------------------+------------+-----+----+---------+------+------+----------------+-----+----------+
|Semerovo| 42000|   NA|         NA|               NA|    NA|       NA|      NA|   NA|  Original condition|  58|          NA|      

Muestro la media y la desviación típica del precio de las viviendas de la muestra:

In [None]:
realStateDF.select(avg(realStateDF.price)).show()
realStateDF.select(stddev(realStateDF.price)).show()


+------------------+
|        avg(price)|
+------------------+
|165205.66818152307|
+------------------+

+-----------------+
|    stddev(price)|
+-----------------+
|162973.6312769864|
+-----------------+



Filtro las viviendas del dataset que estan en 'Original condition'

In [None]:
realStatefiltradoDF=realStateDF.filter(col("condition")=="Original condition")
realStatefiltradoDF.show(3)

+--------+-----+-----+-----------+-----------------+------+---------+--------+-----+------------------+-----+------------+---------+-----------+-----------------+-----------+----------+-------------------+------------+-----+----+---------+------+------+----------------+-----+----------+---------+
|name_nsi|price|index|environment|quality_of_living|safety|transport|services|relax|         condition| area|energy_costs|provision|certificate|construction_type|orientation|year_built|last_reconstruction|total_floors|floor|lift|balkonies|loggia|cellar|            type|rooms|  district|fila_enum|
+--------+-----+-----+-----------+-----------------+------+---------+--------+-----+------------------+-----+------------+---------+-----------+-----------------+-----------+----------+-------------------+------------+-----+----+---------+------+------+----------------+-----+----------+---------+
|Semerovo|42000|   NA|         NA|               NA|    NA|       NA|      NA|   NA|Original condition|   

Numero de inmuebles por condición de estado:



In [None]:
realStateareasDF = realStateDF.groupBy(realStateDF.condition).count().orderBy("count")
realStateareasDF.show()

+--------------------+-----+
|           condition|count|
+--------------------+-----+
| Development project|   71|
|                  NA|  327|
|  Under construction|  441|
|  Original condition| 2148|
|Partial reconstru...| 3953|
|        New building| 4074|
|Complete reconstr...| 4389|
+--------------------+-----+



Numero inmuebles por numero de habitaciones

In [None]:
realStateroomsDF = realStateDF.groupBy(realStateDF.rooms).count().orderBy("count")
realStateroomsDF.show()

+-----+-----+
|rooms|count|
+-----+-----+
|    5|  147|
|    4| 1529|
|    1| 2247|
|    2| 5309|
|    3| 6171|
+-----+-----+



Creo la columna fila_enum para poder crear otro dataset a partir del original y poder hacer un left-join

In [None]:
realStateDF.printSchema()

realStateDF = realStateDF.withColumn("fila_enum", monotonically_increasing_id())
realStateDF.show(3)

root
 |-- name_nsi: string (nullable = true)
 |-- price: string (nullable = true)
 |-- index: string (nullable = true)
 |-- environment: string (nullable = true)
 |-- quality_of_living: string (nullable = true)
 |-- safety: string (nullable = true)
 |-- transport: string (nullable = true)
 |-- services: string (nullable = true)
 |-- relax: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- area: string (nullable = true)
 |-- energy_costs: string (nullable = true)
 |-- provision: string (nullable = true)
 |-- certificate: string (nullable = true)
 |-- construction_type: string (nullable = true)
 |-- orientation: string (nullable = true)
 |-- year_built: string (nullable = true)
 |-- last_reconstruction: string (nullable = true)
 |-- total_floors: string (nullable = true)
 |-- floor: string (nullable = true)
 |-- lift: string (nullable = true)
 |-- balkonies: string (nullable = true)
 |-- loggia: string (nullable = true)
 |-- cellar: string (nullable = true)
 |-- type

Creo otro dataset ficticio a partir del primero para poder hacer join. Agrego a ambos datasets el fila_enum para poder hacer el join por la coincidencia. Solo he multiplicado la columna 'rooms' y 'price' por dos.

In [None]:

realStateCopiaDF=realStateDF.withColumn("segundoPrecio", expr("price * 2")).withColumn("segundosRooms", expr("rooms * 2"))
realStateCopiaDF.show(3)
columnasAeliminar= ['name_nsi','price','index','environment','quality_of_living','safety','transport','services','relax','condition','area','energy_costs','provision','certificate','construction_type','orientation','year_built','last_reconstruction','total_floors','floor','lift','balkonies','loggia','cellar','type','rooms','district']
realStateCopiaDF=realStateCopiaDF.drop(*columnasAeliminar)
realStateCopiaDF = realStateCopiaDF.withColumn("fila_enum", monotonically_increasing_id())

realStateCopiaDF.show()


+--------+------+-----+-----------+-----------------+------+---------+--------+-----+--------------------+----+------------+---------+-----------+-----------------+-----------+----------+-------------------+------------+-----+----+---------+------+------+----------------+-----+----------+---------+-------------+-------------+
|name_nsi| price|index|environment|quality_of_living|safety|transport|services|relax|           condition|area|energy_costs|provision|certificate|construction_type|orientation|year_built|last_reconstruction|total_floors|floor|lift|balkonies|loggia|cellar|            type|rooms|  district|fila_enum|segundoPrecio|segundosRooms|
+--------+------+-----+-----------+-----------------+------+---------+--------+-----+--------------------+----+------------+---------+-----------+-----------------+-----------+----------+-------------------+------------+-----+----+---------+------+------+----------------+-----+----------+---------+-------------+-------------+
|Semerovo| 42000

Uno ambos datasets, el original y el creado por la columna fila_enum

In [None]:
# joinCondition = realStateDF.fila_enum == realStateCopiaDF.fila_enum
nuevorealStateDF= realStateDF.join(realStateCopiaDF, 'fila_enum', "left_outer").orderBy("fila_enum").show(3)

+---------+--------+------+-----+-----------+-----------------+------+---------+--------+-----+--------------------+----+------------+---------+-----------+-----------------+-----------+----------+-------------------+------------+-----+----+---------+------+------+----------------+-----+----------+-------------+-------------+
|fila_enum|name_nsi| price|index|environment|quality_of_living|safety|transport|services|relax|           condition|area|energy_costs|provision|certificate|construction_type|orientation|year_built|last_reconstruction|total_floors|floor|lift|balkonies|loggia|cellar|            type|rooms|  district|segundoPrecio|segundosRooms|
+---------+--------+------+-----+-----------+-----------------+------+---------+--------+-----+--------------------+----+------------+---------+-----------+-----------------+-----------+----------+-------------------+------------+-----+----+---------+------+------+----------------+-----+----------+-------------+-------------+
|        0|Semer

Importo las funciones de pyspark como F y la función Window.

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window


Creo tres columnas gracias a la funcion window donde se muestra por cada elemento del DF el precio medio de ese tipo de vivienda en concreto, la media de habitaciones de ese tipo de vivienda y el total de inmuebles.

In [None]:

realStateareasDF = realStateDF.groupBy("condition").count().orderBy("count")
realStateareasDF.show()

win = Window().partitionBy("condition").orderBy("price")

windowDF = realStateDF.withColumn('precio_medio', F.avg(("price")).over(win)).withColumn('Media_de_habitaciones',F.avg("rooms").over(win)).withColumn('Total_Inbuebles_de_este_tipo',F.count("condition").over(win))
windowDF.show()


+--------------------+-----+
|           condition|count|
+--------------------+-----+
| Development project|   71|
|                  NA|  327|
|  Under construction|  441|
|  Original condition| 2148|
|Partial reconstru...| 3953|
|        New building| 4074|
|Complete reconstr...| 4389|
+--------------------+-----+

+--------------------+------+-----+-----------+-----------------+------+---------+--------+-----+--------------------+-----+------------+---------+-----------+-----------------+-----------+----------+-------------------+------------+-----+----+---------+------+------+----------------+-----+--------------------+---------+------------------+---------------------+----------------------------+
|            name_nsi| price|index|environment|quality_of_living|safety|transport|services|relax|           condition| area|energy_costs|provision|certificate|construction_type|orientation|year_built|last_reconstruction|total_floors|floor|lift|balkonies|loggia|cellar|            type|ro