In [1]:
import db_utilities as db_utl
import pandas as pd
from tqdm import tqdm
import os
import numpy as np
import math
from multiprocessing import Pool
import csv
import links_utilities as lk_utl

In [2]:
def fix_type_node_df(node_df):
    tmp_node_df = node_df.copy()
    tmp_node_df['creation_date'] = node_df['creation_date'].astype(int)
    return tmp_node_df

In [3]:
def fix_type_edge_df(edge_df):
    tmp_edge_df = edge_df.copy()
    tmp_edge_df['date'] = edge_df['date'].astype(int)
    tmp_edge_df['src'] = edge_df['src'].astype(int)
    tmp_edge_df['dst'] = edge_df['dst'].astype(int)
    return tmp_edge_df

In [4]:
def get_edges_by_forward(ch_msgs_df, db_channels_df):
    ch_fwmsgs_df = ch_msgs_df[ch_msgs_df['is_forwarded'] == True]
    ch_fwmsgs_df = ch_fwmsgs_df[ch_fwmsgs_df['ch_id'] != ch_fwmsgs_df['forwarded_from_id']]
    ch_fwmsgs_df = ch_fwmsgs_df[ch_fwmsgs_df['forwarded_from_id'].isin(db_channels_df['ch_id'])]
    ch_fwmsgs_df['edge_type'] = 'forward'
    ch_fwmsgs_df.drop(columns=['message'], inplace=True)
    ch_fwmsgs_df['src'] = ch_fwmsgs_df['ch_id']
    ch_fwmsgs_df['dst'] = ch_fwmsgs_df['forwarded_from_id']
    ch_fwmsgs_df['note'] = np.nan
    return ch_fwmsgs_df

In [5]:
def get_edges_by_link(ch_msgs_df, db_channels_df, tme_df, ch_id):
    ch_msgs_df.drop(columns=['message'], inplace=True)
    ch_lkmsgs_df = pd.merge(ch_msgs_df, tme_df[tme_df['ch_id']==ch_id], on=('ch_id', 'msg_id'))
    ch_lkmsgs_df['edge_type'] = 'link'
    tmp = db_channels_df.filter(['ch_id', 'username'], axis=1).rename(columns={'ch_id': 'src_id', 'username': 'src'})
    ch_lkmsgs_df = pd.merge(ch_lkmsgs_df, tmp, on='src')
    ch_lkmsgs_df.drop(columns=['src'], inplace=True)
    ch_lkmsgs_df.rename(columns={'src_id': 'src'}, inplace=True)
    tmp = db_channels_df.filter(['ch_id', 'username'], axis=1).rename(columns={'ch_id': 'dst_id', 'username': 'dst'})
    ch_lkmsgs_df = pd.merge(ch_lkmsgs_df, tmp, on='dst')
    ch_lkmsgs_df.drop(columns=['dst'], inplace=True)
    ch_lkmsgs_df.rename(columns={'dst_id': 'dst'}, inplace=True)
    return ch_lkmsgs_df

In [6]:
def get_edges_by_at(ch_msgs_df, db_channels_df):
    ch_atmsgs_df = ch_msgs_df.dropna(subset=['message']).copy()
    ch_atmsgs_df['ats'] = ch_atmsgs_df.apply(lambda x: lk_utl.get_at_links(x['message'], db_channels_df), axis=1)
    ch_atmsgs_df = ch_atmsgs_df[ch_atmsgs_df['ats'].apply(lambda x: len(x) > 0)]
    ch_atmsgs_df = ch_atmsgs_df.explode('ats')
    ch_atmsgs_df['dst'] = ch_atmsgs_df['ats']
    ch_atmsgs_df = ch_atmsgs_df[ch_atmsgs_df['ch_id'] != ch_atmsgs_df['dst']]
    ch_atmsgs_df.drop(columns=['ats'], inplace=True)
    ch_atmsgs_df['edge_type'] = 'at'
    ch_atmsgs_df['src'] = ch_atmsgs_df['ch_id']
    ch_atmsgs_df.drop(columns=['message'], inplace=True)
    return ch_atmsgs_df

In [7]:
def foo(args):
    ch_ids, db_name, edge_file, db_channels_df, tme_df = args
    sub_newNodes = pd.Series(dtype='int64')
    sub_edge_df = None
    for ch_id in ch_ids:
        try:
            ch_msgs_df = db_utl.get_msgs_by_ch_id(int(ch_id), db_name=db_name)
        except:
            continue
        ch_fwmsgs_df = get_edges_by_forward(ch_msgs_df, db_channels_df)
        sub_newNodes = pd.concat([sub_newNodes, ch_fwmsgs_df['dst']]).drop_duplicates(keep='first')
        ch_lkmsgs_df = get_edges_by_link(ch_msgs_df, db_channels_df, tme_df, ch_id)
        sub_newNodes = pd.concat([sub_newNodes, ch_lkmsgs_df['dst']]).drop_duplicates(keep='first')
        #ch_atmsgs_df = get_edges_by_at(ch_msgs_df, db_channels_df)
        #sub_newNodes = pd.concat([sub_newNodes, ch_atmsgs_df['dst']]).drop_duplicates(keep='first')

        if type(sub_edge_df) == type(None):
            sub_edge_df = pd.concat([ch_fwmsgs_df, ch_lkmsgs_df])
        else:
            sub_edge_df = pd.concat([sub_edge_df, ch_fwmsgs_df, ch_lkmsgs_df])

    # write in edge file
    if sub_edge_df is not None:
        sub_edge_df = fix_type_edge_df(sub_edge_df)
        sub_edge_df = sub_edge_df.reindex(sorted(sub_edge_df.columns), axis=1)
        sub_edge_df.to_csv(edge_file, mode='a', header=False, index=False)
    return sub_newNodes

In [8]:
def get_queue_df(db_channels_df, subsetNodes, lvl):
    queue_df = db_channels_df[db_channels_df['ch_id'].isin(subsetNodes)]
    queue_df['level'] = lvl
    queue_df['level'] = queue_df['level'].astype(int)
    queue_df = fix_type_node_df(queue_df)
    queue_df = queue_df.reindex(sorted(queue_df.columns), axis=1)
    return queue_df

In [9]:
def generateTGcsvFromTGchannels(startingNodes, store_path, levels, name, core, chunksize, tme_path, db_name='Telegram_test'):

    tme_df = pd.read_csv(tme_path)
    startingNodes = pd.Series(list(set(startingNodes)))
    db_channels_df = db_utl.get_chs_info_by_ids(db_utl.get_channel_ids(db_name), db_name=db_name)

    # genero node_file e edge_file con i rispettivi header
    node_file = os.path.join(store_path, name + '_nodes.csv')
    edge_file = os.path.join(store_path, name + '_edges.csv')
    node_cols = ['ch_id', 'username', 'creation_date', 'title', 'description', 'scam', 'n_subscribers', 'verified', 'level']
    edge_cols = ['src', 'dst', 'ch_id', 'msg_id', 'media_id', 'date', 'author', 'is_forwarded', 'forwarded_from_id', 'forwarded_message_date', 'title', 'extension', 'edge_type', 'note']
    with open(node_file, 'w') as f:
        csv_writer = csv.writer(f)
        csv_writer.writerow(sorted(node_cols))
    with open(edge_file, 'w') as f:
        csv_writer = csv.writer(f)
        csv_writer.writerow(sorted(edge_cols))

    # setto il livello iniziale di osservazione
    lvl = 0

    # genero la queue dei nodi da osservare
    queue_df = get_queue_df(db_channels_df, startingNodes, lvl)

    # genero il knownNodes dove memorizzo tutti gli id di nodi osservati
    knownNodes = pd.Series(dtype='int64')

    # visito in BFS il network per il numero indicato di iterazioni scoprendo nuovi nodi e archi
    while lvl < levels and not queue_df.empty:

        # aggiungo i nodi di queue in knownNodes
        knownNodes = pd.concat([knownNodes, queue_df['ch_id']]).drop_duplicates(keep='first')

        # scrivo queue_df su node_file
        queue_df.to_csv(node_file, mode='a', header=False, index=False)

        # leggo tutti i messaggi dei canali in queue, genero i nuovi archi, genero la nuova queue
        l = [(ch_ids, db_name, edge_file, db_channels_df, tme_df) for ch_ids in np.array_split(queue_df['ch_id'], math.ceil(queue_df.shape[0]/chunksize))]
        newNodes = pd.Series(dtype='int64')
        with Pool(core) as p:
            for sub_newNodes in tqdm(p.imap(foo, l), total=len(l)):
                newNodes = pd.concat([newNodes, sub_newNodes]).drop_duplicates(keep='first')
        newNodes = newNodes[~newNodes.isin(knownNodes)]
        lvl += 1
        queue_df = get_queue_df(db_channels_df, newNodes, lvl)
    return

In [10]:
if __name__ == "__main__":
    # db_name = 'TGDataset'
    db_name = 'Telegram_test'
    tme_path = '../../00_Data/tme.csv'
    store_path = '.'
    # sabmykNetwork = pd.read_csv('data/sabmyk_network.csv')
    # startingNodes = sabmykNetwork['ch_ID'].to_list()
    # name = 'sabmyk'
    startingNodes = db_utl.get_channel_ids(db_name)
    name = 'TG'
    levels = 1
    core = 32
    chunksize = 100

In [11]:
generateTGcsvFromTGchannels(startingNodes, store_path, levels, name, core, chunksize, tme_path, db_name=db_name)

  tme_df = pd.read_csv(tme_path)
100%|██████████| 1210/1210 [1:22:14<00:00,  4.08s/it]
