# Pandas dataframe accessor: Levi graph
## Transform between different graph data repressentations via a Levi graph

https://jendobson.com/2020/06/12/writing-custom-accessors-to-avoid-subclassing-pandas-dataframes/



In [1]:
import numpy as np
import pandas as pd
from scipy import sparse
import networkx as nx
import matplotlib.pyplot as plt
import beartype as bt

plt.rcParams['figure.dpi'] = 60

### Create data to use in development:
- Token Co-occurrence matrix
- Document-term matrix (this is an example of a bipartite structure/incidence matrix)
- COO (format: multi-index pandas Series)


In [2]:
# Create sample dataset

doc_ids = [str(x) for x in range(5)]
texts = [
    "the quick brown fox jumped",
    "the fox jumped",
    "the dog jumped",
    "one brown fox",
    "ten brown dogs",
   ]

df = pd.DataFrame({'doc_id': doc_ids,
                    'text':texts,
                    })

In [3]:
from sklearn.feature_extraction.text import CountVectorizer

cv = CountVectorizer(stop_words=['the'])
cv.fit(df['text'])
tokens = cv.get_feature_names_out()  #alphabetical

In [4]:
# Create cooccurence matrix

results = cv.transform(df['text'])
coocc = results.T.dot(results)
# np.fill_diagonal(coocc.values, 0)
coocc = pd.DataFrame(results.T.dot(results).toarray(), index=tokens, columns=tokens)
np.fill_diagonal(coocc.values, 0)
coocc

Unnamed: 0,brown,dog,dogs,fox,jumped,one,quick,ten
brown,0,0,1,2,1,1,1,1
dog,0,0,0,0,1,0,0,0
dogs,1,0,0,0,0,0,0,1
fox,2,0,0,0,2,1,1,0
jumped,1,1,0,2,0,0,1,0
one,1,0,0,1,0,0,0,0
quick,1,0,0,1,1,0,0,0
ten,1,0,1,0,0,0,0,0


In [5]:
# Create document term matrix (BIPARTITE STRUCTURE)

results = cv.transform(df['text'])
features = cv.get_feature_names_out()
doc_term = pd.DataFrame(results.toarray(), columns=features)
doc_term = doc_term.reindex(columns=doc_term.columns.tolist()+['cat', 'bear', 'tree'], fill_value=0)
doc_term

Unnamed: 0,brown,dog,dogs,fox,jumped,one,quick,ten,cat,bear,tree
0,1,0,0,1,1,0,1,0,0,0,0
1,0,0,0,1,1,0,0,0,0,0,0
2,0,1,0,0,1,0,0,0,0,0,0
3,1,0,0,1,0,1,0,0,0,0,0
4,1,0,1,0,0,0,0,1,0,0,0


In [6]:
# Create COO Dataframe
# using scipy sparse module here, but we want to avoid it in production
coo_matrix = sparse.coo_matrix(doc_term.values)
coo_rows = coo_matrix.row
coo_cols = coo_matrix.col
coo_vals = coo_matrix.data

term_dict = dict(zip(range(0,11), doc_term.columns))
term_dict

{0: 'brown',
 1: 'dog',
 2: 'dogs',
 3: 'fox',
 4: 'jumped',
 5: 'one',
 6: 'quick',
 7: 'ten',
 8: 'cat',
 9: 'bear',
 10: 'tree'}

In [7]:
coo_cols = coo_cols.tolist()
coo_col_names = [term_dict[key] for key in coo_cols]

In [8]:
idx=pd.MultiIndex.from_arrays([coo_matrix.row, coo_col_names])
coo = pd.DataFrame(coo_matrix.data, index=idx, columns=["flag"])
coo

Unnamed: 0,Unnamed: 1,flag
0,brown,1
0,fox,1
0,jumped,1
0,quick,1
1,fox,1
1,jumped,1
2,dog,1
2,jumped,1
3,brown,1
3,fox,1


In [9]:
levi_series = coo.squeeze()
levi_series

0  brown     1
   fox       1
   jumped    1
   quick     1
1  fox       1
   jumped    1
2  dog       1
   jumped    1
3  brown     1
   fox       1
   one       1
4  brown     1
   dogs      1
   ten       1
Name: flag, dtype: int64

In [10]:
from levi import dataframe, series

levi_series.levi.to_edgelist(level_0="doc", level_1="term")

*** TEST ***


  from tqdm.autonotebook import tqdm


Unnamed: 0,doc,term,flag
0,0,brown,1
1,0,fox,1
2,0,jumped,1
3,0,quick,1
4,1,fox,1
5,1,jumped,1
6,2,dog,1
7,2,jumped,1
8,3,brown,1
9,3,fox,1


In [11]:
series_empty = pd.Series([0,7,8])
series_empty

0    0
1    7
2    8
dtype: int64

In [12]:
series_empty.levi.to

BeartypeCallHintParamViolation: @beartyped levi.series.LeviAccessor._validate() parameter obj="0    0
1    7
2    8
dtype: int64" violates type hint typing.Annotated[pandas.core.series.Series, IsInstance[pandas.core.series.Series] & IsAttr['index', IsInstance[pandas.core.indexes.multi.MultiIndex]]], as <protocol "pandas.core.series.Series"> "0    0
1    7
2    8
dtype: int64" violates validator IsInstance[pandas.core.series.Series] & IsAttr['index', IsInstance[pandas.core.indexes.multi.MultiIndex]]:
    False == (
     True ==     IsInstance[pandas.core.series.Series] &
    False ==     IsAttr['index', IsInstance[pandas.core.indexes.multi.MultiIndex]]
             ).

In [None]:
from levi import series

levi_series.levi.to_adjacency

In [None]:
coo.squeeze()

In [None]:
print(coo_matrix)  #scipy sparse format

In [None]:
def edgelist_to_incidence(edgelist, node_colname, value_colname=None):
    """assume edgelist is indexed by edge number, not some edge set of names (for now)"""
    data = np.ones_like(edgelist.index.values) if value_colname == None else edgelist[value_colname].values
    return sparse.coo_array((data, (edgelist.index, edgelist[node_colname].cat.codes)), shape=(edgelist.shape[0], len(edgelist[node_colname].cat.categories)))  # coo_array((data, (row_idx, col_idx)))

In [None]:
edgelist = levi_series.levi.to_edgelist(level_0="doc", level_1="term")
edgelist['doc'] = pd.Categorical(edgelist.doc)
edgelist['term'] = pd.Categorical(edgelist.term)

In [None]:
edgelist

In [None]:
inc = edgelist_to_incidence(edgelist, node_colname='term')
print(inc)

In [None]:
inc_frame = pd.DataFrame(inc.data, index=edgelist)
inc_frame

In [None]:
# tuple_list = list(zip(coo_rows, coo_col_names, coo_matrix.data))
tuple_list = list(zip(coo_rows, coo_col_names, ))
tuple_list_w_data = [(*t, 1) for t in tuple_list]

In [None]:
import itertools
row_index = dict(zip(tuple_list_w_data, itertools.count()))
row_index

In [None]:
# array_x = nx.algorithms.bipartite.biadjacency_matrix(graph, row_order=tuple_list_w_data)
# array_x

In [None]:
# Make sure pandas categorical works with ints
#   - in nb, ss example has index as ints, not categoricals
#   - pandas_categoricals

In [None]:
from phantom import Phantom
from phantom.predicates import Predicate
import beartype.typing as bt
from beartype.door import is_bearable


In [None]:
def of_beartype(t: bt.Union[type, bt.Tuple[type, ...]]) -> Predicate[object]:
    """ From `phantom.predicates.generic.of_type`

    Create a new predicate that succeeds when its argument is bearable on ``t``.
    """

    def check(a: object) -> bool:
        return is_bearable(a, t)

    return check

In [None]:
# chck = of_beartype(coo)
# chck(coo)

In [None]:
coo.unstack(level=1, )

In [None]:
type(coocc)

In [None]:
import levi

coo.levi

For now, assume "input" is a doc-term matrix

In [None]:
# from affinity-to-edge

def affinity_to_edge(source_name,  # type: str
    target_name,  # type: str
    affinity,  # type: DF
    value_name=None  # type: Optional[str]
    ):
# type: (...) -> DF
    """DEPRECATED"""
    return (affinity.reset_index().melt(source_name, value_name=value_name, var_name=target_name).query('weight>0'))


In [None]:
doc_term

## pandas-flavor

In [None]:
# example code using pandas-flavor

def add_method(key, val, fn_name=None):  
    def fn(df):
        return df.loc[df[key] == val]

    if fn_name is None:
        fn_name = f'{key}_{val}'

    fn.__name__ = fn_name
    fn = pf.register_dataframe_method(fn)
    return fn

for name1 in ['john', 'lisa']:
    add_method('name1', name1)

for name2 in ['fay', 'meg', 'wil']:
    add_method('name2', name2)


# OR, another approach:

@pf.register_dataframe_method
def name1(df, val):
    return df.loc[df['name1'] == val]

@pf.register_dataframe_method
def name2(df, val):
    return df.loc[df['name2'] == val]

test.name1('lisa')
#   name1 name2  scoreA  scoreB
# 1  lisa   wil    9.67    8.87
# 2  lisa   fay    3.41    5.04
# 3  lisa   wil    0.58    6.12

test.name1('lisa').name2('wil')
#   name1 name2  scoreA  scoreB
# 1  lisa   wil    9.67    8.87
# 3  lisa   wil    0.58    6.12

### Type validation, `beartype`

In [None]:
from beartype import beartype
from beartype import typing as bt
from beartype.door import TypeHint as th

from typing_extensions import NamedTuple
import static_frame as sf

from dataclasses import dataclass, InitVar
import networkx as nx
# from typing import NamedTuple
from IPython.display import display
import numpy as np
from numpy.typing import DTypeLike

In [None]:
from beartype import beartype
from beartype.vale import IsAttr, IsEqual, IsSubclass, IsInstance
from typing import Annotated  

In [None]:
# Type hint matching only two-dimensional NumPy arrays of floats of
# arbitrary precision. This time, do it faster than anyone has ever
# type-checked NumPy arrays before. (Cue sonic boom, Chuck Yeager.)
import numpy as np
Numpy2DFloatArray = Annotated[np.ndarray,
    IsAttr['ndim', IsEqual[2]] &
    IsAttr['dtype', IsAttr['type', IsSubclass[np.floating]]]
]

In [None]:
levi_series.index

In [None]:
series_2Dindex = Annotated[pd.Series, # need this Series?
    IsInstance[pd.Series] &
    IsAttr['index', IsInstance[pd.MultiIndex]]
]

In [None]:
@beartype
def test_series_stuff(levi: series_2Dindex):
    print("*** TEST ***")

test_series_stuff(levi_series)

In [None]:
test_series_stuff(edgelist)

In [None]:
test_series_stuff(levi)

In [None]:
# Annotate @beartype-decorated callables with beartype validators.
@beartype
def polygon_area(polygon: Numpy2DFloatArray) -> float:
    '''
    Area of a two-dimensional polygon of floats defined as a set of
    counter-clockwise points, calculated via Green's theorem.

    *Don't ask.*
    '''

    # Calculate and return the desired area. Pretend we understand this.
    polygon_rolled = np.roll(polygon, -1, axis=0)
    return np.abs(0.5*np.sum(
        polygon[:,0]*polygon_rolled[:,1] -
        polygon_rolled[:,0]*polygon[:,1]))


In [None]:
from beartype.vale import IsInstance
class Token(str):
    ...

@beartype
class Tester(bt.Annotated[str, IsInstance[Token]]):
    ...

isinstance(Tester('abc'), Token)

In [None]:
# # Import the requisite machinery.
# from beartype.vale import Is
# from typing import Annotated   # <--------------- if Python ≥ 3.9.0

# # Type hint matching only strings with lengths ranging [4, 40].
# LengthyString = Annotated[str, Is[lambda text: 4 <= len(text) <= 40]]

# test_typing = Annotated[pd.Series, Is[pd.Series]]
# test_typing


In [None]:
# ids = ('id-student', 'id-project')

# testing = doc_term.reset_index().melt(0, value_name="brown", ).query('weight>0')   #var_name=target_name
# testing

In [None]:
pd.DataFrame.melt

testing = doc_term.reset_index().melt(source_name, value_name=value_name, var_name=target_name).query('weight>0')
testing

In [None]:

coo.levi.to_biadjacency

In [None]:
graph = nx.algorithms.bipartite.from_biadjacency_matrix(coo_matrix)
sets = nx.get_node_attributes(graph, name="bipartite")
colors = {0: 'gold', 1: 'lightskyblue'}

nx.draw(graph, with_labels=True, node_color=[colors[node[1]['bipartite']] 
                    for node in graph.nodes(data=True)])


In [None]:
graph.edges()

In [None]:
# Plotting with HyperNetx
import hypernetx as hnx

In [None]:
term_doc = doc_term.T

In [None]:
# *HNX* hypergraphs can be built from networkx bipartite graph objects using `from_bipartite`
H = hnx.Hypergraph.from_dataframe(term_doc)

In [None]:
hnx.drawing.draw(H)