In [2]:
from utils.spark_utils import get_spark_session, get_config

spark = get_spark_session()
config = get_config()

In [3]:
from pyspark.sql.functions import col, from_unixtime, sum, max, avg, lit, stddev, when, count
from datetime import timedelta

MILK_PRODUCTION_TABLE_NAME = "milk_production/"
COWS_WEIGHT = "cows_weight/"
COWS_HEALTH_STATUS = "cows_health/"

In [4]:
cows = spark.read.format("delta").load("../" + config["silver_path"] + "cows/")
measurements = spark.read.format("delta").load("../" + config["silver_path"] + "measurements/")
sensors = spark.read.format("delta").load("../" + config["silver_path"] + "sensors/")



# Milk production

Calculates milk production using as sources sensors data and measurements. Only sensor with unit = L will be used.

In [5]:
sensors_milk_production_df = sensors.filter("unit = 'L'")
milk_production_df = (
    measurements.alias("l")
    .join(sensors_milk_production_df.alias("r"), [col("l.sensor_id") == col("r.id")])
    .select(
        from_unixtime(col("timestamp")).cast("date").alias("date"),
        "cow_id",
        "value"
    ).groupBy("date", "cow_id").agg(sum(col("value")).alias("total_milk_production"))
)


In [68]:
milk_production_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(f"../{config['gold_path']}{MILK_PRODUCTION_TABLE_NAME}")

# Cows weight

Calculates cows weight usinga s sources sensors data and measurements. Only sensors with unit = kg will be used. Aggregates weights from the last 30 days and uses the average weight during the current/latest day to calculate current weight

In [7]:


sensors_cows_weight_df = sensors.filter("unit = 'kg'")
cows_weight_df = (
    measurements.alias("l")
    .join(sensors_cows_weight_df.alias("r"), [col("l.sensor_id") == col("r.id")])
    .join(cows.alias("z"), [col("l.cow_id") == col("z.id")])
    .select(
        from_unixtime(col("timestamp")).cast("date").alias("date"),
        "cow_id",
        "value",
    )
)

max_date = cows_weight_df.agg(max(col("date")).alias("max_date")).collect()[0]["max_date"]
date_30_days_ago = max_date - timedelta(days=30)

# Filter DataFrame for the last 30 days
last_30_days_df = cows_weight_df.filter((col("date") <= max_date) & (col("date") > date_30_days_ago))

# Calculate the average weight for the last 30 days
average_weight_last_30_days_df = last_30_days_df.groupBy("cow_id").agg(avg("value").alias("average_weight_last_30_days"))

# Calculate the current weight (weight at the max date)
current_weight_df = cows_weight_df.filter(col("date") == max_date).groupBy("cow_id").agg(avg("value").alias("current_weight"))

# Join the current weight and average weight DataFrames
cows_weight_result_df = current_weight_df.join(average_weight_last_30_days_df, ["cow_id"], "left").withColumn("date", lit(max_date))

In [70]:
cows_weight_result_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(f"../{config['gold_path']}{COWS_WEIGHT}")

# Cows health status

Compute cows health. For this we based the calculation in a standard desviation from the mean to detect anomalies.

In [10]:
milk_baseline_df = milk_production_df.groupBy("cow_id").agg(
    avg("total_milk_production").alias("avg_daily_milk_production"),
    stddev("total_milk_production").alias("stddev_daily_milk_production")
)

weight_baseline_df = cows_weight_result_df.groupBy("cow_id").agg(
    avg("current_weight").alias("avg_current_weight"),
    stddev("current_weight").alias("stddev_current_weight")
)

# Join baseline metrics with original data
milk_with_baseline_df = milk_production_df.join(milk_baseline_df, ["cow_id"])
weight_with_baseline_df = cows_weight_result_df.join(weight_baseline_df, ["cow_id"])

threshold_multiplier = 2

# Identify anomalies for milk production
milk_anomalies_df = milk_with_baseline_df.withColumn(
    "milk_anomaly",
    when(
        col("total_milk_production") < col("avg_daily_milk_production") - threshold_multiplier * col("stddev_daily_milk_production"),
        1
    ).otherwise(0)
)

# Identify anomalies for weight
weight_anomalies_df = weight_with_baseline_df.withColumn(
    "weight_anomaly",
    when(
        col("current_weight") < col("avg_current_weight") - threshold_multiplier * col("stddev_current_weight"),
        1
    ).otherwise(0)
)

# Aggregate anomalies and combine results
milk_anomalies_df = milk_anomalies_df.groupBy("cow_id").agg(
    max(col("milk_anomaly")).alias("milk_anomaly")
)

weight_anomalies_df = weight_anomalies_df.groupBy("cow_id").agg(
    max(col("weight_anomaly")).alias("weight_anomaly")
)

# Join the anomaly results
combined_anomalies_df = milk_anomalies_df.select("cow_id", "milk_anomaly").join(weight_anomalies_df.select("cow_id", "weight_anomaly"), ["cow_id"], "outer")


In [93]:
combined_anomalies_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(f"../{config['gold_path']}{COWS_HEALTH_STATUS}")