<a href="https://colab.research.google.com/github/quadrismegistus/character-networks/blob/main/GenerateFictionalSocialNetwork.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Generating character networks from novels and books

### 🏃‍♀️ Starting up

In [None]:
#@title 📚 Choose a book to analyze
#@markdown Enter the title of a book, along with a URL to a text file. To find a URL, search:
#@markdown * [Project Gutenberg](https://gutenberg.org) for out of copyright texts
#@markdown * [Z-lib](https://gutenberg.org) for in-copyright texts

title = "Pamela" #@param {type:"string"}
url = "http://www.gutenberg.org/files/6124/6124-0.txt" #@param {type:"string"}



#### Initial initial imports and settings
import os,sys
from google.colab import drive,files
from ipywidgets import widgets
from IPython.display import Markdown, display
# nicer print func
def printm(x): display(Markdown(x))
# elementary dirs
PATH_ROOT='/content'
PATH_LIB=os.path.join(PATH_ROOT,'lib')
PATH_TMP=os.path.join(PATH_ROOT,'tmp')
PATH_TMP_UPLOAD_FN_PRE=os.path.join(PATH_TMP,'uploaded_text')
if not os.path.exists(PATH_ROOT): os.makedirs(PATH_ROOT)
if not os.path.exists(PATH_LIB): os.makedirs(PATH_LIB)
if not os.path.exists(PATH_TMP): os.makedirs(PATH_TMP)
# add lib to python path
sys.path.insert(0,PATH_LIB)

# offer button as well
def upload_f(x,tmpfnpre=PATH_TMP_UPLOAD_FN_PRE):
    from google.colab import files
    res = files.upload()
    fn=list(res.keys())[0]
    fnpre,fnext=os.path.splitext(fn)
    tmpfn=tmpfnpre+fnext
    !mv "$fn" "$tmpfn"

printm('### Alternatively to a URL, you can upload a file (txt, epub, pdf, docx, ...)')
ubutton = widgets.Button(description="Upload text")
ubutton.on_click(upload_f)
display(ubutton)

In [None]:
#@title 📂 Choose where to store data
#@markdown ##### Save text data to Google drive?
#@markdown <small>This will save all generated data to your Google Drive, making downloading/uploading annotations and other data unnecessary. If yes, specify a directory path relative to your root Drive folder.</small>
save = "Yes, save in Google Drive" #@param ["Yes, save in Google Drive", "No, keep on temporary storage"]
path = "DigHum/CharacterNetworks" #@param {type:"string"}

In [None]:
#@title Once you are done, click on menu item **Runtime > Run all**

printm('#### Checking input')
# Load widget code
# Valid input?
class InvalidInput(Exception):
    def _render_traceback_(self): pass
NOVEL_URL=url
NOVEL_TITLE_NICE=title.strip()
NOVEL_TITLE=NOVEL_TITLE_NICE.title().replace(' ','')
if not NOVEL_TITLE:
    print('Type a title in the box above')
    raise InvalidInput
else:
    printm(f'* Using title: **{NOVEL_TITLE_NICE}**')
    printm(f'* Using filename: **{NOVEL_TITLE}**')




PATH_TOOLS=os.path.join('/content','tools')
PATH_NOVELS=os.path.join(PATH_ROOT,'texts')
PATH_TO_BOOKNLP=os.path.join(PATH_TOOLS,'book-nlp')
URL_CORENLP='http://nlp.stanford.edu/software/stanford-corenlp-4.1.0.zip'
MODEL_FN='stanford-corenlp-4.1.0-models.jar'
PATH_TO_BOOKNLP_BINARY=os.path.abspath(os.path.join(PATH_TO_BOOKNLP,'runjava'))
PATH_NOVEL=os.path.join(PATH_NOVELS,NOVEL_TITLE)
ofn_novel=os.path.join(PATH_NOVEL,f'{NOVEL_TITLE}.txt')
ofn=os.path.join(PATH_NOVEL,f'data.parses.{NOVEL_TITLE}.jsonl')
ofn_meta=os.path.join(PATH_NOVEL,f'data.charmeta.{NOVEL_TITLE}.csv')
ofn_meta_anno=os.path.join(PATH_NOVEL,f'data.charmeta.{NOVEL_TITLE}.anno.csv')
ofn_booknlp_out=os.path.splitext(ofn_novel)[0]+'.booknlp'
ofn_booknlp_toks=os.path.join(ofn_booknlp_out,'tokens.txt')
ofn_fig_dir=os.path.join(PATH_NOVEL,'imgs')
ofn_gif=os.path.join(PATH_NOVEL,'anim.gif')
ofn_mp4=os.path.join(PATH_NOVEL,'anim.mp4')
printm(f'* Path to novel data set to: **{PATH_NOVEL}**')
####

# set up symlinks
if save:
    PATH_ROOT_DRIVE=os.path.join('/content/drive','My Drive',path)
    if not os.path.exists(PATH_ROOT_DRIVE): os.makedirs(PATH_ROOT_DRIVE)
    PATH_NOVELS_DRIVE=os.path.join(PATH_ROOT_DRIVE,'texts')
    if not os.path.exists(PATH_NOVELS):
        #@todo what if user changes mind about gdrive?
        os.symlink(PATH_NOVELS_DRIVE, PATH_NOVELS)
    printm(f'* Linking: {PATH_NOVELS} --> **{PATH_NOVELS_DRIVE}**')
if not os.path.exists(PATH_NOVEL): os.makedirs(PATH_NOVEL)
if not os.path.exists(PATH_TOOLS): os.makedirs(PATH_TOOLS)


### getting text using code from a gist
!pip install bs4 kitchen wget fulltext epub-conversion pymupdf requests xml_cleaner html2text -q -q
import urllib.request
gisturl='https://gist.githubusercontent.com/quadrismegistus/f76c2ffcccedc496a638ca430b6851ab/raw/ede9744a01adc324a6223b924789f1553853f91c/brute_txt.py'
urllib.request.urlretrieve(gisturl,os.path.join(PATH_LIB,'brute_txt.py'))
from brute_txt import brute

# is uploaded file?
tmp_fns=os.listdir(PATH_TMP)
matches=[tmp_fn for tmp_fn in tmp_fns if tmp_fn.startswith(os.path.basename(PATH_TMP_UPLOAD_FN_PRE))]
if any(matches):
    match=matches[0]
    matchpath=os.path.join(PATH_TMP,match)
    printm(f'* Loading text from uploaded file: {match}')
    NOVEL_TXT=brute(matchpath)
    os.remove(matchpath)
elif url:
    printm(f'* Loading text from URL: {url}')
    NOVEL_TXT=brute(url)
else:
    printm('No uploaded file nor URL to file')
    raise InvalidInput
# save text
if not NOVEL_TXT:
    printm('Empty text')
    raise InvalidInput
else:
    printm(f'* Loaded book with **{len(NOVEL_TXT.strip().split())}** words')
    with open(ofn_novel,'w') as of: of.write(NOVEL_TXT)

## 🔩 Installations

In [None]:
#@title Install dependencies
!pip install dynetx fa2 pandas colour dimcli numpy moviepy ffmpeg pyvis networkx gender-guesser imageio-ffmpeg imageio -q


#@title Import modules
# imports
import os,sys
import pandas as pd
import networkx as nx
import dynetx as dn
from collections import Counter
from shutil import which
from colour import Color
import numpy as np
from ipywidgets import interact, interactive, fixed, interact_manual, widgets
import warnings
warnings.filterwarnings('ignore')
import math,os
from tqdm import tqdm
from collections import defaultdict
import plotly.express as px
pd.options.display.max_rows=25

In [None]:
#@title Install Booknlp
os.chdir(PATH_TOOLS)
if not os.path.exists(PATH_TO_BOOKNLP):
    !git clone https://github.com/dbamman/book-nlp
PATH_BOOKNLP_MODELS=os.path.join(PATH_TO_BOOKNLP,'lib',MODEL_FN)
if not os.path.exists(PATH_BOOKNLP_MODELS):
    !wget $URL_CORENLP
    corenlp_fn=URL_CORENLP.split('/')[-1]
    corenlp_dir=f'{corenlp_fn.split(".zip")[0]}'
    !unzip -q "$corenlp_fn"
    ifnfn=f'{corenlp_fn.split(".zip")[0]}/{MODEL_FN}'
    !mv "$ifnfn" "$PATH_BOOKNLP_MODELS"
    !rm "$corenlp_fn"
    !rm -rf "$corenlp_dir"
os.chdir(PATH_ROOT)

## 🔨 Parse

This will take ~15 minutes.

In [None]:
#@title Parse text using BookNLP
def parse_text(path_txt):
    if not path_txt: return    
    path_out=ofn_booknlp_out
    path_toks=ofn_booknlp_toks
    cmd=f'cd "{PATH_TO_BOOKNLP}" && ./runjava novels/BookNLP -doc {path_txt} -printHTML -p {path_out} -tok {path_toks} -f'
    # print('>>',cmd)
    !{cmd} #os.system(cmd)
    os.rename(os.path.join(path_out,'book.id.html'), os.path.join(path_out,'parsed.html'))
    os.rename(os.path.join(path_out,'book.id.book'), os.path.join(path_out,'parsed.json'))

#@title
# Parse! This will take 10-15 minutes for most novels... time to make coffee?
if not os.path.exists(ofn_booknlp_toks): parse_text(ofn_novel)

## 🤔 Examine results

In [None]:
#@title Load generated character metadata
def read_parsed_json(path_parsed_json):
    import json,os
    from collections import defaultdict,Counter
    #json.loads(path_parsed_json)
    dat=json.load(open(path_parsed_json))
    keys=[]
    nullchar=defaultdict(Counter)
    text_id=path_parsed_json.split('/')[-3]
    for char in dat['characters']:
        if not char['names']: continue
        names=[x['n'] for x in char['names']]
        
        chardx={'name':names[0], 'id':char['id'], 'names':', '.join(names), 'text_id':text_id}
        num=0
        for key in ['agent','patient','poss','mod','speaking']:
            chardx['num_'+key]=len(char[key])
            num+=chardx['num_'+key]
            # chardx['words_'+key]=words=[]
            # for event in char[key]:
            #     if 'w' in event:
            #         wtxt=event['w']
            #         wlist=word_tokenize(wtxt) if ' ' in wtxt else [wtxt]
            #         wlist=[w.lower() for w in wlist if w and w[0].isalpha()]
            #         words+=wlist
        num+=1
        chardx['num']=num
        yield chardx

# Load jsons into character metadata
char_jsons=list(read_parsed_json(os.path.join(ofn_booknlp_out,'parsed.json')))
char_df=pd.DataFrame(char_jsons)
id2name=dict(zip(char_df.id,char_df.name))
printm(f'Found **{len(char_jsons)}** characters, for a total of **{sum(char_df.num)}** mentions.')
topn=10
printm(f'#### Top {topn} characters')
char_df.sort_values('num',ascending=False)[['name','num','names']].head(topn)

In [None]:
#@title Load word-by-word parse data
# load and gen
tok_df=pd.read_csv(ofn_booknlp_toks,sep='\t')
tok_df['isChar']=tok_df['characterId'].apply(lambda x: int(x!=-1))
tok_df['isWord']=1

# show
printm(f'* Number of words {len(tok_df)} words')
printm(f'* Number of sentences {len(set(tok_df.sentenceID))} words')
printm(f'* Number of paragraphs: {len(set(tok_df.paragraphId))}')
printm(f'* Number of character mentions: {sum(tok_df.isChar)}')
# tok_df

In [None]:
#@title Prepare metadata for manual annotation
if 1:# not os.path.exists(ofn_meta):
    # Guess gender of characters?
    import gender_guesser.detector as gender
    gd = gender.Detector()

    prefix_clues={
        'female':{'Ms.', 'Ms ','Mrs.','Mrs ','Miss ','Madame','Mme.','Mme ','Signorina','Maestra '},
        'male':{'Don ','Signor ','Mr.','Mr ','Maestro '}
    }

    def guess_gender(name,gd=gd):
        if not name: return None

        for gndr,prfxs in prefix_clues.items():
            for p in prfxs:
                if name.startswith(p):
                    return gndr
        
        gend=gd.get_gender(name).replace('mostly_','')
        return gend
    char_df['gender']=char_df.name.apply(guess_gender)
    dfm=char_df#.groupby(['name','gender','id']).sum().reset_index()

    # create other fields
    dfm['name_real']=dfm['name']
    dfm['other']=''
    dfm['notes']=''
    dfm['race']=''
    dfm['class']=''
    dfm_save=dfm[['id','name','name_real','num','gender','race','class','other','notes']].sort_values('num',ascending=False)
    dfm_save.to_csv(ofn_meta,index=False)
    # if not os.path.exists(ofn_meta_anno):
        # dfm_save.to_csv(ofn_meta_anno,index=False)
    printm('#### Metadata generated automatically')
    display(dfm_save)

In [None]:
#@title Download metadata for manual annotation
from google.colab import files
def download_anno(x):
    files.download(ofn_meta)
dbutton = widgets.Button(description="Download as CSV")
dbutton.on_click(download_anno)
printm('''Open in a spreadsheet editor (e.g. excel) and change the names
in the column 'name_real'to rename the character.
Delete the name there to declare that character name invalid.
The other columns allow you to set variables to color or size the networks by.
''')
display(dbutton)

In [None]:
#@title Upload manually refined character metadata 
printm('''Open in a spreadsheet editor (e.g. excel) and change the names
in the column 'name_real'to rename the character.
Delete the name there to declare that character name invalid.
The other columns allow you to set variables to color or size the networks by.

When you're done, click "Upload" and upload the new CSV below.
If you saved your sheet as an excel file, export it as CSV before uploading.
''')




def upload_anno(x):
    FILES = files.upload()
    if not FILES: return
    fn=list(FILES.keys())[0]
    fnpre,ext=os.path.splitext(fn)
    if ext not in {'.csv'}:
        print('File must be a CSV file. (e.g. In excel, export as CSV.)')
        raise InvalidInput
    !mv "$fn" "$ofn_meta_anno"


button = widgets.Button(description="Upload annotations")
button.on_click(upload_anno)
display(button)

In [None]:
#@title Reload and filter metadata
# load csv

min_num_mentions=widgets.IntSlider(min=1,max=50,value=5)
printm('### Filter by the minimum number of mentions')

@interact
def filter_metadata(min_num_mentions=min_num_mentions):
    ifn=ofn_meta_anno if os.path.exists(ofn_meta_anno) else ofn_meta
    dfm=pd.read_csv(ifn).fillna('').rename(columns={'num':'num_total', 'count':'num_total', 'name_standardized':'name_real'})
    dfm=dfm[(dfm.name_real.apply(lambda x: x[0].isalpha()))] 
    name2real=dict(zip(dfm.name,dfm.name_real))
    dfmr=dfm.rename(columns={'id':'characterId'})
    dfmr=dfmr[dfmr.num_total>=min_num_mentions]
    return dfmr.sort_values('num_total')#[dfmr.name.str.startswith('Don')] #.tail(10)

## Analyze results

In [None]:
#@title Distribution of character attention
# create
dfm=filter_metadata(min_num_mentions=min_num_mentions.value)
all_df=dfm.merge(tok_df,on='characterId',how='right').fillna('')
color_by=list(dfm.columns)[list(dfm.columns).index('gender'):]+['none']
dfm['none']='none'
# res=char_df.num.plot(kind='density',width=666)
widg_colorby=widgets.Dropdown(options=color_by,description='Group by')
num_top_chars=widgets.IntSlider(min=5,max=len(dfm)+5,step=5,value=25,description='# Top Chars')

@interact
def showtopn(num_top=num_top_chars):
    n=num_top
    printm(f'#### Median number of mentions per character (all): {int(dfm.num_total.median())}')
    return px.bar(
        dfm.sort_values('num_total',ascending=True).iloc[-n:],
        y="name_real",
        x="num_total",
        hover_data=['name_real','num_total','gender'],
        title=f'Distribution of mentions over top {n} characters',
        text='num_total',
        width=666,
        height=666/25 * n,
        orientation='h'
    )    

In [None]:
#@title Distribution of attention by group
@interact
def show_histogram(group_by=widg_colorby):
    

    printm(f'### Distribution by {group_by}')
    #if group_by!='none':
    xvals=[]
    for cat,catdf in sorted(dfm.groupby(group_by),key=lambda x: -len(x[1])):
        totalperc=round(sum(catdf.num_total)/sum(dfm.num_total)*100,1)
        printm(f'#### {len(catdf)} {cat} characters make up {totalperc}% of all mentions')
        # printm(f'* Median number of mentions per character ({cat}): {int(catdf.num_total.median())}')
        stats=[f'{row.name_real} ({row.num_total})' for i,row in catdf.sort_values('num_total',ascending=False).iterrows()]
        printm(f'''* {', '.join(stats)} [median={int(catdf.num_total.median())}]''')
        xvals+=[cat]
    
    # display(px.histogram(dfm, x='num_total', color=group,marginal='rug')

    fig=px.box(
        dfm.sort_values(group_by),
        y="num_total",
        range_y=(.9,max(dfm.num_total)),
        log_y=True,
        x=group_by,
        width=666,
        color=group_by,
        points="all",
        hover_data=[x for x in ['name_real','num_total'] + color_by if x!='none'],
        title=f'Number of mentions for characters by {group_by}'
    )

    return fig

In [None]:
#@title Show syntactic statistics
#@todo ...
printm('Todo')

## 🕸 Generate social network

In [None]:
#@title Generate dynamic network from interactions
NET_STATS=['degree','degree_centrality','betweenness_centrality','eigenvector_centrality','closeness_centrality']
slice_length=widgets.Dropdown(options=[1,5,10,50,100,250,500,1000,2000,5000,10000,25000],value=500,description='Length')
time_slider1=widgets.IntSlider(min=ts[0], max=ts[-1], step=10, value=ts[0])
time_slider2=widgets.IntSlider(min=ts[0], max=ts[-1], step=10, value=ts[-1])
weight_slider=widgets.IntSlider(min=1, max=10, step=1, value=2)
mindegree_slider=widgets.IntSlider(min=0, max=10, step=1, value=1)
weight_factor=widgets.IntSlider(min=1, max=100, step=1, value=5)
time_units=dict([('words','tokenId'), ('sentences','sentenceID'), ('paragraphs','paragraphId')])
time_type=widgets.Dropdown(options=list(time_units.keys()),description='Unit of time',value='words')

def make_dyn_charnet(name_key=fixed('name_real'),t_unit='words',slice_length=1000):
    roundby=slice_length
    t_key=time_units.get(t_unit,'tokenId')
    printm(f'* Divide text every {roundby} {t_unit}')

    from tqdm import tqdm
    # init
    dg = dn.DynGraph(edge_removal=False)
    all_df['slice']=all_df[t_key].apply(lambda x: x//roundby)
    
    # t=paragraph
    last_char=None
    ts=set()
    t=0
    edges=set()
    name2real=dict(zip(all_df.name, all_df.name_real))

    grps=sorted(list(all_df.groupby('slice')))
    grp_ld=[]
    edge_list=[]
    for sl,sldf in grps:
        t=sl
        chars_in_slice=[x for x in sorted(list(set(sldf[name_key]))) if x]
        #printm(f' * At t={t}, found {len(chars_in_slice)} unique characters: {", ".join(chars_in_slice)}')
        for a in chars_in_slice:
            for b in chars_in_slice:
                if b<=a: continue
                dg.add_interaction(u=a,v=b,t=t)
                edge_list.append((t,a,b))
        grp_dx={
            't':t,
            'chars':chars_in_slice,
            'num_chars':len(chars_in_slice),
        }
        grp_ld.append(grp_dx)
    grp_df=pd.DataFrame(grp_ld)

    # show stats
    ts=dg.temporal_snapshots_ids()
    chartups_dyn={tuple(sorted([u,v])+[t]) for (u, v, op, t) in dg.stream_interactions()}
    chartups_stat={tuple(sorted([u,v])) for (u, v, op, t) in dg.stream_interactions()}
    printm(f'* {len(ts)} time slices')
    printm(f'* {len(chartups_stat)} unique character-to-character edges')
    printm(f'* {len(charnet_dynamic_edgelist)} total interactions')
    display(grp_df)
    return dg,grp_df,edge_list

# show opts
charnet_dynamic_i=interactive(
    make_dyn_charnet,
    t_unit=time_type,
    slice_length=slice_length
)
charnet_dynamic_i

In [None]:
#@title Show character density across length of text
# Other token stats
yopts=widgets.Dropdown(options=[('# of mentions','num_mentions'),('# of unique','num_chars')], desription='Y value')

@interact
def show_density(slice_length=slice_length,color_by=widg_colorby,y_value=yopts):
    num_words_in_preview=30
    slice2txt=defaultdict(list)
    all_df['slice']=all_df.tokenId.apply(lambda x: x//slice_length*slice_length)
    all_df['none']='none'
    slice_ld=[]
    for sl,sldf in all_df.groupby('slice'):
        slice_dx={'slice':sl}
        # get preview
        slice_dx['preview']=[]
        for i,row in sldf.iterrows():
            if not str(row['originalWord']).strip(): continue
            if len(slice_dx['preview'])>num_words_in_preview:break
            slice_dx['preview']+=[str(row['originalWord']).strip()+str(' ' if row['whitespaceAfter']=='S' else '')]
        slice_dx['preview']=''.join(slice_dx['preview'])+'...'
        # count by category
        num_words=len(sldf)
        for cat,catdf in sldf.groupby(color_by):
            num_mentions=sum(catdf['isChar'])
            num_chars=len(set(catdf['characterId']))
            slice_cat_dx=dict(**slice_dx, **{'color_by':cat, 'num_mentions':num_mentions, 'num_chars':num_chars, 'color_by':cat})
            slice_ld.append(slice_cat_dx)

    slicedf=pd.DataFrame(slice_ld)
    printm(f'Median number of unique characters = **{slicedf.num_chars.median()}** names per {slice_length} words')
    printm(f'Median number of character mentions = **{slicedf.num_mentions.median()}** names per {slice_length} words')


    import plotly.express as px
    return px.line(slicedf,x='slice',y=y_value,color='color_by',hover_data=['preview'],
            title=f'{y_value} per {slice_length} words across {NOVEL_TITLE}',
            height=444,
            line_shape='hv')

##  📐 Fine tune network parameters

In [None]:
#@title Convert to static network
# Convert to static


def to_static(edge_list,t_start=None,t_end=None,min_weight=2,name_key='name_real',stats=NET_STATS,min_degree=1):
    # printm(f'#### Generating static network with minimum weight set to {min_weight}')
    import networkx as nx
    g=nx.Graph()

    num_interactions_d=Counter()
    for t,u,v in sorted(edge_list):
        if t_start and t<t_start: continue
        if t_end and t>t_end: continue
        num_interactions_d[u]+=1
        num_interactions_d[v]+=1
        if not g.has_edge(u,v):
            g.add_edge(u,v,t=[t],weight=1)
        else:
            g[u][v]['weight']+=1
            g[u][v]['t']+=[t]

    if min_weight:
        for a,b,d in list(g.edges(data=True)):
            if d['weight']<min_weight:
                g.remove_edge(a,b)

    # add metadata
    for n in g.nodes():
        # print(n,'??')
        ndf=dfm[dfm.name_real==n]
        ncdf=ndf.mode()
        # print(n,len(ncdf))
        common_d=dict(ncdf.iloc[0])
        for k,v in common_d.items(): g.nodes[n][k]=v
        g.nodes[n]['num_interactions']=num_interactions_d[n]
        g.nodes[n]['num_mentions']=ndf['num_total'].sum() #.iloc[0]['num_total']

    # include node stats
    for stat in stats:
        try:
            func=getattr(nx,stat)
            for n,v in dict(func(g)).items():
                g.nodes[n][stat]=v
        except:
            pass

    # return graph
    if min_degree:
        for n in list(g.nodes()):
            if g.nodes[n]['degree']<min_degree:
                g.remove_node(n)

    return g


#@title
def layout(g):
    from fa2 import ForceAtlas2
    forceatlas2 = ForceAtlas2(
        # Behavior alternatives
        outboundAttractionDistribution=True,  # Dissuade hubs
        linLogMode=False,  # NOT IMPLEMENTED
        adjustSizes=False,  # Prevent overlap (NOT IMPLEMENTED)
        edgeWeightInfluence=1.0,

        # Performance
        jitterTolerance=1.0,  # Tolerance
        barnesHutOptimize=True,
        barnesHutTheta=1.2,
        multiThreaded=False,  # NOT IMPLEMENTED

        # Tuning
        scalingRatio=2.0,
        strongGravityMode=False,
        gravity=1.0,

        # Log
        verbose=False
    )
    
    pos = forceatlas2.forceatlas2_networkx_layout(g, pos=None, iterations=2000)
    return pos


#@title Drawing static networks
def drawnet_nx(g,ofn='net.png',pos=None,weight_factor=1,size_by='degree',size_factor=1000,save=True,title=None,color_by=None,default_color='gray',color_start='red',color_end='blue',default_size=300):
    from matplotlib import pyplot as plt
    fig = plt.figure(figsize=(10,10))#,facecolor=(0, 0, 0))
    if title: fig.suptitle(title, fontsize=16)
    nodelist=g.nodes()
    labels=dict((n,n) for n in nodelist)
    try:
        size_vals=x=np.array([g.nodes[n].get(size_by,np.nan) for n in nodelist])
        normalized = (x-min(x))/(max(x)-min(x))
        node_size=[x*size_factor if x is not np.nan else default_size for x in normalized]
    except:
        node_size=default_size
    node_color=[]

    edgelist=list(g.edges())
    edge_size=[g[a][b]['weight']*weight_factor for a,b in edgelist]
    try:
        edge_size_vals=x=np.array([g[a][b]['weight'] for a,b in edgelist])
        edge_normalized = (x-min(x))/(max(x)-min(x))
        edge_size=[x*weight_factor if x is not np.nan else 1 for x in edge_normalized]
    except:
        edge_size=1


    if color_by:
        color_types=sorted(list(set(g.nodes[n][color_by] for n in g.nodes())))
        num_colors=len(color_types)
        spectrum=list(Color(color_start).range_to(Color(color_end),num_colors))
        colormap=dict(zip(color_types, spectrum))
        node_color=[colormap[g.nodes[n][color_by]].hex for n in g.nodes()]
    else:
        node_color=default_color

    nx.draw_networkx(
        g,
        pos=pos,
        labels=labels,
        nodelist=nodelist,
        node_size=node_size,
        edgelist=edgelist,
        width=edge_size,
        font_color='black',
        font_weight='bold',
        font_size=12,
        node_color=node_color,
        edge_color='teal'
    )
    if save:
        plt.savefig(ofn)
        plt.close()
    else:
        return plt




# Generate
printm('### Set minimum weight')
# display(weight_slider)

# @interact
def make_static(min_weight=weight_slider):
    # get current data 
    charnet_dynamic,charnet_dynamic_df,charnet_dynamic_edgelist=charnet_dynamic_i.result
    dg=charnet_dynamic

    charnet_static=g=to_static(charnet_dynamic_edgelist,min_weight=min_weight)
    printm(f'Graph generated with {g.order()} nodes and {g.size()} edges')
    charnet_static_df=pd.DataFrame(dict(charnet_static.nodes[n]) for n in charnet_static.nodes())
    charnet_static_df_edges=pd.DataFrame({'source':a, 'target':b, **d} for a,b,d in charnet_static.edges(data=True))    
    
    printm('### Edge data')
    display(charnet_static_df_edges.sort_values('weight',ascending=False))

    # printm('### Node data')
    # display(charnet_static_df.sort_values('num_total',ascending=False))
    printm('### Graph preview')
    pos=layout(g)
    title=f'{NOVEL_TITLE_NICE} (w>={min_weight})'
    try:
        drawnet_nx(
            g,
            save=False,
            pos=pos,
            title=title,
            weight_factor=10
        )
    except ValueError:
        pass
    return charnet_static, charnet_static_df, charnet_static_df_edges

charnet_static_i=interactive(make_static,min_weight=weight_slider)
charnet_static_i

In [None]:
#@title Fiddle with settings
widg_sizeby=widgets.Dropdown(options=sorted(list(NET_STATS)))
# charnet_static,charnet_static_df=charnet_static_i.result
# node_features=set(charnet_static_df.select_dtypes('number').columns) - {'id'}


def show_graph(t_start=time_slider1,
               t_end=time_slider2,
               min_weight=weight_slider,
               color_by=widg_colorby,
               size_by=widg_sizeby,
               weight_factor=weight_factor):
    g=to_static(charnet_dynamic,min_weight=min_weight)
    pos=layout(g)
    g_sofar=to_static(dg.time_slice(t_from=t_start,t_to=t_end),min_weight=min_weight)
    sized=nx.betweenness_centrality(g_sofar)
    title=f'{NOVEL_TITLE} (t={t_start}-{t_end}) (w>={min_weight})'
    try:
        drawnet_nx(
            g_sofar,
            save=False,
            pos=pos,
            size_by=widg_sizeby.value,
            color_by=widg_colorby.value,
            weight_factor=weight_factor,
            title=title
        )
    except ValueError:
        pass


graph_configurator = interactive(show_graph)
graph_configurator

## 🎥 Generating dynamic network visualizations

In [None]:
#@title Generate underlying images
def drawnets(dg=dg,odir=ofn_fig_dir):
    if not os.path.exists(odir): os.makedirs(odir)
    g=to_static(dg)
    #pos=nx.spring_layout(g,k=5/math.sqrt(g.order()))
    pos=layout(g)
    for t in tqdm(dg.temporal_snapshots_ids()):
        cg=to_static(dg.time_slice(t_from=t,t_to=t),min_weight=weight_slider.value)
        g_sofar=to_static(dg.time_slice(t_from=0,t_to=t),min_weight=weight_slider.value)
        sized=nx.betweenness_centrality(g_sofar)
        #pos=nx.spring_layout(g_sofar)
        ofn_img=os.path.join(odir,f'net-{str(t).zfill(4)}.png')
        title=f'{NOVEL_TITLE_NICE} (t={str(t).zfill(4)})'
        try:
            drawnet_nx(
                g_sofar,
                save=True,
                ofn=ofn_img,
                pos=pos,
                size_by=widg_sizeby.value,
                color_by=widg_colorby.value,
                weight_factor=weight_factor.value,
                title=title
            )
        except ValueError:
            pass

def do_drawnets(*x,**y):
    global dg
    global ofn_fig_dir
    drawnets(dg,odir=ofn_fig_dir)

if not os.path.exists(ofn_fig_dir) or not os.listdir(ofn_fig_dir):
    res=do_drawnets()

button2=widgets.Button(description='Regenerate images')
button2.on_click(do_drawnets)
button2

In [None]:
#@title Generate mp4 video from images
def make_vid_from_folder(*x,image_folder=ofn_fig_dir,ofn=ofn_mp4,fps=15):
    import moviepy.video.io.ImageSequenceClip

    image_files = [os.path.join(image_folder,img) for img in sorted(os.listdir(image_folder)) if img.endswith(".png")]
    clip = moviepy.video.io.ImageSequenceClip.ImageSequenceClip(image_files, fps=fps)
    clip.write_videofile(ofn)

if not os.path.exists(ofn_mp4): make_vid_from_folder()
button3=widgets.Button(description='Regenerate Video')
button3.on_click(make_vid_from_folder)
# display(button3)

# show video
from IPython.display import HTML
from IPython.display import display
from base64 import b64encode
mp4 = open(ofn_mp4,'rb').read()
display(button3)

In [None]:
#@title Show video
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
display(HTML("""
<video width=666 controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url))

In [None]:
#@title Generate gif from images
def make_gif_from_folder(folder=ofn_fig_dir,ofn=ofn_gif):
    import imageio
    images = []
    for fn in sorted(os.listdir(folder)):
        if fn.endswith('.png'):
            with open(os.path.join(folder,fn),'rb') as f:
                images.append(imageio.imread(f))
    imageio.mimsave(ofn, images)


if not os.path.exists(ofn_gif): make_gif_from_folder()

button2=widgets.Button(description='Regenerate GIF')
button2.on_click(make_gif_from_folder)
display(button2)

# show gif?
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
from IPython import display
from pathlib import Path
gifPath = Path(ofn_gif)
# Display GIF in Jupyter, CoLab, IPython
with open(gifPath,'rb') as f:
    display.Image(data=f.read(), format='png')

## Download data

In [None]:
#@title Zip data
PATH_ZIP=os.path.abspath(os.path.join(PATH_NOVEL,'..',NOVEL_TITLE+'.zip'))
cmd=f'cd {PATH_NOVEL}/.. && zip -r9 {PATH_ZIP} {NOVEL_TITLE}'
!{cmd}

In [None]:
##@title Download zip file
def dlzip(): files.download(PATH_ZIP)
dlsize=os.path.getsize(path)
def human_size(bytes, units=[' bytes','KB','MB','GB','TB', 'PB', 'EB']):
    """ Returns a human readable string representation of bytes """
    return str(bytes) + units[0] if bytes < 1024 else human_size(bytes>>10, units[1:])
dlbutton=widgets.Button(description=f'Download zip file ({human_size(dlsize)})')
dlbutton