# ---------------------------- Transform Real Estate Table Units ------------------------
## Description About Data :
### UniqueID: A unique identifier for each property (integer values, no missing data).
### PropertyAddress: The address of the property (string values, 29 missing entries).
### LandUse: The type of land use for the property (string values, no missing data).
### SalePrice: The sale price of the property (string values, no missing data, might need cleaning as it's stored as a string).
### Unnamed: 4: An unnamed column containing data (likely needs further inspection or removal, all non-null values).
### LandValue: The value of the land (numeric values, missing for 30,462 entries).
###  BuildingValue: The value of the building on the property (numeric values, missing for 30,462 entries).
### TotalValue: The total value of the property (numeric values, missing for 30,462 entries).
### Key Statistics:
###  LandValue: Mean value is approximately 69,068, with values ranging from 100 to 2.77 million.
### BuildingValue: Mean value is approximately 160,784, with values ranging from 0 to 12.97 million.
###  TotalValue: Mean value is 232,375, with values ranging from 100 to 13.94 million.

In [38]:
%%pyspark
df = spark.read.load('abfss://files@datalake6ojqgxp.dfs.core.windows.net/Units-csv.csv', format='csv'
## If header exists uncomment line below
, header=True
)
display(df.limit(10))

StatementMeta(spark6ojqgxp, 0, 39, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 61e70628-a98e-4a21-b180-f30a9c23fec6)

## 

## . units.info() Equivalent in PySpark

In [39]:
# Display the schema of the DataFrame
df.printSchema()


StatementMeta(spark6ojqgxp, 0, 40, Finished, Available, Finished)

root
 |-- UniqueID : string (nullable = true)
 |-- PropertyAddress: string (nullable = true)
 |-- LandUse: string (nullable = true)
 |-- SalePrice: string (nullable = true)
 |-- SaleDate: string (nullable = true)
 |-- LandValue: string (nullable = true)
 |-- BuildingValue: string (nullable = true)
 |-- TotalValue: string (nullable = true)



In [40]:
from pyspark.sql.functions import col, sum as sum_

# Count the number of missing values in each column
df.select([sum_(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()


StatementMeta(spark6ojqgxp, 0, 41, Finished, Available, Finished)

+---------+---------------+-------+---------+--------+---------+-------------+----------+
|UniqueID |PropertyAddress|LandUse|SalePrice|SaleDate|LandValue|BuildingValue|TotalValue|
+---------+---------------+-------+---------+--------+---------+-------------+----------+
|        0|             29|      0|        3|       3|    30465|        30468|     30468|
+---------+---------------+-------+---------+--------+---------+-------------+----------+



In [41]:
from pyspark.sql.functions import when, count

# Count the number of missing and non-missing values in 'PropertyAddress'
df.select(col("PropertyAddress").isNull().alias("isNull")).groupBy("isNull").count().show()


StatementMeta(spark6ojqgxp, 0, 42, Finished, Available, Finished)

+------+-----+
|isNull|count|
+------+-----+
|  true|   29|
| false|56451|
+------+-----+



In [42]:
# Count missing values for 'LandUse'
df.select(col("LandUse").isNull().alias("isNull")).groupBy("isNull").count().show()

# Count missing values for 'BuildingValue'
df.select(col("BuildingValue").isNull().alias("isNull")).groupBy("isNull").count().show()

# Count missing values for 'LandValue'
df.select(col("LandValue").isNull().alias("isNull")).groupBy("isNull").count().show()

# Count missing values for 'TotalValue'
df.select(col("TotalValue").isNull().alias("isNull")).groupBy("isNull").count().show()


StatementMeta(spark6ojqgxp, 0, 43, Finished, Available, Finished)

+------+-----+
|isNull|count|
+------+-----+
| false|56480|
+------+-----+

+------+-----+
|isNull|count|
+------+-----+
|  true|30468|
| false|26012|
+------+-----+

+------+-----+
|isNull|count|
+------+-----+
|  true|30465|
| false|26015|
+------+-----+

+------+-----+
|isNull|count|
+------+-----+
|  true|30468|
| false|26012|
+------+-----+



In [43]:
# List all column names
print(df.columns)


StatementMeta(spark6ojqgxp, 0, 44, Finished, Available, Finished)

['UniqueID ', 'PropertyAddress', 'LandUse', 'SalePrice', 'SaleDate', 'LandValue', 'BuildingValue', 'TotalValue']


# Find Problem for This Data 
Pros of the Dataset:
Large Dataset:

With over 56,000 entries, the dataset provides a substantial amount of information, which is useful for statistical analysis and machine learning models.
Property Data:

The dataset includes key variables like LandValue, BuildingValue, and TotalValue, which are crucial for real estate or property value analysis.
Structured Data:

The data has clear structure, with identifiable columns that can be used for further analysis like real estate trend analysis, property pricing models, or investment potential evaluation.
Land Use Information:

The LandUse column offers insight into how the properties are being used, which is useful for urban planning, zoning assessments, or land-use predictions.

Challenges and Solutions:
1) Missing Values:

  Issue: Over 46% of the LandValue, BuildingValue, and TotalValue columns have missing data.

  Solution:
      1- Imputation: Use statistical methods such as mean, median, or regression to estimate missing values.
      
      2- Domain-Specific Rules: For real estate, you can potentially estimate missing values based on the                    available LandUse or location (e.g., similar properties in the same area).
      3- Remove Rows: If imputation is not feasible, you might consider removing rows with significant missing               data if they constitute a small percentage of the dataset.
 
2) Non-Numeric Data in SalePrice:

  Issue: The SalePrice column is stored as a string, which might make it difficult to perform numeric operations.

  Solution: Data Cleaning: Convert SalePrice to a numeric type after cleaning the string (e.g., removing currency     symbols, commas).


  seldate Column (seldate: 4):
  
   Issue: There is an seldate column that contains data but no header, which could be a data anomaly or extra data        not necessary for analysis.
  
  Solution: Review Column Content: Inspect the data within this column. If it doesn’t provide valuable                 information, consider dropping the column entirely.
    Rename: If the column does hold useful data, rename it appropriately after determining its content.
    Uneven Distribution of LandValue and BuildingValue:


 Potential Inconsistent Addresses:
 
   Issue: Some entries in PropertyAddress are missing, and there could be potential inconsistencies (e.g.,                   misspellings, formatting issues).
   
   Solution: 
   1-  Domain-Specific Rules: For real estate, you can potentially estimate missing values based on the                    available LandUse or location (e.g., similar properties in the same area).
   2- Remove Rows: If imputation is not feasible, you might consider removing rows with significant missing               data if they constitute a small percentage of the dataset.
   

In [44]:
building_value_mean = df.agg({"BuildingValue": "mean"}).collect()[0][0]
building_value_mean

StatementMeta(spark6ojqgxp, 0, 45, Finished, Available, Finished)

160801.98658311547

In [45]:
land_value_mean = df.agg({"LandValue": "mean"}).collect()[0][0] 
land_value_mean

StatementMeta(spark6ojqgxp, 0, 46, Finished, Available, Finished)

69070.49110128771

In [46]:
total_value_mean = df.agg({"TotalValue": "mean"}).collect()[0][0]
total_value_mean

StatementMeta(spark6ojqgxp, 0, 47, Finished, Available, Finished)

232392.33088574503

##  Solution 1: fill null values of BuildingValue, LandValue, and TotalValue by their mean


In [47]:


# Round the means to 2 decimal places
building_value_mean = round(building_value_mean, 2)
land_value_mean = round(land_value_mean, 2)
total_value_mean = round(total_value_mean, 2)

df = df.fillna({"BuildingValue": building_value_mean, 
                                  "LandValue": land_value_mean, 
                                  "TotalValue": total_value_mean})
    
display(df.limit(10))

StatementMeta(spark6ojqgxp, 0, 48, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d56cca07-b718-421a-8ac9-1641182b7bf0)

In [48]:
# Display descriptive statistics for the SalePrice column
df.select('SalePrice').describe().show()

StatementMeta(spark6ojqgxp, 0, 49, Finished, Available, Finished)

+-------+------------------+
|summary|         SalePrice|
+-------+------------------+
|  count|             56477|
|   mean|327231.51446028514|
| stddev| 929922.0770860187|
|    min|       $1,124,900 |
|    max|            999999|
+-------+------------------+



In [49]:
##  Solution 1: fill null values of BuildingValue, LandValue, and TotalValue by their mean
df.select('BuildingValue').describe().show()

StatementMeta(spark6ojqgxp, 0, 50, Finished, Available, Finished)

+-------+------------------+
|summary|     BuildingValue|
+-------+------------------+
|  count|             56480|
|   mean|160801.98842626665|
| stddev| 140345.0086866344|
|    min|                 0|
|    max|            999400|
+-------+------------------+



In [50]:
df.select('TotalValue').describe().show()

StatementMeta(spark6ojqgxp, 0, 51, Finished, Available, Finished)

+-------+------------------+
|summary|        TotalValue|
+-------+------------------+
|  count|             56480|
|   mean|232392.33040790205|
| stddev| 190747.0310694506|
|    min|               100|
|    max|            999600|
+-------+------------------+



In [51]:

# Show the first few rows of the dataframe
df.show()


StatementMeta(spark6ojqgxp, 0, 52, Finished, Available, Finished)

+---------+--------------------+-----------------+---------+--------------+---------+-------------+----------+
|UniqueID |     PropertyAddress|          LandUse|SalePrice|      SaleDate|LandValue|BuildingValue|TotalValue|
+---------+--------------------+-----------------+---------+--------------+---------+-------------+----------+
|     2045|1808  FOX CHASE D...|    SINGLE FAMILY|   240000| April 9, 2013|    50000|       168200|    235700|
|    16918|1832  FOX CHASE D...|    SINGLE FAMILY|   366000|     10-Jun-14|    50000|       264100|    319000|
|    54582|1864 FOX CHASE  D...|    SINGLE FAMILY|   435000|     26-Sep-16|    50000|       216200|    298000|
|    43070|1853  FOX CHASE D...|    SINGLE FAMILY|   255000|     29-Jan-16|    50000|       147300|    197300|
|    22714|1829  FOX CHASE D...|    SINGLE FAMILY|   278000|     10-Oct-14|    50000|       152300|    202300|
|    18367|1821  FOX CHASE D...|    SINGLE FAMILY|   267000|     16-Jul-14|    50000|       190400|    259800|
|

In [52]:
# Count the number of null values in each column
from pyspark.sql.functions import col, sum as sum_

df.select([sum_(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

# Display the schema of the dataframe (info equivalent in PySpark)
df.printSchema()

StatementMeta(spark6ojqgxp, 0, 53, Finished, Available, Finished)

+---------+---------------+-------+---------+--------+---------+-------------+----------+
|UniqueID |PropertyAddress|LandUse|SalePrice|SaleDate|LandValue|BuildingValue|TotalValue|
+---------+---------------+-------+---------+--------+---------+-------------+----------+
|        0|             29|      0|        3|       3|        0|            0|         0|
+---------+---------------+-------+---------+--------+---------+-------------+----------+

root
 |-- UniqueID : string (nullable = true)
 |-- PropertyAddress: string (nullable = true)
 |-- LandUse: string (nullable = true)
 |-- SalePrice: string (nullable = true)
 |-- SaleDate: string (nullable = true)
 |-- LandValue: string (nullable = false)
 |-- BuildingValue: string (nullable = false)
 |-- TotalValue: string (nullable = false)



## Solution 2: Non-Numeric Data in SalePrice


In [53]:
from pyspark.sql.functions import regexp_replace, col

# Remove '$' and ',' from SalePrice and cast to float
df = df.withColumn("SalePrice", regexp_replace("SalePrice", "\\$", ""))
df = df.withColumn("SalePrice", regexp_replace("SalePrice", ",", "").cast("float"))

# Sum of SalePrice
sale_price_sum = df.agg({"SalePrice": "sum"}).collect()[0][0]
print(f"Total SalePrice: {sale_price_sum}")

StatementMeta(spark6ojqgxp, 0, 54, Finished, Available, Finished)

Total SalePrice: 18479892264.0


In [54]:
df.show()

StatementMeta(spark6ojqgxp, 0, 55, Finished, Available, Finished)

+---------+--------------------+-----------------+---------+--------------+---------+-------------+----------+
|UniqueID |     PropertyAddress|          LandUse|SalePrice|      SaleDate|LandValue|BuildingValue|TotalValue|
+---------+--------------------+-----------------+---------+--------------+---------+-------------+----------+
|     2045|1808  FOX CHASE D...|    SINGLE FAMILY| 240000.0| April 9, 2013|    50000|       168200|    235700|
|    16918|1832  FOX CHASE D...|    SINGLE FAMILY| 366000.0|     10-Jun-14|    50000|       264100|    319000|
|    54582|1864 FOX CHASE  D...|    SINGLE FAMILY| 435000.0|     26-Sep-16|    50000|       216200|    298000|
|    43070|1853  FOX CHASE D...|    SINGLE FAMILY| 255000.0|     29-Jan-16|    50000|       147300|    197300|
|    22714|1829  FOX CHASE D...|    SINGLE FAMILY| 278000.0|     10-Oct-14|    50000|       152300|    202300|
|    18367|1821  FOX CHASE D...|    SINGLE FAMILY| 267000.0|     16-Jul-14|    50000|       190400|    259800|
|

## Solution 3: Missing Address Handling

In [55]:
from pyspark.sql.functions import count

# Count missing values in 'PropertyAddress'
df.select(col("PropertyAddress").isNull().alias("MissingAddress")).groupBy("MissingAddress").count().show()

# Drop rows with any missing values
df = df.dropna()

# Show DataFrame after dropping missing values
df.show()


StatementMeta(spark6ojqgxp, 0, 56, Finished, Available, Finished)

+--------------+-----+
|MissingAddress|count|
+--------------+-----+
|          true|   29|
|         false|56451|
+--------------+-----+

+---------+--------------------+-----------------+---------+--------------+---------+-------------+----------+
|UniqueID |     PropertyAddress|          LandUse|SalePrice|      SaleDate|LandValue|BuildingValue|TotalValue|
+---------+--------------------+-----------------+---------+--------------+---------+-------------+----------+
|     2045|1808  FOX CHASE D...|    SINGLE FAMILY| 240000.0| April 9, 2013|    50000|       168200|    235700|
|    16918|1832  FOX CHASE D...|    SINGLE FAMILY| 366000.0|     10-Jun-14|    50000|       264100|    319000|
|    54582|1864 FOX CHASE  D...|    SINGLE FAMILY| 435000.0|     26-Sep-16|    50000|       216200|    298000|
|    43070|1853  FOX CHASE D...|    SINGLE FAMILY| 255000.0|     29-Jan-16|    50000|       147300|    197300|
|    22714|1829  FOX CHASE D...|    SINGLE FAMILY| 278000.0|     10-Oct-14|    50000

In [56]:
df.show()

StatementMeta(spark6ojqgxp, 0, 57, Finished, Available, Finished)

+---------+--------------------+-----------------+---------+--------------+---------+-------------+----------+
|UniqueID |     PropertyAddress|          LandUse|SalePrice|      SaleDate|LandValue|BuildingValue|TotalValue|
+---------+--------------------+-----------------+---------+--------------+---------+-------------+----------+
|     2045|1808  FOX CHASE D...|    SINGLE FAMILY| 240000.0| April 9, 2013|    50000|       168200|    235700|
|    16918|1832  FOX CHASE D...|    SINGLE FAMILY| 366000.0|     10-Jun-14|    50000|       264100|    319000|
|    54582|1864 FOX CHASE  D...|    SINGLE FAMILY| 435000.0|     26-Sep-16|    50000|       216200|    298000|
|    43070|1853  FOX CHASE D...|    SINGLE FAMILY| 255000.0|     29-Jan-16|    50000|       147300|    197300|
|    22714|1829  FOX CHASE D...|    SINGLE FAMILY| 278000.0|     10-Oct-14|    50000|       152300|    202300|
|    18367|1821  FOX CHASE D...|    SINGLE FAMILY| 267000.0|     16-Jul-14|    50000|       190400|    259800|
|

## Solution 4: Edit Format of Date in SaleDate


In [57]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType  # Import StringType
from datetime import datetime
from pyspark.sql import SparkSession

# Create a Spark session


# Define a UDF to manually handle date conversion for unsupported formats
def convert_to_date(date_str):
    try:
        # Try parsing common formats
        return datetime.strptime(date_str, '%d-%b-%y').strftime('%Y-%m-%d')
    except ValueError:
        try:
            return datetime.strptime(date_str, '%B %d, %Y').strftime('%Y-%m-%d')
        except ValueError:
            return None  # Return None if all parsing fails

# Register the UDF in Spark
convert_to_date_udf = F.udf(convert_to_date, StringType())

# Apply the UDF to the 'SaleDate' column and create a new formatted column
df_spark = df.withColumn("SaleDateFormatted", 
                               F.coalesce(
                                   F.to_date(F.col("SaleDate"), 'd-MMM-yy'),
                                   F.to_date(F.col("SaleDate"), 'MMMM d, yyyy'),
                                   convert_to_date_udf(F.col("SaleDate"))
                               ))

# Show the result
df_spark.show(truncate=False)


StatementMeta(spark6ojqgxp, 0, 58, Finished, Available, Finished)

+---------+-------------------------------------+-----------------+---------+--------------+---------+-------------+----------+-----------------+
|UniqueID |PropertyAddress                      |LandUse          |SalePrice|SaleDate      |LandValue|BuildingValue|TotalValue|SaleDateFormatted|
+---------+-------------------------------------+-----------------+---------+--------------+---------+-------------+----------+-----------------+
|2045     |1808  FOX CHASE DR, GOODLETTSVILLE   |SINGLE FAMILY    |240000.0 |April 9, 2013 |50000    |168200       |235700    |2013-04-09       |
|16918    |1832  FOX CHASE DR, GOODLETTSVILLE   |SINGLE FAMILY    |366000.0 |10-Jun-14     |50000    |264100       |319000    |2014-06-10       |
|54582    |1864 FOX CHASE  DR, GOODLETTSVILLE   |SINGLE FAMILY    |435000.0 |26-Sep-16     |50000    |216200       |298000    |2016-09-26       |
|43070    |1853  FOX CHASE DR, GOODLETTSVILLE   |SINGLE FAMILY    |255000.0 |29-Jan-16     |50000    |147300       |197300  

In [58]:
df.show()

StatementMeta(spark6ojqgxp, 0, 59, Finished, Available, Finished)

+---------+--------------------+-----------------+---------+--------------+---------+-------------+----------+
|UniqueID |     PropertyAddress|          LandUse|SalePrice|      SaleDate|LandValue|BuildingValue|TotalValue|
+---------+--------------------+-----------------+---------+--------------+---------+-------------+----------+
|     2045|1808  FOX CHASE D...|    SINGLE FAMILY| 240000.0| April 9, 2013|    50000|       168200|    235700|
|    16918|1832  FOX CHASE D...|    SINGLE FAMILY| 366000.0|     10-Jun-14|    50000|       264100|    319000|
|    54582|1864 FOX CHASE  D...|    SINGLE FAMILY| 435000.0|     26-Sep-16|    50000|       216200|    298000|
|    43070|1853  FOX CHASE D...|    SINGLE FAMILY| 255000.0|     29-Jan-16|    50000|       147300|    197300|
|    22714|1829  FOX CHASE D...|    SINGLE FAMILY| 278000.0|     10-Oct-14|    50000|       152300|    202300|
|    18367|1821  FOX CHASE D...|    SINGLE FAMILY| 267000.0|     16-Jul-14|    50000|       190400|    259800|
|

# save data on data lake

In [59]:
df.write.csv('realestate/units_clean.csv', header=True, mode='overwrite')

StatementMeta(spark6ojqgxp, 0, 60, Finished, Available, Finished)