# **PYSPARK INTRODUCTION**

In [6]:
## Librerías
import numpy as np
import pandas as pd 
import pyspark

In [19]:
df_pandas = pd.read_csv('data/housing.csv')

df_pandas.head()

Unnamed: 0,Avg. Area Income,Avg. Area House Age,Avg. Area Number of Rooms,Avg. Area Number of Bedrooms,Area Population,Price,Address
0,79545.45857,5.682861,7.009188,4.09,23086.8005,1059034.0,"208 Michael Ferry Apt. 674\nLaurabury, NE 3701..."
1,79248.64245,6.0029,6.730821,3.09,40173.07217,1505891.0,"188 Johnson Views Suite 079\nLake Kathleen, CA..."
2,61287.06718,5.86589,8.512727,5.13,36882.1594,1058988.0,"9127 Elizabeth Stravenue\nDanieltown, WI 06482..."
3,63345.24005,7.188236,5.586729,3.26,34310.24283,1260617.0,USS Barnett\nFPO AP 44820
4,59982.19723,5.040555,7.839388,4.23,26354.10947,630943.5,USNS Raymond\nFPO AE 09386


In [None]:
## Crear sesión de Spark 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Practise").getOrCreate()

## Crear un DataFrame
df = spark.read.csv('data/housing.csv', header=True, sep=',')
df.show(5)

+----------------+-------------------+-------------------------+----------------------------+---------------+-----------+--------------------+
|Avg. Area Income|Avg. Area House Age|Avg. Area Number of Rooms|Avg. Area Number of Bedrooms|Area Population|      Price|             Address|
+----------------+-------------------+-------------------------+----------------------------+---------------+-----------+--------------------+
|     79545.45857|        5.682861322|              7.009188143|                        4.09|     23086.8005|1059033.558|208 Michael Ferry...|
|       Laurabury|     NE 37010-5101"|                     NULL|                        NULL|           NULL|       NULL|                NULL|
|     79248.64245|        6.002899808|              6.730821019|                        3.09|    40173.07217|1505890.915|188 Johnson Views...|
|   Lake Kathleen|          CA 48958"|                     NULL|                        NULL|           NULL|       NULL|                NULL|

In [87]:
## the second row is just the continuation of the previous row
df = spark.read.format("csv").load("data/housing.csv", header=True, sep=',',inferSchema=True, multiLine=True)
df.show(5)

+----------------+-------------------+-------------------------+----------------------------+---------------+-----------+--------------------+
|Avg. Area Income|Avg. Area House Age|Avg. Area Number of Rooms|Avg. Area Number of Bedrooms|Area Population|      Price|             Address|
+----------------+-------------------+-------------------------+----------------------------+---------------+-----------+--------------------+
|     79545.45857|        5.682861322|              7.009188143|                        4.09|     23086.8005|1059033.558|208 Michael Ferry...|
|     79248.64245|        6.002899808|              6.730821019|                        3.09|    40173.07217|1505890.915|188 Johnson Views...|
|     61287.06718|         5.86588984|               8.51272743|                        5.13|     36882.1594|1058987.988|9127 Elizabeth St...|
|     63345.24005|        7.188236095|              5.586728665|                        3.26|    34310.24283|1260616.807|USS Barnett\nFPO ...|

In [88]:
## rename Avg. Area Income to Avg_Area_Income, Avg. Area House Age to Avg_Area_House_Age, Avg. Area Number of Rooms to Avg_Area_Number_of_Rooms, Avg. Area Number of Bedrooms to Avg_Area_Number_of_Bedrooms, Area Population to Area_Population
df = df.withColumnRenamed("Avg. Area Income", "Avg_Area_Income") \
       .withColumnRenamed("Avg. Area House Age", "Avg_Area_House_Age") \
       .withColumnRenamed("Avg. Area Number of Rooms", "Avg_Area_Number_of_Rooms") \
       .withColumnRenamed("Avg. Area Number of Bedrooms", "Avg_Area_Number_of_Bedrooms") \
       .withColumnRenamed("Area Population", "Area_Population")
df.show(5)

+---------------+------------------+------------------------+---------------------------+---------------+-----------+--------------------+
|Avg_Area_Income|Avg_Area_House_Age|Avg_Area_Number_of_Rooms|Avg_Area_Number_of_Bedrooms|Area_Population|      Price|             Address|
+---------------+------------------+------------------------+---------------------------+---------------+-----------+--------------------+
|    79545.45857|       5.682861322|             7.009188143|                       4.09|     23086.8005|1059033.558|208 Michael Ferry...|
|    79248.64245|       6.002899808|             6.730821019|                       3.09|    40173.07217|1505890.915|188 Johnson Views...|
|    61287.06718|        5.86588984|              8.51272743|                       5.13|     36882.1594|1058987.988|9127 Elizabeth St...|
|    63345.24005|       7.188236095|             5.586728665|                       3.26|    34310.24283|1260616.807|USS Barnett\nFPO ...|
|    59982.19723|       5.0

In [89]:
## equivalent to df.info in pandas 
df.printSchema()

root
 |-- Avg_Area_Income: double (nullable = true)
 |-- Avg_Area_House_Age: double (nullable = true)
 |-- Avg_Area_Number_of_Rooms: double (nullable = true)
 |-- Avg_Area_Number_of_Bedrooms: double (nullable = true)
 |-- Area_Population: double (nullable = true)
 |-- Price: double (nullable = true)
 |-- Address: string (nullable = true)



In [90]:
## select items from the dataframe just one column (Avg. Area Income)
df.select('Avg_Area_Income').show(5)

+---------------+
|Avg_Area_Income|
+---------------+
|    79545.45857|
|    79248.64245|
|    61287.06718|
|    63345.24005|
|    59982.19723|
+---------------+
only showing top 5 rows



In [91]:
df.select(['Avg_Area_Income','Avg_Area_Number_of_Rooms']).show(5)

+---------------+------------------------+
|Avg_Area_Income|Avg_Area_Number_of_Rooms|
+---------------+------------------------+
|    79545.45857|             7.009188143|
|    79248.64245|             6.730821019|
|    61287.06718|              8.51272743|
|    63345.24005|             5.586728665|
|    59982.19723|             7.839387785|
+---------------+------------------------+
only showing top 5 rows



In [92]:
df.dtypes

[('Avg_Area_Income', 'double'),
 ('Avg_Area_House_Age', 'double'),
 ('Avg_Area_Number_of_Rooms', 'double'),
 ('Avg_Area_Number_of_Bedrooms', 'double'),
 ('Area_Population', 'double'),
 ('Price', 'double'),
 ('Address', 'string')]

In [93]:
df.describe().show()

+-------+------------------+------------------+------------------------+---------------------------+-----------------+------------------+--------------------+
|summary|   Avg_Area_Income|Avg_Area_House_Age|Avg_Area_Number_of_Rooms|Avg_Area_Number_of_Bedrooms|  Area_Population|             Price|             Address|
+-------+------------------+------------------+------------------------+---------------------------+-----------------+------------------+--------------------+
|  count|              5000|              5000|                    5000|                       5000|             5000|              5000|                5000|
|   mean| 68583.10898397019| 5.977222035287008|       6.987791850909204|         3.9813299999999967|36163.51603854035|1232072.6541452995|                NULL|
| stddev|10657.991213888685|0.9914561798324225|      1.0058332312754115|         1.2341372654846832|9925.650113546026| 353117.6265836953|                NULL|
|    min|       17796.63119|       2.644304186

In [94]:
## Addding columns to the dataframe
# Add a new column called 'Avg_Area_Income_1000' which is the 'Avg_Area_Income' divided by 1000
df = df.withColumn('Avg_Area_Income_1000', df['Avg_Area_Income']/1000)
df.show(5)

+---------------+------------------+------------------------+---------------------------+---------------+-----------+--------------------+--------------------+
|Avg_Area_Income|Avg_Area_House_Age|Avg_Area_Number_of_Rooms|Avg_Area_Number_of_Bedrooms|Area_Population|      Price|             Address|Avg_Area_Income_1000|
+---------------+------------------+------------------------+---------------------------+---------------+-----------+--------------------+--------------------+
|    79545.45857|       5.682861322|             7.009188143|                       4.09|     23086.8005|1059033.558|208 Michael Ferry...|   79.54545857000001|
|    79248.64245|       6.002899808|             6.730821019|                       3.09|    40173.07217|1505890.915|188 Johnson Views...|         79.24864245|
|    61287.06718|        5.86588984|              8.51272743|                       5.13|     36882.1594|1058987.988|9127 Elizabeth St...|         61.28706718|
|    63345.24005|       7.188236095|    

In [96]:
## delete a column from the dataframe
df = df.drop('Avg_Area_Income_1000')
df.show(5)

+---------------+------------------+------------------------+---------------------------+---------------+-----------+--------------------+
|Avg_Area_Income|Avg_Area_House_Age|Avg_Area_Number_of_Rooms|Avg_Area_Number_of_Bedrooms|Area_Population|      Price|             Address|
+---------------+------------------+------------------------+---------------------------+---------------+-----------+--------------------+
|    79545.45857|       5.682861322|             7.009188143|                       4.09|     23086.8005|1059033.558|208 Michael Ferry...|
|    79248.64245|       6.002899808|             6.730821019|                       3.09|    40173.07217|1505890.915|188 Johnson Views...|
|    61287.06718|        5.86588984|              8.51272743|                       5.13|     36882.1594|1058987.988|9127 Elizabeth St...|
|    63345.24005|       7.188236095|             5.586728665|                       3.26|    34310.24283|1260616.807|USS Barnett\nFPO ...|
|    59982.19723|       5.0

In [104]:
## generate random nan values in the dataframe
import random
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

## read again the data
df = spark.read.format("csv").load("data/housing.csv", header=True, sep=',',inferSchema=True, multiLine=True)
df = df.withColumnRenamed("Avg. Area Income", "Avg_Area_Income") \
       .withColumnRenamed("Avg. Area House Age", "Avg_Area_House_Age") \
       .withColumnRenamed("Avg. Area Number of Rooms", "Avg_Area_Number_of_Rooms") \
       .withColumnRenamed("Avg. Area Number of Bedrooms", "Avg_Area_Number_of_Bedrooms") \
       .withColumnRenamed("Area Population", "Area_Population")

def random_nan(value):
    if random.random() < 0.1:
        return None
    return value

random_nan_udf = udf(random_nan, FloatType())

df = df.withColumn('Avg_Area_Income', random_nan_udf(df['Avg_Area_Income']))

df.show(5)

+---------------+------------------+------------------------+---------------------------+---------------+-----------+--------------------+
|Avg_Area_Income|Avg_Area_House_Age|Avg_Area_Number_of_Rooms|Avg_Area_Number_of_Bedrooms|Area_Population|      Price|             Address|
+---------------+------------------+------------------------+---------------------------+---------------+-----------+--------------------+
|       79545.46|       5.682861322|             7.009188143|                       4.09|     23086.8005|1059033.558|208 Michael Ferry...|
|       79248.64|       6.002899808|             6.730821019|                       3.09|    40173.07217|1505890.915|188 Johnson Views...|
|      61287.066|        5.86588984|              8.51272743|                       5.13|     36882.1594|1058987.988|9127 Elizabeth St...|
|       63345.24|       7.188236095|             5.586728665|                       3.26|    34310.24283|1260616.807|USS Barnett\nFPO ...|
|      59982.195|       5.0

In [105]:
## show rows with nan values
df.filter(df['Avg_Area_Income'].isNull()).show()

+---------------+------------------+------------------------+---------------------------+---------------+-----------+--------------------+
|Avg_Area_Income|Avg_Area_House_Age|Avg_Area_Number_of_Rooms|Avg_Area_Number_of_Bedrooms|Area_Population|      Price|             Address|
+---------------+------------------+------------------------+---------------------------+---------------+-----------+--------------------+
|       59927.66|        5.36212557|             6.393120981|                        2.3|      29387.396|798869.5328|USS Gilbert\nFPO ...|
|           NULL|         6.1243421|             6.290820167|                       4.42|    39355.62573|1170720.894|4343 Joshua Lake ...|
|      78699.516|       5.652783637|             6.756453856|                       3.01|    22836.60757|1081150.125|7585 Lynn Loop\nE...|
|       59539.95|       6.018590243|             7.007676166|                       5.43|    58600.82715|1411730.477|122 Russo Neck\nS...|
|      48904.984|       4.8

In [106]:
## Handling missing values
df.count()

5000

In [107]:
df = df.na.drop()
df.count()

4490

In [108]:
## Filling missing values

# read again the data
df = spark.read.format("csv").load("data/housing.csv", header=True, sep=',',inferSchema=True, multiLine=True)
df = df = df.withColumnRenamed("Avg. Area Income", "Avg_Area_Income") \
       .withColumnRenamed("Avg. Area House Age", "Avg_Area_House_Age") \
       .withColumnRenamed("Avg. Area Number of Rooms", "Avg_Area_Number_of_Rooms") \
       .withColumnRenamed("Avg. Area Number of Bedrooms", "Avg_Area_Number_of_Bedrooms") \
       .withColumnRenamed("Area Population", "Area_Population")
       
       
def random_nan(value):
    if random.random() < 0.1:
        return None
    return value

random_nan_udf = udf(random_nan, FloatType())

df = df.withColumn('Avg_Area_Income', random_nan_udf(df['Avg_Area_Income']))

df.show(5)

+---------------+------------------+------------------------+---------------------------+---------------+-----------+--------------------+
|Avg_Area_Income|Avg_Area_House_Age|Avg_Area_Number_of_Rooms|Avg_Area_Number_of_Bedrooms|Area_Population|      Price|             Address|
+---------------+------------------+------------------------+---------------------------+---------------+-----------+--------------------+
|       79545.46|       5.682861322|             7.009188143|                       4.09|     23086.8005|1059033.558|208 Michael Ferry...|
|       79248.64|       6.002899808|             6.730821019|                       3.09|    40173.07217|1505890.915|188 Johnson Views...|
|      61287.066|        5.86588984|              8.51272743|                       5.13|     36882.1594|1058987.988|9127 Elizabeth St...|
|       63345.24|       7.188236095|             5.586728665|                       3.26|    34310.24283|1260616.807|USS Barnett\nFPO ...|
|      59982.195|       5.0