In [None]:
import pandas as pd
import json
import re
import emoji 
from tqdm.auto import tqdm
from dotenv import load_dotenv

load_dotenv()
tqdm.pandas()

In [None]:
path = "../../data/2025-05-21-travelask/result.json"
df = pd.read_json(path)

In [None]:
df_all_messages = pd.json_normalize(df.messages)

In [None]:
import pandas as pd
import json
import re
import emoji # Make sure to install: pip install emoji
from tqdm.auto import tqdm # Using tqdm.auto for flexible environment (notebook, console)

# Initialize tqdm for pandas
tqdm.pandas()

# --- Text Parsing and Analysis Helpers ---

def _flatten_message_elements(data_list):
    elements = []
    if not data_list: # Handle empty list input
        return elements
    # Check if the first level is a list of lists (common in some message structures)
    if all(isinstance(item, list) for item in data_list):
        for sublist in data_list:
            elements.extend(sublist)
    else:
        elements.extend(data_list)
    return elements

def parse_message_text_robust(text_column_entry):
    parsed_content = None
    # Order of checks is important to avoid pd.isna() on list/dict.
    if isinstance(text_column_entry, (list, dict)):
        parsed_content = text_column_entry  # Already a list/dict
    elif pd.isna(text_column_entry): # Handles np.nan, None
        return []
    elif isinstance(text_column_entry, str):
        try:
            stripped_text = text_column_entry.strip()
            # Attempt to parse only if it looks like a JSON list/object
            if (stripped_text.startswith('[') and stripped_text.endswith(']')) or \
               (stripped_text.startswith('{') and stripped_text.endswith('}')):
                parsed_content = json.loads(stripped_text)
            else:
                # It's a plain string
                return [text_column_entry]
        except json.JSONDecodeError:
            # Failed to parse as JSON, treat as a plain string
            return [text_column_entry]
    else:
        # Other scalar types (int, float, bool etc.), convert to string and wrap in list
        return [str(text_column_entry)]

    # Ensure consistent output format (list of elements)
    if isinstance(parsed_content, dict): # If top level was a dict
        return [parsed_content]
    elif isinstance(parsed_content, list):
        return _flatten_message_elements(parsed_content)
    else:
        # If json.loads returned a scalar (e.g. "null", "true", "123" as JSON)
        return [str(parsed_content)] if parsed_content is not None else []


def analyze_text_content(elements):
    plain_text_parts = []
    link_count = 0
    mention_count = 0
    text_link_count = 0
    other_format_count = 0 # bold, italic, etc. (kept for potential future use, not used by current filters)

    for item in elements:
        if isinstance(item, str):
            plain_text_parts.append(item)
        elif isinstance(item, dict):
            item_type = item.get('type', '').lower()
            # Ensure 'text' value is a string, as it could be other types in malformed data
            text_in_dict = item.get('text')
            if not isinstance(text_in_dict, str) and text_in_dict is not None:
                text_in_dict = str(text_in_dict)
            elif text_in_dict is None:
                text_in_dict = ""

            if item_type == 'mention':
                mention_count += 1
            elif item_type == 'text_link':
                text_link_count += 1
            elif item_type == 'link':
                link_count += 1
            elif item_type in ['bold', 'italic', 'underline', 'strikethrough', 'code', 'pre']:
                other_format_count +=1
            
            if text_in_dict: # Add text from any dict element if it exists
                plain_text_parts.append(text_in_dict)
    
    full_text = " ".join(plain_text_parts).strip() # Strip leading/trailing whitespace from combined text
    interactive_elements_count = link_count + mention_count + text_link_count

    return {
        'full_text_original_case': full_text,
        'full_text_lower': full_text.lower(),
        'link_count': link_count, # Kept for completeness, summed into interactive_elements_count
        'mention_count': mention_count, # Kept for completeness
        'text_link_count': text_link_count, # Kept for completeness
        'interactive_elements_count': interactive_elements_count,
        'other_format_count': other_format_count
    }

# --- Craigslist-like Message Detection Logic ---

CRAIGSLIST_KEYWORDS = [
    "куплю", "купить", "покупка",
    "продам", "продать", "продаю", "продается",
    "сдам", "сдаю", "сдается", # rent out
    "сниму", "возьму в аренду", "арендую", # rent
    "обменяю", "обмен",
    "ищу", # can be for job, item, service
    "предлагаю", "оказываю услуги", "услуги",
    "требуется", "вакансия", "ищем сотрудника", "работа", "подработка", "фриланс",
    "резюме", "ищу работу", "портфолио"
]

PRICE_REGEX = r"\b(\d{2,}(?:[.,]\d+)?)\s*(руб|usd|eur|gel|лари|тенге|kzt|btc|eth|usdt|₽|\$|€|byn|uah|amd|azn|uzs|kgs|tjs|tmr|byn)\b"
PRICE_KEYWORDS = ["цена", "стоимость", "прайс", "оплата", "бюджет", "зарплата", "зп"]

def is_craigslist_like_message(text_column_entry):
    elements = parse_message_text_robust(text_column_entry)
    
    if not elements or (len(elements) == 1 and isinstance(elements[0], str) and not elements[0].strip()):
        return False # Empty or whitespace-only string after parsing
        
    analysis = analyze_text_content(elements)
    text_lower = analysis['full_text_lower']
    original_text = analysis['full_text_original_case']

    if not text_lower.strip() and analysis['interactive_elements_count'] == 0: # No text and no links/mentions
        return False

    # Craigslist-style ads detection (adapted from original Rule 3)
    has_craigslist_keyword = any(keyword in text_lower for keyword in CRAIGSLIST_KEYWORDS)
    if has_craigslist_keyword:
        has_price_signal = any(kw in text_lower for kw in PRICE_KEYWORDS) or re.search(PRICE_REGEX, text_lower, re.IGNORECASE)
        has_contact_phrase = any(phrase in text_lower for phrase in ["писать в лс", "пишите в личку", "звоните", "обращайтесь", "подробности по телефону", "мой контакт", "связь со мной"])
        
        # If a craigslist keyword is present with a clear call to action or price/contact info
        if analysis['interactive_elements_count'] >= 1 or has_price_signal or has_contact_phrase:
            return True
            
        # If it's a longer message with a strong craigslist keyword, it's also suspicious,
        # even without explicit contact/price IF other signals were absent.
        strong_craigslist_kws = ["продам", "сдам", "куплю", "вакансия", "требуется", "продается", "сдается"]
        # This condition checks if the previous 'if' was false (i.e., no interactive elements, no price, no contact phrase)
        if analysis['interactive_elements_count'] == 0 and not has_price_signal and not has_contact_phrase:
            if len(original_text) > 70 and any(kw in text_lower for kw in strong_craigslist_kws):
                return True
    
    return False

# --- Generic/Emoji Message Helpers ---

GENERIC_PHRASES_LIST = [
    "всем привет", "привет", "здрасьте", "здравствуйте", "салют", "ку",
    "доброе утро", "добрый день", "добрый вечер", "доброй ночи", "ду", "дд", "дв", "дн",
    "спс", "спасибо", "спсб", "благодарю", "мерси", "благодарочка", "пасиб", "thx", "ty",
    "пожалуйста", "пж", "плз", "что", "чтобы что","не за что", "нзч", "welcome", "yw",
    "ок", "окей", "оке", "ладно", "хорошо", "хорош", "гуд", "согласен", "согласна", "договорились", "добро", "конечно", "yes", "да", "да ну",
    "понял", "поняла", "понятно", "ясно", "принял", "приняла", "принято", "яснопонятно", "got it",
    "лол", "кек", "рофл", "ахах", "ахахах", "хаха", "хах", "хех", "гг", "ыы", "ору", "ор", "жиза", "lol", "kek", "rofl",
    "всего доброго", "всего хорошего", "удачи", "добра", "бб", "bb",
    "до свидания", "пока", "бай", "чао", "до встречи",
    "нет", "ага", "угу", "неа", "ноуп", "no", "nope", "ну", "ну да",
    "+", "++", "-", "--", "+++", "---",
    "супер", "класс", "отлично", "круто", "круть", "кайф", "кайфы", "кул", "огонь", "топ", "бомба", "пушка", "найс", "nice", "cool", "perfecto",
    "f", "rip", 
    "ап", "up", "bump",
    "сорян", "сори", "сорри", "извини", "извините", "извиняюсь", "прости", "простите", "sorry", "sry",
    "хз", "хзхз", "не знаю", "без понятия", "idk",
    "мда", "мде", "эх", "ого", "ухты", "вау", "wow",
    "минутку", "сек", "сейчас", "погодь", "подожди"
]
NORMALIZED_GENERIC_PHRASES = set(GENERIC_PHRASES_LIST)


def get_plain_text_from_message(text_column_entry):
    elements = parse_message_text_robust(text_column_entry)
    plain_text_parts = []
    for item in elements:
        if isinstance(item, str):
            plain_text_parts.append(item)
        elif isinstance(item, dict) and 'text' in item:
            text_val = item['text']
            if isinstance(text_val, str):
                plain_text_parts.append(text_val)
            elif text_val is not None:
                plain_text_parts.append(str(text_val))
    return " ".join(plain_text_parts).strip()


def is_emoji_only(text: str) -> bool:
    if not isinstance(text, str) or not text.strip():
        return False
    text_no_space = "".join(text.split()) # Remove all whitespace
    if not text_no_space: # Original text was only whitespace
        return False

    demojized_text = emoji.demojize(text_no_space)
    if re.fullmatch(r"(:[a-zA-Z0-9_+\-]+(?:_face_with_[a-zA-Z0-9_+\-]+)?(?:_type_[1-6])?:)+", demojized_text):
        return True
    
    try: 
        if emoji.emoji_count(text_no_space) > 0 and emoji.replace_emoji(text_no_space, replace='') == '':
            return True
    except AttributeError: 
        try: 
            is_all_emoji = True
            if not text_no_space: return False 
            for char_in_text in text_no_space:
                if not emoji.is_emoji(char_in_text):
                    is_all_emoji = False
                    break
            if is_all_emoji:
                return True
        except AttributeError: 
            pass 
            
    emoji_pattern = re.compile(
        "["
        "\U0001F600-\U0001F64F"  # Emoticons
        "\U0001F300-\U0001F5FF"  # Symbols & Pictographs
        "\U0001F680-\U0001F6FF"  # Transport & Map Symbols
        "\U0001F1E0-\U0001F1FF"  # Flags (iOS)
        "\U00002702-\U000027B0"  # Dingbats
        "\U000024C2-\U0001F251" 
        "\U0001F900-\U0001F9FF"  # Supplemental Symbols and Pictographs
        "\U0001FA70-\U0001FAFF"  # Symbols and Pictographs Extended-A
        "\U00002600-\U000026FF"  # Miscellaneous Symbols
        "\U00002B05-\U00002B07"  # Arrows
        "\U00002934-\U00002935"  # Arrows
        "\U00003297-\U00003299"  # Enclosed Characters
        "\U00003030"             # Wavy Dash
        "\U000000A9-\U000000AE"  # Copyright, Registered Sign, TM
        "\U00002122"             # TM
        "\U0000231A-\U0000231B"  # Watch, Hourglass
        "\U0000200D"             # Zero Width Joiner (part of sequences)
        "]+", flags=re.UNICODE)
    if emoji_pattern.fullmatch(text_no_space): 
        return True
            
    return False


def is_generic_or_emoji_message(text_column_entry):
    plain_text = get_plain_text_from_message(text_column_entry)

    # 1. Handle empty or whitespace-only messages
    if not plain_text.strip():
        return True

    # 2. Handle emoji-only messages
    if is_emoji_only(plain_text):
        return True

    # 3. Normalization for generic phrase check
    text_to_check = plain_text.lower()

    # 3a. Remove trailing parentheses (common in Russian for smileys)
    text_to_check = re.sub(r"[()]+$", "", text_to_check).strip()
    
    # 3b. Remove all emojis from the text
    try:
        text_to_check = emoji.replace_emoji(text_to_check, replace='')
    except AttributeError: # Fallback if replace_emoji is not available
        text_to_check = emoji.demojize(text_to_check)
        text_to_check = re.sub(r":[^:\s]+:", "", text_to_check) # Remove demojized emojis
        text_to_check = emoji.emojize(text_to_check) # Convert back if any non-emoji text remains

    # 3c. Strip again in case emoji removal left spaces or if it became empty
    text_to_check = text_to_check.strip()

    if not text_to_check: # If only emojis and/or trailing parentheses were present
        return False # Not a generic *phrase* if what's left is empty

    # 3d. Standard punctuation and space normalization
    text_to_check = re.sub(r"""[\s!"#$%&'()*+,.\-/:;<=>?@[\\\]^_`{|}~]+""", " ", text_to_check).strip()
    text_to_check = re.sub(r'\s+', ' ', text_to_check).strip()


    # 4. Check against normalized generic phrases
    if text_to_check in NORMALIZED_GENERIC_PHRASES:
        return True
    
    # Handle cases like "+++" or "---" which become "+ +" or "- -" after regex
    if text_to_check in ["+ +", "- -", "+ + +", "- - -"]: 
        return True
            
    return False

# --- Main Preprocessing Function ---

BANNED_FROM_NAMES = [
    'travelask_moderator_bot',
    'NoName👽NotAbot',
    '@travelask_help_bot',
    'Trusted_recommendation_bot',
    'Анна Богомолова Чат-бот в ТГ, Getcourse, Salebot       27' # Note the trailing spaces
]

def preprocess_messages_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    df_processed = df.copy()
    initial_rows = len(df_processed)

    if initial_rows == 0:
        print("Initial DataFrame is empty. No preprocessing to perform.")
        return df_processed
        
    print(f"Initial number of messages: {initial_rows}")

    # Step 1: Remove all rows where type == "service"
    if 'type' in df_processed.columns:
        rows_before = len(df_processed)
        df_processed = df_processed[df_processed['type'] != 'service'].copy()
        print(f"Step 1: Removed {rows_before - len(df_processed)} service messages. Rows remaining: {len(df_processed)}")
    else:
        print("Warning: 'type' column not found. Skipping Step 1 (remove service messages).")

    # Step 2: Remove messages from specified 'from' names
    if 'from' in df_processed.columns and not df_processed.empty:
        rows_before = len(df_processed)
        # Ensure 'from' column is treated as string for comparison, handling potential NaNs
        # .isin() handles NaN correctly (NaN.isin(list) is False unless NaN is in list)
        df_processed = df_processed[~df_processed['from'].isin(BANNED_FROM_NAMES)].copy()
        print(f"Step 2: Removed {rows_before - len(df_processed)} messages from banned 'from' names. Rows remaining: {len(df_processed)}")
    elif df_processed.empty:
        print("DataFrame is empty before 'from' name removal. Skipping Step 2.")
    else:
        print("Warning: 'from' column not found. Skipping Step 2 (remove messages from banned 'from' names).")

    # Step 3: Identify and remove Craigslist-like messages
    if 'text' in df_processed.columns and not df_processed.empty:
        rows_before = len(df_processed)
        print("Step 3: Identifying and removing Craigslist-like messages...")
        craigslist_mask = df_processed['text'].progress_apply(is_craigslist_like_message)
        df_processed = df_processed[~craigslist_mask].copy()
        print(f"Step 3: Removed {rows_before - len(df_processed)} Craigslist-like messages. Rows remaining: {len(df_processed)}")
    elif df_processed.empty:
        print("DataFrame is empty before Craigslist-like message removal. Skipping Step 3.")
    else:
        print("Warning: 'text' column not found. Skipping Craigslist-like message removal (Step 3).")

    # Step 4: Remove generic greetings/thanks, emoji-only, and EMPTY messages
    if 'text' in df_processed.columns and not df_processed.empty:
        rows_before = len(df_processed)
        print("Step 4: Identifying and removing generic/emoji-only/empty messages...")
        generic_mask = df_processed['text'].progress_apply(is_generic_or_emoji_message)
        df_processed = df_processed[~generic_mask].copy()
        print(f"Step 4: Removed {rows_before - len(df_processed)} generic/emoji-only/empty messages. Rows remaining: {len(df_processed)}")
    elif df_processed.empty:
        print("DataFrame is empty before generic/emoji/empty message removal. Skipping Step 4.")
    else:
        print("Warning: 'text' column not found. Skipping generic/emoji/empty message removal (Step 4).")
    
    final_rows = len(df_processed)
    print(f"Preprocessing complete. Total messages removed: {initial_rows - final_rows}. Final number of messages: {final_rows}")
    return df_processed

In [None]:
df_cleaned = preprocess_messages_dataframe(df_all_messages)
print(f"Original DataFrame shape: {df_all_messages.shape}")
print(f"Cleaned DataFrame shape: {df_cleaned.shape}")

In [None]:
import pandas as pd
import json
import collections
from typing import List, Dict, Any, Set, Tuple, Optional

# Import tqdm for progress bars
from tqdm.auto import tqdm
# tqdm.pandas() is used to add progress_apply to pandas Series/DataFrame
# It needs to be initialized once.

# Define a custom type for message dictionaries for clarity
MessageType = Dict[str, Any]

# Define the fields to be included in the output JSON
SELECTED_JSON_FIELDS = ['id', 'date_unixtime', 'text', 'reply_to_message_id']

def _default_json_serializer(obj: Any) -> str:
    """
    Default JSON serializer for objects not handled by standard json.dumps.
    This primarily handles datetime objects by converting them to ISO format strings.
    """
    if hasattr(obj, 'isoformat'):  # Covers datetime.datetime, datetime.date
        return obj.isoformat()
    # For other non-serializable types, convert to string.
    return str(obj)

def _validate_input_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    """
    Validates the input DataFrame, checks for required columns for threading logic,
    and converts columns to expected types.
    Returns a processed copy of the DataFrame if validation passes.
    """
    df_processed = df.copy()

    # Required columns for the threading logic itself
    required_columns_for_logic = ['id', 'reply_to_message_id', 'date_unixtime']
    for col in required_columns_for_logic:
        if col not in df_processed.columns:
            raise ValueError(f"Missing required column for threading logic: {col}")

    # Check for columns needed for the final JSON output.
    # These are not strictly required for the threading algorithm to run,
    # but if they are missing, they will be None in the output JSON.
    # It's good practice to inform the user if they are missing.
    for col in SELECTED_JSON_FIELDS:
        if col not in df_processed.columns:
            print(f"Warning: Column '{col}' (selected for JSON output) not found in input DataFrame. "
                  f"It will be missing or None in the output JSON for messages.")


    try:
        if df_processed['id'].isnull().any():
            raise ValueError("Column 'id' contains NaN/null values.")
        temp_ids = df_processed['id'].astype('float').astype('Int64')
        if temp_ids.isnull().any():
            raise ValueError("Column 'id' contains values that cannot be robustly converted to integers.")
        df_processed['id'] = temp_ids.astype(int)
    except (ValueError, TypeError, AttributeError) as e:
        raise ValueError(f"Column 'id' must contain unique, non-null values convertible to integers. Error: {e}") from e

    if not df_processed['id'].is_unique:
        raise ValueError("Column 'id' must contain unique values.")

    try:
        df_processed['reply_to_message_id'] = df_processed['reply_to_message_id'].astype('float').astype('Int64')
    except (ValueError, TypeError) as e:
        raise ValueError(f"Column 'reply_to_message_id' contains values not convertible to nullable integers. Error: {e}") from e

    if not pd.api.types.is_numeric_dtype(df_processed['date_unixtime']):
        try:
            df_processed['date_unixtime'] = pd.to_numeric(df_processed['date_unixtime'])
        except (ValueError, TypeError) as e:
            raise ValueError(f"Column 'date_unixtime' must be numeric or convertible to numeric. Error: {e}") from e
    
    if df_processed['date_unixtime'].isnull().any():
        print("Info: 'date_unixtime' column contains NaN values. Messages with NaN timestamps will be sorted accordingly.")
        
    return df_processed


def group_messages_into_threads(df: pd.DataFrame, min_messages_per_thread: int = 2) -> pd.DataFrame:
    """
    Groups messages from a DataFrame into conversational threads.

    Each thread starts with a root message (one with no 'reply_to_message_id')
    and includes all messages that directly or indirectly reply to it. Messages
    within each thread are sorted chronologically by 'date_unixtime', then by 'id'.
    Threads with fewer messages than `min_messages_per_thread` are excluded.
    The output JSON for messages will only contain the fields specified in
    `SELECTED_JSON_FIELDS`. Progress bars are displayed using tqdm.

    Args:
        df: Pandas DataFrame with message data. Expected columns include:
            'id': Unique integer identifier for the message.
            'reply_to_message_id': Integer ID of the message this message
                                   replies to (NaN, None, or pd.NA if not a reply).
            'date_unixtime': Numeric timestamp (e.g., Unix time) for sorting and for output JSON.
            'text': Text content of the message (for output JSON).
            Other columns can be present but won't be in the final JSON unless
            specified in `SELECTED_JSON_FIELDS`.
        min_messages_per_thread (int): The minimum number of messages a thread
                                       must contain to be included in the output.
                                       Defaults to 2 (i.e., filter out single-message threads).

    Returns:
        A pandas DataFrame where each row represents a conversational thread.
        Columns:
            'root_message_id': The integer ID of the first message in the thread.
            'messages_json': A JSON string representing a list of messages
                             (as dictionaries with selected fields) in the thread, sorted.
    
    Raises:
        ValueError: If input DataFrame is malformed (e.g., missing columns,
                    ID issues, type conversion problems) or if JSON serialization fails.
    """
    if df.empty:
        return pd.DataFrame(columns=['root_message_id', 'messages_json'])

    tqdm.pandas() # Initialize tqdm for pandas operations like progress_apply

    df_processed = _validate_input_dataframe(df)

    all_message_records = df_processed.to_dict(orient='records')
    
    messages_as_dicts: Dict[int, MessageType] = {}
    for record in tqdm(all_message_records, desc="Indexing messages", unit="msg"):
        messages_as_dicts[record['id']] = record

    adj_list: Dict[int, List[int]] = collections.defaultdict(list)
    root_message_ids: List[int] = []

    for msg_id, msg_data in tqdm(messages_as_dicts.items(), desc="Building reply graph", unit="msg"):
        parent_id_val = msg_data.get('reply_to_message_id')
        # Ensure parent_id is an int if not pd.NA or None
        parent_id: Optional[int] = None
        if pd.notna(parent_id_val):
            try:
                parent_id = int(parent_id_val)
            except (ValueError, TypeError):
                print(f"Warning: Message ID {msg_id} has a non-integer 'reply_to_message_id' "
                      f"value '{parent_id_val}'. This reply link will be ignored.")
                parent_id = None # Treat as no parent

        if parent_id is None:
            root_message_ids.append(msg_id)
        else: # parent_id is a valid integer
            adj_list[parent_id].append(msg_id)
    
    root_message_ids.sort()

    threads_data: List[Dict[str, Any]] = []
    processed_messages_globally: Set[int] = set()

    for root_id in tqdm(root_message_ids, desc="Constructing threads", unit="thread"):
        if root_id in processed_messages_globally:
            # This case should ideally not happen if roots are truly roots and graph is a DAG collection
            # print(f"Warning: Root ID {root_id} was already processed as part of another thread. "
            #       "This may indicate data issues. Skipping.")
            continue

        current_thread_messages_list: List[MessageType] = []
        queue = collections.deque()
        
        if root_id not in messages_as_dicts: 
            print(f"Critical Warning: Root ID {root_id} identified but not found in message map. Skipping.")
            continue
        
        queue.append(root_id)
        visited_in_this_thread: Set[int] = set()

        while queue:
            current_id = queue.popleft()

            if current_id in visited_in_this_thread:
                # This can happen in non-tree structures (e.g. replies forming a DAG)
                # print(f"Info: Message ID {current_id} encountered again in thread for root {root_id}. Possible cycle or shared child.")
                continue
            visited_in_this_thread.add(current_id)

            if current_id in processed_messages_globally and current_id != root_id : # Allow root to be processed once
                 print(f"Warning: Message ID {current_id} (in thread of root {root_id}) was already assigned to another thread. "
                       "It will be included in this thread as well. This indicates a message is a child in multiple threads (complex DAG).")
            processed_messages_globally.add(current_id)


            message_data = messages_as_dicts.get(current_id)
            if message_data:
                current_thread_messages_list.append(message_data) # Store full message data for now
                
                children = sorted(adj_list.get(current_id, []))
                for child_id in children:
                    if child_id not in messages_as_dicts:
                        print(f"Warning: Message ID {child_id} (reply to {current_id}) not found in message data map. Skipping this child.")
                        continue
                    if child_id not in visited_in_this_thread: # Add to queue only if not visited in current thread traversal
                        queue.append(child_id)
            else:
                print(f"Critical Warning: Message ID {current_id} was queued but not found in messages_as_dicts. Data inconsistency.")

        if current_thread_messages_list and len(current_thread_messages_list) >= min_messages_per_thread:
            def sort_key_func(msg: MessageType) -> Tuple[Any, Any]:
                ts_val = msg.get('date_unixtime')
                id_val = msg.get('id')
                # Handle potential NaN in timestamps by sorting them last or first as desired
                # Here, NaN timestamps will sort after valid timestamps.
                ts = ts_val if pd.notna(ts_val) else float('inf') 
                mid = id_val if pd.notna(id_val) else float('inf')
                return (ts, mid)

            try:
                current_thread_messages_list.sort(key=sort_key_func)
            except TypeError as e:
                print(f"Warning: Could not sort messages for thread rooted at {root_id} due to "
                      f"type errors in sort keys ({e}). Messages may be in BFS discovery order or partially sorted.")
            
            threads_data.append({
                'root_message_id': root_id,
                'messages': current_thread_messages_list # Still full messages here
            })
        elif current_thread_messages_list:
            # Optionally log this, or just silently filter
            # print(f"Info: Thread rooted at {root_id} has {len(current_thread_messages_list)} messages, "
            #       f"which is less than min_messages_per_thread={min_messages_per_thread}. Skipping.")
            pass


    all_df_ids = set(df_processed['id'])
    # Messages that are roots of threads smaller than min_messages_per_thread will also be in orphaned_ids
    # if they were not part of any larger, processed thread.
    orphaned_ids = all_df_ids - processed_messages_globally
    if orphaned_ids:
        print(f"Info: {len(orphaned_ids)} message(s) were not part of any qualifying thread "
              f"(e.g., standalone messages, or roots of threads smaller than {min_messages_per_thread} messages). "
              f"First 10 such IDs: {list(orphaned_ids)[:10]}...")

    if not threads_data:
        return pd.DataFrame(columns=['root_message_id', 'messages_json'])

    result_df = pd.DataFrame(threads_data)
    
    if result_df.empty: # Check if DataFrame is empty after filtering by min_messages_per_thread
        return pd.DataFrame(columns=['root_message_id', 'messages_json'])
        
    try:
        tqdm.pandas(desc="Serializing threads to JSON with selected fields")
        
        def create_json_with_selected_fields(msg_list: List[MessageType]) -> str:
            selected_field_msg_list = []
            for msg_dict in msg_list:
                # Create a new dictionary with only the selected fields
                # .get(key) will return None if the key is missing, which is fine.
                filtered_msg = {key: msg_dict.get(key) for key in SELECTED_JSON_FIELDS}
                selected_field_msg_list.append(filtered_msg)
            return json.dumps(selected_field_msg_list, ensure_ascii=False, default=_default_json_serializer)

        result_df['messages_json'] = result_df['messages'].progress_apply(create_json_with_selected_fields)
    except Exception as e:
        raise ValueError(f"Failed to serialize messages to JSON. Error: {e}") from e
    
    return result_df[['root_message_id', 'messages_json']]


df_message_threads = group_messages_into_threads(df_cleaned)
print(df_message_threads.head())
if not df_message_threads.empty:
    print("\nExample JSON output for the first thread:")
    print(json.dumps(json.loads(df_message_threads.iloc[0]['messages_json']), indent=2))

In [None]:
import json
import pandas as pd

def group_messages_into_threads(df_input_messages: pd.DataFrame, time_threshold_seconds: int) -> pd.DataFrame:
    """
    Processes rows of messages, grouping them into logical threads based on a time heuristic.

    Args:
        df_input_messages: DataFrame with a 'messages_json' column. Each entry in
                           'messages_json' is a JSON string representing a list of
                           message objects. Each message object is expected to have 'id',
                           'date_unixtime', and optionally 'reply_to_message_id'.
        time_threshold_seconds: If a new row's first message is a "root" message
                                (no reply_to_message_id) and appears chronologically
                                after the last message of the current accumulated thread
                                and within this time threshold (in seconds),
                                it's considered a continuation of that thread.

    Returns:
        A DataFrame with columns "root_message_id" and "messages_json".
        "root_message_id" is the ID of the first message in the logical thread.
        "messages_json" is a JSON string of all messages in that logical thread.
        Returns an empty DataFrame with these columns if input is empty or no valid threads are formed.
    """
    
    processed_threads_data = [] 
    current_accumulated_messages = []

    output_columns = ["root_message_id", "messages_json"]

    if df_input_messages.empty:
        return pd.DataFrame(columns=output_columns)

    for _, row_series in df_input_messages.iterrows():
        json_string = row_series.get('messages_json')
        
        if not isinstance(json_string, str):
            # Skip row if 'messages_json' is not a string
            continue
            
        try:
            messages_in_row = json.loads(json_string)
        except json.JSONDecodeError:
            # Skip row if JSON is malformed
            continue

        if not isinstance(messages_in_row, list) or not messages_in_row:
            # Skip row if parsed JSON is not a list or is an empty list
            continue
        
        if not current_accumulated_messages:
            # Ensure the first message of a new accumulation is valid before extending
            if isinstance(messages_in_row[0], dict) and \
               'id' in messages_in_row[0] and \
               'date_unixtime' in messages_in_row[0]:
                current_accumulated_messages.extend(messages_in_row)
            else:
                # First row's data is invalid for starting a thread, skip
                continue
        else:
            try:
                # Ensure first message of current row and last of accumulated are valid dicts with keys
                if not (isinstance(messages_in_row[0], dict) and \
                        isinstance(current_accumulated_messages[-1], dict) and \
                        'date_unixtime' in messages_in_row[0] and \
                        'date_unixtime' in current_accumulated_messages[-1]):
                    raise ValueError("Invalid message structure for comparison")


                first_message_this_row = messages_in_row[0]
                last_message_previous_accumulation = current_accumulated_messages[-1]
                
                time_difference = first_message_this_row['date_unixtime'] - last_message_previous_accumulation['date_unixtime']
                is_starter_message = first_message_this_row.get("reply_to_message_id") is None
                
            except (KeyError, TypeError, IndexError, ValueError):
                # Data integrity issue. Finalize the previous thread (if valid) and start a new one.
                if current_accumulated_messages: # Finalize previous thread
                    # current_accumulated_messages[0] should be valid if it started a thread
                    if isinstance(current_accumulated_messages[0], dict) and 'id' in current_accumulated_messages[0]:
                        root_id = current_accumulated_messages[0]['id']
                        messages_json_str = json.dumps(current_accumulated_messages, ensure_ascii=False)
                        processed_threads_data.append({
                            "root_message_id": root_id,
                            "messages_json": messages_json_str
                        })
                
                # Attempt to start new thread with current row's messages, if valid
                if isinstance(messages_in_row[0], dict) and \
                   'id' in messages_in_row[0] and \
                   'date_unixtime' in messages_in_row[0]:
                    current_accumulated_messages = messages_in_row
                else:
                    current_accumulated_messages = [] # Reset, current row also invalid
                continue

            is_chronological_and_within_threshold = (0 <= time_difference < time_threshold_seconds)

            if is_starter_message and is_chronological_and_within_threshold:
                current_accumulated_messages.extend(messages_in_row)
            else:
                # Finalize the previous logical thread
                # current_accumulated_messages[0] is known to be valid from when it started accumulation
                if current_accumulated_messages:
                    root_id = current_accumulated_messages[0]['id'] 
                    messages_json_str = json.dumps(current_accumulated_messages, ensure_ascii=False)
                    processed_threads_data.append({
                        "root_message_id": root_id,
                        "messages_json": messages_json_str
                    })
                
                # Start new thread if current row's first message is valid
                if isinstance(messages_in_row[0], dict) and \
                   'id' in messages_in_row[0] and \
                   'date_unixtime' in messages_in_row[0]:
                    current_accumulated_messages = messages_in_row
                else:
                    current_accumulated_messages = [] # Reset, current row invalid to start new thread

    # Add the last accumulated thread
    if current_accumulated_messages:
        # current_accumulated_messages[0] is known to be valid if list is not empty
        if isinstance(current_accumulated_messages[0], dict) and 'id' in current_accumulated_messages[0]:
            root_id = current_accumulated_messages[0]['id']
            messages_json_str = json.dumps(current_accumulated_messages, ensure_ascii=False)
            processed_threads_data.append({
                "root_message_id": root_id,
                "messages_json": messages_json_str
            })

    return pd.DataFrame(processed_threads_data, columns=output_columns)

df_message_threads = group_messages_into_threads(df_message_threads, 2*60)

In [None]:
import ast
import json
import pandas as pd
import re

# Define question starters and greeting patterns
QUESTION_STARTERS_RU = [
    "кто", "что", "где", "когда", "как", "почему", "зачем", "сколько", "какой",
    "какая", "какое", "какие", "чей", "чья", "чьё", "чьи", "расскажите",
    "подскажите", "посоветуйте", "знает ли", "знаете ли", "кто-нибудь знает",
    "не подскажете", "не знаете ли", "можно ли", "есть ли", "верно ли",
    "правильно ли", "нужен ли", "нужна ли", "нужно ли", "нужны ли",
    "а где", "а как", "а кто", "а что", "а когда", "а сколько", "во сколько",
    "за сколько", "каков", "какова", "каково", "каковы"
]

GREETING_PATTERN_STR = r"^(?:привет\s*всем|всем\s*привет|привет|здравствуйте|добрый\s*(?:день|вечер|утро)|доброго\s*времени\s*суток|ребят(?:а)?|девочки|парни|друзья|господа|народ|пацаны|мужики|коллеги|товарищи|всем\s*доброго\s*вечера|всем\s*добрый)\s*[,!)\s]*"
GREETING_PATTERN = re.compile(GREETING_PATTERN_STR)

def is_meaningful_question(text_content: any) -> bool:
    """
    Checks if the given text content of a message is a meaningful question.
    Aims to identify genuine inquiries rather than simple greetings or statements.
    """
    if not isinstance(text_content, str):
        return False

    original_text_lower_stripped = text_content.lower().strip()
    
    # Filter out very short messages that are unlikely to be meaningful questions,
    # unless they are a direct question starter or a single interrogative word.
    if len(original_text_lower_stripped) < 3 and original_text_lower_stripped != "?":
        # Allow single-word question starters like "кто?", "как?"
        is_starter_word = any(original_text_lower_stripped.rstrip("?!.,") == qs for qs in QUESTION_STARTERS_RU)
        if not is_starter_word:
             return False

    text_after_greeting = GREETING_PATTERN.sub("", original_text_lower_stripped, count=1).strip()
    
    if not text_after_greeting: # If only greeting was present or became empty
        return False

    text_to_analyze_starters = text_after_greeting

    # Check 1: Starts with a question starter word/phrase
    for starter in QUESTION_STARTERS_RU:
        if text_to_analyze_starters.startswith(starter):
            # Avoid known non-question phrases like "как говорится"
            if text_to_analyze_starters.startswith("как говорится"):
                return False
            
            # Prioritize if it also ends with a question mark or is relatively short (more likely a direct question)
            if original_text_lower_stripped.endswith("?") or len(text_to_analyze_starters.split()) <= 7:
                 return True
            # If longer and no QM, but starts with a strong question starter, still consider it a question.
            # This heuristic can be adjusted based on desired strictness.
            # For this task, starting with a question word is a strong indicator.
            return True 
            
    # Check 2: Ends with a question mark and is not too short or a simple exclamation/interjection
    if original_text_lower_stripped.endswith("?"):
        words = original_text_lower_stripped.split()
        if not words: 
            return False

        # Avoid single interjections like "Ого?", "А?"
        if len(words) == 1:
            first_word_original_cleaned = words[0].rstrip('?!.,')
            # Allow single interrogative words like "Можно?", "Когда?"
            if first_word_original_cleaned in QUESTION_STARTERS_RU:
                return True
            # Allow other single words if they are reasonably long (e.g. "Помощь?")
            if len(first_word_original_cleaned) > 3 and first_word_original_cleaned not in ["ого", "а", "ну", "да", "нет", "ок"]:
                return True
            return False # Short, non-starter single words like "Эй?" are not meaningful questions

        # For multi-word messages ending in '?'
        first_word_original_cleaned = words[0].rstrip('?!.,')
        # Filter out interjections or very common non-question starters
        if first_word_original_cleaned not in ["ого", "ок", "да", "нет", "ладно", "ага", "ну", "привет", "здравствуйте"]:
             if len(original_text_lower_stripped) > 5: # Arbitrary length to avoid very short, ambiguous phrases
                return True
    return False

def filter_dataframe_for_meaningful_question_threads(df_input: pd.DataFrame) -> pd.DataFrame:
    """
    Filters the input DataFrame to keep only threads where the first message
    is identified as a meaningful question.
    """
    if df_input.empty:
        return df_input.copy()

    # Create a boolean mask for filtering
    rows_to_keep_mask = pd.Series([False] * len(df_input), index=df_input.index)
    
    for index, row in df_input.iterrows():
        try:
            # Load the thread messages from the JSON string
            thread_messages = json.loads(row['messages_json'])
        except json.JSONDecodeError:
            # Skip rows with malformed JSON
            continue

        if not thread_messages or not isinstance(thread_messages, list) or \
           not thread_messages[0] or not isinstance(thread_messages[0], dict):
            # Skip if thread is empty or first message is invalid
            continue
            
        # The first message in the list is considered the starting message of the thread
        starting_message = thread_messages[0]
        
        text_content = starting_message.get('text')
        
        if is_meaningful_question(text_content):
            rows_to_keep_mask.loc[index] = True
            
    return df_input[rows_to_keep_mask].copy()

df_question_threads = filter_dataframe_for_meaningful_question_threads(df_message_threads)

In [None]:
df_question_threads.iloc[0].messages_json

In [None]:
df_question_threads.to_csv("chat_threads.csv")