In [1]:
from elasticsearch_dsl import connections, Index, Q
from json import loads
from pandas import DataFrame, NA
from pathlib import Path

from fediverse_analysis.instance_data.analyze import Analyzer

In [2]:
ELASTIC_HOST = 'https://elasticsearch.srv.webis.de'
ELASTIC_PORT = 9200
ELASTIC_USER = 'wo84xel'
# As a way to hide the password at least from the notebook, enter a path to a file here, which only contains the password for Elastic.
ELASTIC_PASSWORD_FILE = Path('~/.local/share/passwords/webis-elasticsearch.txt').expanduser()
INDEX = 'corpus_mastodon_statuses*'

INSTANCE_DATA_PATH = Path('/mnt/ceph/storage/data-in-progress/data-teaching/theses/wstud-thesis-ernst/fedi_data/2024-01-30/05.jsonl')
INSTANCES_PATH = Path('/mnt/ceph/storage/data-in-progress/data-teaching/theses/wstud-thesis-ernst/sample/04/instances.txt')
REMOVED_INSTANCES_PATH = Path('/mnt/ceph/storage/data-in-progress/data-teaching/theses/wstud-thesis-ernst/sample/04/instances_removed_for_crawling_errors.json')

NUM_EXPLICIT_INSTANCES = 10
# Upper bound for the number of buckets in Elastic aggs. Set a bit higher than the number of instances crawled.
SEARCH_MAX = 1100

# Limit the Elastic searches to a specific date range. Crawling started on 2023-12-21.
DATE_AFTER = '2023-12-01T00:00:00'
## Ca. '2024-01-30T12:00:00' is the time when a new version of the fediverse data was gahtered.
DATE_BEFORE = '2024-01-30T12:00:00'

In [3]:
# Connect to Elastic.
with ELASTIC_PASSWORD_FILE.open('r') as f:
    password = f.readline().strip('\n')
elastic = connections.create_connection(
    hosts=ELASTIC_HOST + ':' + str(ELASTIC_PORT),
    basic_auth=(ELASTIC_USER, password),
    timeout=300
)
# Prepare the date query
date_query = Q('range', **{'crawled_at': {'gt': DATE_AFTER, 'lt': DATE_BEFORE, 'format' : 'date_hour_minute_second'}})

In [4]:
# Search Elastic for crawled federated data.
fed_data_search = Index(INDEX).search().params(size=0).query(date_query)
fed_data_search.aggs.bucket('instances', 'terms', field='crawled_from_instance.keyword', size=SEARCH_MAX)\
    .bucket('users', 'cardinality', field='account.handle.keyword')
fed_data_result = fed_data_search.execute()

In [5]:
# Search Elastic for crawled local data.
local_data_search = Index(INDEX).search().params(size=0).query(date_query).query('bool', filter=Q('term', is_local=True))
local_data_search.aggs.bucket('instances', 'terms', field='crawled_from_instance.keyword', size=SEARCH_MAX)\
    .bucket('users', 'cardinality', field='account.handle.keyword')
local_data_result = local_data_search.execute()

In [6]:
# Count instances and put index together.
buckets = fed_data_result.aggs['instances']['buckets']
## Determine the top 10 instances.
index = []
for i in range(NUM_EXPLICIT_INSTANCES):
    index.append(buckets[i]['key'])
instances = index.copy()
## Count 'others'.
others_label = str(len(buckets) - NUM_EXPLICIT_INSTANCES) + ' others'
## Count 'all crawled'.
crawled_label = str(len(buckets)) + ' crawled'

index.extend((others_label, crawled_label))

In [7]:
# Evaluate data
fed_posts = {}
fed_users = {}
local_posts = {}
local_users = {}
## Federated and local search results look the same, so we can do the same stuff twice.
for search_result, posts, users in (
    (fed_data_result, fed_posts, fed_users),
    (local_data_result, local_posts, local_users)
):
    ## list → dict
    data_dict = {
        entry['key']: {
            'doc_count': entry['doc_count'],
            'users': entry['users']['value']
        }
        for entry in search_result.aggs['instances']['buckets']
    }
    ## top 10
    for instance in instances:
        posts[instance] = data_dict[instance]['doc_count']
        users[instance] = data_dict[instance]['users']
        del data_dict[instance]
    ## Others / all crawled
    posts[others_label] = sum(v['doc_count'] for v in data_dict.values())
    posts[crawled_label] = sum(posts.values())
## Local users ~(top 10)
local_users[others_label] = sum(v['users']['value'] for v in local_data_result.aggs['instances']['buckets'])
## Local users all crawled
local_users[crawled_label] = sum(local_users.values())

In [8]:
# Federated users and deduplicated posts is not as easy – we need additional searches for these.
## Prepare Query for instances: ~(top 10)
not_top10_query = Q()
for instance in instances:
    not_top10_query = not_top10_query & ~Q('term', crawled_from_instance=instance)

In [9]:
# Federated posts values contain duplicates. We want to display one of these values later,
# but the main DataFrame should contain deduplicated values as they are way more meaningful.
fed_crawled_dup_posts = fed_posts[crawled_label]

In [10]:
# Federated deduplicated posts, ~(top 10).
dedup_posts_other_search = Index(INDEX).search().params(size=0).query(date_query).query('bool', filter=not_top10_query)
dedup_posts_other_search.aggs.bucket('posts', 'cardinality', field='uri.keyword')
dedup_posts_other_result = dedup_posts_other_search.execute()

fed_posts[others_label] = dedup_posts_other_result.aggs['posts']['value']

In [11]:
# Federated deduplicated posts, all crawled.
dedup_posts_crawled_search = Index(INDEX).search().params(size=0).query(date_query)
dedup_posts_crawled_search.aggs.bucket('posts', 'cardinality', field='uri.keyword')
dedup_posts_crawled_result = dedup_posts_crawled_search.execute()

fed_posts[crawled_label] = dedup_posts_crawled_result.aggs['posts']['value']

In [12]:
## Federated, ~(top 10)
others_fed_users_search = Index(INDEX).search().params(size=0).query(date_query).query('bool', filter=not_top10_query)
others_fed_users_search.aggs.bucket('users', 'cardinality', field='account.handle.keyword')
others_fed_users_result = others_fed_users_search.execute()

fed_users[others_label] = others_fed_users_result.aggs['users']['value']

In [13]:
## Federated, all crawled
all_fed_users_search = Index(INDEX).search().params(size=0).query(date_query)
all_fed_users_search.aggs.bucket('users', 'cardinality', field='account.handle.keyword')
all_fed_users_result = all_fed_users_search.execute()

fed_users[crawled_label] = all_fed_users_result.aggs['users']['value']

In [14]:
# 'Available' data: the entire Mastodon network (that we know of).
## Get all instances we ever crawled.
crawled_instances = set()
with INSTANCES_PATH.open('r') as f:
    for line in f:
        crawled_instances.add(line[:-1])
with REMOVED_INSTANCES_PATH.open('r') as f:
    for instance in loads(f.readline()):
        crawled_instances.add(instance)
## File to dict
mastodon_data = {}
with open(INSTANCE_DATA_PATH, 'r') as file:
    for line in file:
        line_dict = loads(line)
        # No nodeinfo means server unreachable or other software.
        if (line_dict['nodeinfo']):
            # We need either (nodeinfo & activity) or (nodeinfo & software==mastodon)
            if (not line_dict['activity']):
                if (not line_dict['nodeinfo']['software']
                    or not line_dict['nodeinfo']['software']['name'] == 'mastodon'
                ):
                    continue
            # We only use nodeinfo though.
            mastodon_data[line_dict['instance']] = line_dict['nodeinfo']

all_label = str(len(mastodon_data)) + ' discovered'
index.append(all_label)

In [15]:
avail_local_posts = {
    others_label: 0
}
avail_local_users = {
    others_label: 0
}
## top 10
for instance in instances:
    avail_local_posts[instance] = mastodon_data[instance]['usage']['localPosts']
    avail_local_users[instance] = mastodon_data[instance]['usage']['users']['total']
    crawled_instances.remove(instance)
## Others (crawled, but not top 10)
for instance in crawled_instances:
    # Some instances might not be present in more recent fediverse data which was obtained later.
    if (instance in mastodon_data):
        avail_local_posts[others_label] += mastodon_data[instance]['usage']['localPosts']
        avail_local_users[others_label] += mastodon_data[instance]['usage']['users']['total']
## All crawled
avail_local_posts[crawled_label] = sum(avail_local_posts.values())
avail_local_users[crawled_label] = sum(avail_local_users.values())
## All
avail_local_posts[all_label] = sum(v['usage']['localPosts'] for v in mastodon_data.values())
avail_local_users[all_label] = sum(v['usage']['users']['total'] for v in mastodon_data.values())

In [16]:
# Append NA, otherwise it will be NaN and the whole column will be converted to float.
for dict in (fed_posts, fed_users, local_posts, local_users):
    dict[all_label] = NA

df = DataFrame(
    {
        ('Crawled', 'Posts (fed.)'): fed_posts,
        ('Crawled', 'Users (fed.)'): fed_users,
        ('Crawled', 'Posts (loc.)'): local_posts,
        ('Crawled', 'Users (loc.)'): local_users,
        ('Available', 'Posts (loc.)'): avail_local_posts,
        ('Available', 'Users (loc.)'): avail_local_users
    },
    index=index
)
df.index.name = 'Instance'

In [17]:
# Calculate ratio of crawled posts and users
local_posts_ratio = df.loc[:, ('Crawled', 'Posts (loc.)')] /  df.loc[:, ('Available', 'Posts (loc.)')]
local_users_ratio = df.loc[:, ('Crawled', 'Users (loc.)')] /  df.loc[:, ('Available', 'Users (loc.)')]

fed_posts_ratio = {
    all_label: fed_posts[crawled_label] / avail_local_posts[all_label]
}
fed_users_ratio = {
    all_label: fed_users[crawled_label] / avail_local_users[all_label]
}

df_ratios = DataFrame(
    {
        ('Crawled ratio', 'Posts (fed.)'): fed_posts_ratio,
        ('Crawled ratio', 'Users (fed.)'): fed_users_ratio,
        ('Crawled ratio', 'Posts (loc.)'): local_posts_ratio,
        ('Crawled ratio', 'Users (loc.)'): local_users_ratio
    },
    index=index
).fillna(NA, axis=1)
df_ratios.index.name = 'Instance'

In [18]:
print('Posts crawled total:', fed_crawled_dup_posts)
print('Of which are duplicates:', fed_crawled_dup_posts - fed_posts[crawled_label])
print('Ratio of original posts:', fed_posts[crawled_label] / fed_crawled_dup_posts)

df

Posts crawled total: 455754128
Of which are duplicates: 434100428
Ratio of original posts: 0.04751180224087844


Unnamed: 0_level_0,Crawled,Crawled,Crawled,Crawled,Available,Available
Unnamed: 0_level_1,Posts (fed.),Users (fed.),Posts (loc.),Users (loc.),Posts (loc.),Users (loc.)
Instance,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2
mastodon.social,9892251.0,221263.0,1995240.0,60393.0,81205325,1802370
mastodon.online,6227455.0,155371.0,187287.0,4668.0,7759995,194309
mstdn.social,6108231.0,118780.0,234125.0,5043.0,14989224,222358
ohai.social,5569867.0,115024.0,12819.0,633.0,1180837,39023
mastodon.world,5149782.0,108522.0,115832.0,3270.0,4912465,181577
mas.to,5147378.0,107973.0,111870.0,4018.0,7021649,169298
universeodon.com,4737603.0,122978.0,58670.0,1447.0,2852641,78258
social.vivaldi.net,4494845.0,61714.0,95703.0,1471.0,1709786,43582
techhub.social,4487083.0,113611.0,47437.0,1250.0,1404970,79527
toot.community,4175344.0,108589.0,18983.0,687.0,1303738,30807


The above values are deduplicated, which means no single value contains any duplicates. If you take multiple values from one of the *(fed.)* columns though, there will definitely be an intersection.

The values in the *Available* columns above are from a set time. This means, that the ratios below are inexact and, depending on your chosen timeframe, more or less off.

In [19]:
df_ratios

Unnamed: 0_level_0,Crawled ratio,Crawled ratio,Crawled ratio,Crawled ratio
Unnamed: 0_level_1,Posts (fed.),Users (fed.),Posts (loc.),Users (loc.)
Instance,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
mastodon.social,,,0.02457,0.033508
mastodon.online,,,0.024135,0.024024
mstdn.social,,,0.01562,0.02268
ohai.social,,,0.010856,0.016221
mastodon.world,,,0.023579,0.018009
mas.to,,,0.015932,0.023733
universeodon.com,,,0.020567,0.01849
social.vivaldi.net,,,0.055974,0.033752
techhub.social,,,0.033764,0.015718
toot.community,,,0.01456,0.0223
