In [None]:

import duckdb
import pandas as pd
import os
from datetime import timedelta

# DuckDB connection
con = duckdb.connect()


con.execute("SET temp_directory='temp_duckdb';")      
con.execute("SET memory_limit='8GB';")                
con.execute("SET threads=4;")                        
con.execute("PRAGMA disable_object_cache;")           
con.execute("PRAGMA max_temp_directory_size='200GB';")

# Paths
MESSAGETRACK_PARQUET = "Data/messagetrack.parquet"
MESSAGELOG_PARQUET   = "Data/messagelog2.parquet"

print("Config ready.")


Config ready.


In [None]:

for path in (MESSAGETRACK_PARQUET, MESSAGELOG_PARQUET):
    exists = os.path.exists(path)
    size_gb = os.path.getsize(path)/1e9 if exists else 0
    print(f"{path}: exists={exists}, size‚âà{size_gb:.2f} GB")

assert os.path.exists(MESSAGETRACK_PARQUET), "Missing messagetrack.parquet"
assert os.path.exists(MESSAGELOG_PARQUET), "Missing messagelog2.parquet"


Data/messagetrack.parquet: exists=True, size‚âà2.81 GB
Data/messagelog2.parquet: exists=True, size‚âà21.64 GB


In [None]:


print("üîç Exploring data structure (memory-safe)...")

# Sample rows from both files
mt_sample = con.execute(f"SELECT * FROM '{MESSAGETRACK_PARQUET}' LIMIT 5").fetchdf()
ml_sample = con.execute(f"SELECT * FROM '{MESSAGELOG_PARQUET}' LIMIT 5").fetchdf()

print("üìã MessageTrack sample:")
print(mt_sample.head())
print("\nüìã MessageLog sample:")
print(ml_sample.head())


print("\nüìä Basic Statistics (lightweight):")

mt_basic = con.execute(f"""
  SELECT 
    COUNT(*) AS total_rows,
    MIN(msgcreateddt) AS first_date,
    MAX(msgcreateddt) AS last_date
  FROM '{MESSAGETRACK_PARQUET}'
""").fetchdf()


ml_basic = con.execute(f"""
  SELECT 
    COUNT(*) AS total_rows,
    MIN(auditdatetime) AS first_date,
    MAX(auditdatetime) AS last_date
  FROM '{MESSAGELOG_PARQUET}'
""").fetchdf()

print("MessageTrack:")
print(f"  ‚Ä¢ Total rows: {mt_basic['total_rows'].iloc[0]:,}")
print(f"  ‚Ä¢ Date range: {mt_basic['first_date'].iloc[0]} ‚Üí {mt_basic['last_date'].iloc[0]}")

print("\nMessageLog:")
print(f"  ‚Ä¢ Total rows: {ml_basic['total_rows'].iloc[0]:,}")
print(f"  ‚Ä¢ Date range: {ml_basic['first_date'].iloc[0]} ‚Üí {ml_basic['last_date'].iloc[0]}")

# Calculate years of data
if not mt_basic.empty and not ml_basic.empty:
    mt_start = pd.to_datetime(mt_basic['first_date'].iloc[0])
    mt_end = pd.to_datetime(mt_basic['last_date'].iloc[0])
    years_span = mt_end.year - mt_start.year + 1
    print(f"\n‚è±Ô∏è Data span: {years_span} years ({mt_start.year}-{mt_end.year})")

print("‚úÖ Basic exploration complete (detailed stats will come in later cells)")


üîç Exploring data structure (memory-safe)...
üìã MessageTrack sample:
   messagetrack_id                                 msgid      msgqueue  \
0         88824443  79e75b2a-df14-4739-8850-f09840f346dc    system_out   
1         88833346  effeb0fc-9c24-41f7-ab50-22d30a0a9db1    system_out   
2         88842521  c7789ce6-97a0-4481-8a6c-7097be9c0ada  schedule_out   
3         88840542  c7aab416-9938-41c8-b151-395bd96a4a3c     admin_out   
4         88827465  4451dc56-4467-410e-a4a3-3571e09b987c  minute_xlang   

    msgtype msgpriority msgstatus msgdepartment  \
0  Schedule      Normal         C                 
1  Schedule      Normal         C                 
2  Schedule      Normal         C                 
3     ADMIN      Normal         C           MCD   
4  Schedule      Normal         C                 

                            msgparentid            msgcreateddt  \
0                                  None 2025-03-18 19:45:00.050   
1                                  None 2

In [None]:


current_state_summary = con.execute(f"""
  SELECT
    DATE_TRUNC('month', msgcreateddt) AS creation_month,
    EXTRACT(YEAR FROM msgcreateddt) AS year,
    msgtype,
    msgstatus AS current_status,
    msgqueue  AS current_queue,
    msgdepartment,
    msgpriority,
    COUNT(*)                  AS active_workflows,
    COUNT(DISTINCT msguserid) AS users_involved,
    AVG(EXTRACT(EPOCH FROM (COALESCE(msgmodifieddt, msgcreateddt) - msgcreateddt))/86400.0) AS avg_age_days,
    MIN(msgcreateddt) AS oldest_workflow,
    MAX(COALESCE(msgmodifieddt, msgcreateddt)) AS latest_activity,
    COUNT(CASE WHEN EXTRACT(EPOCH FROM (COALESCE(msgmodifieddt, msgcreateddt) - msgcreateddt))/86400.0 <= 1  THEN 1 END) AS same_day_workflows,
    COUNT(CASE WHEN EXTRACT(EPOCH FROM (COALESCE(msgmodifieddt, msgcreateddt) - msgcreateddt))/86400.0 >  30 THEN 1 END) AS long_running_workflows
  FROM '{MESSAGETRACK_PARQUET}'
  WHERE msgcreateddt IS NOT NULL
  GROUP BY 1,2,3,4,5,6,7
  ORDER BY creation_month DESC
""").fetchdf()

# --- Fix datatypes & clean categories before export ---
# Convert datetime columns to proper ISO strings
for dt_col in ['creation_month', 'oldest_workflow', 'latest_activity']:
    if dt_col in current_state_summary.columns:
        current_state_summary[dt_col] = pd.to_datetime(
            current_state_summary[dt_col], errors='coerce'
        ).dt.strftime('%Y-%m-%d %H:%M:%S')

# Standardize categorical columns (fill nulls + strip spaces)
for cat_col in ['msgtype', 'current_status', 'current_queue', 'msgdepartment', 'msgpriority']:
    if cat_col in current_state_summary.columns:
        current_state_summary[cat_col] = (
            current_state_summary[cat_col]
            .fillna('Unknown')
            .astype(str)
            .str.strip()
        )

# Re-export cleaned version
current_state_summary.to_csv('pbi_current_state.csv', index=False)

# current_state_summary.to_csv('pbi_current_state.csv', index=False)
print(f"Saved pbi_current_state.csv with {len(current_state_summary)} rows")
current_state_summary.head()


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Saved pbi_current_state.csv with 35780 rows


Unnamed: 0,creation_month,year,msgtype,current_status,current_queue,msgdepartment,msgpriority,active_workflows,users_involved,avg_age_days,oldest_workflow,latest_activity,same_day_workflows,long_running_workflows
0,2025-05-01 00:00:00,2025,UNDERWRITING,C,hms_uw_out,CBD,Normal,171,8,4.220257,2025-05-01 05:57:17,2025-05-28 16:15:57,69,0
1,2025-05-01 00:00:00,2025,EARLY CLAIM REVIEW NOTE,C,claim_out,PCD,Normal,221,17,3.812171,2025-05-01 08:46:00,2025-05-28 18:44:08,19,0
2,2025-05-01 00:00:00,2025,POST COMMERCIAL DECISION,C,quality_care_out,I-Cash,Highest,1,1,3.083878,2025-05-16 14:12:12,2025-05-19 16:12:59,0,0
3,2025-05-01 00:00:00,2025,TREATMENT,D,treatment_out,CBD,Normal,2,2,3.843739,2025-05-13 12:23:16,2025-05-23 11:01:53,1,0
4,2025-05-01 00:00:00,2025,TREATMENT,D,treatment_in,PCD,Normal,20,7,7.817895,2025-05-01 09:40:47,2025-05-28 17:11:11,6,0


In [None]:


print("üîç Detecting MessageLog date range for chunked processing...")

date_range = con.execute(f"""
  SELECT 
    MIN(auditdatetime) AS start_date,
    MAX(auditdatetime) AS end_date,
    COUNT(*) AS total_records
  FROM '{MESSAGELOG_PARQUET}'
  WHERE auditdatetime IS NOT NULL
""").fetchdf()

ml_start = pd.to_datetime(date_range['start_date'].iloc[0])
ml_end   = pd.to_datetime(date_range['end_date'].iloc[0])
total    = int(date_range['total_records'].iloc[0])

print(f"üìÖ Audit log range: {ml_start.date()} ‚Üí {ml_end.date()}")
print(f"üìä Total audit records: {total:,}")
print(f"‚è±Ô∏è Data span: {(ml_end - ml_start).days} days")
print(f"üîÑ Will process in 90-day chunks to avoid memory issues")


üîç Detecting MessageLog date range for chunked processing...
üìÖ Audit log range: 2019-05-01 ‚Üí 2025-05-28
üìä Total audit records: 270,688,936
‚è±Ô∏è Data span: 2219 days
üîÑ Will process in 90-day chunks to avoid memory issues


In [None]:


print("üîÑ Processing MessageLog in chunks to build audit summary...")

log_summary_all = pd.DataFrame()
current_date = ml_start
chunk_days = 90  # 3-month chunks
chunk_idx = 1

while current_date < ml_end:
    next_date = min(current_date + timedelta(days=chunk_days), ml_end)
    print(f"[Chunk {chunk_idx}] {current_date.date()} ‚Üí {next_date.date()}")

    try:
        df = con.execute(f"""
            SELECT
              msgid,
              COUNT(uniqueid) AS state_changes,
              MIN(auditdatetime) AS first_audit,
              MAX(auditdatetime) AS last_audit,
              COUNT(DISTINCT audituserid) AS users_touched,
              COUNT(DISTINCT auditqueue) AS queues_visited,
              COUNT(DISTINCT auditdepartment) AS departments_involved
            FROM '{MESSAGELOG_PARQUET}'
            WHERE auditdatetime >= '{current_date.strftime('%Y-%m-%d')}'
              AND auditdatetime < '{next_date.strftime('%Y-%m-%d')}'
            GROUP BY msgid
        """).fetchdf()

        if not df.empty:
            log_summary_all = pd.concat([log_summary_all, df], ignore_index=True)
            print(f"  ‚úì {len(df):,} workflows summarized")
        else:
            print(f"  ‚ö†Ô∏è No data in this chunk")

    except Exception as e:
        print(f"  ‚ùå Error in chunk {chunk_idx}: {e}")
        
    current_date = next_date
    chunk_idx += 1
    
    # Progress save every 8 chunks (about 2 years)
    if chunk_idx % 8 == 0:
        print(f"üíæ Intermediate progress: {len(log_summary_all):,} workflows so far")

print(f"\nüîÑ Consolidating {len(log_summary_all):,} workflow summaries...")

# Consolidate overlapping workflows (same msgid across multiple chunks)
log_summary_final = log_summary_all.groupby('msgid', as_index=False).agg({
    'state_changes': 'sum',
    'first_audit': 'min',
    'last_audit': 'max',
    'users_touched': 'max',
    'queues_visited': 'max',
    'departments_involved': 'max'
})

# Calculate audit span in days
log_summary_final['audit_span_days'] = (
    pd.to_datetime(log_summary_final['last_audit']) - 
    pd.to_datetime(log_summary_final['first_audit'])
).dt.total_seconds() / 86400.0

# Save intermediate results
log_summary_final.to_parquet("complete_log_summary.parquet", index=False)
print(f"‚úÖ Complete audit summary saved: {len(log_summary_final):,} unique workflows")
print(f"üìÅ Saved: complete_log_summary.parquet")

# Show preview
print(f"\nüìã Sample audit summary:")
print(log_summary_final.head())


üîÑ Processing MessageLog in chunks to build audit summary...
[Chunk 1] 2019-05-01 ‚Üí 2019-07-30


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

  ‚úì 341,303 workflows summarized
[Chunk 2] 2019-07-30 ‚Üí 2019-10-28


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

  ‚úì 357,627 workflows summarized
[Chunk 3] 2019-10-28 ‚Üí 2020-01-26


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

  ‚úì 376,999 workflows summarized
[Chunk 4] 2020-01-26 ‚Üí 2020-04-25


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [None]:
from datetime import datetime

# Register complete_log_summary parquet as a DuckDB view
con.execute("""
  CREATE OR REPLACE VIEW complete_log_summary AS
  SELECT * FROM read_parquet('complete_log_summary.parquet')
""")

# Dynamically generate quarterly date ranges based on your data range
def generate_quarters(start_date: str, end_date: str):
    quarters = []
    current_start = datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.strptime(end_date, "%Y-%m-%d")
    
    while current_start < end:
        month = current_start.month
        if month <= 3:
            next_start = datetime(current_start.year, 4, 1)
        elif month <= 6:
            next_start = datetime(current_start.year, 7, 1)
        elif month <= 9:
            next_start = datetime(current_start.year, 10, 1)
        else:
            next_start = datetime(current_start.year + 1, 1, 1)

        if next_start > end:
            next_start = end

        quarters.append((current_start.strftime("%Y-%m-%d"), next_start.strftime("%Y-%m-%d")))
        current_start = next_start

    return quarters


start_date = "2019-01-01" 
end_date = "2025-05-28"    

quarters = generate_quarters(start_date, end_date)
print("Generated quarters for export:")
for s, e in quarters:
    print(s, "to", e)


for start_date, end_date in quarters:
    print(f"Exporting lifecycle data for {start_date} to {end_date}...")
    output_parquet = f"pbi_workflow_lifecycle_{start_date}_to_{end_date}.parquet"
    con.execute(f"""
      COPY (
        SELECT
          mt.msgid,
          mt.msgtype,
          mt.msgstatus AS current_status,
          mt.msgqueue AS current_queue,
          mt.msgdepartment AS current_department,
          mt.msgpriority,
          mt.msgcreateddt AS workflow_start,
          mt.msgmodifieddt AS workflow_modified,
          DATE_TRUNC('month', mt.msgcreateddt) AS start_month,
          EXTRACT(YEAR FROM mt.msgcreateddt) AS start_year,
          EXTRACT(MONTH FROM mt.msgcreateddt) AS start_month_num,
          EXTRACT(QUARTER FROM mt.msgcreateddt) AS start_quarter,
          COALESCE(ls.state_changes, 0) AS state_changes,
          ls.first_audit,
          ls.last_audit,
          COALESCE(ls.users_touched, 0) AS users_touched,
          COALESCE(ls.queues_visited, 1) AS queues_visited,
          COALESCE(ls.departments_involved, 1) AS departments_involved,
          COALESCE(EXTRACT(EPOCH FROM (ls.last_audit - ls.first_audit))/86400.0, 0) AS audit_span_days,
          EXTRACT(EPOCH FROM (COALESCE(mt.msgmodifieddt, mt.msgcreateddt) - mt.msgcreateddt))/86400.0 AS processing_days,
          CASE WHEN COALESCE(ls.state_changes, 0) = 0 THEN 'No Audit Trail'
               WHEN ls.state_changes <= 3 THEN 'Simple'
               WHEN ls.state_changes <= 10 THEN 'Moderate'
               ELSE 'Complex'
          END AS complexity_category,
          CASE WHEN EXTRACT(EPOCH FROM (COALESCE(mt.msgmodifieddt, mt.msgcreateddt) - mt.msgcreateddt))/86400.0 <= 1 THEN 'Same Day'
               WHEN EXTRACT(EPOCH FROM (COALESCE(mt.msgmodifieddt, mt.msgcreateddt) - mt.msgcreateddt))/86400.0 <= 7 THEN 'Within Week'
               WHEN EXTRACT(EPOCH FROM (COALESCE(mt.msgmodifieddt, mt.msgcreateddt) - mt.msgcreateddt))/86400.0 <= 30 THEN 'Within Month'
               ELSE 'Long Running'
          END AS performance_category
        FROM '{MESSAGETRACK_PARQUET}' mt
        LEFT JOIN complete_log_summary ls ON mt.msgid = ls.msgid
        WHERE mt.msgcreateddt >= DATE '{start_date}'
          AND mt.msgcreateddt < DATE '{end_date}'
          AND mt.msgcreateddt IS NOT NULL
      )
      TO '{output_parquet}' (FORMAT PARQUET);
    """)
    print(f"‚úÖ Exported chunk to {output_parquet}")


Generated quarters for export:
2019-01-01 to 2019-04-01
2019-04-01 to 2019-07-01
2019-07-01 to 2019-10-01
2019-10-01 to 2020-01-01
2020-01-01 to 2020-04-01
2020-04-01 to 2020-07-01
2020-07-01 to 2020-10-01
2020-10-01 to 2021-01-01
2021-01-01 to 2021-04-01
2021-04-01 to 2021-07-01
2021-07-01 to 2021-10-01
2021-10-01 to 2022-01-01
2022-01-01 to 2022-04-01
2022-04-01 to 2022-07-01
2022-07-01 to 2022-10-01
2022-10-01 to 2023-01-01
2023-01-01 to 2023-04-01
2023-04-01 to 2023-07-01
2023-07-01 to 2023-10-01
2023-10-01 to 2024-01-01
2024-01-01 to 2024-04-01
2024-04-01 to 2024-07-01
2024-07-01 to 2024-10-01
2024-10-01 to 2025-01-01
2025-01-01 to 2025-04-01
2025-04-01 to 2025-05-28
Exporting lifecycle data for 2019-01-01 to 2019-04-01...
‚úÖ Exported chunk to pbi_workflow_lifecycle_2019-01-01_to_2019-04-01.parquet
Exporting lifecycle data for 2019-04-01 to 2019-07-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2019-04-01_to_2019-07-01.parquet
Exporting lifecycle data for 2019-07-01 to 2019-10-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2019-07-01_to_2019-10-01.parquet
Exporting lifecycle data for 2019-10-01 to 2020-01-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2019-10-01_to_2020-01-01.parquet
Exporting lifecycle data for 2020-01-01 to 2020-04-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2020-01-01_to_2020-04-01.parquet
Exporting lifecycle data for 2020-04-01 to 2020-07-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2020-04-01_to_2020-07-01.parquet
Exporting lifecycle data for 2020-07-01 to 2020-10-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2020-07-01_to_2020-10-01.parquet
Exporting lifecycle data for 2020-10-01 to 2021-01-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2020-10-01_to_2021-01-01.parquet
Exporting lifecycle data for 2021-01-01 to 2021-04-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2021-01-01_to_2021-04-01.parquet
Exporting lifecycle data for 2021-04-01 to 2021-07-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2021-04-01_to_2021-07-01.parquet
Exporting lifecycle data for 2021-07-01 to 2021-10-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2021-07-01_to_2021-10-01.parquet
Exporting lifecycle data for 2021-10-01 to 2022-01-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2021-10-01_to_2022-01-01.parquet
Exporting lifecycle data for 2022-01-01 to 2022-04-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2022-01-01_to_2022-04-01.parquet
Exporting lifecycle data for 2022-04-01 to 2022-07-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2022-04-01_to_2022-07-01.parquet
Exporting lifecycle data for 2022-07-01 to 2022-10-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2022-07-01_to_2022-10-01.parquet
Exporting lifecycle data for 2022-10-01 to 2023-01-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2022-10-01_to_2023-01-01.parquet
Exporting lifecycle data for 2023-01-01 to 2023-04-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2023-01-01_to_2023-04-01.parquet
Exporting lifecycle data for 2023-04-01 to 2023-07-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2023-04-01_to_2023-07-01.parquet
Exporting lifecycle data for 2023-07-01 to 2023-10-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2023-07-01_to_2023-10-01.parquet
Exporting lifecycle data for 2023-10-01 to 2024-01-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2023-10-01_to_2024-01-01.parquet
Exporting lifecycle data for 2024-01-01 to 2024-04-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2024-01-01_to_2024-04-01.parquet
Exporting lifecycle data for 2024-04-01 to 2024-07-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2024-04-01_to_2024-07-01.parquet
Exporting lifecycle data for 2024-07-01 to 2024-10-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2024-07-01_to_2024-10-01.parquet
Exporting lifecycle data for 2024-10-01 to 2025-01-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2024-10-01_to_2025-01-01.parquet
Exporting lifecycle data for 2025-01-01 to 2025-04-01...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2025-01-01_to_2025-04-01.parquet
Exporting lifecycle data for 2025-04-01 to 2025-05-28...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Exported chunk to pbi_workflow_lifecycle_2025-04-01_to_2025-05-28.parquet


In [None]:
import duckdb


con = duckdb.connect('workflow_lifecycle.db')
parquet_path = r"C:/Users/Nimish Mhatre/Desktop/shree/cardifuni_share/chunkdata/pbi_workflow_lifecycle_*.parquet"
con.execute(f"""
    CREATE OR REPLACE VIEW workflow_lifecycle AS
    SELECT * FROM parquet_scan('{parquet_path}')
""")
df = con.execute("SELECT * FROM workflow_lifecycle LIMIT 5").fetchdf()
print(df.head())


                                  msgid  msgtype current_status current_queue  \
0  0d6c1c98-35ea-47f7-9d26-d0a678311d4e    CLAIM              C     claim_out   
1  0d6c8433-3b39-4151-998a-991dfc76b616  INVOICE              D     claim_out   
2  0d6cd64f-8ed6-4f44-9bbf-c0c04e9bd4bd  INVOICE              C     claim_out   
3  0d6d4470-e5ae-4a72-a0db-6b1b1b9860c3  INVOICE              C     claim_out   
4  0d6d6254-2a96-4d88-b7bf-b0f8bb6c0a96  INVOICE              C     claim_out   

  current_department msgpriority          workflow_start  \
0                CBD      Normal 2019-06-04 15:00:53.513   
1                         Normal 2019-05-07 23:43:41.003   
2                         Normal 2019-06-12 01:03:19.860   
3                         Normal 2019-05-07 09:43:16.320   
4               None      Normal 2019-05-07 15:03:28.513   

        workflow_modified start_month  start_year  ...  state_changes  \
0 2019-06-07 11:42:25.643  2019-06-01        2019  ...             12   
1 2019

In [3]:
con = duckdb.connect(r"C:/Users/Nimish Mhatre/Desktop/shree/cardifuni_share/workflow_lifecycle.db")
parquet_path = r"C:/Users/Nimish Mhatre/Desktop/shree/cardifuni_share/chunkdata/pbi_workflow_lifecycle_*.parquet"
con.execute(f"""
    CREATE OR REPLACE VIEW workflow_lifecycle AS
    SELECT * FROM parquet_scan('{parquet_path}')
""")


<_duckdb.DuckDBPyConnection at 0x1fa0c2691f0>

In [None]:
import pandas as pd

quarters = generate_quarters("2019-01-01", "2025-05-28")  

output_csv = "pbi_workflow_lifecycle_full.csv"
first_write = True

for start_date, end_date in quarters:
    parquet_file = f"pbi_workflow_lifecycle_{start_date}_to_{end_date}.parquet"
    print(f"Processing {parquet_file}...")
    df = pd.read_parquet(parquet_file)
    if not df.empty:
        if first_write:
            df.to_csv(output_csv, index=False, mode='w')
            first_write = False
        else:
            df.to_csv(output_csv, index=False, header=False, mode='a')
        print(f"Appended {len(df)} rows to {output_csv}")
    else:
        print("No records in this chunk")

print(f"\n‚úÖ Combined CSV written to {output_csv}")


In [None]:
import duckdb
import pandas as pd

# Connect to DuckDB
con = duckdb.connect()
MESSAGELOG_PARQUET = "Data/messagelog2.parquet"
query = f"""
SELECT
  msgid,
  auditdatetime,
  audituserid,
  auditqueue,
  auditdepartment,
  auditaction AS new_status,
  LAG(auditaction) OVER (PARTITION BY msgid ORDER BY auditdatetime) AS old_status
FROM '{MESSAGELOG_PARQUET}'
ORDER BY msgid, auditdatetime
"""
df_status_transitions = con.execute(query).fetchdf()
df_status_transitions['old_status'] = df_status_transitions['old_status'].fillna('Start')
output_csv_path = "workflow_status_transitions.csv"
df_status_transitions.to_csv(output_csv_path, index=False)

print(f"Exported workflow status transition timeline to {output_csv_path}")
print(df_status_transitions.head())


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

OutOfMemoryException: Out of Memory Error: Allocation failure