In [1]:
!pip install -qq tiktoken

In [2]:
import pandas as pd
import os
from pydrive.auth import GoogleAuth

from pydrive.drive import GoogleDrive

import json
import gdown
import re
import tiktoken

In [3]:
gauth = GoogleAuth()
gauth.LocalWebserverAuth()

drive = GoogleDrive(gauth)

Your browser has been opened to visit:

    https://accounts.google.com/o/oauth2/auth?client_id=911515843534-tovhamtddg2t9f43fk19k21t0kj7cnon.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A8080%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive&access_type=offline&response_type=code
Authentication successful.


In [4]:
!mkdir data

A subdirectory or file data already exists.


In [5]:
data = pd.read_csv('results_txt/results.csv')
data['flag'] = data['flag'].apply(lambda a: 'Speaker ' + a)
data['название'] = data['название'].apply(lambda s: re.sub(r'[.,"\'-?:!;]', '', s).
                                          strip().replace(" ", "_"))
data = data.drop(columns=['автор', 'берем', 'проблема', 'VIDEO'])

In [6]:
data.head()

Unnamed: 0,название,TEXT,flag
0,mental_health_counseling_conversations,https://drive.google.com/file/d/1D6mmyud1pL1v-...,Speaker A
1,mental_health_chatbot_dataset,https://drive.google.com/file/d/1605EyVkjyGqHD...,Speaker A
2,Открытая_консультация_психолога__хочу_отношени...,https://drive.google.com/open?id=1Fows9_H6R_k8...,Speaker B
3,Открытая_консультация_психолога__хочу_отношени...,https://drive.google.com/open?id=1yCcQqfBDxWpF...,Speaker A
4,Открытая_консультация_психолога__Алкоголизм__У...,https://drive.google.com/open?id=1q-FflfkGdHdK...,Speaker B


In [7]:
def download_data(data, output_dir='data'):
    '''
    The input dataframe must contain links to folders in Google Drive
    '''
    path_to_file = []

    for title, url in zip(data['название'], data['TEXT']):
        output = f'{output_dir}/{title}.txt'
        path_to_file.append(output)
        if not os.path.exists(output):
            gdown.download(url=url, output=output, quiet=True, 
                           fuzzy=True, use_cookies=True)

    data['path_to_file'] = path_to_file

    return data

In [8]:
data = download_data(data)
data.head()

Unnamed: 0,название,TEXT,flag,path_to_file
0,mental_health_counseling_conversations,https://drive.google.com/file/d/1D6mmyud1pL1v-...,Speaker A,data/mental_health_counseling_conversations.txt
1,mental_health_chatbot_dataset,https://drive.google.com/file/d/1605EyVkjyGqHD...,Speaker A,data/mental_health_chatbot_dataset.txt
2,Открытая_консультация_психолога__хочу_отношени...,https://drive.google.com/open?id=1Fows9_H6R_k8...,Speaker B,data/Открытая_консультация_психолога__хочу_отн...
3,Открытая_консультация_психолога__хочу_отношени...,https://drive.google.com/open?id=1yCcQqfBDxWpF...,Speaker A,data/Открытая_консультация_психолога__хочу_отн...
4,Открытая_консультация_психолога__Алкоголизм__У...,https://drive.google.com/open?id=1q-FflfkGdHdK...,Speaker B,data/Открытая_консультация_психолога__Алкоголи...


In [9]:
def clean(string: str) -> str:
    while '(' in string or ')' in string:
        left = string.index(' (') if '(' in string else 0
        right = string.index(')') if ')' in string else -2
        string = string[:left] + string[right + 1:]

    while '[' in string:
        left = string.index('[')
        right = string.index(']')
        string = string[:left] + string[left + 1: right] + string[right + 1:]

    return string


print(clean("Who said this [to you] (ph)?"))

Who said this to you?


In [10]:
def to_speakers(text: str):
    f = False
    text = text.split('\n')
    output = ''

    for line in text:
        if not f:
            if line.startswith('BEGIN TRANSCRIPT'):
                f = 1
            continue
        if line.startswith('END TRANSCRIPT'):
            break

        if line.startswith('CLIENT'):
            output += 'Speaker A' + clean(line[len('CLIENT'):]) + '\n'

        elif line.startswith('THERAPIST'):
            output += 'Speaker B' + clean(line[len('THERAPIST'):]) + '\n'

        else:
            output += clean(line) + '\n'

    return (output, 'Speaker A')

In [11]:
def convert_to_prompt_completion_pairs(text: str, flag=None) -> list[dict]:
    '''
    Convert pairs from dialogue to prompt completion pairs. 
    If your file is different from the Speaker A, Speaker B format, then do not pass anything to the flag variable
    :param flag: {Speaker A, Speaker B, None}
    :return: prompt completion pairs
    '''
    pairs = []
    lines = text.strip().replace('\xa0', '').split('\n')

    if flag == "Speaker A":
        completion = current_prompt = ''
        for line in lines:
            if line.startswith("Speaker A"):
                if completion and current_prompt:
                    pairs.append({"prompt": current_prompt, "completion": completion})
                current_prompt = line[len("Speaker A:") + 1:].strip()
                completion = ''
                
            elif line.startswith("Speaker B:"):
                if current_prompt:
                    completion = line[len("Speaker B:") + 1:].strip()

            elif line == '\n':
                continue

            else:
                if completion:
                    completion += '\n' + line.strip()
                else:
                    current_prompt += line.strip()

    elif flag == "Speaker B":
        completion = current_prompt = ''
        for line in lines:
            if line.startswith("Speaker B:"):
                if completion and current_prompt:
                    pairs.append({"prompt": current_prompt, "completion": completion})
                current_prompt = line[len("Speaker A:") + 1:].strip()

            elif line.startswith("Speaker A:"):
                if current_prompt:
                    completion = line[len("Speaker A:") + 1:].strip()
            
            elif line == '\n':
                continue

            else:
                if completion:
                    completion += '\n' + line.strip()
                else:
                    current_prompt += line.strip()

    else:
        return convert_to_prompt_completion_pairs(*to_speakers(text))

    return pairs

In [12]:
def get_pairs(data):
    pairs = []
    for file_name, flag in zip(data['path_to_file'], data['flag']):
        with open(file_name, 'r', encoding='utf8', errors='ignore') as file:
            try:
                text = file.read()
                pairs.append(convert_to_prompt_completion_pairs(text, flag))
            except UnicodeDecodeError:
                print(file_name)
            
    return pairs

In [13]:
pairs = get_pairs(data=data)
print(pairs[0][0])

{'prompt': 'How can I get to a place where I can be content from day to day?', 'completion': "It's important to take a look inside and see what's going on with you to cause you to have these feelings. Please contact us in whatever way is most comfortable for you and we can get you set up with someone who will help you figure out this space in your life."}


In [14]:
def num_tokens_from_string(input_element, encoding_name: str = 'p50k_base') -> int:
    '''Returns the number of tokens in a text string.'''
    encoding = tiktoken.get_encoding(encoding_name)
    num_tokens = len(encoding.encode(input_element))
    return num_tokens

In [15]:
def convert_to_json(pairs, output_file='prompt_completion.jsonl'):
    with open(output_file, 'w', encoding='utf8') as output_file:
        for file_pairs in pairs:
            for pair in file_pairs:
                if num_tokens_from_string(pair['prompt']) < 2048 and \
                        num_tokens_from_string(pair['completion']) < 2048:
                    json.dump({'prompt': pair['prompt'] + ' ->',
                               'completion': ' ' + pair['completion'] + '\n'},
                              output_file)
                    output_file.write('\n')


convert_to_json(pairs, 'prompt_completion.jsonl')

In [16]:
# Initialize the tokenizer with the Davinci-003 model's maximum token limit (4096 tokens)
encoding = tiktoken.encoding_for_model("text-davinci-003")
# Your text to count tokens in
text = "Hello, World!"

# Tokenize the text and count tokens
token_count = len(encoding.encode(text))

print(f'Tokenizer: {encoding}')
print(f"Token count: {token_count}")
print(encoding.encode(text))

Tokenizer: <Encoding 'p50k_base'>
Token count: 4
[15496, 11, 2159, 0]


In [17]:
def num_tokens_from_json(file_name: str, encoding_name: str = 'p50k_base') -> int:
    num_tokens = 0
    encoding = tiktoken.get_encoding(encoding_name)
    full_text = ''
    with open(file_name, 'r', encoding='utf8') as f:
        for pair in f:
            pair = json.loads(pair)
            full_text += pair['prompt'] + ' ' + pair['completion'] + ' '
        num_tokens += len(encoding.encode(full_text.strip()))

    return num_tokens

In [18]:
num_tokens_from_json('prompt_completion.jsonl')

242677