In [1]:
import os
from pyspark.sql import SparkSession, DataFrame, Row
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import window as W
from pyspark.errors import AnalysisException


from dotenv import load_dotenv

os.environ["SPARK_VERSION"] = "3.3"
load_dotenv("../.env-deploy", override=True)

True

In [2]:
data_home = "/Users/kwesi/Desktop/ai/gpts/mlsgpt/data"
jar_files = ["postgresql-42.7.3.jar", "mysql-connector-j-8.0.33.jar"]
jar_opts = ",".join([f"{data_home}/jars/{jar}" for jar in jar_files])
warehouse = f"{data_home}/warehouse"

spark: SparkSession = (
    SparkSession.builder\
    .appName("MLSGPT")
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.shuffle.service.enabled", "true")
    .config("spark.sql.warehouse.dir", f"{warehouse}")
    .config("spark.sql.session.timeZone", "UTC")
    .config("spark.jars", f"{jar_opts}") 
    .enableHiveSupport()
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")

24/05/19 03:42:39 WARN Utils: Your hostname, marley.local resolves to a loopback address: 127.0.0.1; using 10.0.0.135 instead (on interface en0)
24/05/19 03:42:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/05/19 03:42:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
def read_table(url:str, props:dict, table_name: str, ) -> DataFrame:
    try:
        return spark.read.jdbc(url=url, table=table_name, properties=props)
    except AnalysisException as e:
        print(f"Table {table_name} not found")
        return None
    
pg_url = "jdbc:postgresql://{}:{}/{}".format(os.getenv("POSTGRES_HOST"), os.getenv("POSTGRES_PORT"),os.getenv("POSTGRES_DB"))
pg_props = {
    "user": os.getenv("POSTGRES_USER"),
    "password": os.getenv("POSTGRES_PASSWORD"),
    "driver": "org.postgresql.Driver"
}
tables = ["rsbr.boards", "rsbr.office", "rsbr.agent", "rsbr.property", "rsbr.property_rooms"]
columns = [c.column for c in spark.read.csv("../data/columns_to_keep.csv", header=True).collect()]
df = (
    read_table(pg_url, pg_props, "rsbr.property")    
    .select(columns)
)

In [4]:
dom = (
    F.when(F.col("Sold") == 1, F.datediff(F.current_date(), F.to_date("ListingContractDate", "yyyy-MM-dd")))
    .otherwise(F.datediff(F.current_date(), F.to_date("ListingContractDate", "yyyy-MM-dd")))
    .alias("DaysOnMarket")
)
sold = F.when(F.col("Sold") == 1, "Yes").otherwise("No").alias("Sold")
pp_sq_ft = F.round(F.col("Price") / F.col("SizeInterior"),2).alias("PricePerSqft")
win_top = W.Window.partitionBy("City", "PropertyType").orderBy(F.desc("Price"))
win_bottom = W.Window.partitionBy("City", "PropertyType").orderBy(F.asc("Price"))
df_base = df.select(
    "City",
    "CommunityName",
    F.col("AmmenitiesNearby").alias("AmenitiesNearby"),
    "Type",
    "PropertyType",
    "OwnershipType",
    "BedroomsTotal",
    "BathroomTotal",
    F.substring("PostalCode", 0,3).alias("FSA"),
    F.quarter(F.to_date("ListingContractDate", "yyyy-MM-dd")).alias("Quarter"),
    F.year(F.to_date("ListingContractDate", "yyyy-MM-dd")).alias("Year"),
    F.rank().over(win_top).alias("TopRank"),
    F.rank().over(win_bottom).alias("BottomRank"),
    dom,
    sold,
    pp_sq_ft,
    "Price",
    "Lease",
).filter(F.col("Price") > 0)

In [15]:
groups = (
    ("City",),
    ("City", "Type"),
    ("City", "PropertyType"),
    ("City", "BedroomsTotal"),
)

agg_cols = (
    F.count("*").cast(T.IntegerType()).alias("InventoryCount"),
    F.avg("Price").cast(T.DecimalType(15,2)).alias("AveragePrice"),
    F.percentile_approx("Price", 0.5).cast(T.DecimalType(15,2)).alias("MedianPrice"),
    F.min("Price").cast(T.DecimalType(15,2)).alias("MinimumPrice"),
    F.max("Price").cast(T.DecimalType(15,2)).alias("MaximumPrice"),
    F.avg("DaysOnMarket").cast(T.DecimalType(15,2)).alias("AverageDaysOnMarket"),
    F.percentile_approx("DaysOnMarket", 0.5).cast(T.DecimalType(15,2)).alias("MedianDaysOnMarket"),
    F.min("DaysOnMarket").cast(T.DecimalType(15,2)).alias("MinimumDaysOnMarket"),
    F.max("DaysOnMarket").cast(T.DecimalType(15,2)).alias("MaximumDaysOnMarket"),
    F.avg("PricePerSqft").cast(T.DecimalType(15,2)).alias("AveragePricePerSqft"),
)

df_stats = {}
for group in groups:
    group_key = "".join(group) + "Stats"
    df_stats[group_key] = df_base.groupBy(*group).agg(*agg_cols)

In [16]:
for key, value in df_stats.items():
    print(key)
    value.show(10)
    print("\n\n")   

CityStats
+-------------------+--------------+------------+-----------+------------+------------+-------------------+------------------+-------------------+-------------------+-------------------+
|               City|InventoryCount|AveragePrice|MedianPrice|MinimumPrice|MaximumPrice|AverageDaysOnMarket|MedianDaysOnMarket|MinimumDaysOnMarket|MaximumDaysOnMarket|AveragePricePerSqft|
+-------------------+--------------+------------+-----------+------------+------------+-------------------+------------------+-------------------+-------------------+-------------------+
|          aberfoyle|             1|   494900.00|  494900.00|   494900.00|   494900.00|              53.00|             53.00|              53.00|              53.00|             554.20|
|              acton|            13|  1197152.92| 1249000.00|   825000.00|  1800000.00|              36.69|             33.00|              19.00|              88.00|             560.10|
|addington highlands|            22|   577268.05|  5499

In [18]:
# table_names ={
#     "CityStats": "rsbr.city_stats",
#     "CityTypeStats": "rsbr.city_type_stats",
#     "CityPropertyTypeStats": "rsbr.city_property_type_stats",
#     "CityBedroomsTotalStats": "rsbr.city_bedrooms_stats",
# }

# for key, value in df_stats.items():
#     # value.describe().show(vertical=True) 
#     value.write.jdbc(url=pg_url, table=table_names[key], mode="append", properties=pg_props)

                                                                                

In [29]:
cities = [r.City for r in df_base.select("City").distinct().sort(F.asc("City")).collect()]
city_types = [r.Type for r in df_base.select("Type").filter(F.col("Type").isNotNull()).distinct().sort(F.asc("Type")).collect()]
property_types = [r.PropertyType for r in df_base.select("PropertyType").distinct().sort(F.asc("PropertyType")).collect()]
bedrooms = [str(r.BedroomsTotal) for r in df_base.select("BedroomsTotal").distinct().sort(F.asc("BedroomsTotal")).collect()]

rows = [
    Row(Attribute="City", Values=cities),
    Row(Attribute="Type", Values=city_types),
    Row(Attribute="PropertyType", Values=property_types),
    Row(Attribute="BedroomsTotal", Values=bedrooms),
]
df_info = spark.createDataFrame(rows)


In [30]:
df_info.show(vertical=True)

-RECORD 0-------------------------
 Attribute | City                 
 Values    | [aberfoyle, acton... 
-RECORD 1-------------------------
 Attribute | Type                 
 Values    | [Apartment, Comme... 
-RECORD 2-------------------------
 Attribute | PropertyType         
 Values    | [Multi-family, Si... 
-RECORD 3-------------------------
 Attribute | BedroomsTotal        
 Values    | [0, 1, 2, 3, 4, 5... 



In [31]:
df_info.write.jdbc(url=pg_url, table="rsbr.stats_info", mode="append", properties=pg_props)

In [8]:
# # Compute average and median prices by city
# price_stats = (
#     df_base.groupBy("City", "Type", "OwnershipType", "BedroomsTotal", "FSA")
#     .agg(
#         F.count("City").alias("InventoryLevel"),
#         F.avg("Price").alias("AveragePrice"),
#         F.percentile_approx("Price", 0.5).alias("MedianPrice")
#     )
# )
# print(price_stats.count())
# price_stats.show(5)

In [9]:
# # Group by quarter and compute average price
# trends = (
#     df_base
#     .groupBy("City", "Quarter")
#     .agg(F.avg("Price").alias("AveragePrice"))
# )
# trends.show(5)

In [10]:
# # Inventory levels by city and property type
# inventory_levels = (
#     df_base
#     .groupBy("City", "PropertyType")
#     .agg(F.count("City").alias("InventoryLevel"))
#     .orderBy("InventoryLevel", ascending=False)
# )
# print(inventory_levels.count())
# inventory_levels.show(5)

In [11]:
# # Rank properties and select those to compare
# cma = (
#     df_base
#     .filter((F.col("TopRank") <= 5) | (F.col("BottomRank") <= 5))
# )
# print(cma.count())
# cma.show(5)

In [12]:
# community_features = (
#     df_base
#     .groupBy("CommunityName").agg(F.collect_set("AmenitiesNearBy").alias("UniqueAmenities"))
# )
# print(community_features.count())
# community_features.show(5)

In [13]:
# price_per_sq_ft = (
#     df_base
#     .groupBy("City")
#     .agg(F.avg("PricePerSqFt").alias("AvgPricePerSqFt"))
#     .orderBy("AvgPricePerSqFt", ascending=False)
# )
# print(price_per_sq_ft.count())
# price_per_sq_ft.show(100)