### Kübelfüllung Analyse
Hier haben wir die Analyse wie schnell sich jeder einzelne Kübel fühlt. Die Analyse basiert noch auf der alten Datenbankarchitektur und müsse somit angepasst werden.

In [1]:
from pyspark.sql import SparkSession
import uuid
import random
import datetime
import numpy as np

spark = SparkSession.builder \
    .appName("SmartLitter") \
    .getOrCreate()

jdbc_url = "jdbc:postgresql://db:5432/litter_db"
connection_properties = {
    "user": "root",
    "password": "pwd123",
    "driver": "org.postgresql.Driver",
    "stringtype": "unspecified"
}

In [2]:
litter_bin_emptying_df = spark.read.jdbc(
    url=jdbc_url,
    table="litter_bin_emptying",
    properties=connection_properties
)

litter_bin_emptying_df.head(3)

[Row(emptying_uuid='15bb74a3-62be-44f6-a55a-ab1e0a3b3356', fk_litter_bin_uuid='77d3ae5f-359a-4756-b42a-fd5f94127ec3', liter=16, point_in_time=datetime.datetime(2024, 6, 9, 5, 16), bin_full=True),
 Row(emptying_uuid='0b5381f7-01d3-4e35-bd9f-ddd1936da58c', fk_litter_bin_uuid='77d3ae5f-359a-4756-b42a-fd5f94127ec3', liter=22, point_in_time=datetime.datetime(2024, 6, 10, 9, 57), bin_full=True),
 Row(emptying_uuid='6dfb870c-139f-4653-95ce-e6306d726a30', fk_litter_bin_uuid='77d3ae5f-359a-4756-b42a-fd5f94127ec3', liter=27, point_in_time=datetime.datetime(2024, 6, 11, 6, 3), bin_full=True)]

In [5]:
from pyspark.sql.functions import col

# Replace 'your_uuid' with the specific UUID you're interested in
your_uuid = "77d3ae5f-359a-4756-b42a-fd5f94127ec3"

# Filter by the specified UUID, order by 'point_in_time' descending, and limit to the last 20 entries
df_filtered = (
    litter_bin_emptying_df.filter(col("fk_litter_bin_uuid") == your_uuid)
      .orderBy(col("point_in_time").desc())
      .limit(20)
)

# Show the results
df_filtered.show()


+--------------------+--------------------+-----+-------------------+--------+
|       emptying_uuid|  fk_litter_bin_uuid|liter|      point_in_time|bin_full|
+--------------------+--------------------+-----+-------------------+--------+
|d78df067-4d2f-4af...|77d3ae5f-359a-475...|   35|2024-11-12 07:06:00|    true|
|b5abf799-17b2-491...|77d3ae5f-359a-475...|   18|2024-11-11 07:57:00|    true|
|8d8cb44c-064f-477...|77d3ae5f-359a-475...|   15|2024-11-10 06:07:00|    true|
|02f12522-f18c-4a0...|77d3ae5f-359a-475...|   30|2024-11-09 08:17:00|    true|
|b6b3c7e9-47fe-421...|77d3ae5f-359a-475...|   25|2024-11-08 04:54:00|    true|
|ebd32cd4-af5c-4d9...|77d3ae5f-359a-475...|   29|2024-11-07 10:44:00|    true|
|0a951353-89ac-452...|77d3ae5f-359a-475...|   20|2024-11-06 05:20:00|   false|
|4c15fde6-b932-45f...|77d3ae5f-359a-475...|   37|2024-11-05 08:50:00|    true|
|6258101f-dc1f-411...|77d3ae5f-359a-475...|   40|2024-11-04 07:35:00|   false|
|aec4be16-ab15-451...|77d3ae5f-359a-475...|   35|202

In [7]:
# Behalte zufällig 50% der Datensätze und verwerfe den Rest
df_reduced = litter_bin_emptying_df.sample(fraction=0.5, seed=42)  # seed for reproducibility

# Zeige das Ergebnis
df_reduced.show()

# Replace 'your_uuid' with the specific UUID you're interested in
your_uuid = "77d3ae5f-359a-4756-b42a-fd5f94127ec3"

# Filter by the specified UUID, order by 'point_in_time' descending, and limit to the last 20 entries
df_filtered = (
    df_reduced.filter(col("fk_litter_bin_uuid") == your_uuid)
      .orderBy(col("point_in_time").desc())
      .limit(20)
)

# Show the results
df_filtered.show()


+--------------------+--------------------+-----+-------------------+--------+
|       emptying_uuid|  fk_litter_bin_uuid|liter|      point_in_time|bin_full|
+--------------------+--------------------+-----+-------------------+--------+
|5289b3fb-255c-4ba...|77d3ae5f-359a-475...|   31|2024-06-12 04:06:00|    true|
|49bca41a-346a-46e...|77d3ae5f-359a-475...|   21|2024-06-16 09:53:00|    true|
|e56ed3d0-aba9-4b3...|77d3ae5f-359a-475...|   21|2024-06-19 09:01:00|    true|
|e9766c52-5aa2-49b...|77d3ae5f-359a-475...|   24|2024-06-21 10:24:00|    true|
|0b5c38dc-7b7a-40a...|77d3ae5f-359a-475...|   39|2024-06-25 08:36:00|   false|
|6cb7677c-2c62-431...|77d3ae5f-359a-475...|   26|2024-06-27 08:16:00|   false|
|852cc956-b998-416...|77d3ae5f-359a-475...|   23|2024-06-29 06:29:00|   false|
|12c56dd1-3ba6-497...|77d3ae5f-359a-475...|   27|2024-07-04 04:04:00|    true|
|2ac39296-84d8-427...|77d3ae5f-359a-475...|   36|2024-07-05 08:13:00|    true|
|ed42b4b4-cc99-4f4...|77d3ae5f-359a-475...|   38|202

In [9]:
from pyspark.sql import Window
from pyspark.sql.functions import col, lag, lead, when, datediff

# Define the window partitioned by fk_litter_bin_uuid and ordered by point_in_time
window_spec = Window.partitionBy("fk_litter_bin_uuid").orderBy("point_in_time")

# Identify when bin_full changes from False to True
df_with_lag = df_reduced.withColumn("previous_bin_full", lag("bin_full").over(window_spec))

# Filter rows where bin_full changed from False to True
df_false_to_true = df_with_lag.filter((col("previous_bin_full") == False) & (col("bin_full") == True))

# Get the time of the last 'False' value before the change to 'True'
df_false_with_lead_time = df_with_lag.withColumn("next_true_time", lead("point_in_time").over(window_spec))

# Filter for only rows where bin_full is False and we have a next True time
df_false_time = df_false_with_lead_time.filter((col("bin_full") == False) & col("next_true_time").isNotNull())

# Calculate the days difference between the False and the next True
df_days_until_full = df_false_time.withColumn(
    "days_until_full",
    datediff(col("next_true_time"), col("point_in_time"))
)

# Show the result
df_days_until_full.select("fk_litter_bin_uuid", "point_in_time", "days_until_full").show()


+--------------------+--------------------+-----+-------------------+--------+-----------------+-------------------+---------------+
|       emptying_uuid|  fk_litter_bin_uuid|liter|      point_in_time|bin_full|previous_bin_full|     next_true_time|days_until_full|
+--------------------+--------------------+-----+-------------------+--------+-----------------+-------------------+---------------+
|ad20d8d1-c5cc-4b3...|05df253e-6af8-438...|   35|2024-01-26 07:13:00|   false|             true|2024-01-27 09:18:00|              1|
|e8dbe7d2-5702-4b9...|05df253e-6af8-438...|   31|2024-02-03 06:10:00|   false|             true|2024-02-04 11:51:00|              1|
|e54a9b8e-ccc4-4c1...|05df253e-6af8-438...|   35|2024-02-05 07:45:00|   false|             true|2024-02-06 11:45:00|              1|
|86e82bc2-4083-43b...|05df253e-6af8-438...|   30|2024-02-13 09:40:00|   false|             true|2024-02-15 11:56:00|              2|
|d4c3b694-68d2-406...|05df253e-6af8-438...|   23|2024-02-15 11:56:00|

In [12]:
df_days_until_full.select("fk_litter_bin_uuid", "days_until_full").head(10)

[Row(fk_litter_bin_uuid='01c83801-b6b1-4617-a80c-7ac31d159934', days_until_full=1),
 Row(fk_litter_bin_uuid='01c83801-b6b1-4617-a80c-7ac31d159934', days_until_full=1),
 Row(fk_litter_bin_uuid='01c83801-b6b1-4617-a80c-7ac31d159934', days_until_full=1),
 Row(fk_litter_bin_uuid='01c83801-b6b1-4617-a80c-7ac31d159934', days_until_full=2),
 Row(fk_litter_bin_uuid='01c83801-b6b1-4617-a80c-7ac31d159934', days_until_full=1),
 Row(fk_litter_bin_uuid='01c83801-b6b1-4617-a80c-7ac31d159934', days_until_full=2),
 Row(fk_litter_bin_uuid='01c83801-b6b1-4617-a80c-7ac31d159934', days_until_full=3),
 Row(fk_litter_bin_uuid='01c83801-b6b1-4617-a80c-7ac31d159934', days_until_full=3),
 Row(fk_litter_bin_uuid='01c83801-b6b1-4617-a80c-7ac31d159934', days_until_full=1),
 Row(fk_litter_bin_uuid='01c83801-b6b1-4617-a80c-7ac31d159934', days_until_full=2)]

In [14]:
from pyspark.sql import functions as F

# Group by each fk_litter_bin_uuid and calculate the average of days_until_full
df_avg_days_until_full = df_days_until_full.groupBy("fk_litter_bin_uuid").agg(
    F.avg("days_until_full").alias("avg_days_until_full")
)

# Show the result
df_avg_days_until_full.head(10)


[Row(fk_litter_bin_uuid='01c83801-b6b1-4617-a80c-7ac31d159934', avg_days_until_full=1.7872340425531914),
 Row(fk_litter_bin_uuid='02e3367d-58d4-449a-acde-e72a115b42a0', avg_days_until_full=1.8529411764705883),
 Row(fk_litter_bin_uuid='03554f3f-08d8-45b1-8ff7-faa4e6f616fd', avg_days_until_full=2.0625),
 Row(fk_litter_bin_uuid='03b81394-2510-443c-bd41-92a5c831f668', avg_days_until_full=2.0),
 Row(fk_litter_bin_uuid='04d5b3a6-5e41-4600-bbcc-96a02db59fed', avg_days_until_full=1.8387096774193548),
 Row(fk_litter_bin_uuid='04f781a6-6fa3-4117-93d2-3d4bb51bcdba', avg_days_until_full=2.4444444444444446),
 Row(fk_litter_bin_uuid='05df253e-6af8-4389-84b4-377f769e6e28', avg_days_until_full=2.0555555555555554),
 Row(fk_litter_bin_uuid='069b61bb-f0c9-452d-8490-3fc09562cf16', avg_days_until_full=1.7317073170731707),
 Row(fk_litter_bin_uuid='06d073b3-7c35-4b4e-a288-0d85cc21c437', avg_days_until_full=1.9210526315789473),
 Row(fk_litter_bin_uuid='06ec6ff5-572c-487e-b4e0-741b37d2be3e', avg_days_until_ful