### Importing libs and instanciating SC and SQLSC

In [1]:
# useful libraries
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# pyspark libs
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession, Row, Window
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.mllib.feature import StandardScaler, Normalizer, HashingTF, IDF
from pyspark.mllib.linalg import Vectors, SparseVector
from pyspark.mllib.stat import Statistics

from pyspark.ml.feature import StringIndexer, VectorIndexer, Imputer

### Data in DataFrame and RDD formats

In [2]:
# Create a Spark session
ss = SparkSession.builder.appName("data_preparation").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/14 22:11:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/12/14 22:11:04 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# forcing the schema of the dataframe
schema = "regio1 STRING, serviceCharge DOUBLE, heatingType STRING, telekomTvOffer STRING, telekomHybridUploadSpeed STRING, newlyConst BOOLEAN, balcony BOOLEAN, picturecount INT, pricetrend DOUBLE, telekomUploadSpeed DOUBLE, totalRent DOUBLE, yearConstructed INT, scoutId INT, noParkSpaces INT, firingTypes STRING, hasKitchen BOOLEAN, geo_bln STRING, cellar BOOLEAN, yearConstructedRange INT, baseRent DOUBLE, houseNumber STRING, livingSpace DOUBLE, geo_krs STRING, condition STRING, interiorQual STRING, petsAllowed STRING, street STRING, streetPlain STRING, lift BOOLEAN, baseRentRange INT, typeOfFlat STRING, geo_plz STRING, noRooms INT, thermalChar DOUBLE, floor INT, numberOfFloors INT, noRoomsRange INT, garden BOOLEAN, livingSpaceRange INT, regio2 STRING, regio3 STRING, description STRING, facilities STRING, heatingCosts DOUBLE, energyEfficiencyClass STRING, lastRefurbish INT, electricityBasePrice DOUBLE, electricityKwhPrice DOUBLE, date STRING"

fields = [StructField(field_name.split()[0], 
                      StringType() if field_name.split()[1] == "STRING" else 
                      IntegerType() if field_name.split()[1] == "INT" else 
                      DoubleType() if field_name.split()[1] == "DOUBLE" else
                      BooleanType() if field_name.split()[1] == "BOOLEAN" else
                      StringType(), 
                      True) 
          for field_name in schema.split(", ")]

custom_schema = StructType(fields)

In [4]:
# Read the CSV file with multiline values (description and facilities)
data = ss.read.schema(custom_schema).options(header = True, inferSchema = True, multiline = True, escape = "\"").csv("immo_data.csv")

# Show the corrected description and facilities fields
data.select("description", "facilities").show(3)

                                                                                

+--------------------+--------------------+
|         description|          facilities|
+--------------------+--------------------+
|Die ebenerdig zu ...|Die Wohnung ist m...|
|Alles neu macht d...|                  NA|
|Der Neubau entste...|* 9 m² Balkon\n* ...|
+--------------------+--------------------+
only showing top 3 rows



In [5]:
# attribute names
attNames = data.columns
print(attNames)

['regio1', 'serviceCharge', 'heatingType', 'telekomTvOffer', 'telekomHybridUploadSpeed', 'newlyConst', 'balcony', 'picturecount', 'pricetrend', 'telekomUploadSpeed', 'totalRent', 'yearConstructed', 'scoutId', 'noParkSpaces', 'firingTypes', 'hasKitchen', 'geo_bln', 'cellar', 'yearConstructedRange', 'baseRent', 'houseNumber', 'livingSpace', 'geo_krs', 'condition', 'interiorQual', 'petsAllowed', 'street', 'streetPlain', 'lift', 'baseRentRange', 'typeOfFlat', 'geo_plz', 'noRooms', 'thermalChar', 'floor', 'numberOfFloors', 'noRoomsRange', 'garden', 'livingSpaceRange', 'regio2', 'regio3', 'description', 'facilities', 'heatingCosts', 'energyEfficiencyClass', 'lastRefurbish', 'electricityBasePrice', 'electricityKwhPrice', 'date']


In [6]:
# rdd of the data without the header
datardd = data.rdd

In [7]:
# shape of the dataframe
print(f"shape of the dataframe: {data.count(), len(attNames)}")

[Stage 1:>                                                          (0 + 1) / 1]

shape of the dataframe: (268850, 49)


                                                                                

In [8]:
# checking atts data types
print(data.dtypes)

[('regio1', 'string'), ('serviceCharge', 'double'), ('heatingType', 'string'), ('telekomTvOffer', 'string'), ('telekomHybridUploadSpeed', 'string'), ('newlyConst', 'boolean'), ('balcony', 'boolean'), ('picturecount', 'int'), ('pricetrend', 'double'), ('telekomUploadSpeed', 'double'), ('totalRent', 'double'), ('yearConstructed', 'int'), ('scoutId', 'int'), ('noParkSpaces', 'int'), ('firingTypes', 'string'), ('hasKitchen', 'boolean'), ('geo_bln', 'string'), ('cellar', 'boolean'), ('yearConstructedRange', 'int'), ('baseRent', 'double'), ('houseNumber', 'string'), ('livingSpace', 'double'), ('geo_krs', 'string'), ('condition', 'string'), ('interiorQual', 'string'), ('petsAllowed', 'string'), ('street', 'string'), ('streetPlain', 'string'), ('lift', 'boolean'), ('baseRentRange', 'int'), ('typeOfFlat', 'string'), ('geo_plz', 'string'), ('noRooms', 'int'), ('thermalChar', 'double'), ('floor', 'int'), ('numberOfFloors', 'int'), ('noRoomsRange', 'int'), ('garden', 'boolean'), ('livingSpaceRange

## Data pre-processing and exploration

In [9]:
# checking for missing values
missing = data.select([F.count(F.when(F.col(c).contains('None')| \
                            F.col(c).contains('NULL') | \
                            F.col(c).contains('NA') | \
                            F.col(c).contains('no_information')| \
                            F.col(c).contains('NO_INFORMATION')| \
                            (F.col(c) == '') | \
                            F.col(c).isNull(), c)).alias(c) for c in attNames])

In [10]:
# Filter columns based on the condition count > 120000
#columns_above_threshold = [c for c in missing.columns if missing.select(c).first()[c] > 120000]

# takes quite some time to run the statement above
columns_above_threshold = ['telekomHybridUploadSpeed', 'noParkSpaces', 'heatingCosts', 'energyEfficiencyClass', 'lastRefurbish', 'electricityBasePrice', 'electricityKwhPrice']
print("Columns with more than 120 000 missing values:")
print(columns_above_threshold)

Columns with more than 120 000 missing values:
['telekomHybridUploadSpeed', 'noParkSpaces', 'heatingCosts', 'energyEfficiencyClass', 'lastRefurbish', 'electricityBasePrice', 'electricityKwhPrice']


In [11]:
# dropping attributes with at least {threshold} missing values
data = data.drop(*columns_above_threshold)
print(f"Size after dropping the attributes: {data.count(), len(data.columns)}")

[Stage 4:>                                                          (0 + 1) / 1]

Size after dropping the attributes: (268850, 42)


                                                                                

In [12]:
def get_top_distinct_values_with_percentage(column_name):
    sql_query = f"""
        SELECT {column_name}, COUNT(*) AS count, 
        (COUNT(*) * 100.0 / (SELECT COUNT(*) FROM data)) AS percentage
        FROM data
        GROUP BY {column_name}
        ORDER BY count DESC
        LIMIT 10
    """
    return ss.sql(sql_query)


In [13]:
# checking the distinct values of the remaining columns
data.createOrReplaceTempView("data")

23/12/14 22:11:36 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [14]:
# show top 10 distinct values, counts, and percentages for each column
#for col_name in data.columns:
#    top_distinct_values = get_top_distinct_values_with_percentage(col_name)
#    print(f"Top 10 distinct values, count, and percentage in column '{col_name}':")
#    top_distinct_values.show()

In [15]:
#missing.toPandas().loc[0,:] != 0

Analysing both the distinct and missing values information, we have to decide which attributes to keep. We also have to choose wisely the ones in which to impute the missing values and the strategy to follow to do so, depending on the dtype and semantic of the variable in this context.

### Dropping non-informative attributes

In [16]:
# exactly duplicate variables (geobln == regio1, geokrs == regio2)
# and street == streetPlain (street is without german characters)
data_clean = data.drop("geo_bln","geo_krs","street")

# many same values (variance threshold feature selection)
data_clean = data_clean.drop("telekomTvOffer","newlyConst")

# binned features (we prefer to do it on our own) and non-informative features
# we only keep baseRentRange to impute the missing values,
# since it seems relevant and doesn't have any missing values
data_clean = data_clean.drop("pricetrend","telekomUploadSpeed",\
                            "scoutId","yearConstructedRange","noRoomsRange",\
                             "date", "houseNumber", "livingSpaceRange")

In [17]:
print(f"Size after dropping the attributes: {data_clean.count(), len(data_clean.columns)}", end = "\n\n")
print("Columns:")
for att in data_clean.columns:
    print(f"{att}", end = ", ")

[Stage 7:>                                                          (0 + 1) / 1]

Size after dropping the attributes: (268850, 29)

Columns:
regio1, serviceCharge, heatingType, balcony, picturecount, totalRent, yearConstructed, firingTypes, hasKitchen, cellar, baseRent, livingSpace, condition, interiorQual, petsAllowed, streetPlain, lift, baseRentRange, typeOfFlat, geo_plz, noRooms, thermalChar, floor, numberOfFloors, garden, regio2, regio3, description, facilities, 

                                                                                

### "NA" to NULL for missing values

In [18]:
# substitute NA with the NULL in each column to have the same representation
data_clean = data_clean.select(*[F.when((F.col(att) == "NA")|\
                                      (F.col(att) == "NaN"), None)\
                        .otherwise(F.col(att)).alias(att) for att in data_clean.columns])

In [19]:
# creating new View for the cleaned data
data_clean.createOrReplaceTempView("data")

In [20]:
# showing change from "NA" to NULL
dist = get_top_distinct_values_with_percentage("interiorQual")
dist.show()

                                                                                

+-------------+------+-----------------+
| interiorQual| count|       percentage|
+-------------+------+-----------------+
|         NULL|112665|41.90626743537288|
|       normal| 81826|30.43555886181886|
|sophisticated| 64762|24.08852519992561|
|       luxury|  7648| 2.84470894550865|
|       simple|  1949| 0.72493955737400|
+-------------+------+-----------------+



#### Cheking if there are records with many NULL values
In order to remove the least amount of remaining attributes, since the information could be important in the following task, we filter out records that have more missing values than a certain threshold.

In [21]:
# new column with the number of NULLs for each row
sumNulls = "+".join(f"CASE WHEN {col} IS NULL THEN 1 ELSE 0 END" for col in data_clean.columns)

# new df with numOfNulls column
data_sumNulls = data_clean.withColumn("numOfNulls", F.expr(sumNulls))

In [22]:
# filter the records that have at least 7 missing values (not knowing which of them though)
data_thresh = data_sumNulls.filter(data_sumNulls["numOfNulls"] <= 6)

# assigning the filtered dataframe (without the numOfNulls attribute) to a clean variable
df = data_thresh.select([c for c in data_thresh.columns if c != "numOfNulls"])
print(f"Size of the dataframe after filtering: {df.count(), len(df.columns)}")

[Stage 16:>                                                         (0 + 1) / 1]

Size of the dataframe after filtering: (234465, 29)


                                                                                

##### To do:

    yearConstructed -> 21% missing -> remove records
    interiorQual -> 41% missing -> drop attribute or all "normal"
    petsAllowed -> 42% missing -> drop attribute or all "negotiable"

    description -> keep as is 7% NULL
    facilities -> keep as is 20% NULL

In [23]:
# removing records without yearConstructed info
df_filt = df.filter(F.col("yearConstructed").isNotNull())
print(f"size of the resulting df: {df_filt.count(), len(df_filt.columns)}")

[Stage 19:>                                                         (0 + 1) / 1]

size of the resulting df: (196624, 29)


                                                                                

In [24]:
# replace missing values in string columns with the most frequent value
df_imp = df_filt.withColumn("interiorQual", F.when(F.col("interiorQual").isNull(), "normal").otherwise(F.col("interiorQual")))
df_imp = df_imp.withColumn("petsAllowed", F.when(F.col("petsAllowed").isNull(), "negotiable").otherwise(F.col("petsAllowed")))

In [25]:
df_imp.select("petsAllowed").distinct().show()
df_imp.select("interiorQual").distinct().show()

                                                                                

+-----------+
|petsAllowed|
+-----------+
| negotiable|
|         no|
|        yes|
+-----------+



[Stage 25:>                                                         (0 + 1) / 1]

+-------------+
| interiorQual|
+-------------+
|       luxury|
|       normal|
|sophisticated|
|       simple|
+-------------+



                                                                                

### Brief Outlier Detection
Before the imputation of missing values

In [26]:
# numerical columns to check
num_cols = [c[0] for c in df_imp.dtypes if c[1] in ('int','double')] 

# stats for numerical columns, in order to check for anomalous values
for att in num_cols:
    df_imp.describe(f"{att}").show()

                                                                                

+-------+------------------+
|summary|     serviceCharge|
+-------+------------------+
|  count|            194352|
|   mean|153.24412020457711|
| stddev| 353.9170256422129|
|    min|               0.0|
|    max|          146118.0|
+-------+------------------+



                                                                                

+-------+-----------------+
|summary|     picturecount|
+-------+-----------------+
|  count|           196624|
|   mean|9.989858816828058|
| stddev|6.371257360408224|
|    min|                0|
|    max|              112|
+-------+-----------------+



                                                                                

+-------+-----------------+
|summary|        totalRent|
+-------+-----------------+
|  count|           169462|
|   mean|924.5552048246827|
| stddev|38285.70338031341|
|    min|              0.0|
|    max|      1.5751535E7|
+-------+-----------------+



                                                                                

+-------+------------------+
|summary|   yearConstructed|
+-------+------------------+
|  count|            196624|
|   mean|1966.4265450809667|
| stddev|45.507746935593644|
|    min|              1000|
|    max|              2090|
+-------+------------------+



                                                                                

+-------+-----------------+
|summary|         baseRent|
+-------+-----------------+
|  count|           196624|
|   mean|655.1151909736303|
| stddev|581.8008028829324|
|    min|              0.0|
|    max|         120000.0|
+-------+-----------------+



                                                                                

+-------+------------------+
|summary|       livingSpace|
+-------+------------------+
|  count|            196624|
|   mean| 74.28130482545369|
| stddev|159.64683444827784|
|    min|               0.0|
|    max|           66100.0|
+-------+------------------+



                                                                                

+-------+------------------+
|summary|     baseRentRange|
+-------+------------------+
|  count|            196624|
|   mean|3.7983613394092277|
| stddev|2.2326464091453824|
|    min|                 1|
|    max|                 9|
+-------+------------------+



                                                                                

+-------+------------------+
|summary|           noRooms|
+-------+------------------+
|  count|            179339|
|   mean| 2.615147848488059|
| stddev|1.2559439233246728|
|    min|                 1|
|    max|               230|
+-------+------------------+



                                                                                

+-------+------------------+
|summary|       thermalChar|
+-------+------------------+
|  count|            142643|
|   mean|114.19493939415106|
| stddev| 60.80072910101507|
|    min|               0.1|
|    max|            1983.0|
+-------+------------------+



                                                                                

+-------+------------------+
|summary|             floor|
+-------+------------------+
|  count|            168214|
|   mean|2.1422949338342825|
| stddev|3.0363632250797212|
|    min|                -1|
|    max|               650|
+-------+------------------+



[Stage 58:>                                                         (0 + 1) / 1]

+-------+------------------+
|summary|    numberOfFloors|
+-------+------------------+
|  count|            135689|
|   mean|3.6374798251884823|
| stddev| 6.591104601747938|
|    min|                 0|
|    max|               999|
+-------+------------------+



                                                                                

In [27]:
# yearConstructed -> drop when after 2023 or before ...
df_out = df_imp.filter((F.col("yearConstructed") >= 1700) | (F.col("yearConstructed") <= 2023))
#print(f"Size of the df without abnormal values: {df_out.count(), len(df_out.columns)}")

In [28]:
# noRooms -> drop when over 20
df_out = df_out.filter((F.col("noRooms") <= 20) | (F.col("noRooms").isNull()))
#print(f"Size of the df without abnormal values: {df_out.count(), len(df_out.columns)}")

In [29]:
# floor -> drop when over 30
df_out = df_out.filter((F.col("floor") <= 30) | (F.col("floor").isNull()))
#print(f"Size of the df without abnormal values: {df_out.count(), len(df_out.columns)}")

In [30]:
# numberOfFloors -> drop when over 30
df_out = df_out.filter((F.col("numberOfFloors") <= 30) | (F.col("numberOfFloors").isNull()))
#print(f"Size of the df without abnormal values: {df_out.count(), len(df_out.columns)}")

In [31]:
# serviceCharge, totalRent, baseRent, livingSpace, thermalChar -> look something for each
df_out = df_out.filter((F.col("serviceCharge") <= 1000) | (F.col("serviceCharge").isNull()))
#print(f"Size of the df without abnormal values: {df_out.count(), len(df_out.columns)}")

In [32]:
# serviceCharge, totalRent, baseRent, livingSpace, thermalChar -> look something for each
#df_imp.select("totalRent","livingSpace").filter(F.col("totalRent")>15000).orderBy("totalRent", ascending = False).show()
df_out = df_out.filter((F.col("totalRent") <= 15000) | (F.col("totalRent").isNull()))
print(f"Size of the df without abnormal values: {df_out.count(), len(df_out.columns)}")

[Stage 61:>                                                         (0 + 1) / 1]

Size of the df without abnormal values: (196449, 29)


                                                                                

In [33]:
# serviceCharge, totalRent, baseRent, livingSpace, thermalChar -> look something for each
#df_imp.select("baseRent","livingSpace").filter(F.col("baseRent")>20000).orderBy("baseRent", ascending = False).show()
df_out = df_out.filter((F.col("baseRent") <= 20000) | (F.col("baseRent").isNull()))
print(f"Size of the df without abnormal values: {df_out.count(), len(df_out.columns)}")

[Stage 64:>                                                         (0 + 1) / 1]

Size of the df without abnormal values: (196447, 29)


                                                                                

In [34]:
# serviceCharge, totalRent, baseRent, livingSpace, thermalChar -> look something for each
#df_imp.select("livingSpace").filter(F.col("livingSpace") > 1000).orderBy("livingSpace", ascending = False).show()
df_out = df_out.filter((F.col("livingSpace") <= 1000) | (F.col("livingSpace").isNull()))
print(f"Size of the df without abnormal values: {df_out.count(), len(df_out.columns)}")

[Stage 67:>                                                         (0 + 1) / 1]

Size of the df without abnormal values: (196440, 29)


                                                                                

In [35]:
df_imp = df_out.select("*")

### Missing values

In [36]:
contAtt = [c[0] for c in df_imp.dtypes if c[1] == 'double']
catAtt = [c for c in df_imp.columns if c not in contAtt]
print(contAtt)
print(catAtt)

['serviceCharge', 'totalRent', 'baseRent', 'livingSpace', 'thermalChar']
['regio1', 'heatingType', 'balcony', 'picturecount', 'yearConstructed', 'firingTypes', 'hasKitchen', 'cellar', 'condition', 'interiorQual', 'petsAllowed', 'streetPlain', 'lift', 'baseRentRange', 'typeOfFlat', 'geo_plz', 'noRooms', 'floor', 'numberOfFloors', 'garden', 'regio2', 'regio3', 'description', 'facilities']


#### Imputing discrete and categorical attributes (mode, median)

##### To impute:

    heatingType -> general mode
    firingTypes -> general mode
    condition -> general mode
    typeOfFlat -> 13% missing -> general mode ("apartment")
    noRooms -> 9% missing -> general median or grouped by livingSpaceRange
    floor -> 19% missing -> median
    numberOfFloors -> 36% missing -> median


In [37]:
# list of columns to impute
cols_mode = ["heatingType","firingTypes","condition", "typeOfFlat"]

# impute most frequent value
for att in cols_mode:
    mfv = df_imp.groupBy(att).count().orderBy(F.col("count").desc()).first()[att]
    df_imp = df_imp.withColumn(att, F.when(F.col(att).isNull(), mfv).otherwise(F.col(att)))

                                                                                

In [38]:
# creating new View for the imputed data
df_imp.createOrReplaceTempView("data")

In [39]:
# show top 10 distinct values, counts, and percentages for each column
for col_name in cols_mode:
    top_distinct_values = get_top_distinct_values_with_percentage(col_name)
    print(f"Top 10 distinct values, count, and percentage in column '{col_name}':")
    top_distinct_values.show()

Top 10 distinct values, count, and percentage in column 'heatingType':


                                                                                

+--------------------+------+-----------------+
|         heatingType| count|       percentage|
+--------------------+------+-----------------+
|     central_heating|120754|61.47118713093056|
|    district_heating| 21306|10.84605986560782|
|       floor_heating| 15122| 7.69802484219100|
|         gas_heating| 14368| 7.31419262879251|
|self_contained_ce...| 14238| 7.24801466096518|
|         oil_heating|  3635| 1.85043779271024|
|           heat_pump|  2395| 1.21920179189574|
|combined_heat_and...|  1811| 0.92190999796375|
|night_storage_heater|  1005| 0.51160659743433|
| wood_pellet_heating|   814| 0.41437589085726|
+--------------------+------+-----------------+

Top 10 distinct values, count, and percentage in column 'firingTypes':


                                                                                

+-----------------+------+-----------------+
|      firingTypes| count|       percentage|
+-----------------+------+-----------------+
|              gas|108625|55.29678273264101|
| district_heating| 44297|22.54988800651598|
|              oil| 14273| 7.26583180614946|
|natural_gas_light|  8670| 4.41356139279169|
|natural_gas_heavy|  4062| 2.06780696395846|
|      electricity|  3942| 2.00671960904093|
|       geothermal|  2186| 1.11280798208104|
|   pellet_heating|  2166| 1.10262675626145|
|  gas:electricity|  1291| 0.65719812665445|
|    local_heating|   852| 0.43372021991448|
+-----------------+------+-----------------+

Top 10 distinct values, count, and percentage in column 'condition':


                                                                                

+--------------------+-----+-----------------+
|           condition|count|       percentage|
+--------------------+-----+-----------------+
|           well_kept|90920|46.28385257585013|
|     fully_renovated|19933|10.14711871309306|
|         refurbished|19897|10.12879250661780|
|      first_time_use|19204| 9.77601303196905|
|      mint_condition|18516| 9.42577886377520|
|          modernized|13653| 6.95021380574221|
|first_time_use_af...|11424| 5.81551618814905|
|          negotiable| 1852| 0.94278151089391|
|  need_of_renovation| 1040| 0.52942374261861|
| ripe_for_demolition|    1| 0.00050906129098|
+--------------------+-----+-----------------+

Top 10 distinct values, count, and percentage in column 'typeOfFlat':


[Stage 101:>                                                        (0 + 1) / 1]

+-------------------+------+-----------------+
|         typeOfFlat| count|       percentage|
+-------------------+------+-----------------+
|          apartment|123490|62.86397882305030|
|        roof_storey| 24978|12.71533292608430|
|       ground_floor| 23642|12.03522704133578|
|         maisonette|  6849| 3.48656078191814|
|              other|  5965| 3.03655060069232|
|raised_ground_floor|  3875| 1.97261250254531|
|          penthouse|  2960| 1.50682142129912|
|      terraced_flat|  2632| 1.33984931785787|
|      half_basement|  1434| 0.72999389126451|
|               loft|   615| 0.31307269395235|
+-------------------+------+-----------------+



                                                                                

In [40]:
# list of columns to impute
cols_median = ["noRooms","floor","numberOfFloors"]

# impute median
for att in cols_median:
    median = df_imp.approxQuantile(att, [0.5], 0.25)[0]
    df_imp = df_imp.withColumn(att, F.when(F.col(att).isNull(), median).otherwise(F.col(att)))

                                                                                

In [41]:
# creating new View for the imputed data
df_imp.createOrReplaceTempView("data")

In [42]:
# show top 10 distinct values, counts, and percentages for each column
for col_name in cols_median:
    top_distinct_values = get_top_distinct_values_with_percentage(col_name)
    print(f"Top 10 distinct values, count, and percentage in column '{col_name}':")
    top_distinct_values.show()

Top 10 distinct values, count, and percentage in column 'noRooms':


                                                                                

+-------+-----+-----------------+
|noRooms|count|       percentage|
+-------+-----+-----------------+
|    3.0|86688|44.12950519242517|
|    2.0|64751|32.96222765220933|
|    4.0|20646|10.51007941356139|
|    1.0|19090| 9.71798004479739|
|    5.0| 4034| 2.05355324781104|
|    6.0|  891| 0.45357361026268|
|    7.0|  220| 0.11199348401548|
|    8.0|   80| 0.04072490327835|
|    9.0|   20| 0.01018122581959|
|   10.0|    7| 0.00356342903686|
+-------+-----+-----------------+

Top 10 distinct values, count, and percentage in column 'floor':


                                                                                

+-----+-----+-----------------+
|floor|count|       percentage|
+-----+-----+-----------------+
|  2.0|71932|36.61779678273264|
|  1.0|48695|24.78873956424353|
|  3.0|29146|14.83710038688658|
|  0.0|19417| 9.88444308694767|
|  4.0|15502| 7.89146813276318|
|  5.0| 6622| 3.37100386886581|
|  6.0| 2127| 1.08277336591326|
|  7.0|  875| 0.44542862960700|
|  8.0|  496| 0.25249440032580|
|  9.0|  366| 0.18631643249847|
+-----+-----+-----------------+

Top 10 distinct values, count, and percentage in column 'numberOfFloors':


[Stage 122:>                                                        (0 + 1) / 1]

+--------------+------+-----------------+
|numberOfFloors| count|       percentage|
+--------------+------+-----------------+
|           3.0|100828|51.32763184687436|
|           4.0| 31034|15.79820810425575|
|           2.0| 29005|14.76532274485848|
|           5.0| 16603| 8.45194461413154|
|           1.0|  6241| 3.17705151700265|
|           6.0|  5513| 2.80645489716962|
|           7.0|  2212| 1.12604357564651|
|           0.0|  1018| 0.51822439421706|
|           8.0|  1012| 0.51517002647119|
|          11.0|   887| 0.45153736509876|
+--------------+------+-----------------+



                                                                                

#### Imputing continuous attributes (median)

##### To impute:

    serviceCharge -> avg by some att
    totalRent -> avg by..
    streetPlain -> don't know... impute the street doesn't seem right!
    thermalCharge -> defines energy efficiency, 39% NULL -> regression or avg by...

In order to impute reasonable values, we decided to first group by baseRentRange, to achieve better results

In [43]:
# define the columns to impute and the grouping column
cols_mean = contAtt
grCol = "baseRentRange"

# create an imputer instance
imputer = Imputer(inputCols=cols_mean, outputCols=[col+"_imputed" for col in cols_mean]).setStrategy("median")

# window for grouping
win = Window().partitionBy(grCol)

# iterate over distinct values in the grouping column
dist_groups = df_imp.select(grCol).distinct().collect()

imputed_dfs = []

for group_value in dist_groups:
    group_df = df_imp.filter(F.col(grCol) == group_value[0])
    imputer_model = imputer.fit(group_df)
    imputed_df = imputer_model.transform(group_df)
    imputed_dfs.append(imputed_df)

# combine together
df_imp_all = imputed_dfs[0]
for imputed_df in imputed_dfs[1:]:
    df_imp_all = df_imp_all.union(imputed_df)

                                                                                

##### Checking imputations for continuous attributes

In [44]:
# continuous variables
#for att in contAtt:
#    print(f"attribute \"{att}\":")
#    
#    # Values before imputation
#    before = df_imp.select(att).where(df_imp.baseRentRange == 2).take(15)
#    
#    # Values after imputation
#    after = df_imp_all.select(att+"_imputed").where(df_imp_all.baseRentRange == 2).take(15)
#    
#    # Print side-by-side comparison
#    for vb, va in zip(before, after):
#        print(f"before: {vb[0]}, after: {va[0]}")
#    print()

##### Keeping only the imputed attributes and renaming them

In [45]:
print(df_imp_all.columns)
print()
impAtt = [col+"_imputed" for col in contAtt]
print(impAtt)

['regio1', 'serviceCharge', 'heatingType', 'balcony', 'picturecount', 'totalRent', 'yearConstructed', 'firingTypes', 'hasKitchen', 'cellar', 'baseRent', 'livingSpace', 'condition', 'interiorQual', 'petsAllowed', 'streetPlain', 'lift', 'baseRentRange', 'typeOfFlat', 'geo_plz', 'noRooms', 'thermalChar', 'floor', 'numberOfFloors', 'garden', 'regio2', 'regio3', 'description', 'facilities', 'serviceCharge_imputed', 'totalRent_imputed', 'baseRent_imputed', 'livingSpace_imputed', 'thermalChar_imputed']

['serviceCharge_imputed', 'totalRent_imputed', 'baseRent_imputed', 'livingSpace_imputed', 'thermalChar_imputed']


In [46]:
# keeping only imputed variables
dataframe = df_imp_all.drop(*contAtt, "baseRentRange")

# renaming the imputed atts to the originals
for i, att in enumerate(impAtt):
    dataframe = dataframe.withColumnRenamed(att, contAtt[i])

print(f"Size of the final dataframe: {dataframe.count(), len(dataframe.columns)}")
print()
print(dataframe.columns)



Size of the final dataframe: (196440, 28)

['regio1', 'heatingType', 'balcony', 'picturecount', 'yearConstructed', 'firingTypes', 'hasKitchen', 'cellar', 'condition', 'interiorQual', 'petsAllowed', 'streetPlain', 'lift', 'typeOfFlat', 'geo_plz', 'noRooms', 'floor', 'numberOfFloors', 'garden', 'regio2', 'regio3', 'description', 'facilities', 'serviceCharge', 'totalRent', 'baseRent', 'livingSpace', 'thermalChar']


                                                                                

In [47]:
# checking for missing values
missing_df = dataframe.select([F.count(F.when(F.col(c).contains('None')| \
                            F.col(c).contains('NULL') | \
                            F.col(c).contains('NA') | \
                            F.col(c).contains('no_information')| \
                            F.col(c).contains('NO_INFORMATION')| \
                            (F.col(c) == '') | \
                            F.col(c).isNull(), c)).alias(c) for c in dataframe.columns])

In [49]:
#pdf = missing_df.toPandas()
#pdf[(pdf != 0)].sum()

In [50]:
type(dataframe)

pyspark.sql.dataframe.DataFrame

### NOTES

- We still have not treated the free text features, even in terms of missing values.
    - Fare vocabolario con termini positivi e negativi, percentuale positivi/negativi, positivi/totale termini
- streetPlain (the address with german characters) has still missing values, since we can not impute the address of an apartment!!
 Remove streetPlain and discretize zipcode (geoplz)

### Saving the dataframe for future analyses

In [51]:
# saving in csv format in MY jupyter data folder
#dataframe.write.csv("dataframe_eda.csv", header=True, mode="overwrite")

In [52]:
#pandas_df = dataframe.toPandas()

In [53]:
#pandas_df.to_csv("dataframe_eda.csv", index = False)