# Sprint 5 – PySpark Tables: Turning Unix-era insights into PySpark assets

In [3]:
import os, sys, traceback
print("Python exe:", sys.executable)

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

os.environ["HADOOP_HOME"] = r"C:\hadoop"
os.environ["PATH"] += r";C:\hadoop\bin"

from pyspark.sql import SparkSession

try:
    spark = SparkSession.getActiveSession()
    if spark is None:
        spark = (
            SparkSession.builder
            .master("local[*]")
            .appName("Sprint-Tables")
            .config("spark.driver.host", "127.0.0.1")
            .config("spark.driver.bindAddress", "127.0.0.1")
            .config("spark.sql.shuffle.partitions", "4")
            .config("spark.driver.memory", "4g")
            .getOrCreate()
        )
    print("✅ Spark ready:", spark.version, "| master:", spark.sparkContext.master)
except Exception as e:
    print("❌ Spark start failed")
    traceback.print_exc()


Python exe: C:\Users\Jaafa\anaconda3\envs\pyspark101\python.exe
✅ Spark ready: 3.5.1 | master: local[*]


In [4]:
import os
DATA_PATH = r"C:\Users\Jaafa\OneDrive\Documents\Projects\bravefalcons-HouseTS-housing\data\HouseTS.csv"   # file you moved into the data/ folder

print("Exists?", os.path.exists(DATA_PATH))

df = (spark.read
      .option("header", True)
      .option("inferSchema", True)
      .csv(DATA_PATH)
      .limit(10000))

df.printSchema()
df.show(5, truncate=False)
print("Sample rows:", df.count())



Exists? True
root
 |-- date: date (nullable = true)
 |-- median_sale_price: double (nullable = true)
 |-- median_list_price: double (nullable = true)
 |-- median_ppsf: double (nullable = true)
 |-- median_list_ppsf: double (nullable = true)
 |-- homes_sold: double (nullable = true)
 |-- pending_sales: double (nullable = true)
 |-- new_listings: double (nullable = true)
 |-- inventory: double (nullable = true)
 |-- median_dom: double (nullable = true)
 |-- avg_sale_to_list: double (nullable = true)
 |-- sold_above_list: double (nullable = true)
 |-- off_market_in_two_weeks: double (nullable = true)
 |-- city: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- bank: double (nullable = true)
 |-- bus: double (nullable = true)
 |-- hospital: double (nullable = true)
 |-- mall: double (nullable = true)
 |-- park: double (nullable = true)
 |-- restaurant: double (nullable = true)
 |-- school: double (nullable = true)
 |-- station: doubl

### Frequency Table

In [5]:
from pyspark.sql import functions as F
import os

FREQ_COL = "city_full"

frequency_tbl = (
    df.groupBy(FREQ_COL)
      .agg(F.count("*").alias("count"))
      .orderBy(F.desc("count"))
)

frequency_tbl.show(10, truncate=False)




+--------------------------------+-----+
|city_full                       |count|
+--------------------------------+-----+
|Atlanta-Sandy Springs-Alpharetta|10000|
+--------------------------------+-----+



In [6]:
ARTIFACTS = "./artifacts"

freq_csv_dir = os.path.join(ARTIFACTS, f"frequency_{FREQ_COL}_csv")
freq_parquet_dir = os.path.join(ARTIFACTS, f"frequency_{FREQ_COL}_parquet")

(frequency_tbl.coalesce(1)
    .write.mode("overwrite").option("header", True).csv(freq_csv_dir))

(frequency_tbl
    .write.mode("overwrite").parquet(freq_parquet_dir))

print("✅ Wrote:", freq_csv_dir, "and", freq_parquet_dir)


✅ Wrote: ./artifacts\frequency_city_full_csv and ./artifacts\frequency_city_full_parquet


In [7]:
freq_csv_reload = spark.read.option("header", True).csv(freq_csv_dir)
freq_pq_reload  = spark.read.parquet(freq_parquet_dir)

print(
    "Frequency row count (orig / csv / parquet):",
    frequency_tbl.count(),
    freq_csv_reload.count(),
    freq_pq_reload.count()
)


Frequency row count (orig / csv / parquet): 1 1 1


### Top-N Table

In [8]:
TOP_N = 10
GROUP_COL = "city_full"
VALUE_COL = "median_sale_price"

df_top = df.withColumn(VALUE_COL, F.col(VALUE_COL).cast("double"))

topn_overall = (
    df_top.groupBy(GROUP_COL)
          .agg(F.max(VALUE_COL).alias("max_median_sale_price"))
          .orderBy(F.desc("max_median_sale_price"))
          .limit(TOP_N)
)

topn_overall.show(TOP_N, truncate=False)


+--------------------------------+---------------------+
|city_full                       |max_median_sale_price|
+--------------------------------+---------------------+
|Atlanta-Sandy Springs-Alpharetta|875000.0             |
+--------------------------------+---------------------+



In [9]:
topn_csv_dir     = os.path.join(ARTIFACTS, f"top{TOP_N}_{GROUP_COL}_by_{VALUE_COL}_csv")
topn_parquet_dir = os.path.join(ARTIFACTS, f"top{TOP_N}_{GROUP_COL}_by_{VALUE_COL}_parquet")

(topn_overall.coalesce(1)
    .write.mode("overwrite").option("header", True).csv(topn_csv_dir))
(topn_overall
    .write.mode("overwrite").parquet(topn_parquet_dir))

print("✅ Wrote:", topn_csv_dir, "and", topn_parquet_dir)



✅ Wrote: ./artifacts\top10_city_full_by_median_sale_price_csv and ./artifacts\top10_city_full_by_median_sale_price_parquet


In [10]:
topn_overall_re = spark.read.parquet(topn_parquet_dir)
print("TopN count (orig vs reload):", topn_overall.count(), topn_overall_re.count())
topn_overall_re.show(TOP_N, truncate=False)



TopN count (orig vs reload): 1 1
+--------------------------------+---------------------+
|city_full                       |max_median_sale_price|
+--------------------------------+---------------------+
|Atlanta-Sandy Springs-Alpharetta|875000.0             |
+--------------------------------+---------------------+



### Example of Join

In [11]:
city_lookup = spark.createDataFrame(
    [("Atlanta–Sandy Springs–Alpharetta", "GA")],
    ["city_full", "state"]
)

df_joined = df.join(city_lookup, on="city_full", how="left")
df_joined.select("city_full", "state").distinct().show(5, truncate=False)


+--------------------------------+-----+
|city_full                       |state|
+--------------------------------+-----+
|Atlanta-Sandy Springs-Alpharetta|NULL |
+--------------------------------+-----+



### Example of Window

In [12]:
from pyspark.sql import Window


time_candidates = [c for c in ["ts", "date", "sale_date"] if c in df.columns]
if time_candidates:
    tcol = time_candidates[0]
    dfw = df.withColumn("ts_norm", F.to_timestamp(F.col(tcol))).withColumn("day", F.to_date("ts_norm"))

    w = Window.partitionBy("day").orderBy(F.desc("cnt"))
    perday_top = (
        dfw.groupBy("day", "city_full")
           .agg(F.count(F.lit(1)).alias("cnt"))
           .withColumn("rank_per_day", F.row_number().over(w))
           .filter(F.col("rank_per_day") <= 5)
           .orderBy("day", F.desc("cnt"))
    )

    perday_csv_dir = os.path.join(ARTIFACTS, "topn_per_day_city_full_csv")
    perday_pq_dir  = os.path.join(ARTIFACTS, "topn_per_day_city_full_parquet")

    (perday_top.coalesce(1)
        .write.mode("overwrite").option("header", True).csv(perday_csv_dir))
    perday_top.write.mode("overwrite").parquet(perday_pq_dir)

    csv_cnt = spark.read.option("header", True).csv(perday_csv_dir).count()
    pq_cnt  = spark.read.parquet(perday_pq_dir).count()
    print("Top-per-day row count (orig / csv / parquet):",
          perday_top.count(), csv_cnt, pq_cnt)
else:
    print("⚠️ No timestamp column ('ts', 'date', 'sale_date') found — window example skipped.")


Top-per-day row count (orig / csv / parquet): 142 142 142


In [13]:
print("Spark version:", spark.version)
print("Shuffle partitions:", spark.conf.get("spark.sql.shuffle.partitions"))
print("Driver memory:", spark.sparkContext.getConf().get("spark.driver.memory"))


Spark version: 3.5.1
Shuffle partitions: 4
Driver memory: 4g


In [14]:
import shutil
shutil.make_archive("artifacts_backup", "zip", "artifacts")
print("Created artifacts_backup.zip")


Created artifacts_backup.zip
