# Sandbox

Notebook scratch pad.

In [None]:
for song, artist, year in zip(
    ('So What', 'Ruby My Dear', 'Moritat'),
    ('Miles Davis', 'Thelonious Monk', 'Sonny Rollins'),
    (1959, 1951, 1956)):
    print(f'SELECT "{song}"', 'AS song,', f'"{artist}"', 'AS artist,', year, "AS year UNION ALL")

"So What" AS song, "Miles Davis" AS artist, 1959 AS year UNION ALL
"Ruby My Dear" AS song, "Thelonious Monk" AS artist, 1951 AS year UNION ALL
"Moritat" AS song, "Sonny Rollins" AS artist, 1956 AS year UNION ALL


In [None]:
import jsonlines
from jazz_graph.extract_jazz import PERFORMER_ROLES

def has_jazz_instrumental(release_dict: dict) -> bool:
    """Return True is some performer plays an instrument.

    The definition of a jazz instrument here is broad.
    """
    credits = release_dict.get('credits')
    if not credits:
        return False
    for artist in credits:
        if artist.get('role') in PERFORMER_ROLES:
            return True
    return False


files = [
    '../local_data/jazz_artists.jsonl',
    '../local_data/jazz_masters_filtered.jsonl',
    '../local_data/jazz_releases.jsonl'
]

counter = 0
n_enties = 0
seen = set()

# for file_name in files:
#     with jsonlines.open(file_name, 'r') as f:
#         for line in f:
#             try:
#                 entry = f.read()
#             except Exception:
#                 print("EOF? ")
#             print(entry)
#             break
#             n_enties += 1

#             if len(entry['genres']) > 1:
#                 counter += 1
#                 seen.add(tuple(entry['genres']))
# entry

from jazz_graph.data_normalization import normalize_title

def release_titles(release: dict):
    title = release.get('title')
    if title is not None:
        yield title
    tracklist = release.get('tracklist')
    if tracklist is None:
        return
    for listing in tracklist:
        title = listing.get('title')
        if title is not None:
            yield title

def iter_releases():
    with jsonlines.open(files[2]) as f:
        for line in f:
            entity = f.read()
            if has_jazz_instrumental(entity) and len(entity['genres']) < 2:
                title = entity.get('title')
                if entity.get('id', -1) == 12917715:
                    print(entity)
                    break
                if title.lower().startswith('a love sup'):
                    print (f"Found it. {entity.get('title')}")
                if title.lower().startswith('head '):
                    print(f"found another {entity.get('title')}")
                counter += 1
                if counter % 5000 == 0:
                    print(entity)
                for title in release_titles(entity):
                    norm_title = normalize_title(title)
                    if norm_title == 'a love supreme':

                        print(entity)
                    # print(norm_title)

            n_enties += 1
            # if counter > 20:
            #     break

def to_dict_entry(release: dict) -> dict:
    # out = {
    #     'discog_id': None,
    #     'artist_id': None,
    #     'artist_name': None,
    #     'year': None,
    #     'album_title': None
    # }
    out = {}
    out['discog_id'] = release.get('id')
    out['album_title'] = release['title']
    out['album_title_normalized'] = normalize_title(release['title'])
    out['release_year'] = release.get('released').split('-')[0]
    artists = release.get('artists')

    out['artist_name'] = artists[0]['name'] if artists else None
    out['discog_artist_id'] = artists[0]['id'] if artists else None
    return out

def releases_to_csv(release_path: str = 'jazz_releases.jsonl', csv_path: str = 'discogs_musicbrainz.csv', directory: str = ''):
    """Extract releases to a csv file for merging musicbrainz data."""
    entries = []
    import pandas as pd
    import numpy as np
    import os
    full_path = os.path.join(directory, release_path)
    with jsonlines.open(full_path, 'r') as f:
        for line in f:
            release = f.read()
            entries.append(to_dict_entry(release))
    data = pd.DataFrame.from_records(entries).fillna({'discog_id': -1, 'discog_artist_id': -1}).astype({'discog_id': np.int64, 'discog_artist_id': np.int64})
    data.to_csv(os.path.join(directory, csv_path))

with jsonlines.open('../local_data/jazz_releases.jsonl', 'r') as f:
    for line in f:
        release = f.read()
        if len(release['genres']) == 1:
            break

"/workspace/local_data/jazz_releases.jsonl"
print(to_dict_entry(release))
releases_to_csv(directory='../local_data')

{'discog_id': 1226, 'album_title': 'Tourist', 'album_title_normalized': 'tourist', 'release_year': '2000', 'artist_name': 'St Germain', 'discog_artist_id': 74}


In [57]:
import unicodedata

text = "Hôtel"

# NFD: Canonical Decomposition
nfd_text = unicodedata.normalize('NFD', text)
print(f"NFD: {nfd_text} (Length: {len(nfd_text)})")

# NFC: Canonical Composition
nfc_text = unicodedata.normalize('NFC', text)
print(f"NFC: {nfc_text} (Length: {len(nfc_text)})")

# NFKD: Compatibility Decomposition
nfkd_text = unicodedata.normalize('NFKD', text)
print(f"NFKD: {nfkd_text} (Length: {len(nfkd_text)})")

# NFKC: Compatibility Composition
nfkc_text = unicodedata.normalize('NFKC', text)
print(f"NFKC: {nfkc_text} (Length: {len(nfkc_text)})")

# Example with a compatibility character (e.g., superscript 2)
superscript_two = "²"
nfkd_superscript = unicodedata.normalize('NFKD', superscript_two)
print(f"NFKD of '²': {nfkd_superscript}")
text == nfd_text

NFD: Hôtel (Length: 6)
NFC: Hôtel (Length: 5)
NFKD: Hôtel (Length: 6)
NFKC: Hôtel (Length: 5)
NFKD of '²': 2


False

In [69]:
import re
import unicodedata
exp = r'\(\d\.\d\smix\)'
title = 'So What (5.0 Mix)'
title = unicodedata.normalize('NFD', title).lower()
re.search(exp, title)

<re.Match object; span=(8, 17), match='(5.0 mix)'>

In [55]:
'1567'.split('-')

['1567']

In [None]:
import numpy as np
samples = np.array([[
    [1, 1, 1],
    [2, 2, 2],
    [3, 4, 5],
    [0, .9, 4]
]])
w = np.array([
    [.1, .1, .2],
    [.3, 4., .5]
]).reshape(3, 2)
print(samples.shape, w.shape)
samples @ w

## Hetrogenous Graph

Verify Hetrogenous Graph concept.

In [None]:
import torch
import numpy as np
from torch_geometric.data import HeteroData

# NOTE: this is probably reusable in tests.

def example_graph():
    # write out a heterogeneous graph in an adjacency list.
    # this is probably not the best structure, but it's easy to read.
    graph = {}
    graph['musician'] = [0, 1, 2, 3, 4]
    graph['performance'] = list(range(10))
    graph['song'] = list(range(0, 8))

    graph['composes'] = {0: list(range(0, 4)), 1: range(4, 8)}
    graph['performs'] = {
        0: [0, 1, 2, 4, 8, 9],
        1: [3, 5, 6, 7, 9],
        2: [1, 2, 3, 4],
        3: [5, 6, 7, 8],
        4: [9]
    }
    # performance to song
    graph['performing'] = (
        {idx: [idx] for idx in graph['song']}
        | {7: [2, 3], 8: [0], 9: [1]}
    )
    return graph

def edge_map_to_array(mapping: dict[int, list], invert=False):
    """Create sparse array that maps node edge.

    If invert is true, the mapping contains the edges in the top row,
    reversing the direction of the edges from the mapping.

    Assumes directed edges.
    """
    from functools import reduce
    num_edges = reduce(lambda a, b: a + len(b), mapping.values(), 0)
    array = torch.zeros(2, num_edges, dtype=torch.long)
    edge_idx = 0
    for i, (node_i, adj) in enumerate(mapping.items()):
        for node_j in adj:
            if not invert:
                array[0][edge_idx] = node_i
                array[1][edge_idx] = node_j
            else:
                array[0][edge_idx] = node_j
                array[1][edge_idx] = node_i
            # add the edge in the opposite direction.
            # array[0][edge_idx + num_edges // 2] = node_j - j_off
            # array[1][edge_idx + num_edges // 2] = node_i - i_off
            edge_idx += 1
    return array

def test_edge_map_to_array():
    graph = example_graph()
    x = edge_map_to_array(graph['performs'])
    x_invert = edge_map_to_array(graph['performs'], invert=True)
    assert torch.equal(x[0], x_invert[1])
    assert torch.equal(x[1], x_invert[0])
    expected = torch.tensor([
        [0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 4],
        [0, 1, 2, 4, 8, 9, 3, 5, 6, 7, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9]])
    assert x.dtype == torch.long
    np.testing.assert_equal(x.numpy(), expected.numpy())


test_edge_map_to_array()


In [14]:
def example_hetrogenous_data() -> HeteroData:
    graph = example_graph()
    data = HeteroData()
    data['musician'].x = torch.rand((len(graph['musician']), 1))
    data['performance'].x = torch.rand((len(graph['performance']), 1))
    data['song'].x = torch.rand((len(graph['song']), 1))

    data['musician', 'performs', 'performance'].edge_index = edge_map_to_array(graph['performs'])
    data['musician', 'composes', 'song'].edge_index = edge_map_to_array(graph['composes'])
    data['performance', 'performs', 'song'].edge_index = edge_map_to_array(graph['performing'])
    # create the reverse edges, since edges pass messages in both directions.
    # OPTION 2: Use PyG.transforms.ToUndirected()(data) to get the same result of next three lines.
    data['performance', 'performs', 'musician'].edge_index = edge_map_to_array(graph['performs'], invert=True)
    data['song', 'composes', 'musician'].edge_index = edge_map_to_array(graph['composes'], True)
    data['song', 'performs', 'performance'].edge_index = edge_map_to_array(graph['performing'], True)
    return data

data = example_hetrogenous_data()

# tests the data and shows some properties.
assert data.has_isolated_nodes() is False
assert data.has_self_loops() is False
assert data.is_undirected(), "This works, as along as you wrote the edges correctly."
data.metadata()

(['musician', 'performance', 'song'],
 [('musician', 'performs', 'performance'),
  ('musician', 'composes', 'song'),
  ('performance', 'performs', 'song'),
  ('performance', 'performs', 'musician'),
  ('song', 'composes', 'musician'),
  ('song', 'performs', 'performance')])

In [27]:
# example model.

from torch_geometric.nn import SAGEConv, to_hetero

# See https://pytorch-geometric.readthedocs.io/en/2.6.0/cheatsheet/gnn_cheatsheet.html
# for a list of Conv operators that support bipartite graphs.

class GNN(torch.nn.Module):
    def __init__(self, hidden_channels, out_chanels):
        super().__init__()
        self.conv1 = SAGEConv((-1, -1), hidden_channels)
        self.conv2 = SAGEConv((-1, -1), out_chanels)

    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index).relu()
        x = self.conv2(x, edge_index)
        return x

model = GNN(64, 3)
model = to_hetero(model, data.metadata(), aggr='sum')

import time
with torch.no_grad():
    start = time.time()
    y_hat = model(data.x_dict, data.edge_index_dict)
    elapsed = time.time() - start
    print(f"Took {elapsed} seconds. ")
y_hat

Took 0.007594108581542969 seconds. 


{'musician': tensor([[ 0.0796, -0.1573, -0.2088],
         [ 0.2286, -0.2953,  0.0686],
         [-0.6347, -0.4148,  0.0371],
         [-0.9138, -0.5287,  0.0705],
         [-0.6052, -0.4917,  0.1785]]),
 'performance': tensor([[-0.0269, -1.2339, -0.7485],
         [ 0.0395, -1.4116, -0.6841],
         [-0.3795, -0.9934, -0.5707],
         [ 0.0157, -1.1449, -0.6401],
         [-0.3054, -0.9710, -0.6383],
         [-0.3410, -1.1379, -0.5487],
         [-0.3257, -0.7704, -0.6527],
         [-0.2721, -1.0125, -0.5731],
         [-0.2389, -0.9920, -0.6391],
         [ 0.0949, -1.3454, -0.6798]]),
 'song': tensor([[ 1.9740, -2.4685, -0.1463],
         [ 1.6659, -2.4504, -0.1978],
         [ 1.3012, -2.1212, -0.2909],
         [ 1.5592, -2.2539, -0.1619],
         [ 1.5506, -2.0969, -0.3227],
         [ 1.0122, -1.9640, -0.5050],
         [ 2.0204, -2.2588, -0.1939],
         [ 1.0168, -1.7627, -0.3483]])}

## Write some queries.

Scratch pad for generating queries to test/explore Neo4j structures, 
think about data modeling, etc.

In [None]:
songs = ['So What?', "Freddie Freeloader", "All Blues", "Blue in Green", "Flamenco Sketches"]
musicians = ['Bill Evans', 'Wynton Kelly', 'Cannonball Adderley', 'John Coltrane', 'Miles Davis', 'Paul Chambers', 'Jimmy Cobb']
def hook(string):
    split = string.split(' ')
    return split[0] + split[1][:1]

for song in songs:
    print(f"CREATE ({hook(song)}:Song {{name: '{song}'}})")

In [None]:
kind_of_blue_musicians = {
    'Bill Evans': 'Piano',
    'Wynton Kelly': 'Piano',
    'Cannonball Adderley': 'Alto Saxophone',
    'John Coltrane': 'Tenor Saxophone',
    'Miles Davis': 'Trumpet',
    'Paul Chambers': 'Bass',
    'Jimmy Cobb': 'Drums'
}

In [None]:

for musician in musicians:
    for song in songs:
        cypher = f'CREATE ({hook(song)})-[:PERFORMING]->({hook(song)}Song)'
        # print(cypher)

        if "Fred" in song:
            if "Bill" in musician:
                continue
        else:
            if "Wyn" in musician:
                continue
        if 'Blue in' in song and "Cann" in musician:
            continue

        cypher = f"CREATE ({hook(musician)}) -[:PERFORMS {{instrument: '{kind_of_blue_musicians[musician].lower()}'}}]-> ({hook(song)})"
        # cypher = f'({hook(song)})-[:PERFORMING]->({hook(song)}Song)'
        print(cypher)


In [None]:
s = """
CREATE (BillE:Musician {name: 'Bill Evans'})
CREATE (WyntonK:Musician {name: 'Wynton Kelly'})
CREATE (CannonballA:Musician {name: 'Cannonball Adderly'})
CREATE (JohnC:Musician {name: 'John Coltrane'})
CREATE (MilesD:Musician {name: 'Miles Davis'})
CREATE (PaulC:Musician {name: 'Paul Chambers'})
CREATE (JimmyC:Musician {name: 'Jimmy Cobb'})
"""
import re
for e in re.findall(r"'[\w\s]+'", s, flags=re.MULTILINE):
    print(e, ',', sep='', end=' ')

In [None]:
tracks = ["FreddieT", "BlueiT", "AllBT", "SketchesT"]
for track in tracks:
    print(f"CREATE (KindOfBlue) -[:LISTS_TRACK]-> ({track})")

In [None]:
def softmax(x):
    return np.exp(x) / np.sum(np.exp(x))

softmax(np.array([1, 4, -4, 5]))

In [None]:
pip install nbformat

In [None]:
import plotly.graph_objects as go

# ... (define node positions and edge connections) ...


node_x = [1, 2, 3]
node_y = [2, 3, 3]

edge_x = [1, 2, None, 1, 3, None]
edge_y = [2, 3, None, 2, 3, None]
fig = go.Figure(data=[go.Scatter(x=node_x, y=node_y, mode='markers'),
                        go.Scatter(x=edge_x, y=edge_y, mode='lines')])
fig.show()