In [82]:
import duckdb
import os
import json
from datetime import datetime
from pathlib import Path

In [3]:
OUTPUT_DIR = Path(r"C:\Users\ahmed\Downloads\TheSportsDB\airflow\dags\data\kafka_invalid") 

date_str = datetime.now().strftime("%Y-%m-%d")

parquet_file = OUTPUT_DIR / f"{date_str}.parquet"

con = duckdb.connect(database=':memory:')

parquet_file_unix = parquet_file.as_posix()

print("Checking file path:", parquet_file_unix)
print("File exists?", parquet_file.exists())

if parquet_file.exists():
    df_preview = con.execute(f"SELECT * FROM '{parquet_file_unix}' LIMIT 10").fetchdf()
    print(df_preview)
else:
    print("File not found! Please check the path and today_parquet_file on it.")


Checking file path: C:/Users/ahmed/Downloads/TheSportsDB/airflow/dags/data/kafka_invalid/2025-08-11.parquet
File exists? True
                   topic                  event_time  \
0  rejected.soccer.event  2025-08-11T16:51:35.238307   
1  rejected.soccer.event  2025-08-11T16:51:35.238405   
2  rejected.soccer.event  2025-08-11T16:51:35.238427   
3  rejected.soccer.event  2025-08-11T16:51:35.238437   
4  rejected.soccer.event  2025-08-11T16:51:35.238451   
5  rejected.soccer.event  2025-08-11T16:51:35.238463   
6  rejected.soccer.event  2025-08-11T16:51:35.238475   
7  rejected.soccer.event  2025-08-11T16:51:35.238487   
8  rejected.soccer.event  2025-08-11T16:51:35.238494   
9  rejected.soccer.event  2025-08-11T16:51:35.238505   

                                             message  
0  {"idEvent":"2267100","idLeague":"4328","idHome...  
1  {"idEvent":"2267210","idLeague":"4328","idHome...  
2  {"idEvent":"2267231","idLeague":"4328","idHome...  
3  {"idEvent":"2267162","idLeague":"4

In [83]:
# 1) Count of invalid topics (زي ما هو لكن وسعت العمود ليشمل أكتر من كلمة مفتاحية)
query_1 = f"""
SELECT topic, COUNT(*) AS message_count
FROM read_parquet('{parquet_file_unix}')
GROUP BY topic
ORDER BY message_count DESC
"""

# 2) Number of messages by hour (time grouping)
query_2 = f"""
SELECT
  STRFTIME(CAST(event_time AS TIMESTAMP), '%Y-%m-%d %H:00:00') AS hour,
  COUNT(*) AS messages_count
FROM read_parquet('{parquet_file_unix}')
GROUP BY hour
ORDER BY hour
"""

# 3) Analyze keywords in the message column (زودت keywords)
query_3 = f"""
SELECT
  topic,
  COUNT(*) AS total,
  SUM(CASE WHEN LOWER(message) LIKE '%error%' THEN 1 ELSE 0 END) AS error_count,
  SUM(CASE WHEN LOWER(message) LIKE '%invalid%' THEN 1 ELSE 0 END) AS invalid_count,
  SUM(CASE WHEN LOWER(message) LIKE '%missing%' THEN 1 ELSE 0 END) AS missing_count,
  SUM(CASE WHEN LOWER(message) LIKE '%timeout%' THEN 1 ELSE 0 END) AS timeout_count,
  SUM(CASE WHEN LOWER(message) LIKE '%failed%' THEN 1 ELSE 0 END) AS failed_count
FROM read_parquet('{parquet_file_unix}')
GROUP BY topic
ORDER BY total DESC
"""

# 4) Daily analysis of invalid messages by topic with avg message length
query_4 = f"""
SELECT
  topic,
  DATE(event_time) AS event_date,
  COUNT(*) AS invalid_count,
  AVG(LENGTH(message)) AS avg_message_length
FROM read_parquet('{parquet_file_unix}')
WHERE LOWER(message) LIKE '%invalid%'
GROUP BY topic, event_date
ORDER BY event_date, invalid_count DESC
"""

# 5) Percentage of invalid messages by topic
query_5 = f"""
WITH total_msgs AS (
  SELECT topic, COUNT(*) AS total_count
  FROM read_parquet('{parquet_file_unix}')
  GROUP BY topic
),
invalid_msgs AS (
  SELECT topic, COUNT(*) AS invalid_count
  FROM read_parquet('{parquet_file_unix}')
  WHERE LOWER(message) LIKE '%invalid%'
  GROUP BY topic
)
SELECT
  t.topic,
  t.total_count,
  COALESCE(i.invalid_count, 0) AS invalid_count,
  ROUND(COALESCE(i.invalid_count, 0) * 100.0 / t.total_count, 2) AS invalid_percentage
FROM total_msgs t
LEFT JOIN invalid_msgs i ON t.topic = i.topic
ORDER BY invalid_percentage DESC
"""

# 6) Track daily change in percentage of invalid messages (Trend)
query_6 = f"""
WITH daily_stats AS (
  SELECT
    DATE(event_time) AS event_date,
    COUNT(*) AS total_count,
    SUM(CASE WHEN LOWER(message) LIKE '%invalid%' THEN 1 ELSE 0 END) AS invalid_count
  FROM read_parquet('{parquet_file_unix}')
  GROUP BY event_date
)
SELECT
  event_date,
  total_count,
  invalid_count,
  ROUND(invalid_count * 100.0 / total_count, 2) AS invalid_percentage
FROM daily_stats
ORDER BY event_date
"""

# 7) Top 10 frequent messages by topic (غيرت الفلتر عشان يجيب أكتر رسائل شائعة مش بس soccer)
query_7 = f"""
SELECT topic, message, COUNT(*) AS occurrences
FROM read_parquet('{parquet_file_unix}')
GROUP BY topic, message
ORDER BY occurrences DESC
LIMIT 10
"""

# 8) Future events (events بعد اليوم)
query_8 = f"""
SELECT *
FROM read_parquet('{parquet_file_unix}')
WHERE CAST(event_time AS DATE) > CURRENT_DATE
"""

# 9) Error reason distribution over time (للـ stacked bar chart في Grafana)
query_9 = f"""
SELECT
  topic,
  DATE_TRUNC('hour', CAST(event_time AS TIMESTAMP)) AS hour,
  CASE
    WHEN LOWER(message) LIKE '%invalid%' THEN 'Invalid'
    WHEN LOWER(message) LIKE '%error%' THEN 'Error'
    WHEN LOWER(message) LIKE '%missing%' THEN 'Missing'
    WHEN LOWER(message) LIKE '%timeout%' THEN 'Timeout'
    WHEN LOWER(message) LIKE '%failed%' THEN 'Failed'
    ELSE 'Other'
  END AS error_category,
  COUNT(*) AS count
FROM read_parquet('{parquet_file_unix}')
GROUP BY topic, hour, error_category
ORDER BY hour, topic
"""

# 10) Moving average of invalid percentage per topic (trend smoothing)
query_10 = f"""
WITH hourly_stats AS (
  SELECT
    topic,
    DATE_TRUNC('hour', CAST(event_time AS TIMESTAMP)) AS hour,
    COUNT(*) AS total_count,
    SUM(CASE WHEN LOWER(message) LIKE '%invalid%' THEN 1 ELSE 0 END) AS invalid_count
  FROM read_parquet('{parquet_file_unix}')
  GROUP BY topic, hour
)
SELECT
  topic,
  hour,
  ROUND(100.0 * invalid_count / total_count, 2) AS invalid_percentage,
  AVG(ROUND(100.0 * invalid_count / total_count, 2)) OVER (
    PARTITION BY topic
    ORDER BY hour
    ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
  ) AS moving_avg_6_hours
FROM hourly_stats
ORDER BY topic, hour
"""

# 11) Null ratio per row (percentage of nulls in each row)
query_11 = f"""
SELECT 
    topic,
    COUNT(*) AS total,
    SUM(CASE WHEN message IS NULL THEN 1 ELSE 0 END) AS null_count,
    ROUND(SUM(CASE WHEN message IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) AS null_percentage
FROM read_parquet('{parquet_file_unix}')
GROUP BY topic
ORDER BY null_percentage DESC;

"""

# 12) This query calculates the percentage of null values in each row for the specified columns.
query_12 = f"""
SELECT 
    *,
    ROUND(
        (
            (CASE WHEN topic IS NULL THEN 1 ELSE 0 END) +
            (CASE WHEN event_time IS NULL THEN 1 ELSE 0 END) +
            (CASE WHEN message IS NULL THEN 1 ELSE 0 END)
        ) * 100.0 / 3, 
        2
    ) AS null_percentage_per_row
FROM read_parquet('{parquet_file_unix}');
"""

In [84]:
df_topic_counts              = con.execute(query_1).fetchdf()
df_hourly_counts             = con.execute(query_2).fetchdf()
df_keyword_analysis          = con.execute(query_3).fetchdf()
df_daily_invalid             = con.execute(query_4).fetchdf()
df_invalid_percentage_per_topic = con.execute(query_5).fetchdf()
df_daily_invalid_trend       = con.execute(query_6).fetchdf()
df_top_invalid_messages      = con.execute(query_7).fetchdf()
df_future_events             = con.execute(query_8).fetchdf()
df_rejected_reasons          = con.execute(query_9).fetchdf()
df_moving_avg_invalid        = con.execute(query_10).fetchdf()
df_percentage_nulls_each_row = con.execute(query_11).fetchdf()
df_specified_col_null_per    = con.execute(query_12).fetchdf()

In [86]:
print("1) Messages count by topic:")
print(df_topic_counts)

1) Messages count by topic:
                              topic  message_count
0             rejected.soccer.event          16321
1             rejected.soccer.venue             86
2  rejected.soccer.event.highlights             10


In [87]:
print("\n2) Messages count by hour:")
print(df_hourly_counts)


2) Messages count by hour:
                  hour  messages_count
0  2025-08-11 16:00:00           16417


In [88]:
print("\n3) Keyword analysis by topic:")
print(df_keyword_analysis)


3) Keyword analysis by topic:
                              topic  total  error_count  invalid_count  \
0             rejected.soccer.event  16321          1.0            0.0   
1             rejected.soccer.venue     86          0.0            0.0   
2  rejected.soccer.event.highlights     10          0.0            0.0   

   missing_count  timeout_count  failed_count  
0            0.0            0.0           0.0  
1            0.0            0.0           0.0  
2            0.0            0.0           0.0  


In [89]:
print("\n4) Daily invalid messages with average message length:")
print(df_daily_invalid)


4) Daily invalid messages with average message length:
Empty DataFrame
Columns: [topic, event_date, invalid_count, avg_message_length]
Index: []


In [90]:
print("\n5) Percentage of invalid messages by topic:")
print(df_invalid_percentage_per_topic)


5) Percentage of invalid messages by topic:
                              topic  total_count  invalid_count  \
0  rejected.soccer.event.highlights           10              0   
1             rejected.soccer.venue           86              0   
2             rejected.soccer.event        16321              0   

   invalid_percentage  
0                 0.0  
1                 0.0  
2                 0.0  


In [91]:
print("\n6) Daily invalid percentage trend:")
print(df_daily_invalid_trend)


6) Daily invalid percentage trend:
  event_date  total_count  invalid_count  invalid_percentage
0 2025-08-11        16417            0.0                 0.0


In [92]:
print("\n7) Top 10 frequent messages by topic:")
print(df_top_invalid_messages)


7) Top 10 frequent messages by topic:
                   topic                                            message  \
0  rejected.soccer.venue  {"idVenue":"21398","intFormedYear":"0","strVen...   
1  rejected.soccer.venue  {"idVenue":"24006","intFormedYear":"0","strVen...   
2  rejected.soccer.venue  {"idVenue":"24006","intFormedYear":"0","strVen...   
3  rejected.soccer.venue  {"idVenue":"16479","intFormedYear":"0","strVen...   
4  rejected.soccer.venue  {"idVenue":"16479","intFormedYear":"0","strVen...   
5  rejected.soccer.venue  {"idVenue":"17728","intFormedYear":"0","strVen...   
6  rejected.soccer.venue  {"idVenue":"17728","intFormedYear":"0","strVen...   
7  rejected.soccer.venue  {"idVenue":"21398","intFormedYear":"0","strVen...   
8  rejected.soccer.venue  {"idVenue":"17728","intFormedYear":"0","strVen...   
9  rejected.soccer.venue  {"idVenue":"17728","intFormedYear":"0","strVen...   

   occurrences  
0            2  
1            2  
2            2  
3            2  
4     

In [93]:
print("\n8) Future events after today:")
print(df_future_events)


8) Future events after today:
Empty DataFrame
Columns: [topic, event_time, message]
Index: []


In [94]:
print("\n9) Error reason distribution over time:")
print(df_rejected_reasons)


9) Error reason distribution over time:
                              topic                hour error_category  count
0             rejected.soccer.event 2025-08-11 16:00:00          Other  16320
1             rejected.soccer.event 2025-08-11 16:00:00          Error      1
2  rejected.soccer.event.highlights 2025-08-11 16:00:00          Other     10
3             rejected.soccer.venue 2025-08-11 16:00:00          Other     86


In [95]:
print("\n10) Moving average of invalid percentage per topic:")
print(df_moving_avg_invalid)



10) Moving average of invalid percentage per topic:
                              topic                hour  invalid_percentage  \
0             rejected.soccer.event 2025-08-11 16:00:00                 0.0   
1  rejected.soccer.event.highlights 2025-08-11 16:00:00                 0.0   
2             rejected.soccer.venue 2025-08-11 16:00:00                 0.0   

   moving_avg_6_hours  
0                 0.0  
1                 0.0  
2                 0.0  


In [97]:
print("\n11) Null ratio per row (percentage of nulls in each row:")
print(df_percentage_nulls_each_row)


11) Null ratio per row (percentage of nulls in each row:
                              topic  total  null_count  null_percentage
0  rejected.soccer.event.highlights     10         0.0              0.0
1             rejected.soccer.venue     86         0.0              0.0
2             rejected.soccer.event  16321         0.0              0.0


In [98]:
print("\n12)  This query calculates the percentage of null values in each row for the specified columns:")
print(df_specified_col_null_per)


12)  This query calculates the percentage of null values in each row for the specified columns:
                       topic                  event_time  \
0      rejected.soccer.event  2025-08-11T16:51:35.238307   
1      rejected.soccer.event  2025-08-11T16:51:35.238405   
2      rejected.soccer.event  2025-08-11T16:51:35.238427   
3      rejected.soccer.event  2025-08-11T16:51:35.238437   
4      rejected.soccer.event  2025-08-11T16:51:35.238451   
...                      ...                         ...   
16412  rejected.soccer.venue  2025-08-11T16:53:04.007535   
16413  rejected.soccer.venue  2025-08-11T16:53:04.007549   
16414  rejected.soccer.venue  2025-08-11T16:53:04.007564   
16415  rejected.soccer.venue  2025-08-11T16:53:04.007577   
16416  rejected.soccer.venue  2025-08-11T16:53:04.007661   

                                                 message  \
0      {"idEvent":"2267100","idLeague":"4328","idHome...   
1      {"idEvent":"2267210","idLeague":"4328","idHome...   
2 

In [96]:
# This function converts nested JSON structures into a flat dictionary and calculates null percentages for every key and value in msg  
def flatten_json(y):
    """change to flatten JSON form nested"""
    out = {}
    def flatten(x, name=''):
        if isinstance(x, dict):
            for a in x:
                flatten(x[a], f"{name}{a}.")
        elif isinstance(x, list):
            for i, a in enumerate(x):
                flatten(a, f"{name}{i}.")
        else:
            out[name[:-1]] = x
    flatten(y)
    return out

results = []

for idx, row in df.iterrows():
    try:
        msg_dict = json.loads(row['message'])
    except Exception:
        continue
    
    flat_msg = flatten_json(msg_dict)
    total_keys = len(flat_msg)
    null_keys = sum(1 for v in flat_msg.values() if v is None or v == "" or str(v).lower() == "null")
    null_percentage = round((null_keys / total_keys) * 100, 2) if total_keys else 0

    results.append({
        "topic": row['topic'],
        "event_time": row['event_time'],
        "total_keys": total_keys,
        "null_keys": null_keys,
        "null_percentage": null_percentage
    })

import pandas as pd
results_df = pd.DataFrame(results)
print(results_df.sort_values("null_percentage", ascending=False))

                       topic                  event_time  total_keys  \
1737   rejected.soccer.event  2025-08-11T16:51:36.103012          33   
741    rejected.soccer.event  2025-08-11T16:51:35.510355          33   
1558   rejected.soccer.event  2025-08-11T16:51:36.100157          33   
2834   rejected.soccer.event  2025-08-11T16:51:36.597986          33   
2485   rejected.soccer.event  2025-08-11T16:51:36.398428          33   
...                      ...                         ...         ...   
16289  rejected.soccer.event  2025-08-11T16:52:39.145877          22   
16290  rejected.soccer.event  2025-08-11T16:52:39.145889          22   
16291  rejected.soccer.event  2025-08-11T16:52:39.145902          22   
16292  rejected.soccer.event  2025-08-11T16:52:39.145994          22   
16293  rejected.soccer.event  2025-08-11T16:52:39.146017          22   

       null_keys  null_percentage  
1737          11            33.33  
741           11            33.33  
1558          11           