In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, sum as _sum, avg, countDistinct, count, to_date, date_trunc,
    when, lit, round as sround
)

spark = SparkSession.builder.appName("GoldMarts").getOrCreate()
spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")

In [8]:
BASE   = "/home/jovyan/work"
SILVER = f"{BASE}/parquet/silver"
GOLD   = f"{BASE}/parquet/gold"

In [9]:
users_s   = spark.read.parquet(f"{SILVER}/users")                # user_id, goal, age_band, ‚Ä¶
foods_s   = spark.read.parquet(f"{SILVER}/food_items")           # not needed below but good to have
intake_s  = spark.read.parquet(f"{SILVER}/intake_enriched")      # user_id, taken_at, kcal, protein_g, carbs_g, fat_g
water_s   = spark.read.parquet(f"{SILVER}/water_logs") 

In [10]:
import os, glob

BASE = "/home/jovyan/work"
SILVER = f"{BASE}/parquet/silver"

print("SILVER path:", SILVER)
print("parquet dir:", os.listdir(f"{BASE}/parquet"))
print("silver dir :", os.listdir(SILVER))        # should list: ['users','food_items','intake_enriched','water_logs',...]


SILVER path: /home/jovyan/work/parquet/silver
parquet dir: ['bronze', 'gold', 'silver']
silver dir : ['food_items', 'intake_enriched', 'users', 'water_logs']


In [11]:
import os, glob
BASE   = "/home/jovyan/work"
BRONZE = f"{BASE}/parquet/bronze"
SILVER = f"{BASE}/parquet/silver"

print("silver:", os.listdir(SILVER))           # should list: users, food_items, intake_enriched, water_logs
print("bronze:", os.listdir(f"{BASE}/parquet/bronze"))
print("food_items bronze exists? ", os.path.exists(f"{BRONZE}/food_items"))


silver: ['food_items', 'intake_enriched', 'users', 'water_logs']
bronze: ['food_items', 'intake_logs', 'mood_logs', 'users', 'water_logs']
food_items bronze exists?  True


In [12]:
from pyspark.sql.functions import col, to_date, sum as _sum, when, lit, round as sround, date_trunc, avg, countDistinct

spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")

BASE   = "/home/jovyan/work"
GOLD   = f"{BASE}/parquet/gold"

# 1) Aggregate intake per user-day
intake_day = (
    intake_s
      .withColumn("date", to_date(col("taken_at")))
      .groupBy("user_id","date")
      .agg(
          _sum("kcal").alias("total_kcal"),
          _sum("protein_g").alias("total_protein_g"),
          _sum("carbs_g").alias("total_carbs_g"),
          _sum("fat_g").alias("total_fat_g")
      )
)

# 2) Aggregate water per user-day
water_day = (
    water_s
      .withColumn("date", to_date(col("taken_at")))
      .groupBy("user_id","date")
      .agg(_sum("ml").alias("water_ml"))
)

# 3) Join goal, compute targets & adherence
daily = (
    intake_day
      .join(users_s.select("user_id","goal","age_band"), "user_id", "left")
      .join(water_day, ["user_id","date"], "left")
      .withColumn("target_kcal",
          when(col("goal")=="weight_gain", lit(2200.0))
          .when(col("goal")=="weight_loss", lit(1500.0))
          .otherwise(lit(1800.0))
      )
      .withColumn("adherence_pct", sround((col("total_kcal")/col("target_kcal"))*100, 2))
      .withColumn("surplus_deficit_kcal", sround(col("total_kcal") - col("target_kcal"), 1))
      .withColumn("water_ml", when(col("water_ml").isNull(), lit(0)).otherwise(col("water_ml")))
)

daily.write.mode("overwrite").parquet(f"{GOLD}/daily_user_nutrition")
print("‚úÖ GOLD written: daily_user_nutrition")
daily.orderBy("user_id","date").show(10, truncate=False)


‚úÖ GOLD written: daily_user_nutrition
+-------+----------+----------+---------------+-------------+-----------+-----------+--------+--------+-----------+-------------+--------------------+
|user_id|date      |total_kcal|total_protein_g|total_carbs_g|total_fat_g|goal       |age_band|water_ml|target_kcal|adherence_pct|surplus_deficit_kcal|
+-------+----------+----------+---------------+-------------+-----------+-----------+--------+--------+-----------+-------------+--------------------+
|1      |2025-10-11|415.5     |9.15           |83.25        |24.3       |maintenance|20-22   |0       |1800.0     |23.08        |-1384.5             |
|1      |2025-10-12|590.0     |34.4           |80.0         |32.0       |maintenance|20-22   |0       |1800.0     |32.78        |-1210.0             |
|1      |2025-10-21|385.5     |12.0           |52.35        |22.5       |maintenance|20-22   |0       |1800.0     |21.42        |-1414.5             |
|1      |2025-10-25|139.0     |2.4            |52.4    

In [13]:
weekly_prep = daily.withColumn("week_start", date_trunc("week", col("date")))

weekly = (
    weekly_prep
      .groupBy("user_id","week_start","goal","age_band")
      .agg(
          sround(avg("adherence_pct"),2).alias("avg_adherence_pct"),
          countDistinct("date").alias("days_logged"),
          _sum(when(col("adherence_pct")>=90, lit(1)).otherwise(lit(0))).alias("days_on_target"),
          sround(avg("total_kcal"),1).alias("avg_kcal"),
          sround(avg("water_ml"),1).alias("avg_water_ml"),
          sround(avg("total_protein_g"),1).alias("avg_protein_g"),
          sround(avg("total_carbs_g"),1).alias("avg_carbs_g"),
          sround(avg("total_fat_g"),1).alias("avg_fat_g")
      )
      .withColumn("consistency_pct", sround((col("days_on_target")/col("days_logged"))*100,2))
)

weekly.write.mode("overwrite").parquet(f"{GOLD}/weekly_user_summary")
print("‚úÖ GOLD written: weekly_user_summary")
weekly.orderBy("user_id","week_start").show(10, truncate=False)


‚úÖ GOLD written: weekly_user_summary
+-------+-------------------+-----------+--------+-----------------+-----------+--------------+--------+------------+-------------+-----------+---------+---------------+
|user_id|week_start         |goal       |age_band|avg_adherence_pct|days_logged|days_on_target|avg_kcal|avg_water_ml|avg_protein_g|avg_carbs_g|avg_fat_g|consistency_pct|
+-------+-------------------+-----------+--------+-----------------+-----------+--------------+--------+------------+-------------+-----------+---------+---------------+
|1      |2025-10-06 00:00:00|maintenance|20-22   |27.93            |2          |0             |502.8   |0.0         |21.8         |81.6       |28.2     |0.0            |
|1      |2025-10-20 00:00:00|maintenance|20-22   |14.57            |2          |0             |262.3   |0.0         |7.2          |52.4       |16.8     |0.0            |
|2      |2025-09-29 00:00:00|maintenance|23-26   |9.5              |1          |0             |171.0   |0.0     

In [14]:
from pyspark.sql.functions import concat_ws

alerts = (
    daily
      .select("user_id","date","goal","total_kcal","target_kcal","adherence_pct","water_ml")
      .withColumn(
          "alert_type",
          when(col("adherence_pct") < 85,  lit("UNDER_TARGET"))
          .when(col("adherence_pct") > 115, lit("OVER_TARGET"))
          .when(col("water_ml") < 1500,     lit("LOW_WATER"))
      )
      .filter(col("alert_type").isNotNull())
      .withColumn(
          "message",
          when(col("alert_type")=="UNDER_TARGET",
               concat_ws(" ", lit("Eat ~"), sround(col("target_kcal")-col("total_kcal"),0), lit("kcal more to reach target.")))
          .when(col("alert_type")=="OVER_TARGET",
               concat_ws(" ", lit("Reduce ~"), sround(col("total_kcal")-col("target_kcal"),0), lit("kcal to get back on target.")))
          .otherwise(lit("Drink more water (goal ‚â• 1500 ml)."))
      )
)

alerts.write.mode("overwrite").parquet(f"{GOLD}/alerts")
print("‚úÖ GOLD written: alerts")
alerts.orderBy("user_id","date").show(10, truncate=False)


‚úÖ GOLD written: alerts
+-------+----------+-----------+----------+-----------+-------------+--------+------------+---------------------------------------+
|user_id|date      |goal       |total_kcal|target_kcal|adherence_pct|water_ml|alert_type  |message                                |
+-------+----------+-----------+----------+-----------+-------------+--------+------------+---------------------------------------+
|1      |2025-10-11|maintenance|415.5     |1800.0     |23.08        |0       |UNDER_TARGET|Eat ~ 1385.0 kcal more to reach target.|
|1      |2025-10-12|maintenance|590.0     |1800.0     |32.78        |0       |UNDER_TARGET|Eat ~ 1210.0 kcal more to reach target.|
|1      |2025-10-21|maintenance|385.5     |1800.0     |21.42        |0       |UNDER_TARGET|Eat ~ 1415.0 kcal more to reach target.|
|1      |2025-10-25|maintenance|139.0     |1800.0     |7.72         |0       |UNDER_TARGET|Eat ~ 1661.0 kcal more to reach target.|
|2      |2025-10-04|maintenance|171.0     |1800.0  

In [15]:
for name in ["daily_user_nutrition","weekly_user_summary","alerts"]:
    df = spark.read.parquet(f"{GOLD}/{name}")
    print(name, "-> rows:", df.count())
    df.printSchema()


daily_user_nutrition -> rows: 144
root
 |-- user_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- total_kcal: double (nullable = true)
 |-- total_protein_g: double (nullable = true)
 |-- total_carbs_g: double (nullable = true)
 |-- total_fat_g: double (nullable = true)
 |-- goal: string (nullable = true)
 |-- age_band: string (nullable = true)
 |-- water_ml: long (nullable = true)
 |-- target_kcal: double (nullable = true)
 |-- adherence_pct: double (nullable = true)
 |-- surplus_deficit_kcal: double (nullable = true)

weekly_user_summary -> rows: 117
root
 |-- user_id: integer (nullable = true)
 |-- week_start: timestamp (nullable = true)
 |-- goal: string (nullable = true)
 |-- age_band: string (nullable = true)
 |-- avg_adherence_pct: double (nullable = true)
 |-- days_logged: long (nullable = true)
 |-- days_on_target: long (nullable = true)
 |-- avg_kcal: double (nullable = true)
 |-- avg_water_ml: double (nullable = true)
 |-- avg_protein_g: double (nullable =

In [26]:
from pyspark.sql import functions as F
import os

ROOT = "/home/jovyan/work"          # project root
GOLD = f"{ROOT}/output/gold"        # our GOLD folder

# make sure the folders exist (OS-level; Spark will also create as needed)
os.makedirs(GOLD, exist_ok=True)

# write alerts to a stable place
alerts.write.mode("overwrite").parquet(f"{GOLD}/alerts")

print("‚úÖ Wrote alerts to:", f"{GOLD}/alerts")

‚úÖ Wrote alerts to: /home/jovyan/work/output/gold/alerts


In [27]:
from pyspark.sql import functions as F

# load back from the stable GOLD path
alerts = spark.read.parquet(f"{GOLD}/alerts")

# ensure 'date' is a proper date
alerts = alerts.withColumn("date", F.to_date("date"))

WATER_TARGET = 2000  # ml per day
max_dt = alerts.agg(F.max("date").alias("d")).first()["d"]

reminders = (
    alerts.filter(F.col("date") == F.lit(max_dt))
          .withColumn("need_ml", F.greatest(F.lit(0), F.lit(WATER_TARGET) - F.col("water_ml")))
          .withColumn(
              "reminder_msg",
              F.when(F.col("need_ml") > 0,
                     F.concat(F.lit("Drink "), F.col("need_ml").cast("int"), F.lit(" ml water üíß")))
               .otherwise(F.lit("Hydration target met ‚úÖ"))
          )
          .select("user_id","date","water_ml","need_ml","reminder_msg")
)

out_dir = f"{GOLD}/daily_water_reminders"
reminders.coalesce(1).write.mode("overwrite").option("header","true").csv(out_dir)

print("üìÅ Saved daily reminders to:", out_dir)
reminders.orderBy("user_id").show(truncate=False)

üìÅ Saved daily reminders to: /home/jovyan/work/output/gold/daily_water_reminders
+-------+----------+--------+-------+----------------------+
|user_id|date      |water_ml|need_ml|reminder_msg          |
+-------+----------+--------+-------+----------------------+
|10     |2025-10-29|500     |1500   |Drink 1500 ml water üíß|
+-------+----------+--------+-------+----------------------+



In [28]:
import os, glob
base = "/home/jovyan/work/output/gold/daily_water_reminders"
print("Folder:", base)
print("Files:", os.listdir(base))
print("Sample CSV:", glob.glob(base + "/*.csv") or glob.glob(base + "/*.csv/*"))

Folder: /home/jovyan/work/output/gold/daily_water_reminders
Files: ['.part-00000-fd2b2289-a467-4b43-aea0-a9e4079d3d0b-c000.csv.crc', '._SUCCESS.crc', 'part-00000-fd2b2289-a467-4b43-aea0-a9e4079d3d0b-c000.csv', '_SUCCESS']
Sample CSV: ['/home/jovyan/work/output/gold/daily_water_reminders/part-00000-fd2b2289-a467-4b43-aea0-a9e4079d3d0b-c000.csv']
