In [1]:
from typing import Optional, Sequence, List, Dict, Any, Type, Union
import os
import json
from copy import deepcopy
import random


def convert_to_list(x: Union[Any, Sequence[Any]]) -> List[Any]:
    if isinstance(x, (list, tuple)):
        return list(x)
    else:
        return [x]

def load_json(path: Union[str, List[str]]) -> Union[dict, List[dict]]:
    paths = convert_to_list(path)
    
    data = None
    for path in paths:
        if not os.path.exists(path):
            raise ValueError(f"{path} does not exist")
        
        with open(path, "r", encoding="utf-8") as f:
            json_data = json.load(f)
            if isinstance(json_data, dict):
                if data is None:
                    data = json_data
                else:
                    assert isinstance(data, dict), f"Each previous json file contains a list of json dicts, while {path} contains only a json dict"
                    data.update(json_data)
            elif isinstance(json_data, list):
                if data is None:
                    data = json_data
                else:
                    assert isinstance(data, list), f"Each previous json file contains a json dict, while {path} contains only a list of json dicts"
                    data.extend(json_data)
            else:
                raise ValueError(f"{path} is not a valid json file")
            
    return data

def load_jsonl(path: Union[str, List[str]]) -> List[dict]:
    paths = convert_to_list(path)
    
    data = []
    for path in paths:
        if not os.path.exists(path):
            raise ValueError(f"{path} does not exist")
        
        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                data.append(json.loads(line))
    
    return data

def save_json(data: Union[dict, List[dict]], path: str) -> None:
    if not path.endswith(".json"):
        raise ValueError(f"{path} is not a json file")
    if not os.path.exists(os.path.dirname(path)):
        os.makedirs(os.path.dirname(path))
        
    with open(path, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=4)

def save_jsonl(data: List[dict], path: str) -> None:
    if not path.endswith(".jsonl"):
        raise ValueError(f"{path} is not a jsonl file")
    if not os.path.exists(os.path.dirname(path)):
        os.makedirs(os.path.dirname(path))
        
    with open(path, "w", encoding="utf-8") as f:
        for line in data:
            f.write(json.dumps(line, ensure_ascii=False) + "\n")


In [2]:
train_data_path = "./original/claude_multi_instruct_30k.json"
test_data_path = "./original/claude_multi_instruct_1k.json"

In [3]:
train_data = load_json(train_data_path)
test_data = load_json(test_data_path)

In [4]:
len(train_data), len(test_data), test_data[0].keys(), train_data[0].keys()

(32170,
 1020,
 dict_keys(['instruction', 'output']),
 dict_keys(['instruction', 'output']))

In [5]:
print(f"Instruction: {test_data[2]['instruction']}")
print("\n")
print(f"Output: {test_data[2]['output']}")

Instruction: Compose a comprehensive summary explaining the main principles behind gravity, how it works, and its role in shaping the known universe. Include the following:

- A definition of gravity in simple terms and its relationship to mass and acceleration. 
- An overview of Isaac Newton's theory of universal gravitation and how it explains the attraction between objects with mass. Discuss both Newton's law of gravitation and universal law of gravitation.
- Albert Einstein's theory of general relativity and how it revolutionized the understanding of gravity as the curvature of spacetime caused by massive objects. Compare and contrast it with Newton's theory.    
- The latest scientific theories and research regarding the nature of gravity, including gravitational waves, extra dimensions, quantum gravity, and efforts to unite gravity with other fundamental forces.
- Examples of gravity in action at different scales, from apples falling on Earth to gravitational lensing by black hol

In [6]:
print(len(l:=test_data[2]['instruction'].split()), l)

166 ['Compose', 'a', 'comprehensive', 'summary', 'explaining', 'the', 'main', 'principles', 'behind', 'gravity,', 'how', 'it', 'works,', 'and', 'its', 'role', 'in', 'shaping', 'the', 'known', 'universe.', 'Include', 'the', 'following:', '-', 'A', 'definition', 'of', 'gravity', 'in', 'simple', 'terms', 'and', 'its', 'relationship', 'to', 'mass', 'and', 'acceleration.', '-', 'An', 'overview', 'of', 'Isaac', "Newton's", 'theory', 'of', 'universal', 'gravitation', 'and', 'how', 'it', 'explains', 'the', 'attraction', 'between', 'objects', 'with', 'mass.', 'Discuss', 'both', "Newton's", 'law', 'of', 'gravitation', 'and', 'universal', 'law', 'of', 'gravitation.', '-', 'Albert', "Einstein's", 'theory', 'of', 'general', 'relativity', 'and', 'how', 'it', 'revolutionized', 'the', 'understanding', 'of', 'gravity', 'as', 'the', 'curvature', 'of', 'spacetime', 'caused', 'by', 'massive', 'objects.', 'Compare', 'and', 'contrast', 'it', 'with', "Newton's", 'theory.', '-', 'The', 'latest', 'scientific',

In [7]:
question_key = "question"
answer_key = "answer"

In [8]:
test_data_ = []
test_max_seq_len = int(500 * 0.75)
test_max_num_samples = 300

for test_sample in test_data:
    if len(test_sample["instruction"].split() + test_sample["output"].split()) > test_max_seq_len:
        continue
    test_data_.append({
        question_key: test_sample["instruction"],
        answer_key: test_sample["output"],
    })
print(len(test_data_))

if len(test_data_) > test_max_num_samples:
    random.seed(42)
    test_data_ = random.sample(test_data_, test_max_num_samples)

len(test_data_)

303


300

In [9]:
print(f"Question: {test_data_[0][question_key]}")
print("\n")
print(f"Answer: {test_data_[0][answer_key]}")

Question: Compose a one-minute summary of an interesting historical trivia fact that covers the following aspects in a concise and comprehensive manner: who was involved, when and where it happened, what happened, why it is considered trivial yet fascinating, and how it shaped our modern world today. Ensure all conveyed facts are accurate and properly sourced, with no irrelevant or exaggerated details included.


Answer: In 1718, British naval officer James Puckle invented the world's first rapid-fire machine gun, called the Puckle gun. He designed it to fire rectangular bullets at Catholic enemies and round bullets at Protestant allies. Though never widely used, the Puckle gun demonstrated that rapid-fire weapons were technologically feasible over 200 years before Gatling's popularization of the concept, showcasing Puckle's inventiveness and the bizarre mindset of tribal religious conflicts at the time. His ideas foreshadowed how rapid advances in weapon technology would characterize 

In [10]:
len(test_data_[0][question_key].split() + test_data_[0][answer_key].split())

158

In [11]:
train_data_ = []
train_max_seq_len = int(400 * 0.75)
train_max_num_samples = 10000
eval_ratio = 0.02


for train_sample in train_data:
    if len(train_sample["instruction"].split() + train_sample["output"].split()) > train_max_seq_len:
        continue
    train_data_.append({
        question_key: train_sample["instruction"],
        answer_key: train_sample["output"],
    })
print(len(train_data_))

if len(train_data_) > train_max_num_samples:
    random.seed(42)
    train_data_ = random.sample(train_data_, train_max_num_samples)

num_train_samples = len(train_data_)
num_eval_samples = int(num_train_samples * eval_ratio)

train_data_ = train_data_[:-num_eval_samples]
eval_data_ = train_data_[-num_eval_samples:]

len(train_data_), len(eval_data_)

13606


(9800, 200)

In [12]:
print(f"Question: {train_data_[0][question_key]}")
print("\n")
print(f"Answer: {train_data_[0][answer_key]}")

Question: Develop an algorithm in pseudocode to perform recursive sorting  of a list containing integer values. Include comments explaining each step of the algorithm.


Answer: recursiveSort(list)
  // Base case - if list has one element or less, return the list 
  if length(list) <= 1:  
    return list

  // Otherwise, remove the first element to be the pivot  
  pivot  list[0]  
  list.remove(pivot)  
  
  // Initialize left and right lists   
  left = []  
  right = []  
 
  // Iterate through remaining list elements  
  for each element in list:
  
    // If less than pivot, add to left list 
    if element < pivot:   
      left.append(element)

    // Otherwise, add to right list
    else:  
      right.append(element)   

  // Sort left and right recursively   
  left = recursiveSort(left)
  right = recursiveSort(right)

  // Concatenate sorted left, pivot, and right     
  return left + [pivot] + right

In summary, this algorithm recursively calls itself to split the list int

In [13]:
len(train_data_[0][question_key].split() + train_data_[0][answer_key].split())

191

In [14]:
test_save_path = "./qa_test.jsonl"
train_save_path = "./qa_train.jsonl"
eval_save_path = "./qa_eval.jsonl"

In [None]:
save_jsonl(test_data_, test_save_path) # 0.3k, token length < 500
save_jsonl(train_data_, train_save_path) # 10k - 0.2k, token length < 400
save_jsonl(eval_data_, eval_save_path) # 0.2k, token length < 400

In [1]:
from typing import Optional, Sequence, List, Dict, Any, Type, Union
import os
import json
import re
import shutil

from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps

import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
def convert_to_list(x: Union[Any, Sequence[Any]]) -> List[Any]:
    if isinstance(x, (list, tuple)):
        return list(x)
    else:
        return [x]

def check_valid_path(
    path: str,
    ext: Optional[str] = None,
    should_exist: bool = True,
    is_dir: bool = False,
    create_if_not_exist: bool = False,
    empty_if_exist: bool = False,
) -> None:
    if should_exist and not os.path.exists(path):
        raise ValueError(f"{path} does not exist")
        
    if is_dir:
        if os.path.exists(path) and not os.path.isdir(path):
            raise ValueError(f"{path} is not a directory")
        
        if create_if_not_exist and not os.path.exists(path):
            os.makedirs(path, exist_ok=True)
        
        if empty_if_exist and os.path.exists(path):
            shutil.rmtree(path)
            os.makedirs(path, exist_ok=True)
    else:
        if os.path.exists(path) and not os.path.isfile(path):
            raise ValueError(f"{path} is not a file")
        
        if ext is not None and not path.endswith(f".{ext}"):
            raise ValueError(f"{path} is not a {ext} file")
        
        if create_if_not_exist and not os.path.exists(path):
            os.makedirs(os.path.dirname(path))
            with open(path, "w") as _: pass
        
        if empty_if_exist and os.path.exists(path):
            os.remove(path)

In [3]:
def multithreaded(max_workers=5):
    """Multithread Decorator
    
    NOTE: this decorator assumes that: 
        1. the iterable arguments are ONLY in the *args, thus **kwargs are always the non-iterable shared ones
        2. there's NO mutable argument that requires to be modified in-place, i.e. all of them are read-only
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            iterable_args = []
            non_iterable_args = []
            
            for arg in args:
                if isinstance(arg, (list, tuple, set)):
                    
                    iterable_args.append(arg)
                else:
                    non_iterable_args.append(arg)
            
            iterable_args = zip(*iterable_args)
            
            results = []
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                if iterable_args:
                    future_to_item = [
                        executor.submit(func, *(list(items) + non_iterable_args), **kwargs)
                        for items in iterable_args
                    ]
                    
                    for i, future in enumerate(as_completed(future_to_item)):
                        try:
                            result = future.result()
                        except Exception as exc:
                            print(f'The {i}-th result generated an exception: {exc}')
                        else:
                            results.append(result)
                else:
                    results.append(func(*args, **kwargs))
            
            return results
        return wrapper
    return decorator

In [4]:
def load_jsonl(path: Union[str, List[str]]) -> List[dict]:
    paths = convert_to_list(path)
    
    data = []
    for path in paths:
        check_valid_path(path, ext="jsonl")
        with open(path, "r", encoding="utf-8") as f:
            lines = f.readlines()
            data.extend(json.loads(line) for line in lines)
    
    return data

In [5]:
def load_jsonl_mt(
    path: Union[str, List[str]],
    max_workers: int = 5,
) -> List[dict]:
    paths = convert_to_list(path)
    
    @multithreaded(max_workers=max_workers)
    def _load_line(line: str) -> Union[list, dict]:
        return json.loads(line)
    
    data = []
    for path in paths:
        check_valid_path(path, ext="jsonl")
        with open(path, "r", encoding="utf-8") as f:
            lines = f.readlines()
            # data.extend(json.loads(line) for line in lines)
            data.extend(_load_line(lines)) # multi-thread speed-up
    
    return data

In [6]:
import time
path = "./qa_train.jsonl"

In [10]:
start_time = time.time()

load_jsonl(path)

end_time = time.time()

print("Time taken:", end_time - start_time, "seconds")

Time taken: 0.053804636001586914 seconds


In [9]:
start_time = time.time()

load_jsonl_mt(path)

end_time = time.time()

print("Time taken:", end_time - start_time, "seconds")

Time taken: 0.32451462745666504 seconds
