# WORDLE Sheets Loader

This notebook contains the code required to extract Wordle scores from an iMessage thread and to load them to a Google Sheet. It was developed using Mac OS X Ventura 13.5.2 on Apple Silicon (M1 chip) and Python 3.11.5.

## Required Setup
* (Optional): If you have conda installed, create a Conda environment using env.txt file:
    * conda env create --file wordle-py-env.txt
* Allow access Full Disk Access for Terminal application so Python can read iMessage database:
    * System Settings > Privacy & Security > Full Disk Access > enable Terminal (use `+` icon to add if needed)
    * Confirm you can access the `chat.db` file from Terminal by running `ls -l ~/Library/Messages/chat.db`
    * Add the full file path (`/Users/<username>/Library/Messages/chat.db`) to `config.yaml` under `message_db_path`
* Create a Google Sheet and copy and paste the `sharing link` into the `sheets_link` value in `config.yaml`
* Create a Service Account to allow access to your Google Sheet
    * The [Authentication section of the gspread docs](https://docs.gspread.org/en/latest/oauth2.html) have good instructions. I opted to create a Service Account and a Service Account key JSON file.
    * Save the JSON file to a safe location on your local system and add the path as the `service_account_key_path` value in `config.yaml`
* Follow steps below to find one (or more) `chat_ids` that correspond to the iMessage threads your Wordle score messages are in
    * fill in `chat_ids` in `config.yaml`
    * Create entries in `handle_dict` in `config.yaml` to translate phone numbers and/or iMessage email IDs into names
    * Put your own name in the `is_from_me_name` in `config.yaml`


### References:
* Article: https://towardsdatascience.com/heres-how-you-can-access-your-entire-imessage-history-on-your-mac-f8878276c6e9
* GitHub code - iMessage DB analysis: https://github.com/yortos/imessage-analysis
* iMessage reader - incl. bytestring decoding: https://github.com/niftycode/imessage_reader


In [1]:
import os
import json

import gspread
import pandas as pd
import sqlite3

In [2]:
# pandas display options - tweak as needed
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 100)
pd.set_option('display.width', 200)
pd.set_option('display.expand_frame_repr', False)
pd.set_option('display.max_colwidth', None)

# prevents certain strings in displayed text fields from being interpreted as mathematical formulas
pd.set_option('display.html.use_mathjax', False)

## Extract Messages from Local iMessage DB

In [36]:
import yaml
config = yaml.safe_load(open("./config.yaml"))

In [37]:
config

{'message_db_path': '/Users/<username>/Library/Messages/chat.db',
 'sheets_link': 'https://docs.google.com/spreadsheets/d/abcdefg/edit?usp=sharing',
 'service_account_key_path': '/Users/<username>/my_secret_key.json',
 'chat_ids': [4, 1406],
 'handle_dict': {'+13035551234': 'Alice',
  '+13035552345': 'Bob',
  '+13035553456': 'Foo',
  'fake_address@gmail.com': 'Foo'},
 'is_from_me_name': 'Baz',
 'did_not_finish_score': 7}

In [5]:
print(f'Accessing local iMessage chat.db...')

Accessing local iMessage chat.db...


In [6]:
# find local chat.db and establish a connection
conn = sqlite3.connect(config['message_db_path'])
cur = conn.cursor()

# query the database to get all the table names
cur.execute(" select name from sqlite_master where type = 'table' ")

for name in cur.fetchall():
    print(name)

('_SqliteDatabaseProperties',)
('chat_message_join',)
('deleted_messages',)
('sqlite_sequence',)
('chat_recoverable_message_join',)
('handle',)
('sync_deleted_chats',)
('kvtable',)
('sync_deleted_attachments',)
('sync_deleted_messages',)
('unsynced_removed_recoverable_messages',)
('recoverable_message_part',)
('chat_handle_join',)
('message_attachment_join',)
('message_processing_task',)
('message',)
('chat',)
('attachment',)
('sqlite_stat1',)


In [7]:
# find chat_id(s) for iMessage thread(s) from which wordle scores are to be extracted
# search for messages using `text` field - can't easily wildcard blob column in SQL due to data type

pd.read_sql_query("""
SELECT c.chat_id,
    c.message_id,
    m.text
FROM message m
    LEFT JOIN chat_message_join c
    ON m.ROWID = c.message_id
WHERE m.text LIKE '%Wordle 258%'
ORDER BY date DESC
LIMIT 100
""", conn)

Unnamed: 0,chat_id,message_id,text
0,1406,24646,Wordle 258 5/6\n\n⬛⬛🟨⬛🟨\n🟩🟨⬛🟨⬛\n🟩⬛⬛⬛⬛\n🟩⬛🟩⬛🟩\n🟩🟩🟩🟩🟩
1,1406,24633,Emphasized “Wordle 258 2/6\n\n🟩🟨⬛🟨⬛\n🟩🟩🟩🟩🟩”
2,1406,24619,Wordle 258 4/6\n\n🟨🟨🟨⬜⬜\n⬜🟩🟩🟩⬜\n⬜🟩🟩🟩⬜\n🟩🟩🟩🟩🟩
3,1406,24726,Wordle 258 2/6\n\n🟩🟨⬛🟨⬛\n🟩🟩🟩🟩🟩
4,1406,24670,Wordle 258 4/6\n\n⬛⬛🟨⬛🟨\n⬛⬛🟩🟩🟩\n⬛⬛🟩🟩🟩\n🟩🟩🟩🟩🟩
5,1406,24786,Wordle 258 4/6\n\n🟨⬜⬜⬜🟨\n⬜🟩🟩🟩⬜\n⬜🟩🟩🟩⬜\n🟩🟩🟩🟩🟩
6,1406,24730,Wordle 258 4/6\n\n🟩🟨⬛🟨⬛\n🟩⬛⬛🟨🟨\n🟩⬛🟩⬛🟩\n🟩🟩🟩🟩🟩


In [None]:
# Main SQL query to load all non-reaction messages from specified iMessage threads
# NOTE: uncomment the `select *` rows to see what else is in the tables

chat_id_list = [f"c.chat_id = {x}" for x in config['chat_ids']]
chat_id_string = ' OR '.join(chat_id_list)

query = f"""
SELECT h.id AS handle
    , c.message_id
    , c.chat_id
    , m.is_from_me
    , m.date
    , m.text
    , m.attributedBody
    --, h.*
    --, c.*
    --, m.*
FROM message m
    LEFT JOIN chat_message_join c
        ON m.ROWID = c.message_id
    LEFT JOIN handle h
        ON m.handle_id = h.ROWID
WHERE m.associated_message_guid IS NULL --filters out reaction messages
    AND m.attributedBody IS NOT NULL --filters out messages with no body
    AND ({chat_id_string})
ORDER BY date DESC
"""

emojis_df = pd.read_sql_query(query, conn)

emojis_df.head(10)

In [9]:
def decode_streamtyped_blob(blob: str) -> str:
    """Function to extract messages contained in binary blob under the attributedBody field
    in messages table in iMessage chat.db.

    Parameters
    ----------
    blob : str
        Binary blob from attributedBody field from iMessage db.

    Returns
    -------
    str
        Decoded message string

    """
    try:
        text = blob.split(b"NSString")[1]
        text = text[5:]  # stripping some preamble which generally looks like this: b'\x01\x94\x84\x01+'
        
        # this 129 is b'\x81, python indexes byte strings as ints,
        # this is equivalent to text[0:1] == b'\x81'
        if text[0] == 129:
            length = int.from_bytes(text[1:3], "little")
            text = text[3: length + 3]
        else:
            length = text[0]
            text = text[1: length + 1]
        return text.decode()

    except Exception as e:
        print(blob)
        print(e)
        return "ERROR: Can't decode message."

In [10]:
emojis_df['decoded_message'] = emojis_df['attributedBody'].apply(lambda x: decode_streamtyped_blob(x))

In [None]:
# text column is only sometimes populated, so we need to manually decode the attributedBody column
emojis_df[emojis_df['text'] != emojis_df['decoded_message']]

In [12]:
emojis_df.drop(['text', 'attributedBody'], axis=1, inplace=True)

# Keep only messages with 'Wordle ?/6' syntax
filtered_df = emojis_df[emojis_df['decoded_message'].str.contains('Wordle.*\/6.*', regex=True, na=False)]

# Keep only messages with wordle emoji squares in them
filtered_df = filtered_df[filtered_df['decoded_message'].str.contains('(?:⬛|⬜|🟨|🟦|🟩|🟧)', regex=True, na=False)]

In [13]:
# Create new column with just extracted scores and fill in X / did not finish scores
filtered_df['score'] = filtered_df['decoded_message'].str.extract('(?:Wordle \d+ )(\d)(?:/\d)', expand=True)
filtered_df['score'] = filtered_df['score'].fillna(config['did_not_finish_score']).astype(int)

In [14]:
# Create new column with game numbers
filtered_df['game_num'] = filtered_df['decoded_message'].str.extract('(?:Wordle )(\d\d\d)(?: ./6)', expand=True)

In [None]:
filtered_df.head()

In [17]:
print(f"Found {len(filtered_df.index)} scores in iMessage threads.")

Found 2725 scores in iMessage threads.


## Convert `handles` (phone numbers/emails) and `is_from_me` flags to names

In [None]:
# Spot check to see which handles exist and need to be added to `handle_dict` in config.yaml
print(filtered_df['handle'].unique())

In [19]:
# Use dictionary from config.yaml to translate numbers & emails to names
filtered_df['name'] = filtered_df['handle'].replace(config['handle_dict'])

In [20]:
# Use `is_from_me` flags to fill in name from config.yaml
filtered_df.loc[filtered_df['is_from_me'] == 1, 'name'] = config['is_from_me_name']

In [21]:
# make sure there are no unassigned scores
assert len(filtered_df[filtered_df['name'].isnull()]) == 0
assert len(filtered_df[filtered_df['name'] == '']) == 0

In [None]:
# make sure all handles were translated correctly
print(filtered_df['name'].unique())

In [23]:
scores_df = filtered_df[['name','game_num','score']].copy()

In [24]:
# make sure there is only one score per game per person
dedup_scores_df = scores_df.groupby(['name', 'game_num']).head(1)

In [None]:
dedup_scores_df.head(6)

In [26]:
game_idx_df = dedup_scores_df.set_index('game_num')

In [27]:
# Pivot table & clean up
df_format_asc = game_idx_df.pivot(columns='name')
df_format_desc = df_format_asc.sort_index(ascending=False)
df_format_desc.fillna('', inplace=True)

# Change game numbers from index to column
df_format_desc.reset_index(inplace=True)

# cast game numbers to int
df_format_desc.game_num = df_format_desc.game_num.astype(int)

In [None]:
# get list of names to upload them separately
names_list = df_format_asc.columns.levels[1].tolist()

## Google Sheets API

In [29]:
gc = gspread.service_account(filename=config['service_account_key_path'])
sh = gc.open_by_url(config['sheets_link'])
wks = sh.get_worksheet(0)

# Confirm we can read from worksheet
if (type(wks.acell('B1').value)) == str:
    print("Successfully accessed Google Sheet.")

Successfully accessed Google Sheet.


In [None]:
# Update list of names in case order changed - edit cell range if needed
wks.update('B1:G1', [names_list])

In [31]:
game_nums = df_format_desc.game_num
print(f'Writing scores to Google Sheet. Game number range: {game_nums.min()} - {game_nums.max()}')

Writing scores to Google Sheet. Game number range: 224 - 831


In [None]:
# Extract values only as list
scores_list = df_format_desc.values.tolist()

# Write scores to Google Sheet - edit cell range if needed
scores_logs = wks.update(f'A5:G{len(scores_list) + 4}', df_format_desc.values.tolist())
print(f'Google Sheet update confirmation:\n{json.dumps(scores_logs, indent = 4)}')

In [None]:
print(f"Google Sheet link:\n{config['sheets_link']}")

In [34]:
# close connections to local database
cur.close()
conn.close()

In [35]:
# open Google sheet in Safari
os.system(f"open -a Safari {config['sheets_link']}")

0