Imports

In [None]:
import httpx
import networkx as nx
import time
import sqlite3
import pandas as pd
import json
import persistqueue

Open Alex permette di accedere tramite API alle pubblicazioni e gli autori. Gli autori sono identificati da un ID

In [None]:
seed_author_id = 'A5035471624'

# Funzione per ottenere tutte le pubblicazioni di un autore
def getWorks(author_id):
    publications = []
    url = f"https://api.openalex.org/works?filter=author.id:{author_id}&per_page=100"
    request = httpx.get(url)
    works = request.json()
    maximum = int(works["meta"]["count"])
    count = 2
    while len(publications) < maximum:
        for work in works["results"]:
            publications.append(work)
        url = f"https://api.openalex.org/works?filter=author.id:{author_id}&page={count}&per_page=100"
        request = httpx.get(url)
        works = request.json()
        count += 1
    return publications



In [None]:
# funzione che dato un autore, restituisce i suoi collaboratori.
# ciclando su tutte le pubblicazioni dell'autore, si estraggono gli autori di ogni pubblicazione che hanno quindi collaborato con l'autore
def getCollaborators(author_id):
    collaborators = set()
    publications = getWorks(author_id)
    for p in publications:
        for author in p["authorships"]:
            id = author["author"]["id"].split("/")[-1]
            collaborators.add(id)
    return collaborators
             
#collaboratori = getCollaborators(seed_author_id)

Esplorazione della rete

In [None]:
# Funzione che dato un autore, restituisce tutti i suoi collaboratori e i collaboratori dei collaboratori e cosi via. Moooolto lenta, esplora tutto il grafo
def getAllCollaborators(author_id):
    frontiera = [author_id]
    visitati = set()
    collaboratori_diz = dict()
    while frontiera:
        author_id = frontiera.pop(0)
        collaboratori = getCollaborators(author_id)
        collaboratori_diz[author_id] = collaboratori
        for c in collaboratori:
            if c not in collaboratori_diz:
                frontiera.append(c)
    return collaboratori_diz

In [None]:
#dizionario =   getAllCollaborators(seed_author_id)

<h5>Un crawler o un Surfer permette di "navigare" tra gli autori. In pratica prende i parametri della rete (autore da cui partire, url a cui fare le richieste, un threshold (una profondità massima da non superare), la frontiera e l'insieme degli autori di cui conosco gia i collaboratori)</h5>

</h5>Con il metodo crawl (gattonare) permettiamo al surfer di muoversi nel "grafo" degli autori. La parola chiave yield è un return che però non aspetta la fine dell'esecuzione. Appena ha una tupla da restituire lo fa. L'assegnamento va fatto come se fosse una lista, che avrà però lunghezza dinamica</h5>

In [None]:
class OpenAlexCrawler:
    def __init__(self, author_id, BASE_URL, threshold = 2):
        self.author_id = author_id
        self.BASE_URL = BASE_URL
        self.threshold = threshold
        self.visited = set()
        self.frontiera = [(self.author_id, 0)]
        
    def crawl(self):
        while self.frontiera:
            author_id, depth = self.frontiera.pop(0)
            if depth < self.threshold:
                collaboratori = self._getCollaborators(author_id)
                for c in collaboratori:
                    if c not in self.visited:
                        self.frontiera.append((c, depth+1))
                        self.visited.add(c)
                    # restituisce una lista, ma un elemento per volta. Come se fosse uno streaming di dati
                    yield(author_id, c)
            
            
    
    def _getCollaborators(self, author_id):
        collaborators = set()
        publications = self._getWorks(author_id)
        for p in publications:
            for author in p["authorships"]:
                id = author["author"]["id"].split("/")[-1]
                if id != author_id:
                    collaborators.add(id)
        return collaborators
    
    def _getWorks(self, author_id):
        publications = []
        url = f"{self.BASE_URL}{author_id}"
        request = httpx.get(url)
        time.sleep(0.5)
        try:
            works = request.json()
            for work in works["results"]:
                    publications.append(work)
            
        except Exception as e:
            
            print("Errore", e)
        return publications
    
    
crawler = OpenAlexCrawler(seed_author_id, "https://api.openalex.org/works?filter=author.id:")
g = nx.Graph()
for source, destination in crawler.crawl():
    g.add_edge(source, destination)

In [None]:
g.order(), g.size()

In [None]:
nx.write_gexf(g,"collaborations.gexf")

Insieme (set) implementato però su un database. Cosi da conservare l'eseguzione della visita nel caso in cui crashi o si debba interrompere il processo

In [None]:
class SQLiteSet(object):
    _TABLE_NAME = 'member'
    _SQL_CREATE = (
        'CREATE TABLE IF NOT EXISTS {table_name} ('
        'element TEXT PRIMARY KEY)'
    )
    # Template per inserimento di un elemento nella tabella
    _SQL_INSERT = 'INSERT INTO {table_name} (element) VALUES (?)'
    # Template per la selezione di un elemento nella tabella
    _SQL_SELECT_ID = (
        'SELECT element FROM {table_name} WHERE'
        ' element = "{node_id}"'
    )
    # Template per la rimozione di tutti gli elementi nella tabella
    _SQL_DELETE = 'DELETE FROM {table_name}'

    def __init__(self,path):
        self.db_connect = sqlite3.connect(path)  # Apro la connessione con il db
        self.db_connect.execute(self._SQL_CREATE.format(table_name = self._TABLE_NAME)) 

    def clear(self):
        with self.db_connect:
            self.db_connect.execute(self._SQL_DELETE.format(table_name = self._TABLE_NAME))

    def add(self, node_id):
        with self.db_connect:
            try:
                self.db_connect.execute(self._SQL_INSERT.format(table_name = self._TABLE_NAME),(node_id,))
            except:
                return None

    def contains(self,node_id):
        with self.db_connect:
            res = list(self.db_connect.execute(self._SQL_SELECT_ID.format(table_name = self._TABLE_NAME, node_id = node_id)))
            return len(res) > 0

Surfer che utilizza DB. Salva quindi i nodi gli autori incontrati in una tabella del DB

In [None]:
import persistqueue.sqlqueue


class OpenAlexSurferPersistent():
    def __init__(self, author_id, BASE_URL="https://api.openalex.org/works?filter=author.id:", threshold = 2, reset = True):
        self.queue = persistqueue.sqlqueue.SQLiteQueue(".",db_file_name='surfer_2025.db',auto_commit=True)
        self.visited = SQLiteSet(path='visited.db')
        if reset:
            self.visited.clear()
            self._empty_queue()
            self.queue.put((author_id, 0))
        self.threshold = threshold
        self.BASE_URL = BASE_URL
        
    def _empty_queue(self):
        while not self.queue.empty():
            self.queue.get()
    
    def _get_collaborators(self, author_id):
        collaborators = set()
        publications = self._get_works(author_id)
        for p in publications:
            for author in p["authorships"]:
                id = author["author"]["id"].split("/")[-1]
                if id != author_id:
                    collaborators.add(id)
        return collaborators
    def _get_works(self, author_id):
        publications = []
        url = f"{self.BASE_URL}{author_id}"
        request = httpx.get(url)
        time.sleep(0.5)
        try:
            works = request.json()
            for work in works["results"]:
                    publications.append(work)
        except Exception as e:
            
            print("Errore", e)
        return publications
    def visit(self):
        while not self.queue.empty():
            author_id, depth = self.queue.get()
            if depth < self.threshold:
                collaborators = self._get_collaborators(author_id)
                for c in collaborators:
                    if not self.visited.contains(c):
                        self.queue.put((c, depth+1))
                        self.visited.add(c)
                    yield (author_id, c)

In [None]:
surfer = OpenAlexSurferPersistent(seed_author_id)
g = nx.Graph()
for a,b in surfer.visit():
    g.add_edge(a,b)
    
g.order(), g.size()

In [None]:
con = sqlite3.connect("visited.db")
df = pd.read_sql_query("SELECT * from member", con)
df