In [1]:
import os
import random
import json
import datetime
from typing import List, Dict, Any, Iterator
from IPython.display import JSON

source_root = 'source_conversations'
target_root = 'training_data'

In [2]:
user_name           = 'sample_subject'   # Required, name of the user in conversations
test_partner_name   = 'sample_partner'   # Required if you only want conversation with one person

In [3]:
####################################
#           HELPER Functions
####################################

def ms_to_date(timestamp_ms: int) -> datetime.datetime:
    """
    Converts a timestamp in milliseconds to a datetime object.

    :param timestamp_ms: Timestamp in milliseconds.
    :return: Datetime object representing the given timestamp.
    """
    return datetime.datetime.fromtimestamp(timestamp_ms / 1000.0)

def fix_text(text: str) -> str:
    """
    Fixes encoding issues in the given text.

    :param text: Text string potentially with encoding issues.
    :return: Text string with fixed encoding.
    """
    return text.encode('latin1').decode('utf8')

def show_message(message: Dict[str, any]) -> None:
    """
    Prints the details of a message, including sender, timestamp, and content.

    :param message: Dictionary containing message details.
    """
    print(f"{message['sender_name']} ---> {ms_to_date(message['timestamp_ms'])}")
    print(get_content(message))
    print()

def get_content(message: Dict[str, any]) -> str:
    """
    Extracts the content from a message dictionary, handling encoding issues.

    :param message: Message dictionary.
    :return: The content of the message or a placeholder for non-text content.
    """
    try:
        content = message['content']
        try:
            content = fix_text(content)
        except Exception:
            pass
        return content
    except KeyError:
        return '[gif]'

In [4]:
def split2conversations(messages: List[Dict[str, Any]], limit: int = 60, reverse: bool = True) -> List[List[Dict[str, Any]]]:
    """
    Splits a list of messages into conversations based on a time limit between messages.
    Messages are considered part of the same conversation if the time gap is less than the limit.

    :param messages: List of message dictionaries, each containing a 'timestamp_ms' key.
    :param limit: Time limit in minutes to consider messages as part of the same conversation.
    :param reverse: Whether to reverse the order of messages before processing.
    :return: List of conversations, each a list of message dictionaries.
    """
    limit *= 60 * 1000  # Convert minutes to milliseconds

    if reverse:
        messages = messages[::-1]

    conversations = []
    conversation = []

    for message in messages:
        if not conversation or abs(message['timestamp_ms'] - conversation[-1]['timestamp_ms']) < limit:
            conversation.append(message)
        else:
            conversations.append(conversation)
            conversation = [message]

    if conversation:
        conversations.append(conversation)

    return conversations


def merge_messages(conversation: List[Dict[str, Any]]) -> str:
    """
    Merges messages in a conversation with the same sender into single entries.
    Outputs the merged conversation as a JSON formatted string.

    :param conversation: List of message dictionaries with 'sender_name', 'timestamp_ms', and 'content'.
    :return: JSON formatted string representing the merged conversation.
    """
    merged_messages = []
    last_sender = None
    temp = None

    for message in conversation:
        sender = message.get('sender_name')
        content = get_content(message)

        if sender == last_sender:
            temp['content'] += content + "\n"
        else:
            if temp is not None:
                merged_messages.append(temp)
            temp = {
                'sender_name': sender,
                'timestamp_ms': message.get('timestamp_ms'),
                'content': content + "\n"
            }
        last_sender = sender

    if temp:
        merged_messages.append(temp)

    return merged_messages

In [5]:
# LOAD CONVERSATIONS WITH TEST USER
f = open(os.path.join(source_root,user_name,'inbox', test_partner_name, 'message_1.json'), 'r')
data = json.load(f)['messages']
conversations = split2conversations(data)


# LOAD ALL CONVERSATIONS
# conversations = []
# for partner in os.listdir(os.path.join(source_root,user_name,'inbox')):
#     f = open(os.path.join(source_root,user_name,'inbox', partner, 'message_1.json'), 'r')
#     data = json.load(f)['messages']
#     conversations+= split2conversations(data)


len(conversations) 


2

In [6]:
for m in conversations[0]:
    show_message(m)

sample_user ---> 2021-12-02 17:36:40
Hello!

sample_user ---> 2021-12-02 17:53:20
What's up man?

sample_user ---> 2021-12-02 18:10:00
:)

sample_subject ---> 2021-12-02 18:26:40
All good here, how about you?



In [7]:
merged_conversations = [merge_messages(conversation) for conversation in conversations]

In [8]:
for i in range(len(conversations)):
    for m in merged_conversations[i]:
        show_message(m)
    print("-"*100)

sample_user ---> 2021-12-02 17:36:40
Hello!
What's up man?
:)


sample_subject ---> 2021-12-02 18:26:40
All good here, how about you?


----------------------------------------------------------------------------------------------------
sample_partner ---> 2021-12-14 08:13:20
Hi again


sample_subject ---> 2021-12-14 08:30:00
Still good


----------------------------------------------------------------------------------------------------


In [9]:
def conversation_to_prompt(conversation: List[Dict[str, any]], me: str) -> Iterator[str]:
    """
    Converts a conversation into a series of prompt dictionaries in JSON format.

    :param conversation: List of messages from the 'merge_messages' function output.
    :param me: Identifier for the 'Assistant' in the conversation.
    :return: Iterator of JSON-formatted prompt dictionaries.
    """
    prompt_dict = []
    for message in conversation:
        role = 'assistant' if message['sender_name'] == me else 'user'
        prompt_dict.append({'role': role, 'content': get_content(message).strip()})

        if role == 'assistant' and len(prompt_dict) > 1:
            yield {"messages": prompt_dict}
            prompt_dict = []

def collect_prompts(merged_conversations: List[List[Dict[str, any]]], me: str) -> List[Dict[str, List[Dict[str, any]]]]:
    """
    Collects all prompt dictionaries from multiple conversations, each as a separate dictionary object.

    :param merged_conversations: List of conversations, each a list of merged message dictionaries.
    :param me: Identifier for the 'Assistant' in the conversations.
    :return: A list of dictionaries, each representing the prompt dictionaries from a conversation.
    """
    all_prompts = []

    for conversation in merged_conversations:
        conversation_prompts = []
        for prompt in conversation_to_prompt(conversation, me):
            conversation_prompts.extend(prompt["messages"])
        if conversation_prompts:
            all_prompts.append({"messages": conversation_prompts})

    return all_prompts



In [10]:
training_data = collect_prompts(merged_conversations, me=user_name)

In [11]:
JSON(training_data[0])

<IPython.core.display.JSON object>

In [12]:
file_path = os.path.join(target_root, f"{user_name}.jsonl")
with open(file_path, 'w', encoding='utf-8') as file:
    for conversation in training_data:
        print(json.dumps(conversation), file=file)