In [1]:
import pyspark

spark = pyspark.sql.SparkSession.builder.appName("inconsistences_report").getOrCreate()
CURRENT_DAY= '2024-12-03'

In [2]:
import pyspark.sql.functions as spark_f

def flatten_column(df,column):
    sub_columns = df.select(f"{column}.*").columns
    for sub_column in sub_columns:
        df = df.withColumn(f"{column}_{sub_column}", spark_f.col(f"{column}.{sub_column}"))
    return df.drop(column)

def histogram(df, column,title):
    import plotly.express as px
    fig = px.histogram(
        df,
        x=column,
        title=title,
    )
    fig.show()

def pie_chart(df, column, title, category_orders=None):
    import plotly.express as px
    df = df.groupby(column).count().rename(columns={df.columns[0]: "count"}).reset_index().sort_values(column)
    # print(df.head())
    fig = px.pie(
        df,
        names=column,
        values="count",
        category_orders = category_orders,
        title=title,
    )
    # fig.update_xaxes(categoryorder="category ascending")
    fig.show()

In [3]:
raw_allowance_backend_table = spark.read.csv("../data/allowance_backend_table.csv", header=True)
raw_allowance_events = spark.read.json("../data/allowance_events.json", multiLine = True)
raw_payments_schedule_backend_table = spark.read.csv("../data/payments_schedule_backend_table.csv", header=True)

### cleaned allowance events

In [4]:
allowance_events = flatten_column(raw_allowance_events, "allowance")
allowance_events = flatten_column(allowance_events, "allowance_scheduled")
allowance_events = flatten_column(allowance_events, "event")
allowance_events = flatten_column(allowance_events, "user")

print(allowance_events.dtypes)
cleaned_events = allowance_events.withColumn(
    "event_timestamp", allowance_events.event_timestamp.cast("timestamp")
).alias("cleaned_events")

cleaned_events = cleaned_events.dropDuplicates()
cleaned_events.show()


# payments_schedule_backend_table.show()

[('allowance_amount', 'bigint'), ('allowance_scheduled_day', 'string'), ('allowance_scheduled_frequency', 'string'), ('event_name', 'string'), ('event_timestamp', 'string'), ('user_id', 'string')]
+----------------+-----------------------+-----------------------------+-----------------+-------------------+--------------------+
|allowance_amount|allowance_scheduled_day|allowance_scheduled_frequency|       event_name|    event_timestamp|             user_id|
+----------------+-----------------------+-----------------------------+-----------------+-------------------+--------------------+
|              20|               saturday|                       weekly|allowance.created|2024-11-28 07:10:16|38f8d838-ea08-4fd...|
|               5|          fifteenth_day|                      monthly|allowance.created|2024-09-05 13:30:38|9748cdad-69a4-400...|
|               5|               saturday|                       weekly|allowance.created|2024-10-21 17:27:50|8252c070-2698-49d...|
|          

### cleaned allowance bacckend table

In [5]:
allowance_backend_table = raw_allowance_backend_table.withColumnRenamed(
    "uuid", "user_id"
)
# to see all possible values of column status
allowance_backend_table.select("status").dropDuplicates().show()

# get only the enabled rows
allowance_backend_table = allowance_backend_table.filter(
    allowance_backend_table.status == "enabled"
)
allowance_backend_table = allowance_backend_table.withColumn(
    "corrected_updated_at",
    spark_f.when(
        spark_f.rlike(
            allowance_backend_table.updated_at,
            spark_f.lit(r"\d+-\d+-\d+T\d+:\d+:\d+.\d+Z"),
        ),
        spark_f.to_timestamp(allowance_backend_table.updated_at),
    )
    .otherwise(spark_f.from_unixtime(allowance_backend_table.updated_at))
    .cast("timestamp"),
)
allowance_backend_table = allowance_backend_table.withColumn(
    "creation_date",
    spark_f.from_unixtime(allowance_backend_table.creation_date).cast("timestamp"),
)


cleaned_backend_table = allowance_backend_table.drop("updated_at").withColumnRenamed(
    "corrected_updated_at", "updated_at"
).alias("cleaned_backend_table")
print(cleaned_backend_table.dtypes)
cleaned_backend_table.show(truncate=False)

+--------+
|  status|
+--------+
| enabled|
|disabled|
+--------+

[('user_id', 'string'), ('creation_date', 'timestamp'), ('frequency', 'string'), ('day', 'string'), ('next_payment_day', 'string'), ('status', 'string'), ('updated_at', 'timestamp')]
+------------------------------------+-------------------+---------+-------------+----------------+-------+--------------------------+
|user_id                             |creation_date      |frequency|day          |next_payment_day|status |updated_at                |
+------------------------------------+-------------------+---------+-------------+----------------+-------+--------------------------+
|30f4e25e-3e37-462e-8c3c-42f24f54350f|2024-08-28 06:51:49|monthly  |fifteenth_day|15              |enabled|2024-10-15 05:00:41.445627|
|6da398ad-079d-49b9-8668-6d7ce4d22683|2024-08-26 05:10:29|monthly  |fifteenth_day|15              |enabled|2024-08-26 05:10:29       |
|2d30fe2d-6c32-4b8a-a19b-906184f64f62|2024-11-11 04:12:39|monthly  |fifteen

In [6]:
# We have events of creation and edition, so we need to get the most recent event for each user to get
# a snapshot of what the table allowance_backend_table should be
from pyspark.sql.window import Window

# user_window = Window.partitionBy("user_id").orderBy(spark_f.desc("event_timestamp"))

# cleaned_events.show()
last_event_by_user = (
    cleaned_events.groupBy("user_id").agg(
        spark_f.max("event_timestamp").alias("event_timestamp")
    )
).alias("last_event_by_user")

truth_backend_table = last_event_by_user.join(
    cleaned_events, on=["user_id", "event_timestamp"]
).selectExpr("cleaned_events.*","event_name as last_event_name")

###QUALITY TEST to see if we can have more than one created event per user
# If we have some user with more than one created event, the allowance_backend_table could have more than one row
# per user and this code to generate the truth table would not have the real truth (only the most 
# recent created or updated allowance)
more_than_one_allowance = (
    cleaned_events.groupBy("user_id", "event_name")
    .count()
    .filter("count > 1 and event_name = 'allowance.created'")
    .count()
)
assert (
    more_than_one_allowance == 0
), f"we have {more_than_one_allowance} users with more than one allowance.created event"


###QUALITY TEST to see if we have more than one event per user
duplicated_users = (
    truth_backend_table.groupBy("user_id").count().filter("count > 1").count()
)
assert (
    duplicated_users == 0
), f"we have {duplicated_users} users with more than one line in truth_backend_table"

# cleaned_events.filter("user_id = 'ea7a6ea8-ff78-4a5b-848b-9f532a7a653c'").show(truncate=False)
# cleaned_events.filter("user_id = 'e7fc3804-fb1e-416e-8788-3dd0256a7d54'").show(truncate=False)
truth_backend_table = truth_backend_table.withColumnRenamed("allowance_scheduled_frequency", "frequency")
truth_backend_table = truth_backend_table.withColumnRenamed("allowance_scheduled_day", "day")
truth_backend_table = truth_backend_table.withColumnRenamed("allowance_amount", "amount")
truth_backend_table = truth_backend_table.alias("truth_backend_table")

truth_backend_table.show()









+--------------------+-------------------+------+-------------+---------+-----------------+-----------------+
|             user_id|    event_timestamp|amount|          day|frequency|       event_name|  last_event_name|
+--------------------+-------------------+------+-------------+---------+-----------------+-----------------+
|38f8d838-ea08-4fd...|2024-11-28 07:10:16|    20|     saturday|   weekly|allowance.created|allowance.created|
|113d8189-3ce0-47b...|2024-09-21 17:02:11|    20|    first_day|  monthly| allowance.edited| allowance.edited|
|8c212fdf-895b-4d3...|2024-08-10 07:21:42|    15|       sunday|   weekly| allowance.edited| allowance.edited|
|138a678d-b0eb-46a...|2024-09-07 08:14:12|     4|       monday| biweekly|allowance.created|allowance.created|
|03bc2115-7d48-4be...|2024-08-08 08:12:00|    10|       friday|   weekly| allowance.edited| allowance.edited|
|e48bb548-a87a-406...|2024-11-10 07:28:22|     5|    first_day|  monthly| allowance.edited| allowance.edited|
|9af3c140-

# Analysing  discrepancies between the events and the backend table 

In [7]:
### Aconding to the events tables it should not be possible for a user to have more than one allowance
more_than_one_allowances = cleaned_backend_table.groupBy('user_id').count().filter('count > 1').count()
assert more_than_one_allowances == 0 

comparisson_backend_table = cleaned_backend_table.join(truth_backend_table, on="user_id", how="left")

### This lines of the allowance_backend_table should not exist because there are no events to support them
# on the events table
comparisson_backend_table.filter("truth_backend_table.user_id is null").select('cleaned_backend_table.*').show()

+--------------------+-------------------+---------+---------+----------------+-------+--------------------+
|             user_id|      creation_date|frequency|      day|next_payment_day| status|          updated_at|
+--------------------+-------------------+---------+---------+----------------+-------+--------------------+
|bb950bcb-0760-417...|2024-09-20 23:53:12|   weekly|   friday|              27|enabled| 2024-09-20 23:53:12|
|59a14e2a-27a1-4cb...|2024-11-23 18:50:05|  monthly|first_day|               1|enabled|2024-12-01 05:00:...|
|a616b4c6-482d-4e5...|2024-11-23 18:49:36|  monthly|first_day|               1|enabled|2024-12-01 05:01:...|
|a6ca6993-3b73-4a8...|2024-11-23 18:49:28|  monthly|first_day|               1|enabled|2024-12-01 05:01:...|
|cd2d5904-7ca0-4d0...|2024-11-23 18:49:47|  monthly|first_day|               1|enabled| 2024-11-23 18:49:47|
+--------------------+-------------------+---------+---------+----------------+-------+--------------------+



In [8]:
print("Users with discrepancies in the day column:")
comparisson_backend_table.filter("truth_backend_table.day != cleaned_backend_table.day").selectExpr("user_id","cleaned_backend_table.day","truth_backend_table.day as truth_day").show(truncate =False)

print("Users with discrepancies in the frequency column:")
comparisson_backend_table.filter("truth_backend_table.frequency != cleaned_backend_table.frequency").selectExpr("user_id","cleaned_backend_table.frequency", "truth_backend_table.frequency as truth_frequency").show(truncate =False)



Users with discrepancies in the day column:
+------------------------------------+---------+---------+
|user_id                             |day      |truth_day|
+------------------------------------+---------+---------+
|308b2b9c-d49d-4b2d-947c-5b2370da090f|first_day|sunday   |
+------------------------------------+---------+---------+

Users with discrepancies in the frequency column:
+------------------------------------+---------+---------------+
|user_id                             |frequency|truth_frequency|
+------------------------------------+---------+---------------+
|308b2b9c-d49d-4b2d-947c-5b2370da090f|monthly  |weekly         |
+------------------------------------+---------+---------------+



In [9]:
def build_backend_truth_table(frequency, truth_function):
    return (
        comparisson_backend_table.filter(f"truth_backend_table.frequency = '{frequency}'")
        .withColumn(
            "truth_next_payment_day",
            truth_function(
                "truth_backend_table.day", "truth_backend_table.event_timestamp"
            ),
        )
        .selectExpr(
            "user_id",
            "cleaned_backend_table.frequency",
            "cleaned_backend_table.day",
            "cleaned_backend_table.creation_date",
            "cleaned_backend_table.updated_at",
            "cleaned_backend_table.next_payment_day",
            "last_event_name",
            "event_timestamp as last_event_timestamp",
            "truth_next_payment_day",
        )
        .withColumn(
            "correct",
            spark_f.when(
                spark_f.col("truth_next_payment_day") == spark_f.col("next_payment_day"),
                spark_f.lit("correct"),
            ).otherwise(spark_f.lit("wrong")),
        )
        .withColumn(
            "diference_next_payment_day",
            spark_f.col("next_payment_day") - spark_f.col("truth_next_payment_day"),
        )
    )

#### Daily allowancces

In [10]:
## daily discrepancies


def next_payment_day_daily_frequency(day, event_timestamp):
    return spark_f.day(spark_f.date_add(spark_f.lit(CURRENT_DAY), 1))


backend_table_daily_truth = build_backend_truth_table("daily", next_payment_day_daily_frequency)

pd_comparisson_backend = backend_table_daily_truth.toPandas()
histogram(pd_comparisson_backend, "correct", "Correctness of the next_payment_day for daily frequency")
pie_chart(pd_comparisson_backend, "correct", "proportion", category_orders={"correct": ["correct", "wrong"]})

#### monthly

In [11]:
comparisson_backend_table.filter("truth_backend_table.frequency = 'monthly'").select("cleaned_backend_table.day").drop_duplicates().show()

+-------------+
|          day|
+-------------+
|fifteenth_day|
|    first_day|
+-------------+



In [12]:
# comparisson_backend_table.show()


def next_payment_day_montlhy_frequency(day_column, event_timestamp):
    # current_day = spark_f.day(spark_f.to_date(spark_f.lit(CURRENT_DAY)))

    day_column = (
        spark_f.when(spark_f.col(day_column) == "fifteenth_day", spark_f.lit(15))
        .when(spark_f.col(day_column) == "first_day", spark_f.lit(1))
        .otherwise(spark_f.lit(None))
    )

    return day_column


backend_table_monthly_truth = build_backend_truth_table(
    "monthly", next_payment_day_montlhy_frequency
)

comparisson_backend_table.filter("truth_backend_table.frequency = 'monthly'").show()
pd_comparisson_backend = backend_table_monthly_truth.toPandas()

histogram(
    pd_comparisson_backend,
    "correct",
    "Correctness of the next_payment_day for monthly frequency",
)

pie_chart(
    pd_comparisson_backend,
    "correct",
    "proportion",
    category_orders={"correct": ["correct", "wrong"]},
)

+--------------------+-------------------+---------+-------------+----------------+-------+--------------------+-------------------+------+-------------+---------+-----------------+-----------------+
|             user_id|      creation_date|frequency|          day|next_payment_day| status|          updated_at|    event_timestamp|amount|          day|frequency|       event_name|  last_event_name|
+--------------------+-------------------+---------+-------------+----------------+-------+--------------------+-------------------+------+-------------+---------+-----------------+-----------------+
|0e4ede31-e71b-4c8...|2024-11-13 23:15:30|  monthly|    first_day|               1|enabled|2024-12-01 05:00:...|2024-11-13 18:15:30|    40|    first_day|  monthly|allowance.created|allowance.created|
|e7f3a4c3-443d-476...|2024-11-17 13:49:44|  monthly|    first_day|               1|enabled|2024-12-01 05:01:...|2024-11-17 08:49:44|     8|    first_day|  monthly|allowance.created|allowance.created|


### weekly

In [13]:
comparisson_backend_table.filter("truth_backend_table.frequency = 'weekly'").select("truth_backend_table.day").dropDuplicates().show()

+---------+
|      day|
+---------+
|   sunday|
| thursday|
|   monday|
|wednesday|
|   friday|
| saturday|
|  tuesday|
+---------+



In [14]:
# comparisson_backend_table.show()
def int_day_of_week(column):
    DAYS_OF_WEEK = [
        "sunday",
        "monday",
        "tuesday",
        "wednesday",
        "thursday",
        "friday",
        "saturday",
    ]

    day_of_week = spark_f.when(
        spark_f.col(column).isin(DAYS_OF_WEEK),
        spark_f.expr(f"array_position(array({','.join([ f"'{day}'" for day in DAYS_OF_WEEK])}), {column})")
    ).otherwise(spark_f.lit(None)).cast("int")
    return day_of_week

def next_day_of_week(timestamp,day_of_week):
    
    dif = day_of_week - spark_f.dayofweek(timestamp)
    next_day = spark_f.when(dif >= 0, spark_f.date_add(timestamp,dif)).otherwise(spark_f.date_add(timestamp ,dif+7))
    return next_day

def next_payment_day_weekly_frequency(day_column, event_timestamp, current_day = None):
    if current_day is None:
        current_day = spark_f.to_date(spark_f.lit(CURRENT_DAY))
    else:
        current_day = spark_f.to_date(current_day)
    day_of_week = int_day_of_week(day_column)
    
    next_day = next_day_of_week(current_day,day_of_week)
    return spark_f.day(next_day)


backend_table_weekly_truth = build_backend_truth_table("weekly", next_payment_day_weekly_frequency)
# comparisson_backend_table.filter("truth_backend_table.frequency = 'weekly'").show()
pd_comparisson_backend = backend_table_weekly_truth.toPandas()

histogram(
    pd_comparisson_backend,
    "correct",
    "Correctness of the next_payment_day for monthly frequency",
)

pie_chart(pd_comparisson_backend, "correct", "proportion",category_orders={"correct": ["correct", "wrong"]})

### biweekly

In [15]:
# comparisson_backend_table.show()



def next_payment_day_biweekly_frequency(day_column, last_event_date_column):
    current_day = spark_f.to_date(spark_f.lit(CURRENT_DAY))
    day_of_week = int_day_of_week(day_column)
    current_day_of_week = spark_f.dayofweek(current_day)

    
    ## I'm considering that the first week of the payment is the first occurence of the day
    #  after or during the last edition day of the allowance
    first_payment_day = next_day_of_week(last_event_date_column, day_of_week)

    dif_days = spark_f.date_diff(current_day, first_payment_day)  
    
    next_biweek_day = spark_f.date_add(current_day,14 - (dif_days % (7*2)))


    next_day = spark_f.when(
        first_payment_day > current_day, first_payment_day
    ).otherwise(next_biweek_day)

    return spark_f.day(next_day)


backend_table_biweekly_truth = build_backend_truth_table("biweekly", next_payment_day_biweekly_frequency)
# comparisson_backend_table.filter("truth_backend_table.frequency = 'weekly'").show()
pd_comparisson_backend = backend_table_biweekly_truth.toPandas()

histogram(
    pd_comparisson_backend,
    "correct",
    "Correctness of the next_payment_day for monthly frequency",
)

pie_chart(pd_comparisson_backend, "correct", "proportion",category_orders={"correct": ["correct", "wrong"]})

From all possible frequencys, only the `monthly` frequency has no errors. I think this is because the monthly frequency is the only one that don't need updates to change the value of `next_payment_day` column. Because it will always be the same unless the user do some editing on the day of the schedule. **All frequencies that requires some kind of update on the `next_payment_day` has errors.**

Considering that the biggest possible period for an allowance is monthly, is expected that no allowance has an updated_at older than 30 days past. There are some that are older than 30 days as we can see on the next graphic. I think it's possible for one to have an updated_at older than 30 days when the user enables some allowance tha was previosly disabled. But i don't have information about the time when the allowances where enabled or disabled to confirm that.

In [16]:
all_backend_table_truth = (
    backend_table_daily_truth.unionByName(backend_table_monthly_truth)
    .unionByName(backend_table_biweekly_truth)
    .unionByName(backend_table_weekly_truth)
)
all_backend_table_truth= all_backend_table_truth.withColumn("how_old_was_last_update",spark_f.date_diff(spark_f.to_timestamp(spark_f.lit(CURRENT_DAY)),spark_f.col("updated_at")))

histogram(
    all_backend_table_truth.toPandas(),
    "how_old_was_last_update",
    "Time in days from last update",
)

As we can see on the next graphs, the allowances that were edited has a similar distribution of errors than the allowances that were never edited. So, it seems that the problem its not related with the edit action of an allowance.   

In [17]:
to_graph=all_backend_table_truth.filter("last_event_name like '%created'")
histogram(to_graph.toPandas(), "correct", "correctness")

to_graph=all_backend_table_truth.filter("last_event_name like '%edited'")
histogram(to_graph.toPandas(), "correct", "correctness")

In the daily frequency, 86.4% of the allowances has the `next_payment_day` equal to `updated_at + 1 day`. Considering that and the fact that almost every daily allowance has a wrong value of `next_payment_day` getting the current day as `2024-11-03`, this may be because the backend job that updates the next_payment_day column for the daily frequency is not running every day or is breaking somehow. We can see this on the next graph.

In [24]:
# all_backend_table_truth.filter("correct = 'correct' and day = 'daily'").show()
test_next_update_daily = all_backend_table_truth.withColumn(
    "update_next_day", spark_f.day(spark_f.date_add(spark_f.col("updated_at"), 1))
).filter(
    "frequency = 'daily'"
)  # .show()
test_next_update_daily = test_next_update_daily.withColumn(
    "update_correct",
    spark_f.when(
        spark_f.col("update_next_day") == spark_f.col("next_payment_day"),
        spark_f.lit("correct"),
    ).otherwise(spark_f.lit("wrong")),
)

# test_next_update.show()

pie_chart(
    test_next_update_daily.toPandas(),
    "update_correct",
    "proportion",
    category_orders={"update_correct": ["correct", "wrong"]},
)
# all_backend_table_truth.filter("date(updated_at) = '2024-11-03'").show()

The majority of the cases that are wrong considering the next day of the updated_at (`update_next_day`) are wrong by a diference of 1 as we can see in the next table,  this could indicate that there has been some double updating if the update process only increments the `next_payment_day` column without regards to the current day.

In [None]:
test_next_update_errors = test_next_update_daily.filter(
    "update_correct = 'wrong'"
).withColumn(
    "next_day_to_update_next_day_dif",
    test_next_update_daily.next_payment_day - test_next_update_daily.update_next_day,
).select(
    "user_id",
    "frequency",
    "updated_at",
    "update_next_day",
    "next_payment_day",
    "next_day_to_update_next_day_dif",
)
test_next_update_errors.show()

+--------------------+---------+--------------------+---------------+----------------+-------------------------------+
|             user_id|frequency|          updated_at|update_next_day|next_payment_day|next_day_to_update_next_day_dif|
+--------------------+---------+--------------------+---------------+----------------+-------------------------------+
|ddcfef7f-f431-4d2...|    daily|2024-09-11 10:51:...|             12|               4|                           -8.0|
|9be9e091-4f3a-4a6...|    daily|2024-10-15 22:26:...|             16|              17|                            1.0|
|e792e7d1-67ae-444...|    daily| 2024-11-25 21:18:00|             26|              27|                            1.0|
|7f1f0ab3-4eb3-418...|    daily|2024-10-06 21:44:...|              7|               8|                            1.0|
|337875a6-6c2b-49a...|    daily| 2024-11-25 21:18:13|             26|              27|                            1.0|
|2cb3cd03-75f8-429...|    daily| 2024-11-14 22:5

In [32]:
all_backend_table_truth.withColumn("update_next_day",next_payment_day_weekly_frequency("updated_at",None) ).show()


# test_next_update.show()

# pie_chart(
#     test_next_update_daily.toPandas(),
#     "update_correct",
#     "proportion",
#     category_orders={"update_correct": ["correct", "wrong"]},
# )

AnalysisException: [DATATYPE_MISMATCH.ARRAY_FUNCTION_DIFF_TYPES] Cannot resolve "array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), updated_at)" due to data type mismatch: Input to `array_position` should have been "ARRAY" followed by a value with same element type, but it's ["ARRAY<STRING>", "TIMESTAMP"].; line 1 pos 0;
'Project [user_id#183, frequency#19, day#20, creation_date#208, updated_at#224, next_payment_day#22, last_event_name#289, last_event_timestamp#574, truth_next_payment_day#559, correct#584, diference_next_payment_day#595, how_old_was_last_update#1033, 'day(CASE WHEN ((cast(CASE WHEN cast(updated_at#224 as string) IN (cast(sunday as string),cast(monday as string),cast(tuesday as string),cast(wednesday as string),cast(thursday as string),cast(friday as string),cast(saturday as string)) THEN array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), updated_at#224) ELSE null END as int) - dayofweek(to_date(2024-12-03, None, Some(America/Sao_Paulo), false))) >= 0) THEN date_add(to_date(2024-12-03, None, Some(America/Sao_Paulo), false), (cast(CASE WHEN cast(updated_at#224 as string) IN (cast(sunday as string),cast(monday as string),cast(tuesday as string),cast(wednesday as string),cast(thursday as string),cast(friday as string),cast(saturday as string)) THEN array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), updated_at#224) ELSE null END as int) - dayofweek(to_date(2024-12-03, None, Some(America/Sao_Paulo), false)))) ELSE date_add(to_date(2024-12-03, None, Some(America/Sao_Paulo), false), ((cast(CASE WHEN cast(updated_at#224 as string) IN (cast(sunday as string),cast(monday as string),cast(tuesday as string),cast(wednesday as string),cast(thursday as string),cast(friday as string),cast(saturday as string)) THEN array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), updated_at#224) ELSE null END as int) - dayofweek(to_date(2024-12-03, None, Some(America/Sao_Paulo), false))) + 7)) END) AS update_next_day#2127]
+- Project [user_id#183, frequency#19, day#20, creation_date#208, updated_at#224, next_payment_day#22, last_event_name#289, last_event_timestamp#574, truth_next_payment_day#559, correct#584, diference_next_payment_day#595, date_diff(cast(to_timestamp(2024-12-03, None, TimestampType, Some(America/Sao_Paulo), false) as date), cast(updated_at#224 as date)) AS how_old_was_last_update#1033]
   +- Union false, false
      :- Project [user_id#183, frequency#19, day#20, creation_date#208, updated_at#224, next_payment_day#22, last_event_name#289, last_event_timestamp#574, truth_next_payment_day#559, correct#584, (cast(next_payment_day#22 as double) - cast(truth_next_payment_day#559 as double)) AS diference_next_payment_day#595]
      :  +- Project [user_id#183, frequency#19, day#20, creation_date#208, updated_at#224, next_payment_day#22, last_event_name#289, last_event_timestamp#574, truth_next_payment_day#559, CASE WHEN (truth_next_payment_day#559 = cast(next_payment_day#22 as int)) THEN correct ELSE wrong END AS correct#584]
      :     +- Project [user_id#183, frequency#19, day#20, creation_date#208, updated_at#224, next_payment_day#22, last_event_name#289, event_timestamp#148 AS last_event_timestamp#574, truth_next_payment_day#559]
      :        +- Project [user_id#183, creation_date#208, frequency#19, day#20, next_payment_day#22, status#23, updated_at#224, event_timestamp#148, amount#368L, day#360, frequency#352, event_name#108, last_event_name#289, day(date_add(cast(2024-12-03 as date), 1)) AS truth_next_payment_day#559]
      :           +- Filter (frequency#352 = daily)
      :              +- Project [user_id#183, creation_date#208, frequency#19, day#20, next_payment_day#22, status#23, updated_at#224, event_timestamp#148, amount#368L, day#360, frequency#352, event_name#108, last_event_name#289]
      :                 +- Join LeftOuter, (user_id#183 = user_id#281)
      :                    :- SubqueryAlias cleaned_backend_table
      :                    :  +- Project [user_id#183, creation_date#208, frequency#19, day#20, next_payment_day#22, status#23, corrected_updated_at#199 AS updated_at#224]
      :                    :     +- Project [user_id#183, creation_date#208, frequency#19, day#20, next_payment_day#22, status#23, corrected_updated_at#199]
      :                    :        +- Project [user_id#183, cast(from_unixtime(cast(creation_date#18 as bigint), yyyy-MM-dd HH:mm:ss, Some(America/Sao_Paulo)) as timestamp) AS creation_date#208, frequency#19, day#20, updated_at#21, next_payment_day#22, status#23, corrected_updated_at#199]
      :                    :           +- Project [user_id#183, creation_date#18, frequency#19, day#20, updated_at#21, next_payment_day#22, status#23, cast(CASE WHEN RLIKE(updated_at#21, \d+-\d+-\d+T\d+:\d+:\d+.\d+Z) THEN cast(to_timestamp(updated_at#21, None, TimestampType, Some(America/Sao_Paulo), false) as string) ELSE from_unixtime(cast(updated_at#21 as bigint), yyyy-MM-dd HH:mm:ss, Some(America/Sao_Paulo)) END as timestamp) AS corrected_updated_at#199]
      :                    :              +- Filter (status#23 = enabled)
      :                    :                 +- Project [uuid#17 AS user_id#183, creation_date#18, frequency#19, day#20, updated_at#21, next_payment_day#22, status#23]
      :                    :                    +- Relation [uuid#17,creation_date#18,frequency#19,day#20,updated_at#21,next_payment_day#22,status#23] csv
      :                    +- SubqueryAlias truth_backend_table
      :                       +- Project [user_id#281, event_timestamp#148, allowance_amount#63L AS amount#368L, day#360, frequency#352, event_name#108, last_event_name#289]
      :                          +- Project [user_id#281, event_timestamp#148, allowance_amount#63L, allowance_scheduled_day#84 AS day#360, frequency#352, event_name#108, last_event_name#289]
      :                             +- Project [user_id#281, event_timestamp#148, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91 AS frequency#352, event_name#108, last_event_name#289]
      :                                +- Project [user_id#281, event_timestamp#148, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_name#108 AS last_event_name#289]
      :                                   +- Project [user_id#133, event_timestamp#275, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, user_id#281, event_timestamp#148]
      :                                      +- Join Inner, ((user_id#133 = user_id#281) AND (event_timestamp#275 = event_timestamp#148))
      :                                         :- SubqueryAlias last_event_by_user
      :                                         :  +- Aggregate [user_id#133], [user_id#133, max(event_timestamp#148) AS event_timestamp#275]
      :                                         :     +- Deduplicate [allowance_amount#63L, allowance_scheduled_day#84, event_name#108, event_timestamp#148, user_id#133, allowance_scheduled_frequency#91]
      :                                         :        +- SubqueryAlias cleaned_events
      :                                         :           +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, cast(event_timestamp#116 as timestamp) AS event_timestamp#148, user_id#133]
      :                                         :              +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user_id#133]
      :                                         :                 +- Project [user#33, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user#33.id AS user_id#133]
      :                                         :                    +- Project [user#33, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116]
      :                                         :                       +- Project [event#32, user#33, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event#32.timestamp AS event_timestamp#116]
      :                                         :                          +- Project [event#32, user#33, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event#32.name AS event_name#108]
      :                                         :                             +- Project [event#32, user#33, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91]
      :                                         :                                +- Project [event#32, user#33, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled_day#84, allowance_scheduled#69.frequency AS allowance_scheduled_frequency#91]
      :                                         :                                   +- Project [event#32, user#33, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled#69.day AS allowance_scheduled_day#84]
      :                                         :                                      +- Project [event#32, user#33, allowance_amount#63L, allowance_scheduled#69]
      :                                         :                                         +- Project [allowance#31, event#32, user#33, allowance_amount#63L, allowance#31.scheduled AS allowance_scheduled#69]
      :                                         :                                            +- Project [allowance#31, event#32, user#33, allowance#31.amount AS allowance_amount#63L]
      :                                         :                                               +- Relation [allowance#31,event#32,user#33] json
      :                                         +- Deduplicate [allowance_amount#63L, allowance_scheduled_day#84, event_name#108, event_timestamp#148, user_id#281, allowance_scheduled_frequency#91]
      :                                            +- SubqueryAlias cleaned_events
      :                                               +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, cast(event_timestamp#116 as timestamp) AS event_timestamp#148, user_id#281]
      :                                                  +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user_id#281]
      :                                                     +- Project [user#280, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user#280.id AS user_id#281]
      :                                                        +- Project [user#280, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116]
      :                                                           +- Project [event#279, user#280, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event#279.timestamp AS event_timestamp#116]
      :                                                              +- Project [event#279, user#280, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event#279.name AS event_name#108]
      :                                                                 +- Project [event#279, user#280, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91]
      :                                                                    +- Project [event#279, user#280, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled_day#84, allowance_scheduled#69.frequency AS allowance_scheduled_frequency#91]
      :                                                                       +- Project [event#279, user#280, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled#69.day AS allowance_scheduled_day#84]
      :                                                                          +- Project [event#279, user#280, allowance_amount#63L, allowance_scheduled#69]
      :                                                                             +- Project [allowance#278, event#279, user#280, allowance_amount#63L, allowance#278.scheduled AS allowance_scheduled#69]
      :                                                                                +- Project [allowance#278, event#279, user#280, allowance#278.amount AS allowance_amount#63L]
      :                                                                                   +- Relation [allowance#278,event#279,user#280] json
      :- Project [user_id#183 AS user_id#935, frequency#921 AS frequency#936, day#922 AS day#937, creation_date#208 AS creation_date#938, updated_at#224 AS updated_at#939, next_payment_day#924 AS next_payment_day#940, last_event_name#289 AS last_event_name#941, last_event_timestamp#656 AS last_event_timestamp#942, truth_next_payment_day#641 AS truth_next_payment_day#943, correct#666 AS correct#944, diference_next_payment_day#677 AS diference_next_payment_day#945]
      :  +- Project [user_id#183, frequency#921, day#922, creation_date#208, updated_at#224, next_payment_day#924, last_event_name#289, last_event_timestamp#656, truth_next_payment_day#641, correct#666, diference_next_payment_day#677]
      :     +- Project [user_id#183, frequency#921, day#922, creation_date#208, updated_at#224, next_payment_day#924, last_event_name#289, last_event_timestamp#656, truth_next_payment_day#641, correct#666, (cast(next_payment_day#924 as double) - cast(truth_next_payment_day#641 as double)) AS diference_next_payment_day#677]
      :        +- Project [user_id#183, frequency#921, day#922, creation_date#208, updated_at#224, next_payment_day#924, last_event_name#289, last_event_timestamp#656, truth_next_payment_day#641, CASE WHEN (truth_next_payment_day#641 = cast(next_payment_day#924 as int)) THEN correct ELSE wrong END AS correct#666]
      :           +- Project [user_id#183, frequency#921, day#922, creation_date#208, updated_at#224, next_payment_day#924, last_event_name#289, event_timestamp#148 AS last_event_timestamp#656, truth_next_payment_day#641]
      :              +- Project [user_id#183, creation_date#208, frequency#921, day#922, next_payment_day#924, status#925, updated_at#224, event_timestamp#148, amount#368L, day#360, frequency#352, event_name#108, last_event_name#289, CASE WHEN (day#360 = fifteenth_day) THEN 15 WHEN (day#360 = first_day) THEN 1 ELSE cast(null as int) END AS truth_next_payment_day#641]
      :                 +- Filter (frequency#352 = monthly)
      :                    +- Project [user_id#183, creation_date#208, frequency#921, day#922, next_payment_day#924, status#925, updated_at#224, event_timestamp#148, amount#368L, day#360, frequency#352, event_name#108, last_event_name#289]
      :                       +- Join LeftOuter, (user_id#183 = user_id#281)
      :                          :- SubqueryAlias cleaned_backend_table
      :                          :  +- Project [user_id#183, creation_date#208, frequency#921, day#922, next_payment_day#924, status#925, corrected_updated_at#199 AS updated_at#224]
      :                          :     +- Project [user_id#183, creation_date#208, frequency#921, day#922, next_payment_day#924, status#925, corrected_updated_at#199]
      :                          :        +- Project [user_id#183, cast(from_unixtime(cast(creation_date#920 as bigint), yyyy-MM-dd HH:mm:ss, Some(America/Sao_Paulo)) as timestamp) AS creation_date#208, frequency#921, day#922, updated_at#923, next_payment_day#924, status#925, corrected_updated_at#199]
      :                          :           +- Project [user_id#183, creation_date#920, frequency#921, day#922, updated_at#923, next_payment_day#924, status#925, cast(CASE WHEN RLIKE(updated_at#923, \d+-\d+-\d+T\d+:\d+:\d+.\d+Z) THEN cast(to_timestamp(updated_at#923, None, TimestampType, Some(America/Sao_Paulo), false) as string) ELSE from_unixtime(cast(updated_at#923 as bigint), yyyy-MM-dd HH:mm:ss, Some(America/Sao_Paulo)) END as timestamp) AS corrected_updated_at#199]
      :                          :              +- Filter (status#925 = enabled)
      :                          :                 +- Project [uuid#919 AS user_id#183, creation_date#920, frequency#921, day#922, updated_at#923, next_payment_day#924, status#925]
      :                          :                    +- Relation [uuid#919,creation_date#920,frequency#921,day#922,updated_at#923,next_payment_day#924,status#925] csv
      :                          +- SubqueryAlias truth_backend_table
      :                             +- Project [user_id#281, event_timestamp#148, allowance_amount#63L AS amount#368L, day#360, frequency#352, event_name#108, last_event_name#289]
      :                                +- Project [user_id#281, event_timestamp#148, allowance_amount#63L, allowance_scheduled_day#84 AS day#360, frequency#352, event_name#108, last_event_name#289]
      :                                   +- Project [user_id#281, event_timestamp#148, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91 AS frequency#352, event_name#108, last_event_name#289]
      :                                      +- Project [user_id#281, event_timestamp#148, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_name#108 AS last_event_name#289]
      :                                         +- Project [user_id#133, event_timestamp#275, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, user_id#281, event_timestamp#148]
      :                                            +- Join Inner, ((user_id#133 = user_id#281) AND (event_timestamp#275 = event_timestamp#148))
      :                                               :- SubqueryAlias last_event_by_user
      :                                               :  +- Aggregate [user_id#133], [user_id#133, max(event_timestamp#148) AS event_timestamp#275]
      :                                               :     +- Deduplicate [allowance_amount#63L, allowance_scheduled_day#84, event_name#108, event_timestamp#148, user_id#133, allowance_scheduled_frequency#91]
      :                                               :        +- SubqueryAlias cleaned_events
      :                                               :           +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, cast(event_timestamp#116 as timestamp) AS event_timestamp#148, user_id#133]
      :                                               :              +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user_id#133]
      :                                               :                 +- Project [user#928, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user#928.id AS user_id#133]
      :                                               :                    +- Project [user#928, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116]
      :                                               :                       +- Project [event#927, user#928, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event#927.timestamp AS event_timestamp#116]
      :                                               :                          +- Project [event#927, user#928, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event#927.name AS event_name#108]
      :                                               :                             +- Project [event#927, user#928, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91]
      :                                               :                                +- Project [event#927, user#928, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled_day#84, allowance_scheduled#69.frequency AS allowance_scheduled_frequency#91]
      :                                               :                                   +- Project [event#927, user#928, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled#69.day AS allowance_scheduled_day#84]
      :                                               :                                      +- Project [event#927, user#928, allowance_amount#63L, allowance_scheduled#69]
      :                                               :                                         +- Project [allowance#926, event#927, user#928, allowance_amount#63L, allowance#926.scheduled AS allowance_scheduled#69]
      :                                               :                                            +- Project [allowance#926, event#927, user#928, allowance#926.amount AS allowance_amount#63L]
      :                                               :                                               +- Relation [allowance#926,event#927,user#928] json
      :                                               +- Deduplicate [allowance_amount#63L, allowance_scheduled_day#84, event_name#108, event_timestamp#148, user_id#281, allowance_scheduled_frequency#91]
      :                                                  +- SubqueryAlias cleaned_events
      :                                                     +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, cast(event_timestamp#116 as timestamp) AS event_timestamp#148, user_id#281]
      :                                                        +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user_id#281]
      :                                                           +- Project [user#931, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user#931.id AS user_id#281]
      :                                                              +- Project [user#931, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116]
      :                                                                 +- Project [event#930, user#931, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event#930.timestamp AS event_timestamp#116]
      :                                                                    +- Project [event#930, user#931, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event#930.name AS event_name#108]
      :                                                                       +- Project [event#930, user#931, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91]
      :                                                                          +- Project [event#930, user#931, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled_day#84, allowance_scheduled#69.frequency AS allowance_scheduled_frequency#91]
      :                                                                             +- Project [event#930, user#931, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled#69.day AS allowance_scheduled_day#84]
      :                                                                                +- Project [event#930, user#931, allowance_amount#63L, allowance_scheduled#69]
      :                                                                                   +- Project [allowance#929, event#930, user#931, allowance_amount#63L, allowance#929.scheduled AS allowance_scheduled#69]
      :                                                                                      +- Project [allowance#929, event#930, user#931, allowance#929.amount AS allowance_amount#63L]
      :                                                                                         +- Relation [allowance#929,event#930,user#931] json
      :- Project [user_id#183 AS user_id#973, frequency#959 AS frequency#974, day#960 AS day#975, creation_date#208 AS creation_date#976, updated_at#224 AS updated_at#977, next_payment_day#962 AS next_payment_day#978, last_event_name#289 AS last_event_name#979, last_event_timestamp#870 AS last_event_timestamp#980, truth_next_payment_day#855 AS truth_next_payment_day#981, correct#880 AS correct#982, diference_next_payment_day#891 AS diference_next_payment_day#983]
      :  +- Project [user_id#183, frequency#959, day#960, creation_date#208, updated_at#224, next_payment_day#962, last_event_name#289, last_event_timestamp#870, truth_next_payment_day#855, correct#880, diference_next_payment_day#891]
      :     +- Project [user_id#183, frequency#959, day#960, creation_date#208, updated_at#224, next_payment_day#962, last_event_name#289, last_event_timestamp#870, truth_next_payment_day#855, correct#880, (cast(next_payment_day#962 as double) - cast(truth_next_payment_day#855 as double)) AS diference_next_payment_day#891]
      :        +- Project [user_id#183, frequency#959, day#960, creation_date#208, updated_at#224, next_payment_day#962, last_event_name#289, last_event_timestamp#870, truth_next_payment_day#855, CASE WHEN (truth_next_payment_day#855 = cast(next_payment_day#962 as int)) THEN correct ELSE wrong END AS correct#880]
      :           +- Project [user_id#183, frequency#959, day#960, creation_date#208, updated_at#224, next_payment_day#962, last_event_name#289, event_timestamp#148 AS last_event_timestamp#870, truth_next_payment_day#855]
      :              +- Project [user_id#183, creation_date#208, frequency#959, day#960, next_payment_day#962, status#963, updated_at#224, event_timestamp#148, amount#368L, day#360, frequency#352, event_name#108, last_event_name#289, day(CASE WHEN (CASE WHEN ((cast(CASE WHEN day#360 IN (sunday,monday,tuesday,wednesday,thursday,friday,saturday) THEN array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), day#360) ELSE cast(null as bigint) END as int) - dayofweek(cast(event_timestamp#148 as date))) >= 0) THEN date_add(cast(event_timestamp#148 as date), (cast(CASE WHEN day#360 IN (sunday,monday,tuesday,wednesday,thursday,friday,saturday) THEN array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), day#360) ELSE cast(null as bigint) END as int) - dayofweek(cast(event_timestamp#148 as date)))) ELSE date_add(cast(event_timestamp#148 as date), ((cast(CASE WHEN day#360 IN (sunday,monday,tuesday,wednesday,thursday,friday,saturday) THEN array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), day#360) ELSE cast(null as bigint) END as int) - dayofweek(cast(event_timestamp#148 as date))) + 7)) END > to_date(2024-12-03, None, Some(America/Sao_Paulo), false)) THEN CASE WHEN ((cast(CASE WHEN day#360 IN (sunday,monday,tuesday,wednesday,thursday,friday,saturday) THEN array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), day#360) ELSE cast(null as bigint) END as int) - dayofweek(cast(event_timestamp#148 as date))) >= 0) THEN date_add(cast(event_timestamp#148 as date), (cast(CASE WHEN day#360 IN (sunday,monday,tuesday,wednesday,thursday,friday,saturday) THEN array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), day#360) ELSE cast(null as bigint) END as int) - dayofweek(cast(event_timestamp#148 as date)))) ELSE date_add(cast(event_timestamp#148 as date), ((cast(CASE WHEN day#360 IN (sunday,monday,tuesday,wednesday,thursday,friday,saturday) THEN array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), day#360) ELSE cast(null as bigint) END as int) - dayofweek(cast(event_timestamp#148 as date))) + 7)) END ELSE date_add(to_date(2024-12-03, None, Some(America/Sao_Paulo), false), (14 - (date_diff(to_date(2024-12-03, None, Some(America/Sao_Paulo), false), CASE WHEN ((cast(CASE WHEN day#360 IN (sunday,monday,tuesday,wednesday,thursday,friday,saturday) THEN array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), day#360) ELSE cast(null as bigint) END as int) - dayofweek(cast(event_timestamp#148 as date))) >= 0) THEN date_add(cast(event_timestamp#148 as date), (cast(CASE WHEN day#360 IN (sunday,monday,tuesday,wednesday,thursday,friday,saturday) THEN array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), day#360) ELSE cast(null as bigint) END as int) - dayofweek(cast(event_timestamp#148 as date)))) ELSE date_add(cast(event_timestamp#148 as date), ((cast(CASE WHEN day#360 IN (sunday,monday,tuesday,wednesday,thursday,friday,saturday) THEN array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), day#360) ELSE cast(null as bigint) END as int) - dayofweek(cast(event_timestamp#148 as date))) + 7)) END) % 14))) END) AS truth_next_payment_day#855]
      :                 +- Filter (frequency#352 = biweekly)
      :                    +- Project [user_id#183, creation_date#208, frequency#959, day#960, next_payment_day#962, status#963, updated_at#224, event_timestamp#148, amount#368L, day#360, frequency#352, event_name#108, last_event_name#289]
      :                       +- Join LeftOuter, (user_id#183 = user_id#281)
      :                          :- SubqueryAlias cleaned_backend_table
      :                          :  +- Project [user_id#183, creation_date#208, frequency#959, day#960, next_payment_day#962, status#963, corrected_updated_at#199 AS updated_at#224]
      :                          :     +- Project [user_id#183, creation_date#208, frequency#959, day#960, next_payment_day#962, status#963, corrected_updated_at#199]
      :                          :        +- Project [user_id#183, cast(from_unixtime(cast(creation_date#958 as bigint), yyyy-MM-dd HH:mm:ss, Some(America/Sao_Paulo)) as timestamp) AS creation_date#208, frequency#959, day#960, updated_at#961, next_payment_day#962, status#963, corrected_updated_at#199]
      :                          :           +- Project [user_id#183, creation_date#958, frequency#959, day#960, updated_at#961, next_payment_day#962, status#963, cast(CASE WHEN RLIKE(updated_at#961, \d+-\d+-\d+T\d+:\d+:\d+.\d+Z) THEN cast(to_timestamp(updated_at#961, None, TimestampType, Some(America/Sao_Paulo), false) as string) ELSE from_unixtime(cast(updated_at#961 as bigint), yyyy-MM-dd HH:mm:ss, Some(America/Sao_Paulo)) END as timestamp) AS corrected_updated_at#199]
      :                          :              +- Filter (status#963 = enabled)
      :                          :                 +- Project [uuid#957 AS user_id#183, creation_date#958, frequency#959, day#960, updated_at#961, next_payment_day#962, status#963]
      :                          :                    +- Relation [uuid#957,creation_date#958,frequency#959,day#960,updated_at#961,next_payment_day#962,status#963] csv
      :                          +- SubqueryAlias truth_backend_table
      :                             +- Project [user_id#281, event_timestamp#148, allowance_amount#63L AS amount#368L, day#360, frequency#352, event_name#108, last_event_name#289]
      :                                +- Project [user_id#281, event_timestamp#148, allowance_amount#63L, allowance_scheduled_day#84 AS day#360, frequency#352, event_name#108, last_event_name#289]
      :                                   +- Project [user_id#281, event_timestamp#148, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91 AS frequency#352, event_name#108, last_event_name#289]
      :                                      +- Project [user_id#281, event_timestamp#148, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_name#108 AS last_event_name#289]
      :                                         +- Project [user_id#133, event_timestamp#275, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, user_id#281, event_timestamp#148]
      :                                            +- Join Inner, ((user_id#133 = user_id#281) AND (event_timestamp#275 = event_timestamp#148))
      :                                               :- SubqueryAlias last_event_by_user
      :                                               :  +- Aggregate [user_id#133], [user_id#133, max(event_timestamp#148) AS event_timestamp#275]
      :                                               :     +- Deduplicate [allowance_amount#63L, allowance_scheduled_day#84, event_name#108, event_timestamp#148, user_id#133, allowance_scheduled_frequency#91]
      :                                               :        +- SubqueryAlias cleaned_events
      :                                               :           +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, cast(event_timestamp#116 as timestamp) AS event_timestamp#148, user_id#133]
      :                                               :              +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user_id#133]
      :                                               :                 +- Project [user#966, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user#966.id AS user_id#133]
      :                                               :                    +- Project [user#966, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116]
      :                                               :                       +- Project [event#965, user#966, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event#965.timestamp AS event_timestamp#116]
      :                                               :                          +- Project [event#965, user#966, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event#965.name AS event_name#108]
      :                                               :                             +- Project [event#965, user#966, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91]
      :                                               :                                +- Project [event#965, user#966, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled_day#84, allowance_scheduled#69.frequency AS allowance_scheduled_frequency#91]
      :                                               :                                   +- Project [event#965, user#966, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled#69.day AS allowance_scheduled_day#84]
      :                                               :                                      +- Project [event#965, user#966, allowance_amount#63L, allowance_scheduled#69]
      :                                               :                                         +- Project [allowance#964, event#965, user#966, allowance_amount#63L, allowance#964.scheduled AS allowance_scheduled#69]
      :                                               :                                            +- Project [allowance#964, event#965, user#966, allowance#964.amount AS allowance_amount#63L]
      :                                               :                                               +- Relation [allowance#964,event#965,user#966] json
      :                                               +- Deduplicate [allowance_amount#63L, allowance_scheduled_day#84, event_name#108, event_timestamp#148, user_id#281, allowance_scheduled_frequency#91]
      :                                                  +- SubqueryAlias cleaned_events
      :                                                     +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, cast(event_timestamp#116 as timestamp) AS event_timestamp#148, user_id#281]
      :                                                        +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user_id#281]
      :                                                           +- Project [user#969, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user#969.id AS user_id#281]
      :                                                              +- Project [user#969, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116]
      :                                                                 +- Project [event#968, user#969, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event#968.timestamp AS event_timestamp#116]
      :                                                                    +- Project [event#968, user#969, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event#968.name AS event_name#108]
      :                                                                       +- Project [event#968, user#969, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91]
      :                                                                          +- Project [event#968, user#969, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled_day#84, allowance_scheduled#69.frequency AS allowance_scheduled_frequency#91]
      :                                                                             +- Project [event#968, user#969, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled#69.day AS allowance_scheduled_day#84]
      :                                                                                +- Project [event#968, user#969, allowance_amount#63L, allowance_scheduled#69]
      :                                                                                   +- Project [allowance#967, event#968, user#969, allowance_amount#63L, allowance#967.scheduled AS allowance_scheduled#69]
      :                                                                                      +- Project [allowance#967, event#968, user#969, allowance#967.amount AS allowance_amount#63L]
      :                                                                                         +- Relation [allowance#967,event#968,user#969] json
      +- Project [user_id#183 AS user_id#1011, frequency#997 AS frequency#1012, day#998 AS day#1013, creation_date#208 AS creation_date#1014, updated_at#224 AS updated_at#1015, next_payment_day#1000 AS next_payment_day#1016, last_event_name#289 AS last_event_name#1017, last_event_timestamp#806 AS last_event_timestamp#1018, truth_next_payment_day#791 AS truth_next_payment_day#1019, correct#816 AS correct#1020, diference_next_payment_day#827 AS diference_next_payment_day#1021]
         +- Project [user_id#183, frequency#997, day#998, creation_date#208, updated_at#224, next_payment_day#1000, last_event_name#289, last_event_timestamp#806, truth_next_payment_day#791, correct#816, diference_next_payment_day#827]
            +- Project [user_id#183, frequency#997, day#998, creation_date#208, updated_at#224, next_payment_day#1000, last_event_name#289, last_event_timestamp#806, truth_next_payment_day#791, correct#816, (cast(next_payment_day#1000 as double) - cast(truth_next_payment_day#791 as double)) AS diference_next_payment_day#827]
               +- Project [user_id#183, frequency#997, day#998, creation_date#208, updated_at#224, next_payment_day#1000, last_event_name#289, last_event_timestamp#806, truth_next_payment_day#791, CASE WHEN (truth_next_payment_day#791 = cast(next_payment_day#1000 as int)) THEN correct ELSE wrong END AS correct#816]
                  +- Project [user_id#183, frequency#997, day#998, creation_date#208, updated_at#224, next_payment_day#1000, last_event_name#289, event_timestamp#148 AS last_event_timestamp#806, truth_next_payment_day#791]
                     +- Project [user_id#183, creation_date#208, frequency#997, day#998, next_payment_day#1000, status#1001, updated_at#224, event_timestamp#148, amount#368L, day#360, frequency#352, event_name#108, last_event_name#289, day(CASE WHEN ((cast(CASE WHEN day#360 IN (sunday,monday,tuesday,wednesday,thursday,friday,saturday) THEN array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), day#360) ELSE cast(null as bigint) END as int) - dayofweek(to_date(2024-12-03, None, Some(America/Sao_Paulo), false))) >= 0) THEN date_add(to_date(2024-12-03, None, Some(America/Sao_Paulo), false), (cast(CASE WHEN day#360 IN (sunday,monday,tuesday,wednesday,thursday,friday,saturday) THEN array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), day#360) ELSE cast(null as bigint) END as int) - dayofweek(to_date(2024-12-03, None, Some(America/Sao_Paulo), false)))) ELSE date_add(to_date(2024-12-03, None, Some(America/Sao_Paulo), false), ((cast(CASE WHEN day#360 IN (sunday,monday,tuesday,wednesday,thursday,friday,saturday) THEN array_position(array(sunday, monday, tuesday, wednesday, thursday, friday, saturday), day#360) ELSE cast(null as bigint) END as int) - dayofweek(to_date(2024-12-03, None, Some(America/Sao_Paulo), false))) + 7)) END) AS truth_next_payment_day#791]
                        +- Filter (frequency#352 = weekly)
                           +- Project [user_id#183, creation_date#208, frequency#997, day#998, next_payment_day#1000, status#1001, updated_at#224, event_timestamp#148, amount#368L, day#360, frequency#352, event_name#108, last_event_name#289]
                              +- Join LeftOuter, (user_id#183 = user_id#281)
                                 :- SubqueryAlias cleaned_backend_table
                                 :  +- Project [user_id#183, creation_date#208, frequency#997, day#998, next_payment_day#1000, status#1001, corrected_updated_at#199 AS updated_at#224]
                                 :     +- Project [user_id#183, creation_date#208, frequency#997, day#998, next_payment_day#1000, status#1001, corrected_updated_at#199]
                                 :        +- Project [user_id#183, cast(from_unixtime(cast(creation_date#996 as bigint), yyyy-MM-dd HH:mm:ss, Some(America/Sao_Paulo)) as timestamp) AS creation_date#208, frequency#997, day#998, updated_at#999, next_payment_day#1000, status#1001, corrected_updated_at#199]
                                 :           +- Project [user_id#183, creation_date#996, frequency#997, day#998, updated_at#999, next_payment_day#1000, status#1001, cast(CASE WHEN RLIKE(updated_at#999, \d+-\d+-\d+T\d+:\d+:\d+.\d+Z) THEN cast(to_timestamp(updated_at#999, None, TimestampType, Some(America/Sao_Paulo), false) as string) ELSE from_unixtime(cast(updated_at#999 as bigint), yyyy-MM-dd HH:mm:ss, Some(America/Sao_Paulo)) END as timestamp) AS corrected_updated_at#199]
                                 :              +- Filter (status#1001 = enabled)
                                 :                 +- Project [uuid#995 AS user_id#183, creation_date#996, frequency#997, day#998, updated_at#999, next_payment_day#1000, status#1001]
                                 :                    +- Relation [uuid#995,creation_date#996,frequency#997,day#998,updated_at#999,next_payment_day#1000,status#1001] csv
                                 +- SubqueryAlias truth_backend_table
                                    +- Project [user_id#281, event_timestamp#148, allowance_amount#63L AS amount#368L, day#360, frequency#352, event_name#108, last_event_name#289]
                                       +- Project [user_id#281, event_timestamp#148, allowance_amount#63L, allowance_scheduled_day#84 AS day#360, frequency#352, event_name#108, last_event_name#289]
                                          +- Project [user_id#281, event_timestamp#148, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91 AS frequency#352, event_name#108, last_event_name#289]
                                             +- Project [user_id#281, event_timestamp#148, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_name#108 AS last_event_name#289]
                                                +- Project [user_id#133, event_timestamp#275, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, user_id#281, event_timestamp#148]
                                                   +- Join Inner, ((user_id#133 = user_id#281) AND (event_timestamp#275 = event_timestamp#148))
                                                      :- SubqueryAlias last_event_by_user
                                                      :  +- Aggregate [user_id#133], [user_id#133, max(event_timestamp#148) AS event_timestamp#275]
                                                      :     +- Deduplicate [allowance_amount#63L, allowance_scheduled_day#84, event_name#108, event_timestamp#148, user_id#133, allowance_scheduled_frequency#91]
                                                      :        +- SubqueryAlias cleaned_events
                                                      :           +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, cast(event_timestamp#116 as timestamp) AS event_timestamp#148, user_id#133]
                                                      :              +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user_id#133]
                                                      :                 +- Project [user#1004, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user#1004.id AS user_id#133]
                                                      :                    +- Project [user#1004, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116]
                                                      :                       +- Project [event#1003, user#1004, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event#1003.timestamp AS event_timestamp#116]
                                                      :                          +- Project [event#1003, user#1004, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event#1003.name AS event_name#108]
                                                      :                             +- Project [event#1003, user#1004, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91]
                                                      :                                +- Project [event#1003, user#1004, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled_day#84, allowance_scheduled#69.frequency AS allowance_scheduled_frequency#91]
                                                      :                                   +- Project [event#1003, user#1004, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled#69.day AS allowance_scheduled_day#84]
                                                      :                                      +- Project [event#1003, user#1004, allowance_amount#63L, allowance_scheduled#69]
                                                      :                                         +- Project [allowance#1002, event#1003, user#1004, allowance_amount#63L, allowance#1002.scheduled AS allowance_scheduled#69]
                                                      :                                            +- Project [allowance#1002, event#1003, user#1004, allowance#1002.amount AS allowance_amount#63L]
                                                      :                                               +- Relation [allowance#1002,event#1003,user#1004] json
                                                      +- Deduplicate [allowance_amount#63L, allowance_scheduled_day#84, event_name#108, event_timestamp#148, user_id#281, allowance_scheduled_frequency#91]
                                                         +- SubqueryAlias cleaned_events
                                                            +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, cast(event_timestamp#116 as timestamp) AS event_timestamp#148, user_id#281]
                                                               +- Project [allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user_id#281]
                                                                  +- Project [user#1007, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116, user#1007.id AS user_id#281]
                                                                     +- Project [user#1007, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event_timestamp#116]
                                                                        +- Project [event#1006, user#1007, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event_name#108, event#1006.timestamp AS event_timestamp#116]
                                                                           +- Project [event#1006, user#1007, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91, event#1006.name AS event_name#108]
                                                                              +- Project [event#1006, user#1007, allowance_amount#63L, allowance_scheduled_day#84, allowance_scheduled_frequency#91]
                                                                                 +- Project [event#1006, user#1007, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled_day#84, allowance_scheduled#69.frequency AS allowance_scheduled_frequency#91]
                                                                                    +- Project [event#1006, user#1007, allowance_amount#63L, allowance_scheduled#69, allowance_scheduled#69.day AS allowance_scheduled_day#84]
                                                                                       +- Project [event#1006, user#1007, allowance_amount#63L, allowance_scheduled#69]
                                                                                          +- Project [allowance#1005, event#1006, user#1007, allowance_amount#63L, allowance#1005.scheduled AS allowance_scheduled#69]
                                                                                             +- Project [allowance#1005, event#1006, user#1007, allowance#1005.amount AS allowance_amount#63L]
                                                                                                +- Relation [allowance#1005,event#1006,user#1007] json


In [None]:
cleaned_events.show()

+----------------+-----------------------+-----------------------------+-----------------+-------------------+--------------------+
|allowance_amount|allowance_scheduled_day|allowance_scheduled_frequency|       event_name|    event_timestamp|             user_id|
+----------------+-----------------------+-----------------------------+-----------------+-------------------+--------------------+
|              20|               saturday|                       weekly|allowance.created|2024-11-28 07:10:16|38f8d838-ea08-4fd...|
|               5|          fifteenth_day|                      monthly|allowance.created|2024-09-05 13:30:38|9748cdad-69a4-400...|
|               5|               saturday|                       weekly|allowance.created|2024-10-21 17:27:50|8252c070-2698-49d...|
|               3|                 monday|                     biweekly| allowance.edited|2024-11-06 12:53:31|3c700c10-3d42-49a...|
|              10|                 friday|                     biweekly|allo

In [189]:
# raw_payments_schedule_backend_table.filter("payment_date=3").join(
#     cleaned_backend_table, on="user_id", how="left"
# ).show()

te =all_backend_table_truth.withColumn("dif_creation_update",
    spark_f.date_diff(spark_f.col("updated_at"), spark_f.col("creation_date"))
)

histogram(
    te,
    "how_old_was_last_update",
    "Diference in days between creation and update",
)



In [190]:
tt=te.filter("how_old_was_last_update <= 4")
histogram(
    tt,
    "updated_at",
    "ate",
)