In questa pagina viene presentato il codice python che si è reso necessario per portare a termine la prima parte del task, vale a dire l'ottenimento dei dati di interesse (per ogni pagina wiki), realizzazione dei rispettivi documenti json e la loro immissione sulla coda Kafka ad opera di un Kafka Producer. Sotto sono riportati i vari passi della procedura.

#Installazione delle librerie necessarie

In [None]:
pip install pymongo[srv]

In [None]:
pip install aiohttp

In [None]:
pip install asyncio

In [None]:
pip install async_retrying

In [None]:
pip install kafka-python

# Collegamento a MongoDB

Nel chunk sottostante avviene  dapprima il collegamento alla piattaforma *mongodbatlas*; successivamente, si realizza l'accesso al database *wikitrend* ed infine alla collezione di interesse, *page_topic*, istanziata col nome *input*.

In [None]:
from pymongo import MongoClient
client = MongoClient("")
# DB selection
db = client.get_database('wikitrend')
# Selection of input collection
input = db.page_topic

# Presentazione di due funzioni utili

Nei successivi due chunk vengono presentate due funzioni che risultano particolarmente utili per il compimento del task d'interesse.

La funzione sottostante riceve in input (sottoforma di stringa) il nome di una pagina wiki, e restituisce in output una lista di nove urls relativi ad altrettanti endpoints a cui si dovrà fare richiesta successivamente. Come già accennato, sette di questi sono relativi all'API di *wikimedia* (accessibile al seguente [link](https://wikimedia.org/api/rest_v1/#/)), mentre i restanti fanno riferimento all'API di *xtools* (accessibile al seguente [link](https://xtools.readthedocs.io/en/3.1.41/api/index.html)).

In [None]:
import urllib.parse
def get_urls(page_name):
  l=[]
  years = [('20150701','20151231'),('20160101','20161231'),('20170101','20171231'),('20180101','20181231'),
          ('20190101','20191231'),('20200101','20201231')]
  for year in years:
    url = 'https://wikimedia.org/api/rest_v1/metrics/edits/per-page/en.wikipedia/'+urllib.parse.quote(page_name, safe='')+'/all-editor-types/monthly/'+year[0]+'/'+year[1]
    l.append(url)
  
  views = 'https://wikimedia.org/api/rest_v1/metrics/pageviews/per-article/en.wikipedia.org/all-access/all-agents/'+urllib.parse.quote(page_name, safe='')+'/monthly/20150701/20201231'
  prose = 'https://xtools.wmflabs.org/api/page/prose/en.wikipedia.org/'+urllib.parse.quote(page_name, safe='')
  creation = 'https://xtools.wmflabs.org/api/page/articleinfo/en.wikipedia.org/'+urllib.parse.quote(page_name, safe='')

  l.append(views)
  l.append(prose)
  l.append(creation)
  return l #ouptut: list of 9 urls given a page name

La funzione sottostante (*get_dict*) riceve in input la lista di dizionari, li tratta opportunamente e restituisce in output il dizionario definitivo, contenente le info relative ad una specifica pagina wiki, il quale verrà poi immesso sulla coda Kafka da un Kafka Producer (vedi codice sotto). I dizionari di partenza sono assimilabili alle materie prime dalle quali, mediante un processo di lavorazione (la funzione *get_dict*), viene realizzato il prodotto finito (ovvero, il dizionario definitivo).

In [None]:
from datetime import datetime

def get_dict(data):
  n_views = 0
  n_edits = 0
  months_views = 0
  months_edits = 0
  diz = {}
  for d in data:
    if 'created_at' in d:
      date_of_creation = datetime.strptime(d['created_at'], '%Y-%m-%d') # catch the date of creation
  for d in data: 
    if 'items' in d: 
      if 'results' in d['items'][0]:
        for el in d['items'][0]['results']:
          date = datetime.strptime(el['timestamp'][:10], '%Y-%m-%d') 
          if date > date_of_creation:
            n_edits += el['edits']
            months_edits += 1
            avg_edits = int(round(n_edits/months_edits))
            diz['mean_edits'] = avg_edits
      else:
        for el in d['items']:
          n_views += el['views']
          months_views += 1
          avg_views = int(round(n_views/months_views))
          diz['mean_views'] = avg_views
    elif 'words' in d:
      diz['words'] = d['words']
    elif 'created_at' in d: 
      diz['y_of_creation'] = d['created_at'].split('-')[0]
    else:
      diz.update(d)
  return diz

In [None]:
for doc in input.find({'article':'Al_Pacino'}):
  d = {}
  d['article'] = doc['article']
  d['_id'] = doc['_id']
  urls = get_urls(doc['article'])
  l=[]
  headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'}
  import requests
  for url in urls:
    try:
      r = requests.get(url, headers = headers)
      r.raise_for_status()
      data = r.json()
      l.append(data)
    except requests.exceptions.HTTPError as errh:
      print ("Http Error:",errh)
    except requests.exceptions.ConnectionError as errc:
      print ("Error Connecting:",errc)
    except requests.exceptions.Timeout as errt:
      print ("Timeout Error:",errt)
    except requests.exceptions.RequestException as err:
      print ("OOps: Something Else",err)
  l.append(d)

diz = get_dict(l)
print(diz)


{'mean_edits': 21, 'mean_views': 323061, 'words': 4482, 'y_of_creation': '2002', 'article': 'Al_Pacino', '_id': 41906}


Sopra è riportato un esempio di dizionario restituito dalla funzione get_dict e riferito alla pagina wiki 'Al_Pacino', contenente le varie informazioni di interesse.

# Inizializzazione Kafka Producer

Nel breve codice sottostante viene inizializzato un Kafka Producer, che successivamente sarà adibito alla scrittura dei dati sulla coda Kafka. Il dato in un input viene convertito in un file *json* e codificato in utf-8.

In [None]:
producer = KafkaProducer(bootstrap_servers=['kafka:9092'], # This piece of code initializes a new Kafka Producer
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))

# Download dei dati di interesse e scrittura su Kafka

Nel codice sotto riportato viene realizzata una lista di liste che prende il nome di *tot_urls*. Ciascun elemento (lista) è associato ad una pagina wiki e contiene l'elenco dei nove urls in aggiunta al dizionario contenente il nome dell'articolo e il relativo id nella collezione mongo *page_topic*. Pertanto, la lunghezza della lista di liste è pari al numero di pagine. Un elemento della lista (riferito alla pagina wiki 'David_Bowie') è stampato in coda al codice.

In [None]:
tot_urls = []
for doc in input.find():
  d = {}
  d['article'] = doc['article']
  d['_id'] = doc['_id'] # the dict d contains informations about the name of the wiki page and its _id in the input collection 'page_topic'
  urls = get_urls(doc['article'])# the list of urls is created
  urls.append(d) #little trick: append d to the list of urls. This will turn to be useful later
  tot_urls.append(urls)
print(tot_urls[0])



['https://wikimedia.org/api/rest_v1/metrics/edits/per-page/en.wikipedia/David_Bowie/all-editor-types/monthly/20150701/20151231', 'https://wikimedia.org/api/rest_v1/metrics/edits/per-page/en.wikipedia/David_Bowie/all-editor-types/monthly/20160101/20161231', 'https://wikimedia.org/api/rest_v1/metrics/edits/per-page/en.wikipedia/David_Bowie/all-editor-types/monthly/20170101/20171231', 'https://wikimedia.org/api/rest_v1/metrics/edits/per-page/en.wikipedia/David_Bowie/all-editor-types/monthly/20180101/20181231', 'https://wikimedia.org/api/rest_v1/metrics/edits/per-page/en.wikipedia/David_Bowie/all-editor-types/monthly/20190101/20191231', 'https://wikimedia.org/api/rest_v1/metrics/edits/per-page/en.wikipedia/David_Bowie/all-editor-types/monthly/20200101/20201231', 'https://wikimedia.org/api/rest_v1/metrics/pageviews/per-article/en.wikipedia.org/all-access/all-agents/David_Bowie/monthly/20150701/20201231', 'https://xtools.wmflabs.org/api/page/prose/en.wikipedia.org/David_Bowie', 'https://xtoo

Ora, il chunk sotto riportato costituisce il "cuore" del processo: la composizione delle tre funzioni *async* consente di effettuare le chiamate agli endpoints contenuti nella lista *tot_urls* in modalità completamente asincrona. Detto in brevi termini, ciò significa che le richieste (tasks) ai vari endpoints vengono effettuati simultaneamente e in maniera pressochè indipendente. In questo modo viene garantita una estrema efficienza riducendo notevolmente i tempi dell'operazione (si tenga conto che, in presenza di 9235 pagine di wiki, il totale delle richieste da effettuare ammonta a 92535 X 9 = 83115 urls). Si osserva che nella seconda funzione (innestata poi nella terza), una volta che i tasks relativi ad una pagina wiki sono completati, il producer si adopera nella scrittura del dizionario restituito dalla funzione get_dict sulla coda Kafka. Tale struttura dati, come visto in precedenza, viene dapprima convertita in un file json codificato in utf-8, il quale successivamente viene immesso nel topic *trial*.

In [None]:
import asyncio
import aiohttp
from async_retrying import retry
import nest_asyncio
nest_asyncio.apply()


# The composition of these three functions allows us to make a parallel request to the urls gained by the get_url function,
# in order to save a lot of time during the extracting data process

@retry(attempts=5000) #number of attempts to get the result from each endpoint, high number to be conservative...
async def get(url, session):
  async with session.get(url) as resp:
      resp.raise_for_status()
      return await resp.json() #This returns a json object of the result, basically a Python dictionary

async def fetch(urls, producer):
  async with aiohttp.ClientSession() as session:
      tasks = [get(url, session) for url in urls[:-1]]
      resp = await asyncio.gather(*tasks)
      resp.append(urls[-1])
      d = get_dict(resp) #once the tasks of a wiki page are completed, the dict is created through the function 'get_dict'
      producer.send('trial', value = d)  #producer sends the data to the topic trial

async def main(tot_urls):
  cors = [fetch(urls) for urls in tot_urls]
  await asyncio.gather(*cors)

if __name__ == "__main__":
  asyncio.run(main(tot_urls))