In [15]:
AB_KEY_HEX = "532B4631E4A7B9473E7CFB"
def decrypt_data(data: bytes, key_hex: str) -> bytes:
    """
    Decrypts data using a cyclic XOR with the provided key.
    Used for Asset Bundles (ABKey).
    """
    key = binascii.unhexlify(key_hex)
    key_len = len(key)
    
    # Fast XOR using bytearray
    output = bytearray(data)
    for i in range(len(output)):
        output[i] ^= key[i % key_len]
    return bytes(output)

In [29]:
import os
import binascii
import struct
import apsw
import UnityPy

# --- Constants from Config.json ---
AB_KEY_TEXT = "532B4631E4A7B9473E7CFB"  #
DB_BASE_KEY_HEX = "F170CEA4DFCEA3E1A5D8C70BD1000000" #
DB_KEY_JP_HEX = "6D5B65336336632554712D73505363386D34377B356370233734532973433633" #

def get_decrypted_meta_connection(db_path):
    """
    Connects to the encrypted meta database using apsw and ChaCha20.
    Provided by user.
    """
    if not os.path.exists(db_path):
        raise FileNotFoundError(f"Meta file not found: {db_path}")

    # 1. Generate Key
    base_key = binascii.unhexlify(DB_BASE_KEY_HEX)
    raw_key = binascii.unhexlify(DB_KEY_JP_HEX)
    key_list = list(raw_key)
    
    # XOR Logic: key[i] ^ baseKey[i % 13]
    # Note: 13 is likely len(base_key) if logic assumes that, but based on your snippet:
    for i in range(len(key_list)):
        key_list[i] = key_list[i] ^ base_key[i % len(base_key)] # Fixed modulus to be dynamic or strictly 13 if hardcoded
        
    final_key_hex = binascii.hexlify(bytes(key_list)).decode('utf-8')

    # 2. Connect
    conn = apsw.Connection(db_path)
    cursor = conn.cursor()
    
    # 3. Apply Encryption Config (ChaCha20)
    cursor.execute("PRAGMA cipher='chacha20'")
    cursor.execute(f"PRAGMA hexkey='{final_key_hex}'")
    cursor.execute("PRAGMA cipher_use_hmac=OFF")
    
    return cursor

def get_asset_key_from_db(cursor, asset_name):
    """
    Queries table 'a' for the asset 'e' (Key) using asset 'n' (Name).
    """
    # Usually column 'n' is the name/path (e.g., "storytimeline_041097001")
    # and column 'e' is the key (long integer).
    try:
        # We assume asset_name matches the 'n' column exactly. 
        # Sometimes it might need the full path "dat/..." or similar.
        # Trying exact match first.
        cursor.execute("SELECT e FROM a WHERE n = ?", (asset_name,))
        row = cursor.fetchone()
        
        if row:
            return row[0]
        else:
            # Fallback: Try searching as a substring if exact match fails
            cursor.execute("SELECT n, e FROM a WHERE n LIKE ?", (f"%{asset_name}%",))
            row = cursor.fetchone()
            if row:
                print(f"Found partial match: {row[0]}")
                return row[1]
            return None
            
    except Exception as e:
        print(f"Database query error: {e}")
        return None

def generate_fkey(file_key_long):
    """
    Ports the FKey generation logic from UmaDatabaseEntry.cs
    """
    # 1. Base Keys from Config
    base_keys = binascii.unhexlify(AB_KEY_TEXT)
    base_len = len(base_keys)
    
    # 2. Key Bytes (Little Endian from the DB long)
    # C#: BitConverter.GetBytes(Key)
    key_bytes = struct.pack('<q', file_key_long)
    
    # 3. Generate fKey (base_len * 8 bytes)
    f_key_size = base_len * 8
    f_key = bytearray(f_key_size)
    
    for i in range(base_len):
        b = base_keys[i]
        base_offset = i << 3 # i * 8
        for j in range(8):
            # C#: keys[baseOffset + j] = (byte)(b ^ keyBytes[j]);
            f_key[base_offset + j] = b ^ key_bytes[j]
            
    return bytes(f_key)

def decrypt_uma_data(data, f_key):
    """
    Decrypts the data using Cyclic XOR with the generated fKey.
    """
    f_key_len = len(f_key)
    decrypted = bytearray(len(data))
    
    # Simple cyclic XOR
    for i in range(len(data)):
        decrypted[i] = data[i] ^ f_key[i % f_key_len]
        
    return bytes(decrypted)

# ==========================================
# Main Execution
# ==========================================

# Paths
meta_db_path = meta_file_path # Path to your 'meta' file
asset_path = "storytimeline_041097001"
asset_name = "storytimeline_041097001" # This name is queried in the DB

try:
    # 1. Get the Key (long) from the DB
    print(f"Connecting to DB: {meta_db_path}...")
    db_cursor = get_decrypted_meta_connection(meta_db_path)
    
    print(f"Querying key for: {asset_name}...")
    asset_key_long = get_asset_key_from_db(db_cursor, asset_name)
    
    if asset_key_long is None:
        raise ValueError(f"Could not find key for {asset_name} in database table 'a'.")
        
    print(f"Found Key (e): {asset_key_long}")

    # 2. Generate the fKey
    f_key = generate_fkey(asset_key_long)
    print(f"Generated fKey (first 10 bytes): {f_key[:10].hex()}...")

    # 3. Read and Decrypt File
    if not os.path.exists(asset_path):
        raise FileNotFoundError(f"Asset file not found: {asset_path}")

    with open(asset_path, "rb") as f:
        encrypted_data = f.read()

    print(f"Decrypting {len(encrypted_data)} bytes...")
    decrypted_data = decrypt_uma_data(encrypted_data, f_key)

    # 4. Load into UnityPy
    print("Loading into UnityPy...")
    env = UnityPy.load(decrypted_data)
    
    # Success Check
    print(f"Success! Loaded {len(env.objects)} objects.")
    for obj in env.objects:
        if obj.type.name == "MonoBehaviour":
             # Just printing one to verify
             print(f" - Found MonoBehaviour: {obj.read().name}")
             break

except Exception as e:
    print(f"\nCRITICAL ERROR: {e}")
    import traceback
    traceback.print_exc()

Connecting to DB: C:\Users\Matt\Documents\Games\Umamusume\umamusume_Data\Persistent\meta...
Querying key for: storytimeline_041097001...
Database query error: file is not a database

CRITICAL ERROR: Could not find key for storytimeline_041097001 in database table 'a'.


Traceback (most recent call last):
  File "C:\Users\Matt\AppData\Local\Temp\ipykernel_28488\1351306231.py", line 127, in <module>
    raise ValueError(f"Could not find key for {asset_name} in database table 'a'.")
ValueError: Could not find key for storytimeline_041097001 in database table 'a'.


In [17]:
text_objects = []
for obj in env.objects:
    obj_read = obj.read()
    if hasattr(obj_read, 'Text'):
        text_objects.append(obj_read)

In [18]:
text_objects

[]

In [4]:
target_columns = ['Name','Text','VoiceSheetId','CharaId','CueId']

In [5]:
num_blocks = len(text_objects)
# last block has NextBlock of -1
# first block has NextBlock of 1
last_block_block_num = max([x.NextBlock for x in text_objects])

ValueError: max() iterable argument is empty

In [205]:
t_processed_dict = {}
for t in text_objects:
    t_processed = {}
    t_processed['RubyInfo'] = []
    if t.NextBlock == -1:
        t_processed['BlockNumber'] = last_block_block_num
    else:
        t_processed['BlockNumber'] = t.NextBlock - 1
    for c in target_columns:
        t_processed[c] = getattr(t, c)
    t_processed_dict[t_processed['BlockNumber']] = t_processed

In [206]:
t_processed_list = []
for i in range(num_blocks):
    t_processed_list.append(t_processed_dict[i])

In [207]:
ruby_path = "ast_ruby_041097001"
ruby_name = "ast_ruby_041097001"

In [208]:
env_ruby = UnityPy.load(ruby_path)

In [209]:
ruby_obj = None
for obj in env_ruby.objects:
    if obj.read().m_Name == ruby_name:
        print("Ruby found")
        ruby_obj = obj.read()
        break

Ruby found


In [210]:
ruby_object_columns = ['CharX','CharY','RubyText']

In [211]:
for da in ruby_obj.DataArray:
    ruby_info_list = []
    for rbl in da.RubyDataList:
        ruby_info_obj = {}
        for roc in ruby_object_columns:
            ruby_info_obj[roc] = getattr(rbl, roc)
        ruby_info_list.append(ruby_info_obj)
    t_processed_list[da.BlockIndex]['RubyInfo'] = ruby_info_list

In [212]:
t_processed_list

[{'RubyInfo': [],
  'BlockNumber': 0,
  'Name': '',
  'Text': '',
  'VoiceSheetId': '',
  'CharaId': -1,
  'CueId': -1},
 {'RubyInfo': [],
  'BlockNumber': 1,
  'Name': '',
  'Text': '――――――――',
  'VoiceSheetId': '041097001',
  'CharaId': 1097,
  'CueId': 0},
 {'RubyInfo': [],
  'BlockNumber': 2,
  'Name': '',
  'Text': '混濁する意識の中で、いつも……いつも、同じ問いが浮かぶ。\n“もしあの時、この結末を知っていたら……どうしただろうか？”',
  'VoiceSheetId': '',
  'CharaId': 90001,
  'CueId': -1},
 {'RubyInfo': [],
  'BlockNumber': 3,
  'Name': '',
  'Text': '…………考える。それが徒労に終わることを知りながら。\nなぜなら……答えもまた、いつも同じだからだ。',
  'VoiceSheetId': '',
  'CharaId': 90001,
  'CueId': -1},
 {'RubyInfo': [],
  'BlockNumber': 4,
  'Name': '',
  'Text': '――――――――',
  'VoiceSheetId': '041097001',
  'CharaId': 1097,
  'CueId': 1},
 {'RubyInfo': [],
  'BlockNumber': 5,
  'Name': '',
  'Text': 'そうして再び……意識が遠のいていく。全てが曖昧になる中でも……\nはっきりと、感じる。逃れがたいそれが、今も……まだ……――',
  'VoiceSheetId': '',
  'CharaId': 90001,
  'CueId': -1},
 {'RubyInfo': [],
  'BlockNumber': 6,
  'Name': '',
  'Te

In [105]:
import acb

In [188]:
acb_base_path = "C:\\Users\\Matt\\Desktop\\uma-voice-dataset-creator\\snd_voi_story_041097001"
acb_path = acb_base_path + ".acb"
awb_path = acb_base_path + ".awb"
uma_hca_key = "0x450D608C479F"
vgastream_cli_path = "C:\\Users\\Matt\\Desktop\\uma-voice-dataset-creator\\vgmstream\\vgmstream-cli.exe"

In [189]:
acbfile = acb.ACBFile(acb_path, awb_path, hca_keys = uma_hca_key)

In [199]:
save_dir = "C:\\Users\\Matt\\Desktop\\uma-voice-dataset-creator\\output"

In [202]:
import os
import subprocess
def save_track(acb_file, track, target_dir, name):
    os.makedirs(target_dir, exist_ok=True)
    name_hca = name + ".hca"
    name_wav = name + ".wav"
    with open(os.path.join(target_dir, name_hca), "wb") as out_file:
        out_file.write(acb_file.get_track_data(track, True))
    subprocess.run([vgastream_cli_path, '-o', os.path.join(target_dir, name_wav), os.path.join(target_dir, name_hca)])
    os.remove(os.path.join(target_dir, name_hca))

In [183]:
acbfile.track_list.tracks

[track_t(cue_id=0, name='snd_voi_story_041097001_00', memory_wav_id=65535, external_wav_id=0, enc_type=2, is_stream=1),
 track_t(cue_id=1, name='snd_voi_story_041097001_01', memory_wav_id=65535, external_wav_id=1, enc_type=2, is_stream=1),
 track_t(cue_id=2, name='snd_voi_story_041097001_02', memory_wav_id=65535, external_wav_id=2, enc_type=2, is_stream=1),
 track_t(cue_id=3, name='snd_voi_story_041097001_03', memory_wav_id=65535, external_wav_id=3, enc_type=2, is_stream=1),
 track_t(cue_id=4, name='snd_voi_story_041097001_04', memory_wav_id=65535, external_wav_id=4, enc_type=2, is_stream=1),
 track_t(cue_id=5, name='snd_voi_story_041097001_05', memory_wav_id=65535, external_wav_id=5, enc_type=2, is_stream=1),
 track_t(cue_id=6, name='snd_voi_story_041097001_06', memory_wav_id=65535, external_wav_id=6, enc_type=2, is_stream=1),
 track_t(cue_id=7, name='snd_voi_story_041097001_07', memory_wav_id=65535, external_wav_id=7, enc_type=2, is_stream=1),
 track_t(cue_id=8, name='snd_voi_story_0

In [185]:
acbfile.track_list.tracks[0].name

'snd_voi_story_041097001_00'

In [203]:
for track in acbfile.track_list.tracks:
    save_track(acbfile, track, save_dir, track.name)

In [215]:
t_processed_list[1]['CueId']

0

In [220]:
# invariant: cue_id = index for tracks
for t in t_processed_list:
    if t['CueId'] == -1:
        continue
    curr_track = acbfile.track_list.tracks[t['CueId']]
    save_track(acbfile, curr_track, save_dir, curr_track.name)
    t["AudioFileLocation"] = os.path.join(save_dir, curr_track.name) + ".wav"
    
    

In [221]:
t_processed_list

[{'RubyInfo': [],
  'BlockNumber': 0,
  'Name': '',
  'Text': '',
  'VoiceSheetId': '',
  'CharaId': -1,
  'CueId': -1},
 {'RubyInfo': [],
  'BlockNumber': 1,
  'Name': '',
  'Text': '――――――――',
  'VoiceSheetId': '041097001',
  'CharaId': 1097,
  'CueId': 0,
  'AudioFileLocation': 'C:\\Users\\Matt\\Desktop\\uma-voice-dataset-creator\\output\\snd_voi_story_041097001_00.wav'},
 {'RubyInfo': [],
  'BlockNumber': 2,
  'Name': '',
  'Text': '混濁する意識の中で、いつも……いつも、同じ問いが浮かぶ。\n“もしあの時、この結末を知っていたら……どうしただろうか？”',
  'VoiceSheetId': '',
  'CharaId': 90001,
  'CueId': -1},
 {'RubyInfo': [],
  'BlockNumber': 3,
  'Name': '',
  'Text': '…………考える。それが徒労に終わることを知りながら。\nなぜなら……答えもまた、いつも同じだからだ。',
  'VoiceSheetId': '',
  'CharaId': 90001,
  'CueId': -1},
 {'RubyInfo': [],
  'BlockNumber': 4,
  'Name': '',
  'Text': '――――――――',
  'VoiceSheetId': '041097001',
  'CharaId': 1097,
  'CueId': 1,
  'AudioFileLocation': 'C:\\Users\\Matt\\Desktop\\uma-voice-dataset-creator\\output\\snd_voi_story_041097001_01.wav'},
 {'Ruby

In [223]:
meta_file_path = "C:\\Users\\Matt\\AppData\\LocalLow\\Cygames\\umamusume\\meta"
master_file_path = "C:\\Users\\Matt\\AppData\\LocalLow\\Cygames\\umamusume\\master\\master.mdb"

In [225]:
import sqlite3
meta_con = sqlite3.connect(meta_file_path)
master_con = sqlite3.connect(master_file_path)
meta_cur = meta_con.cursor()
master_cur = master_con.cursor()

In [232]:
story_ids = master_cur.execute("SELECT story_id FROM chara_story_data WHERE chara_id = 1097").fetchall()

In [234]:
story_ids = [x[0] for x in story_ids]

In [235]:
story_ids

[41097001, 41097002, 41097003, 41097004, 41097005, 41097006, 41097007]

In [267]:
def get_hash_given_name_filter(name_filter, meta_cursor):
    res = meta_cursor.execute(f"SELECT n, h FROM a WHERE n LIKE '%{name_filter}%'").fetchall()
    return res

In [304]:
file_packets = []
for story_id in story_ids:
    story_id_str = str(story_id)
    res = get_hash_given_name_filter(f"%{story_id_str}", meta_cur)
    curr_list = []
    for r in res:
        if r[0][0] != 's' or 'resource' in r[0]:
            continue
        curr_item = {}
        curr_item['name'] = r[0]
        curr_item['hash'] = r[1]
        curr_list.append(curr_item)
    #res = [r for r in res]
    #res = list(filter(lambda s: s[0] == 's' and 'resource' not in s and 'dot' not in s, res))
    file_packets.append(curr_list)

In [306]:
file_packet

[{'name': 'story/data/04/1097/ast_dot_041097001',
  'hash': 'IYPY46AWJXDTBACX6MZGUZ3LV27OTOZM'},
 {'name': 'story/data/04/1097/ast_ruby_041097001',
  'hash': 'PA2WM2BDNQ6Y26SHRJPW3QFLBZ44GFNV'},
 {'name': 'story/data/04/1097/storytimeline_041097001',
  'hash': 'XCTZGQRIBGK24V5CTWR2F52VBNIAFFCT'},
 {'name': 'sound/c/snd_voi_story_041097001.acb',
  'hash': 'UIP3TZ34FSPQQKZIOUAGXENLNAOT5O2F'},
 {'name': 'sound/c/snd_voi_story_041097001.awb',
  'hash': 'BIUHGZRF5SNDUHSMX4BRU7SHDGF7RTZ3'}]

In [290]:
hash_files_base_path = "C:\\Users\\Matt\\AppData\\LocalLow\\Cygames\\umamusume\\dat"

In [292]:
curr_hash = file_packet[0]['hash']

In [293]:
curr_hash

'IYPY46AWJXDTBACX6MZGUZ3LV27OTOZM'

In [296]:
def get_filepath_from_hash(h, base_path):
    return os.path.join(base_path, h[:2], h)

In [297]:
get_filepath_from_hash(curr_hash, hash_files_base_path)

'C:\\Users\\Matt\\AppData\\LocalLow\\Cygames\\umamusume\\dat\\IY\\IYPY46AWJXDTBACX6MZGUZ3LV27OTOZM'

In [310]:
for file_packet in file_packets:
    for item in file_packet:
        item["path"] = get_filepath_from_hash(item["hash"], hash_files_base_path)

In [312]:
file_packet = file_packets[0]

In [313]:
file_packet

[{'name': 'story/data/04/1097/ast_dot_041097001',
  'hash': 'IYPY46AWJXDTBACX6MZGUZ3LV27OTOZM',
  'path': 'C:\\Users\\Matt\\AppData\\LocalLow\\Cygames\\umamusume\\dat\\IY\\IYPY46AWJXDTBACX6MZGUZ3LV27OTOZM'},
 {'name': 'story/data/04/1097/ast_ruby_041097001',
  'hash': 'PA2WM2BDNQ6Y26SHRJPW3QFLBZ44GFNV',
  'path': 'C:\\Users\\Matt\\AppData\\LocalLow\\Cygames\\umamusume\\dat\\PA\\PA2WM2BDNQ6Y26SHRJPW3QFLBZ44GFNV'},
 {'name': 'story/data/04/1097/storytimeline_041097001',
  'hash': 'XCTZGQRIBGK24V5CTWR2F52VBNIAFFCT',
  'path': 'C:\\Users\\Matt\\AppData\\LocalLow\\Cygames\\umamusume\\dat\\XC\\XCTZGQRIBGK24V5CTWR2F52VBNIAFFCT'},
 {'name': 'sound/c/snd_voi_story_041097001.acb',
  'hash': 'UIP3TZ34FSPQQKZIOUAGXENLNAOT5O2F',
  'path': 'C:\\Users\\Matt\\AppData\\LocalLow\\Cygames\\umamusume\\dat\\UI\\UIP3TZ34FSPQQKZIOUAGXENLNAOT5O2F'},
 {'name': 'sound/c/snd_voi_story_041097001.awb',
  'hash': 'BIUHGZRF5SNDUHSMX4BRU7SHDGF7RTZ3',
  'path': 'C:\\Users\\Matt\\AppData\\LocalLow\\Cygames\\umamusume\\

In [13]:
DB_BASE_KEY_HEX = "F170CEA4DFCEA3E1A5D8C70BD1000000"
DB_KEY_JP_HEX = "6D5B65336336632554712D73505363386D34377B356370233734532973433633"

def get_decrypted_meta_connection(db_path):
    """
    Connects to the encrypted meta database using apsw and ChaCha20.
    """
    if not os.path.exists(db_path):
        raise FileNotFoundError(f"Meta file not found: {db_path}")

    # 1. Generate Key
    base_key = binascii.unhexlify(DB_BASE_KEY_HEX)
    raw_key = binascii.unhexlify(DB_KEY_JP_HEX)
    key_list = list(raw_key)
    
    # XOR Logic: key[i] ^ baseKey[i % 13]
    for i in range(len(key_list)):
        key_list[i] = key_list[i] ^ base_key[i % 13]
    final_key_hex = binascii.hexlify(bytes(key_list)).decode('utf-8')

    # 2. Connect
    conn = apsw.Connection(db_path)
    cursor = conn.cursor()
    
    # 3. Apply Encryption Config (ChaCha20)
    cursor.execute("PRAGMA cipher='chacha20'")
    cursor.execute(f"PRAGMA hexkey='{final_key_hex}'")
    cursor.execute("PRAGMA cipher_use_hmac=OFF") # Often needed for Unity implementations
    
    return conn

In [14]:
meta_file_path = "C:\\Users\\Matt\\Documents\\Games\\Umamusume\\umamusume_Data\\Persistent\\meta"
master_file_path = "C:\\Users\\Matt\\Documents\\Games\\Umamusume\\umamusume_Data\\Persistent\\master\\master.mdb"
meta_con = get_decrypted_meta_connection(meta_file_path)
master_con = sqlite3.connect(master_file_path)
meta_cur = meta_con.cursor()
master_cur = master_con.cursor()

In [15]:
meta_cur.execute("SELECT * FROM a").fetchall()

[(1,
  '//root',
  None,
  0,
  77,
  -5211244075679446501,
  '4JC6BY3T4ZUB6ECQETFEVNRE7HWSY6FY',
  'manifest3',
  3,
  1,
  0,
  0),
 (2,
  '//Windows',
  None,
  0,
  778,
  -596844287944332799,
  'DLXIEEK4PUWKZDLX4D2TSLEHDTDGF4JK',
  'manifest2',
  2,
  1,
  0,
  0),
 (3,
  '//story',
  None,
  0,
  1369470,
  417778745188025394,
  '5N5LYID26QZ745WG4NO76FICFWHUERER',
  'manifest',
  1,
  1,
  0,
  0),
 (4,
  '//campaign',
  None,
  0,
  31,
  6822856583349484187,
  'NY5OMXOCXPEKVJUEDAME2JGB2JIUBZH2',
  'manifest',
  1,
  1,
  0,
  0),
 (5,
  '//ratingrace',
  None,
  0,
  1206,
  6721254023768407715,
  'CH3UZBOKEVQXGFDNA36WCZNBI5GNRAWJ',
  'manifest',
  1,
  1,
  0,
  0),
 (6,
  '//transferevent',
  None,
  0,
  1201,
  13198277341049896,
  '4PBJ6MFIQ45IO3M5VA2N5LT5BOLCGQGT',
  'manifest',
  1,
  1,
  0,
  0),
 (7,
  '//gacha',
  None,
  0,
  15730,
  5045523432203197028,
  'TKXLCNP572O54ONL4FQ6GFUJLKV2S45A',
  'manifest',
  1,
  1,
  0,
  0),
 (8,
  '//storyevent',
  None,
  0,
  8

In [16]:
master_cur.execute("SELECT * FROM chara_data").fetchall()

[(1001,
  1995,
  5,
  2,
  1999,
  1,
  'FCA7FF',
  'FF58D9',
  'EE6DCB',
  'FFDEF9',
  'FF7FDD',
  'F759CD',
  'F759CD',
  'FF7FDD',
  'FA50CD',
  'FA50CD',
  'FCCAEE',
  'FCE3F5',
  'FDEDFE',
  'FF7FDD',
  'FF7FDD',
  'FF7CDC',
  'FEB4EA',
  1,
  2,
  158,
  1,
  0,
  3,
  0,
  1,
  1,
  110,
  220,
  30,
  90,
  275,
  425,
  130,
  250,
  -1,
  0,
  1483196400,
  0,
  12),
 (1002,
  1994,
  5,
  1,
  1998,
  1,
  '8FE78D',
  'FBFF3B',
  '29BD70',
  'FFCE48',
  '4BD18C',
  '27B36B',
  '27B36B',
  '4BD18C',
  '27B36B',
  '27B36B',
  'BDF2D6',
  'D4F2E2',
  'FFF5CC',
  '4BD18C',
  '4BD18C',
  '43C883',
  '82ECB5',
  1,
  0,
  161,
  1,
  1,
  6,
  0,
  1,
  1,
  170,
  270,
  180,
  310,
  275,
  425,
  180,
  310,
  -1,
  0,
  1483196400,
  0,
  12),
 (1003,
  1988,
  4,
  20,
  1993,
  1,
  'A8C6FD',
  '5277D6',
  '3376D2',
  'FF99D0',
  '4C91F1',
  '2B75DD',
  '2B75DD',
  '71ADFF',
  '1667D9',
  '1667D9',
  'BCD5F5',
  'D6E4F6',
  'FFEBF6',
  '4C91F1',
  '4C91F1',
  '4C91F1',
  '8

In [14]:
%uv pip install pysqlcipher3

Note: you may need to restart the kernel to use updated packages.


[2mUsing Python 3.14.2 environment at: C:\Users\Matt\.pyenv\pyenv-win\versions\3.14.2[0m
[2mResolved [1m1 package[0m [2min 1.49s[0m[0m
  [31mÃ—[0m Failed to build `pysqlcipher3==1.2.0`
[31m  â”œâ”€â–¶ [0mThe build backend returned an error
[31m  â•°â”€â–¶ [0mCall to `setuptools.build_meta:__legacy__.build_wheel` failed (exit
[31m      [0mcode: 1)

[31m      [0m[31m[stdout][39m
[31m      [0mrunning bdist_wheel
[31m      [0mrunning build
[31m      [0mrunning build_py
[31m      [0mcreating build\lib.win-amd64-cpython-314\pysqlcipher3
[31m      [0mcopying lib\dbapi2.py -> build\lib.win-amd64-cpython-314\pysqlcipher3
[31m      [0mcopying lib\dump.py -> build\lib.win-amd64-cpython-314\pysqlcipher3
[31m      [0mcopying lib\__init__.py -> build\lib.win-amd64-cpython-314\pysqlcipher3
[31m      [0mcreating build\lib.win-amd64-cpython-314\pysqlcipher3\test
[31m      [0mcopying lib\test\__init__.py ->
[31m      [0mbuild\lib.win-amd64-cpython-314\pysqlcipher3

In [28]:
import apsw
import binascii
import os
import sys
from dataclasses import dataclass
from typing import Dict, Optional

# Adapted from UmaViewer

# --- Configuration (From Config.cs) ---
DB_BASE_KEY_HEX = "F170CEA4DFCEA3E1A5D8C70BD1000000"
DB_KEY_JP_HEX = "6D5B65336336632554712D73505363386D34377B356370233734532973433633"
DB_KEY_GL_HEX = "56636B634272377665704162"

@dataclass
class UmaDatabaseEntry:
    type: str
    name: str
    url: str
    checksum: str
    prerequisites: str
    key: int

class UmaMetaReader:
    def __init__(self, db_path: str, region: str = "Jp"):
        self.db_path = db_path
        
        self.base_key = binascii.unhexlify(DB_BASE_KEY_HEX)
        if region.lower() == "global":
            self.raw_key = binascii.unhexlify(DB_KEY_GL_HEX)
        else:
            self.raw_key = binascii.unhexlify(DB_KEY_JP_HEX)

    def _gen_final_key(self, key: bytes) -> str:
        """
        Replicates C# GenFinalKey logic: key[i] ^ baseKey[i % 13]
        """
        key_list = list(key)
        base_len = 13 # C# explicitly uses % 13
        
        for i in range(len(key_list)):
            key_list[i] = key_list[i] ^ self.base_key[i % base_len]
            
        return binascii.hexlify(bytes(key_list)).decode('utf-8')

    def read_entries(self) -> Dict[str, UmaDatabaseEntry]:
        if not os.path.exists(self.db_path):
            raise FileNotFoundError(f"Database file not found at: {self.db_path}")

        # Generate the specific hex key for the region
        final_key_hex = self._gen_final_key(self.raw_key)
        entries = {}

        try:
            conn = apsw.Connection(self.db_path)
            cursor = conn.cursor()

            # --- CRITICAL FIX: Use ChaCha20 (Cipher ID 3) ---
            # The C# code calls Sqlite3MC.MC_Config(db, "cipher", 3).
            # In sqlite3mc, Cipher 3 is ChaCha20.
            cursor.execute("PRAGMA cipher='chacha20'")
            
            # Set the key
            cursor.execute(f"PRAGMA hexkey='{final_key_hex}'")
            
            # Attempt to read data
            query = "SELECT m, n, h, c, d, e FROM a"
            
            count = 0
            for row in cursor.execute(query):
                m_type, n_name, h_url, c_chk, d_pre, e_key = row
                
                # Basic validation: 'm' (type) and 'n' (name) must exist
                if not m_type or not n_name:
                    continue

                entry = UmaDatabaseEntry(
                    type=m_type, 
                    name=n_name, 
                    url=h_url, 
                    checksum=c_chk, 
                    prerequisites=d_pre, 
                    key=e_key
                )
                entries[n_name] = entry
                count += 1

            conn.close()
            print(f"Success! Decrypted and loaded {count} entries.")
            return entries

        except apsw.SQLError as e:
            print(f"SQLite Error: {e}")
            print(f"Failed. Ensure '{self.db_path}' matches the region '{'Global' if len(self.raw_key) == 12 else 'JP'}'.")
            return {}

# --- Usage ---
if __name__ == "__main__":
    # 1. Point this to your meta file
    META_FILE_PATH = meta_file_path
    
    # 2. SELECT REGION: "Jp" or "Global"
    # IMPORTANT: You cannot open a JP file with Global config or vice versa.
    # Global DB Key length is 12 bytes; JP is 32 bytes.
    SELECTED_REGION = "Jp" 

    print(f"Reading '{META_FILE_PATH}' as {SELECTED_REGION}...")
    
    reader = UmaMetaReader(META_FILE_PATH, region=SELECTED_REGION)
    
    try:
        data = reader.read_entries()
        if data:
            # Print a sample to verify
            sample_key = next(iter(data))
            print(f"Sample Entry: {data[sample_key]}")
    except Exception as ex:
        print(f"Fatal Error: {ex}")

Reading 'C:\Users\Matt\Documents\Games\Umamusume\umamusume_Data\Persistent\meta' as Jp...
Success! Decrypted and loaded 329367 entries.
Sample Entry: UmaDatabaseEntry(type='manifest3', name='//root', url='4JC6BY3T4ZUB6ECQETFEVNRE7HWSY6FY', checksum=-5211244075679446501, prerequisites=None, key=0)


In [12]:
import apsw
import binascii
import shutil
import os
import sys

# --- Configuration ---
# JP Keys (from Config.cs context)
DB_BASE_KEY_HEX = "F170CEA4DFCEA3E1A5D8C70BD1000000"
DB_KEY_JP_HEX = "6D5B65336336632554712D73505363386D34377B356370233734532973433633"

INPUT_FILE = meta_file_path
OUTPUT_FILE = meta_file_path + "_decrypted_3.db"

def decrypt_database():
    # 1. Generate the Hex Key
    # Logic matches C# GenFinalKey: key[i] ^ baseKey[i % 13]
    base_key = binascii.unhexlify(DB_BASE_KEY_HEX)
    raw_key = binascii.unhexlify(DB_KEY_JP_HEX)
    key_list = list(raw_key)
    
    for i in range(len(key_list)):
        key_list[i] = key_list[i] ^ base_key[i % 13]
        
    final_key_hex = binascii.hexlify(bytes(key_list)).decode('utf-8')
    print(f"Generated Key: {final_key_hex}")

    # 2. Create a working copy
    # We DO NOT want to modify the original game file.
    if not os.path.exists(INPUT_FILE):
        print(f"Error: Input file '{INPUT_FILE}' not found.")
        return

    print(f"Copying '{INPUT_FILE}' to '{OUTPUT_FILE}'...")
    shutil.copy(INPUT_FILE, OUTPUT_FILE)

    # 3. Open the COPY and remove encryption
    try:
        conn = apsw.Connection(OUTPUT_FILE)
        c = conn.cursor()

        # Set current encryption (ChaCha20)
        c.execute("PRAGMA cipher='chacha20'")
        c.execute(f"PRAGMA hexkey='{final_key_hex}'")
        
        # Verify we can actually read it before re-keying
        # If this fails, the key is wrong or the file is corrupted.
        c.execute("SELECT count(*) FROM sqlite_master")
        
        print("Decryption successful. Removing encryption now...")

        # 4. REKEY to empty string
        # This tells SQLite to re-encrypt the database with NO password (plaintext)
        c.execute("PRAGMA rekey=''")
        
        # 5. VACUUM
        # Forces a rewrite of the database file, ensuring it is fully clean and standard.
        c.execute("VACUUM")
        
        conn.close()
        print(f"Success! You can now open '{OUTPUT_FILE}' in DB Browser for SQLite.")
        
    except Exception as e:
        print(f"Error during decryption: {e}")
        # Clean up the failed copy so you don't think it worked
        if os.path.exists(OUTPUT_FILE):
            os.remove(OUTPUT_FILE)

if __name__ == "__main__":
    decrypt_database()

Generated Key: 9c2bab97bcf8c0c4f1a9ea7881a213f6c9ebf9d8d4c6a8e43ce5a259bde7e9fd
Copying 'C:\Users\Matt\Documents\Games\Umamusume\umamusume_Data\Persistent\meta' to 'C:\Users\Matt\Documents\Games\Umamusume\umamusume_Data\Persistent\meta_decrypted_3.db'...
Decryption successful. Removing encryption now...
Success! You can now open 'C:\Users\Matt\Documents\Games\Umamusume\umamusume_Data\Persistent\meta_decrypted_3.db' in DB Browser for SQLite.


In [9]:
import sqlite3
import os

# --- Configuration ---
# Update these paths to your actual decrypted files
DB_BEFORE = "meta_decrypted_before.db"
DB_AFTER = "meta_decrypted_after.db"

def get_table_data(db_path):
    if not os.path.exists(db_path):
        print(f"Error: File '{db_path}' not found.")
        return {}
        
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    
    # We select all columns. 
    # Based on previous context: n=Name (Key), m=Type, h=Url/Hash, g=State?
    # Adjust 'n' if the primary key is different.
    try:
        c.execute("SELECT * FROM a")
        columns = [description[0] for description in c.description]
        
        data_map = {}
        for row in c.fetchall():
            # Create a dictionary for the row: {col_name: value}
            row_dict = dict(zip(columns, row))
            
            # Use column 'n' (Name) as the unique identifier
            key = row_dict.get('n', None)
            if key:
                data_map[key] = row_dict
                
        conn.close()
        return data_map
    except sqlite3.OperationalError as e:
        print(f"Error reading {db_path}: {e}")
        return {}

def compare_databases():
    print(f"Loading '{DB_BEFORE}'...")
    data_before = get_table_data(DB_BEFORE)
    
    print(f"Loading '{DB_AFTER}'...")
    data_after = get_table_data(DB_AFTER)
    
    if not data_before or not data_after:
        print("Failed to load one or both databases.")
        return

    # Sets of keys
    keys_before = set(data_before.keys())
    keys_after = set(data_after.keys())

    # 1. New Entries
    added_keys = keys_after - keys_before
    print(f"\n--- [ADDED] New Entries ({len(added_keys)}) ---")
    for k in sorted(added_keys):
        print(f"+ {k}")

    # 2. Removed Entries
    removed_keys = keys_before - keys_after
    print(f"\n--- [REMOVED] Deleted Entries ({len(removed_keys)}) ---")
    for k in sorted(removed_keys):
        print(f"- {k}")

    # 3. Modified Entries
    common_keys = keys_before & keys_after
    modified_count = 0
    print(f"\n--- [MODIFIED] Changed Entries ---")
    
    for k in sorted(common_keys):
        row_b = data_before[k]
        row_a = data_after[k]
        
        # Find differences in columns
        diffs = []
        for col in row_b:
            val_b = row_b[col]
            val_a = row_a.get(col)
            
            if val_b != val_a:
                diffs.append(f"{col}: {val_b} -> {val_a}")
        
        if diffs:
            modified_count += 1
            print(f"~ {k}")
            for d in diffs:
                print(f"    {d}")

    if modified_count == 0:
        print("No existing entries were modified.")

if __name__ == "__main__":
    compare_databases()

Loading 'meta_decrypted_before.db'...
Loading 'meta_decrypted_after.db'...

--- [ADDED] New Entries (0) ---

--- [REMOVED] Deleted Entries (0) ---

--- [MODIFIED] Changed Entries ---
~ sound/v/snd_voi_home_102400.acb
    s: 0 -> 1
~ sound/v/snd_voi_home_102400.awb
    s: 0 -> 1
~ sound/v/snd_voi_home_102401.acb
    s: 0 -> 1
~ sound/v/snd_voi_home_102401.awb
    s: 0 -> 1
~ sound/v/snd_voi_home_102402.acb
    s: 0 -> 1
~ sound/v/snd_voi_home_102402.awb
    s: 0 -> 1
~ sound/v/snd_voi_home_102403.acb
    s: 0 -> 1
~ sound/v/snd_voi_home_102403.awb
    s: 0 -> 1
~ sound/v/snd_voi_home_107400.acb
    s: 0 -> 1
~ sound/v/snd_voi_home_107400.awb
    s: 0 -> 1
~ sound/v/snd_voi_home_107401.acb
    s: 0 -> 1
~ sound/v/snd_voi_home_107401.awb
    s: 0 -> 1
~ sound/v/snd_voi_home_107402.acb
    s: 0 -> 1
~ sound/v/snd_voi_home_107402.awb
    s: 0 -> 1
~ sound/v/snd_voi_title_102400.acb
    s: 0 -> 1
~ sound/v/snd_voi_title_102400.awb
    s: 0 -> 1
~ sound/v/snd_voi_title_107400.acb
    s: 0 -> 

In [10]:
import apsw
import binascii
import shutil
import os

# --- Configuration (From Config.cs) ---
DB_BASE_KEY_HEX = "F170CEA4DFCEA3E1A5D8C70BD1000000"
DB_KEY_JP_HEX = "6D5B65336336632554712D73505363386D34377B356370233734532973433633"

# File Paths
TARGET_FILE = meta_file_path
BACKUP_FILE = meta_file_path + ".bak"

def update_sound_entries():
    # 1. Safety Backup
    if not os.path.exists(TARGET_FILE):
        print(f"Error: Target file '{TARGET_FILE}' not found.")
        return

    print(f"Creating backup at '{BACKUP_FILE}'...")
    shutil.copy(TARGET_FILE, BACKUP_FILE)

    # 2. Generate Key (ChaCha20 / JP Region)
    base_key = binascii.unhexlify(DB_BASE_KEY_HEX)
    raw_key = binascii.unhexlify(DB_KEY_JP_HEX)
    key_list = list(raw_key)
    
    # XOR Logic: key[i] ^ baseKey[i % 13]
    for i in range(len(key_list)):
        key_list[i] = key_list[i] ^ base_key[i % 13]
        
    final_key_hex = binascii.hexlify(bytes(key_list)).decode('utf-8')

    # 3. Connect and Update
    try:
        conn = apsw.Connection(TARGET_FILE)
        cursor = conn.cursor()

        # Configure ChaCha20 Encryption
        cursor.execute("PRAGMA cipher='chacha20'")
        cursor.execute(f"PRAGMA hexkey='{final_key_hex}'")

        # Verify decryption by reading a single row first
        cursor.execute("SELECT count(*) FROM a LIMIT 1")

        print("Decryption successful. Executing update...")

        # 4. Perform the Update
        # "For all rows where m is 'sound', change g to the integer 0"
        query = "UPDATE a SET g=2 WHERE m='sound'"
        cursor.execute(query)
        
        # Get the number of affected rows
        affected_rows = conn.changes()
        
        print(f"Success! Updated {affected_rows} rows.")
        print("Changes have been saved to the encrypted file.")
        
        conn.close()

    except apsw.SQLError as e:
        print(f"SQLite Error: {e}")
        print("Restoring backup due to error...")
        shutil.copy(BACKUP_FILE, TARGET_FILE)
    except Exception as ex:
        print(f"Fatal Error: {ex}")

if __name__ == "__main__":
    update_sound_entries()

Creating backup at 'C:\Users\Matt\Documents\Games\Umamusume\umamusume_Data\Persistent\meta.bak'...
Decryption successful. Executing update...
Success! Updated 30851 rows.
Changes have been saved to the encrypted file.


In [33]:
import sqlite3
import UnityPy
from UnityPy.enums import ClassIDType
import acb
import os
import subprocess
import csv, json
from pathlib import Path
import torch
from transformers import pipeline
import binascii
import apsw
import struct

target_columns = ['Name','Text','VoiceSheetId','CharaId','CueId']
ruby_object_columns = ['CharX','CharY','RubyText']
uma_hca_key = "0x450D608C479F"

meta_file_path = "C:\\Users\\Matt\\Documents\\Games\\Umamusume\\umamusume_Data\\Persistent\\meta"
master_file_path = "C:\\Users\\Matt\\Documents\\Games\\Umamusume\\umamusume_Data\\Persistent\\master\\master.mdb"
hash_files_base_path = "C:\\Users\\Matt\\Documents\\Games\\Umamusume\\umamusume_Data\\Persistent\\dat"

vgastream_cli_path = "C:\\Users\\Matt\\Documents\\notebooks\\uma-voice-dataset-creator-master\\vgmstream\\vgmstream-cli.exe"

save_dir_base_path = "C:\\Users\\Matt\\Documents\\notebooks\\uma-voice-dataset-creator-master\\output"

generate_kwargs = {
    "language": "Japanese",
    "no_repeat_ngram_size": 0,
    "repetition_penalty": 1.0,
}
pipe = pipeline(
    "automatic-speech-recognition",
    model="litagin/anime-whisper",
    device="cuda",
    torch_dtype=torch.float16,
    chunk_length_s=30.0,
    batch_size=64,
)

DB_BASE_KEY_HEX = "F170CEA4DFCEA3E1A5D8C70BD1000000"
DB_KEY_JP_HEX = "6D5B65336336632554712D73505363386D34377B356370233734532973433633"

# Derived from UmaViewer

# Configuration from Config.json
AB_KEY_HEX = "532B4631E4A7B9473E7CFB" #
# The fixed header size defined in UmaAssetBundleStream.cs
HEADER_SIZE = 256 #

def get_decrypted_meta_connection(db_path):
    """
    Connects to the encrypted meta database using apsw and ChaCha20.
    """
    if not os.path.exists(db_path):
        raise FileNotFoundError(f"Meta file not found: {db_path}")

    # 1. Generate Key
    base_key = binascii.unhexlify(DB_BASE_KEY_HEX)
    raw_key = binascii.unhexlify(DB_KEY_JP_HEX)
    key_list = list(raw_key)
    
    # XOR Logic: key[i] ^ baseKey[i % 13]
    for i in range(len(key_list)):
        key_list[i] = key_list[i] ^ base_key[i % 13]
    final_key_hex = binascii.hexlify(bytes(key_list)).decode('utf-8')

    # 2. Connect
    conn = apsw.Connection(db_path)
    cursor = conn.cursor()
    
    # 3. Apply Encryption Config (ChaCha20)
    cursor.execute("PRAGMA cipher='chacha20'")
    cursor.execute(f"PRAGMA hexkey='{final_key_hex}'")
    cursor.execute("PRAGMA cipher_use_hmac=OFF") # Often needed for Unity implementations
    
    return conn

def get_file_packets(chara_id: int):
    def get_hash_given_name_filter(name_filter, meta_cursor):
        res = meta_cursor.execute(f"SELECT n, h, e FROM a WHERE n LIKE '%{name_filter}%'").fetchall()
        return res
    def get_filepath_from_hash(h, base_path):
        return os.path.join(base_path, h[:2], h)
    meta_con = get_decrypted_meta_connection(meta_file_path)
    master_con = sqlite3.connect(master_file_path)
    meta_cur = meta_con.cursor()
    master_cur = master_con.cursor()
    story_ids = master_cur.execute(f"SELECT story_id FROM chara_story_data WHERE chara_id = {str(chara_id)}").fetchall()
    story_ids = [x[0] for x in story_ids]
    file_packets = []
    for story_id in story_ids:
        story_id_str = str(story_id)
        res = get_hash_given_name_filter(f"%{story_id_str}", meta_cur)
        curr_list = []
        for r in res:
            if r[0][0] != 's' or 'resource' in r[0]:
                continue
            curr_item = {}
            curr_item['name'] = r[0]
            curr_item['hash'] = r[1]
            curr_item['encryption_key'] = r[2]
            curr_list.append(curr_item)
        #res = [r for r in res]
        #res = list(filter(lambda s: s[0] == 's' and 'resource' not in s and 'dot' not in s, res))
        file_packets.append(curr_list)
    for file_packet in file_packets:
        for item in file_packet:
            item["path"] = get_filepath_from_hash(item["hash"], hash_files_base_path)
    return file_packets
    
def save_track(acb_file, track, target_dir, name):
    os.makedirs(target_dir, exist_ok=True)
    name_hca = name + ".hca"
    name_wav = name + ".wav"
    with open(os.path.join(target_dir, name_hca), "wb") as out_file:
        out_file.write(acb_file.get_track_data(track, True))
    subprocess.run([vgastream_cli_path, '-o', os.path.join(target_dir, name_wav), os.path.join(target_dir, name_hca)])
    os.remove(os.path.join(target_dir, name_hca))

class UmaAssetLoader:
    @staticmethod
    def get_fkey(file_key_long: int) -> bytes:
        """Ported from UmaDatabaseEntry.cs: FKey getter"""
        base_keys = bytes.fromhex(AB_KEY_HEX) #
        base_len = len(base_keys)
        
        # BitConverter.GetBytes(long) in C# is Little Endian
        key_bytes = struct.pack('<q', file_key_long)
        
        f_key = bytearray(base_len * 8)
        for i in range(base_len):
            b = base_keys[i]
            base_offset = i << 3
            for j in range(8):
                # keys[baseOffset + j] = (byte)(b ^ keyBytes[j]);
                f_key[base_offset + j] = b ^ key_bytes[j]
        return bytes(f_key)

    @staticmethod
    def load_decrypted(item_dict):
        #print(item_dict)
        file_path = item_dict['path']
        file_key = item_dict['encryption_key']
        file_name = item_dict['name']
        
        with open(file_path, "rb") as f:
            data = bytearray(f.read())

        # If key is 0, the file isn't encrypted (standard for audio)
        if file_key == 0:
            return UnityPy.load(bytes(data))

        f_key = UmaAssetLoader.get_fkey(file_key)
        f_key_len = len(f_key)

        # Apply decryption starting at HEADER_SIZE (256)
        # The XOR key index is aligned to the absolute position (i)
        for i in range(HEADER_SIZE, len(data)):
            data[i] ^= f_key[i % f_key_len]
        
        try:
            # UnityPy will now see the first 256 bytes of cleartext plus 
            # the successfully decrypted blocks following it.
            env = UnityPy.load(bytes(data))
            return env
        except Exception as e:
            print(f"  -> UnityPy Error: {e}")
            return None

def process_file_packet(file_packet, chara_id: int):
    # --- 1. Identify Timeline & Load Data ---
    timeline_item = next((x for x in file_packet if "timeline" in x["name"]), None)
    if not timeline_item:
        print("  -> Error: No timeline found in packet.")
        return []

    storyline_name = timeline_item["name"].split("/")[-1]
    env = UmaAssetLoader.load_decrypted(timeline_item)
    
    text_objects = [obj.read() for obj in env.objects if hasattr(obj.read(), 'Text')]
    if not text_objects:
        return []

    # Sort blocks by sequence
    last_block_num = max(t.NextBlock for t in text_objects)
    t_processed_dict = {}
    for t in text_objects:
        block_num = last_block_num if t.NextBlock == -1 else t.NextBlock - 1
        t_data = {c: getattr(t, c) for c in target_columns}
        t_data['BlockNumber'] = block_num
        t_data['RubyInfo'] = []
        t_processed_dict[block_num] = t_data

    t_processed_list = [t_processed_dict[i] for i in range(len(text_objects))]

    # --- 2. Handle Ruby (Standard Logic) ---
    ruby_item = next((x for x in file_packet if "ruby" in x["name"]), None)
    if ruby_item:
        ruby_name = ruby_item["name"].split("/")[-1]
        env_ruby = UmaAssetLoader.load_decrypted(ruby_item)
        if env_ruby:
            for obj in env_ruby.objects:
                read_obj = obj.read()
                if obj.type.name == "MonoBehaviour" and read_obj.m_Name == ruby_name:
                    for da in getattr(read_obj, 'DataArray', []):
                        if da.BlockIndex < len(t_processed_list):
                            t_processed_list[da.BlockIndex]['RubyInfo'] = [
                                {roc: getattr(rbl, roc) for roc in ruby_object_columns}
                                for rbl in da.RubyDataList
                            ]

    # --- 3. Dynamic Audio Resolution ---
    # Cache for loaded ACB files to prevent re-opening the same file for every block
    loaded_acbs = {}

    meta_con = get_decrypted_meta_connection(meta_file_path)
    meta_cur = meta_con.cursor()

    for t in t_processed_list:
        vs_id = t.get('VoiceSheetId')
        if not vs_id or t['CueId'] == -1:
            t["AudioFileLocation"], t["TextAnimeWhisper"] = None, ""
            continue

        # Get or Load the correct ACB for this specific block
        if vs_id not in loaded_acbs:
            # First, check if the ACB is already in our packet
            acb_entry = next((x for x in file_packet if f"{vs_id}.acb" in x["name"]), None)
            awb_entry = next((x for x in file_packet if f"{vs_id}.awb" in x["name"]), None)

            # If not in packet (e.g. redirected to 06), find it in meta now
            if not acb_entry:
                audio_res = meta_cur.execute(
                    f"SELECT n, h, e FROM a WHERE n LIKE 'sound/c/snd_voi_story_{vs_id}.%'"
                ).fetchall()
                for an, ah, ae in audio_res:
                    entry = {'name': an, 'hash': ah, 'encryption_key': ae,
                             'path': os.path.join(hash_files_base_path, ah[:2], ah)}
                    if ".acb" in an: acb_entry = entry
                    if ".awb" in an: awb_entry = entry

            if acb_entry and awb_entry:
                try:
                    loaded_acbs[vs_id] = acb.ACBFile(acb_entry["path"], awb_entry["path"], hca_keys=uma_hca_key)
                except Exception as e:
                    print(f"  -> ACB Load Error ({vs_id}): {e}")
                    loaded_acbs[vs_id] = None
            else:
                loaded_acbs[vs_id] = None

        acbfile = loaded_acbs.get(vs_id)
        if not acbfile:
            t["AudioFileLocation"], t["TextAnimeWhisper"] = None, ""
            continue

        # --- 4. Extraction & Transcription ---
        track_map = {track.name: track for track in acbfile.track_list.tracks}
        expected_track_name = f"{vs_id}_{t['CueId']+1:02d}"
        curr_track = track_map.get(expected_track_name)

        # Fallback to index if name fails
        if not curr_track and t['CueId'] < len(acbfile.track_list.tracks):
            curr_track = acbfile.track_list.tracks[t['CueId']]

        if curr_track:
            save_track(acbfile, curr_track, save_dir_base_path, curr_track.name)
            audio_path = os.path.join(save_dir_base_path, curr_track.name) + ".wav"
            t["AudioFileLocation"] = audio_path
            
            try:
                result = pipe(audio_path, generate_kwargs=generate_kwargs)
                t["TextAnimeWhisper"] = result["text"]
            except Exception as e:
                t["TextAnimeWhisper"] = f"WHISPER_ERROR: {e}"
        else:
            t["AudioFileLocation"], t["TextAnimeWhisper"] = None, ""

    print(f"{storyline_name} done")
    return t_processed_list

def write_simple(data, out):
    keys = sorted({k for d in data for k in d})
    with Path(out).open('w', encoding='utf-8', newline='') as f:
        w = csv.DictWriter(f, fieldnames=keys, quoting=csv.QUOTE_ALL)
        w.writeheader()
        for d in data:
            w.writerow({k: json.dumps(v, ensure_ascii=False) if isinstance(v, (list, dict)) else v
                        for k, v in d.items()})

def process_for_char(chara_id: int):
    file_packets = get_file_packets(chara_id)
    #print(file_packets)
    final_list = []
    for file_packet in file_packets:
        final_list.extend(process_file_packet(file_packet, chara_id))
    write_simple(final_list, os.path.join(save_dir_base_path, "info.csv"))
    return final_list



Loading weights:   0%|          | 0/539 [00:00<?, ?it/s]



In [34]:
final_list = process_for_char(1017)

storytimeline_041017001 done
storytimeline_041017002 done
storytimeline_041017003 done
storytimeline_041017004 done
storytimeline_041017005 done
storytimeline_041017006 done
storytimeline_041017007 done


In [9]:
# Requires transformers>=4.51.0
# Requires sentence-transformers>=2.7.0

from sentence_transformers import SentenceTransformer

# Load the model
model = SentenceTransformer("Qwen/Qwen3-Embedding-4B")

# We recommend enabling flash_attention_2 for better acceleration and memory saving,
# together with setting `padding_side` to "left":
# model = SentenceTransformer(
#     "Qwen/Qwen3-Embedding-4B",
#     model_kwargs={"attn_implementation": "flash_attention_2", "device_map": "auto"},
#     tokenizer_kwargs={"padding_side": "left"},
# )

# The queries and documents to embed
queries = [
    "What is the capital of China?",
    "Explain gravity",
]
documents = [
    "The capital of China is Beijing.",
    "Gravity is a force that attracts two bodies towards each other. It gives weight to physical objects and is responsible for the movement of planets around the sun.",
]

# Encode the queries and documents. Note that queries benefit from using a prompt
# Here we use the prompt called "query" stored under `model.prompts`, but you can
# also pass your own prompt via the `prompt` argument
query_embeddings = model.encode(queries, prompt_name="query")
document_embeddings = model.encode(documents)

# Compute the (cosine) similarity between the query and document embeddings
similarity = model.similarity(query_embeddings, document_embeddings)
print(similarity)
# tensor([[0.7534, 0.1147],
#         [0.0320, 0.6258]])

modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


config_sentence_transformers.json:   0%|          | 0.00/215 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

config.json:   0%|          | 0.00/727 [00:00<?, ?B/s]

model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`
Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model-00001-of-00002.safetensors:   0%|          | 0.00/4.97G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/3.08G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

OSError: The paging file is too small for this operation to complete. (os error 1455)

In [9]:
def get_decrypted_meta_connection(db_path):
    """
    Connects to the encrypted meta database using apsw and ChaCha20.
    """
    if not os.path.exists(db_path):
        raise FileNotFoundError(f"Meta file not found: {db_path}")

    # 1. Generate Key
    base_key = binascii.unhexlify(DB_BASE_KEY_HEX)
    raw_key = binascii.unhexlify(DB_KEY_JP_HEX)
    key_list = list(raw_key)
    
    # XOR Logic: key[i] ^ baseKey[i % 13]
    for i in range(len(key_list)):
        key_list[i] = key_list[i] ^ base_key[i % 13]
    final_key_hex = binascii.hexlify(bytes(key_list)).decode('utf-8')

    # 2. Connect
    conn = apsw.Connection(db_path)
    cursor = conn.cursor()
    
    # 3. Apply Encryption Config (ChaCha20)
    cursor.execute("PRAGMA cipher='chacha20'")
    cursor.execute(f"PRAGMA hexkey='{final_key_hex}'")
    cursor.execute("PRAGMA cipher_use_hmac=OFF") # Often needed for Unity implementations
    
    return conn
def get_file_packets(chara_id: int):
    def get_hash_given_name_filter(name_filter, meta_cursor):
        res = meta_cursor.execute(f"SELECT n, h, e FROM a WHERE n LIKE '%{name_filter}%'").fetchall()
        return res
    def get_filepath_from_hash(h, base_path):
        return os.path.join(base_path, h[:2], h)
    meta_con = get_decrypted_meta_connection(meta_file_path)
    master_con = sqlite3.connect(master_file_path)
    meta_cur = meta_con.cursor()
    master_cur = master_con.cursor()
    story_ids = master_cur.execute(f"SELECT story_id FROM chara_story_data WHERE chara_id = {str(chara_id)}").fetchall()
    story_ids = [x[0] for x in story_ids]
    file_packets = []
    for story_id in story_ids:
        story_id_str = str(story_id)
        res = get_hash_given_name_filter(f"%{story_id_str}", meta_cur)
        curr_list = []
        for r in res:
            print(r)
            if r[0][0] != 's' or 'resource' in r[0]:
                continue
            curr_item = {}
            curr_item['name'] = r[0]
            curr_item['hash'] = r[1]
            curr_item['encryption_key'] = r[2]
            curr_list.append(curr_item)
        #res = [r for r in res]
        #res = list(filter(lambda s: s[0] == 's' and 'resource' not in s and 'dot' not in s, res))
        file_packets.append(curr_list)
    for file_packet in file_packets:
        for item in file_packet:
            item["path"] = get_filepath_from_hash(item["hash"], hash_files_base_path)
    return file_packets

In [17]:
f = get_file_packets(1017)

('story/data/04/1017/resourcelist/storytimeline_041017001_resources', 'D6HJ5WXZR2LNM2YUQWTCK2Q4UBMV7NC3', 3458653226021754007)
('story/data/04/1017/storytimeline_041017001', 'ZBLDS5WHKZB22RUZNRKXB4PVGTFU6JMA', 1859620257465300378)
('lipsync/story/04/1017/story_041017001', 'L2JQVQZDJIV64MHRATYLTPEAFCRD6SJ2', 1796588984780366541)
('sound/c/snd_voi_story_041017001.acb', 'TCHULMENZJA3N6OEJYZ7QZLLKOX5L6WJ', 0)
('sound/c/snd_voi_story_041017001.awb', 'AJPOYMMWYXO5KYQ5SMOND6QUJXFF5TDC', 0)
('story/data/04/1017/resourcelist/storytimeline_041017002_resources', 'Y5SP72BSJGR2EPW6J3VBSCNG4MEEV5EX', -7047093971992636273)
('story/data/04/1017/storytimeline_041017002', '2IWKOUP3SIYWAU5DDFERLM6PAGHW6LAW', -5230941139625283989)
('lipsync/story/04/1017/story_041017002', 'L3ZQKLZPUN3A3KYCZTZLHNTQ7ITVFUU6', 8488686503124713285)
('sound/c/snd_voi_story_041017002.acb', 'JUSFV2ZHYEKPMIV66KP27HGBSOUCJO3S', 0)
('sound/c/snd_voi_story_041017002.awb', 'KCNCHKD5NT4XMJYJCWMYJNJUEHMYQPL3', 0)
('story/data/04/1017/r

In [18]:
file_packet = ('story/data/04/1017/storytimeline_041017007', 'JQBIEYYTCN37NGKM4BY4Z5CHWTFFT2IG', -5754565692494613402)
chara_id = 1017

In [23]:
def get_hash_given_name_filter(name_filter, meta_cursor):
    res = meta_cursor.execute(f"SELECT n, h, e FROM a WHERE n LIKE '%{name_filter}%'").fetchall()
    return res
def get_filepath_from_hash(h, base_path):
    return os.path.join(base_path, h[:2], h)
# handle timeline
timeline_item = {}
timeline_item['name'] = file_packet[0]
timeline_item['hash'] = file_packet[1]
timeline_item['encryption_key'] = file_packet[2]
timeline_item['path'] = get_filepath_from_hash(timeline_item["hash"], hash_files_base_path)
#print(timeline_item)
timeline_path = timeline_item["path"]
storyline_name = timeline_item["name"]

last_slash_index = storyline_name.rfind("/")
storyline_name = storyline_name[last_slash_index+1:]
env = UmaAssetLoader.load_decrypted(timeline_item)
text_objects = []
for obj in env.objects:
    print(obj.read())
# for obj in env.objects:
#     obj_read = obj.read()
#     if hasattr(obj_read, 'Text'):
#         text_objects.append(obj_read)
# num_blocks = len(text_objects)
# # last block has NextBlock of -1
# # first block has NextBlock of 1
# last_block_block_num = max([x.NextBlock for x in text_objects])
# t_processed_dict = {}
# for t in text_objects:
#     t_processed = {}
#     t_processed['RubyInfo'] = []
#     if t.NextBlock == -1:
#         t_processed['BlockNumber'] = last_block_block_num
#     else:
#         t_processed['BlockNumber'] = t.NextBlock - 1
#     for c in target_columns:
#         t_processed[c] = getattr(t, c)
#     t_processed_dict[t_processed['BlockNumber']] = t_processed
# t_processed_list = []
# for i in range(num_blocks):
#     t_processed_list.append(t_processed_dict[i])

In [28]:
for obj in env.objects:
    print(obj.read())

MonoBehaviour(m_Enabled=1, m_GameObject=PPtr(m_FileID=0, m_PathID=0, assetsfile=<SerializedFile>), m_Name='', m_Script=PPtr(m_FileID=0, m_PathID=313364995533270846, assetsfile=<SerializedFile>))
MonoBehaviour(m_Enabled=1, m_GameObject=PPtr(m_FileID=0, m_PathID=0, assetsfile=<SerializedFile>), m_Name='', m_Script=PPtr(m_FileID=0, m_PathID=4312107810357412935, assetsfile=<SerializedFile>))
MonoBehaviour(m_Enabled=1, m_GameObject=PPtr(m_FileID=0, m_PathID=0, assetsfile=<SerializedFile>), m_Name='', m_Script=PPtr(m_FileID=0, m_PathID=3978898428453910436, assetsfile=<SerializedFile>))
MonoBehaviour(m_Enabled=1, m_GameObject=PPtr(m_FileID=0, m_PathID=0, assetsfile=<SerializedFile>), m_Name='', m_Script=PPtr(m_FileID=0, m_PathID=5600456094340690386, assetsfile=<SerializedFile>))
MonoBehaviour(m_Enabled=1, m_GameObject=PPtr(m_FileID=0, m_PathID=0, assetsfile=<SerializedFile>), m_Name='', m_Script=PPtr(m_FileID=0, m_PathID=5035528732311303024, assetsfile=<SerializedFile>))
MonoBehaviour(m_Enabl

In [32]:
def get_file_packets_by_pointer(chara_id: int):
    meta_con = get_decrypted_meta_connection(meta_file_path)
    master_con = sqlite3.connect(master_file_path)
    meta_cur = meta_con.cursor()
    master_cur = master_con.cursor()

    # Get story IDs (e.g., 41017001)
    story_ids = [x[0] for x in master_cur.execute(
        f"SELECT story_id FROM chara_story_data WHERE chara_id = {chara_id}"
    ).fetchall()]

    all_packets = []

    for s_id in story_ids:
        # Convert 41017001 -> '041017001'
        s_id_str = f"{s_id:09d}" 
        
        # 1. Find Timeline & Ruby (Search for the ID string anywhere in the path)
        res = meta_cur.execute(
            f"SELECT n, h, e FROM a WHERE n LIKE '%{s_id_str}%'"
        ).fetchall()
        
        packet = []
        voice_sheet_ids = set()

        for n, h, e in res:
            # Skip resources/dots, keep the core assets
            if "resource" in n or "dot" in n: continue
            
            item = {'name': n, 'hash': h, 'encryption_key': e, 
                    'path': os.path.join(hash_files_base_path, h[:2], h)}
            packet.append(item)

            # 2. If it's the timeline, parse it to find the REAL audio sheet
            if "storytimeline_" in n:
                try:
                    env = UmaAssetLoader.load_decrypted(item)
                    for obj in env.objects:
                        obj_read = obj.read()
                        # Extract VoiceSheetId (e.g., '041017001')
                        vs_id = getattr(obj_read, 'VoiceSheetId', None)
                        if vs_id:
                            voice_sheet_ids.add(vs_id)
                except Exception as ex:
                    print(f"Error parsing timeline {s_id_str}: {ex}")

        # 3. Explicitly find the audio files requested by the timeline
        for vs_id in voice_sheet_ids:
            # This will find snd_voi_story_041017001 even if we are in story 041017005
            audio_res = meta_cur.execute(
                f"SELECT n, h, e FROM a WHERE n LIKE 'sound/c/snd_voi_story_{vs_id}.%'"
            ).fetchall()
            
            for an, ah, ae in audio_res:
                packet.append({
                    'name': an, 'hash': ah, 'encryption_key': ae,
                    'path': os.path.join(hash_files_base_path, ah[:2], ah)
                })
        
        if packet:
            all_packets.append(packet)
            print(f"Processed {s_id_str}: Found {len(voice_sheet_ids)} audio sheet(s).")

    return all_packets

get_file_packets_by_pointer(1017)

Processed 041017001: Found 1 audio sheet(s).
Processed 041017002: Found 1 audio sheet(s).
Processed 041017003: Found 1 audio sheet(s).
Processed 041017004: Found 1 audio sheet(s).
Processed 041017005: Found 1 audio sheet(s).
Processed 041017006: Found 1 audio sheet(s).
Processed 041017007: Found 1 audio sheet(s).


[[{'name': 'story/data/04/1017/storytimeline_041017001',
   'hash': 'ZBLDS5WHKZB22RUZNRKXB4PVGTFU6JMA',
   'encryption_key': 1859620257465300378,
   'path': 'C:\\Users\\Matt\\Documents\\Games\\Umamusume\\umamusume_Data\\Persistent\\dat\\ZB\\ZBLDS5WHKZB22RUZNRKXB4PVGTFU6JMA'},
  {'name': 'lipsync/story/04/1017/story_041017001',
   'hash': 'L2JQVQZDJIV64MHRATYLTPEAFCRD6SJ2',
   'encryption_key': 1796588984780366541,
   'path': 'C:\\Users\\Matt\\Documents\\Games\\Umamusume\\umamusume_Data\\Persistent\\dat\\L2\\L2JQVQZDJIV64MHRATYLTPEAFCRD6SJ2'},
  {'name': 'sound/c/snd_voi_story_041017001.acb',
   'hash': 'TCHULMENZJA3N6OEJYZ7QZLLKOX5L6WJ',
   'encryption_key': 0,
   'path': 'C:\\Users\\Matt\\Documents\\Games\\Umamusume\\umamusume_Data\\Persistent\\dat\\TC\\TCHULMENZJA3N6OEJYZ7QZLLKOX5L6WJ'},
  {'name': 'sound/c/snd_voi_story_041017001.awb',
   'hash': 'AJPOYMMWYXO5KYQ5SMOND6QUJXFF5TDC',
   'encryption_key': 0,
   'path': 'C:\\Users\\Matt\\Documents\\Games\\Umamusume\\umamusume_Data\\Pers