In [1]:
import pandas
import math
import re

from multiprocessing import Pool
from functools import partial
from itertools import chain
from neo4j import GraphDatabase

In [19]:
SUBREDDIT = 'investing'
GRAPH_PASSWORD = 'test1234'

SUBMISSIONS_LOC = 'data/investing/investing_2020_1_1_2021_2_28.csv'

In [2]:
THREADS = 48

POSTS_LOC = f'data/{SUBREDDIT}/posts.csv'
USERS_LOC = f'data/{SUBREDDIT}/users.csv'
SYMBOLS_LOC = f'data/{SUBREDDIT}/symbols.csv'
AUTHORS_TO_POSTS_LOC = f'data/{SUBREDDIT}/authors_to_posts.csv'
POSTS_TO_SYMBOLS_LOC = f'data/{SUBREDDIT}/posts_to_symbols.csv'
AUTHORS_TO_AUTHORS_LOC = f'data/{SUBREDDIT}/author_to_authors.csv'
AUTHORS_TO_SYMBOLS_LOC = f'data/{SUBREDDIT}/author_to_symbols.csv'

In [None]:
driver = GraphDatabase.driver('bolt://localhost:7687', auth=('neo4j', 'test1234'))

In [3]:
def load_symbols():
    exchanges = ['amex', 'nasdaq', 'nyse']

    # create a dict of symbols to exchanges
    symbols = dict()
    for exchange in exchanges:
        exchange_symbols = pandas.read_csv(f'data/{exchange}_symbols.tsv', sep='\t')
        for index, row in exchange_symbols.iterrows():
            symbols[row['Symbol']] = exchange
            
    return symbols

symbols = load_symbols()
len(symbols)

9738

In [20]:
submissions = pandas.read_csv(
    SUBMISSIONS_LOC,
    names=['type', 'depth', 'parent_id', 'submission_id', 'content', 'author_id', 'timestamp', 'up_votes', 'ratio'],
    dtype={
        'type': 'string',
        'depth': int,
        'parent_id': 'string',
        'submission_id': 'string',
        'content': 'string',
        'author_id': 'string', 
        'timestamp': int, 
        'up_votes': int,
        'ratio': float
    }
)

print(f'Number of submissions: {len(submissions[submissions.type == "submission_title"].index)}')
submissions

Number of submissions: 8411


Unnamed: 0,type,depth,parent_id,submission_id,content,author_id,timestamp,up_votes,ratio
0,submission_title,0,,ltky2b,Daily Advice Thread - All basic help or advice...,AutoModerator,1614420015,22,0.97
1,submission_body,0,,ltky2b,"If your question is ""I have $10,000, what do I...",AutoModerator,1614420015,22,0.97
2,comment,1,ltky2b,goytzuy,"**Hi, welcome to /r/investing. Please note tha...",AutoModerator,1614420016,1,0.00
3,comment,1,ltky2b,gp4h7pb,whats a reasonable portion of a portfolio to a...,S7EFEN,1614488027,3,0.00
4,comment,2,gp4h7pb,gp4ouvn,"I'm a millennial, half of my IRA is in safe ET...",kfuzion,1614491062,1,0.00
...,...,...,...,...,...,...,...,...,...
794333,comment,2,fcniw6m,fco14w8,I'd like to have a steady stream. Monthly if p...,killthebatman15,1577822203,1,0.00
794334,comment,1,ei44tp,fcnnqgw,I absolutely wouldn't recommend investing on F...,dvdmovie1,1577814970,1,0.00
794335,comment,2,fcnnqgw,fd0emr6,They advertise to us on Facebook... that is th...,peepee21,1578117725,1,0.00
794336,comment,1,ei44tp,fco9y2c,What I don't see mentioned here is that if you...,mercury187,1577827119,0,0.00


In [25]:
submissions.groupby(['type'])['author_id'].nunique()

type
comment             104058
submission_body       4705
submission_title      4705
Name: author_id, dtype: int64

In [128]:
def chunk_submissions(submissions, num_chunks):
    # figure out the aprox size of each chunk, this is aprox since we want to chunk by full submissions
    approx_size = math.floor(len(submissions.index) / num_chunks)

    splits = [0]
    for i in range(1, num_chunks):
        start_row = i * approx_size
        
        # search for the start of the first submission up until the last chunk, if we reach the last chunk, skip this
        for j in range(start_row, splits[-1], -1):
            if submissions.iloc[j]['type'] == 'submission_title':
                splits.append(j)
                break
                
    splits.append(len(submissions.index))

    return list(submissions[splits[i - 1]:splits[i]] for i in range(1, len(splits)))

submission_chunks = chunk_submissions(submissions, THREADS)
len(submission_chunks)

48

In [64]:
# extract symbols from text 
def extract_symbols(content, symbols, strict=True):
    if pandas.isna(content):
        return []
    
    if not strict:
        tokens = re.split(r'\W+', content.upper())
    else:
        tokens = content.split(' ')
    
    found = set()
    for token in tokens:
        if len(token) > 1:
            if token[0] == '$' or token[0] == ':':
                token = token[1:].upper()
            
            if token in symbols:
                found.add(token)
        
    return list(found)

(
    extract_symbols('$amc and GME to the www.moon.com.', symbols), 
    extract_symbols('$amc and GME to the www.moon.com.', symbols, False)
)

(['GME', 'AMC'], ['GME', 'COM', 'MOON', 'WWW', 'AMC'])

In [129]:
def process_chunk(symbols, chunk):
    # id of the root post
    post_row = None
    
    # seen symbols in ancestors
    ancestor_symbols = []
    # seen authors in ancestors
    ancestor_authors = []
    
    # accumulate top level post to symbols in all its descendents
    post_to_symbols = []
    # accumulate authors to interaction with posts
    author_to_posts = []
    # accumulate author to author interactions per branch
    author_to_authors = []
    # accumulate author to symbol interactions per branch
    author_to_symbols = []
    
    # top level details about each post
    posts = []
    # dataframes of the aggregates of the post level accumulators for the entire chunk
    post_to_symbols_dfs = []
    author_to_posts_dfs = []
    author_to_authors_dfs = []
    author_to_symbols_dfs = []
    
    def aggregate_post():
        aggregates = {
            'max_depth_score': pandas.NamedAgg(column='depth_score', aggfunc='max'),
            'mean_depth_score': pandas.NamedAgg(column='depth_score', aggfunc='mean'),
            'depth_score': pandas.NamedAgg(column='depth_score', aggfunc='sum'),
            'up_votes': pandas.NamedAgg(column='up_votes', aggfunc='sum'),
            'up_votes_with_depth': pandas.NamedAgg(column='up_votes_with_depth', aggfunc='sum'),
            'count': pandas.NamedAgg(column='depth_score', aggfunc='count')
        }
        
        post_to_symbols_df = None
        try:
            post_to_symbols_df = pandas.DataFrame(post_to_symbols, columns=['post_id', 'symbol', 'depth_score', 'up_votes', 'up_votes_with_depth']) \
                .groupby([
                    pandas.Grouper('post_id'),
                    pandas.Grouper('symbol')
                ]).agg(**aggregates)
            post_to_symbols_dfs.append(post_to_symbols_df)
        except:
            display(f'Error with post_to_symbols {post_row.submission_id}.', post_to_symbols)
            
        author_to_posts_df = pandas.DataFrame(author_to_posts, columns=['author_id', 'post_id', 'depth_score', 'up_votes', 'up_votes_with_depth']) \
            .groupby([
                pandas.Grouper('author_id'),
                pandas.Grouper('post_id')
            ]).agg(**aggregates)
        author_to_posts_dfs.append(author_to_posts_df)
            
        author_to_authors_df = pandas.DataFrame(author_to_authors, columns=['from_id', 'to_id', 'depth_score', 'up_votes', 'up_votes_with_depth']) \
            .groupby([
                pandas.Grouper('from_id'),
                pandas.Grouper('to_id')
            ]).agg(**aggregates)
        author_to_authors_dfs.append(author_to_authors_df)
            
        try:
            author_to_symbols_df = pandas.DataFrame(author_to_symbols, columns=['author_id', 'symbol', 'depth_score', 'up_votes', 'up_votes_with_depth']) \
                .groupby([
                    pandas.Grouper('author_id'),
                    pandas.Grouper('symbol')
                ]).agg(**aggregates)
            author_to_symbols_dfs.append(author_to_symbols_df)
        except:
            display(f'Error with author_to_symbols {post_row.submission_id}.', author_to_symbols)
            
        posts.append([
            # copy some stuff from the main post row
            post_row.submission_id, post_row.content, post_row.author_id, post_row.timestamp, post_row.up_votes, post_row.ratio,
            # some aggregate stats
            len(author_to_posts),
            len(author_to_posts_df.index),
            len(post_to_symbols_df.index) if post_to_symbols_df is not None else 0,
            author_to_posts_df['up_votes'].sum()
        ])
    
    rows_processed = 0
    for (i, row) in chunk.iterrows():
        row_symbols = extract_symbols(row.content, symbols)
        
        if row.type == 'submission_title':
            # if not the first, process current accumulators
            if post_row is not None:
                aggregate_post()
            
            # new submission, reset the tree accumulators
            post_row = row
            
            ancestor_symbols = [row_symbols]
            ancestor_authors = [row.author_id]
            
            post_to_symbols = []
            author_to_posts = []
            author_to_authors = []
            author_to_symbols = []
        elif row.type == 'submission_body':
            # just an extension of the root, just extend with any symbols in the body
            row_symbols = list(set(row_symbols + ancestor_symbols[0]))
            ancestor_symbols[0] = row_symbols
        elif row.depth <= previous_depth:
            # sibling or cousin, remove some context from the accumulators
            pops = previous_depth - row.depth + 1
            ancestor_symbols = ancestor_symbols[:-pops]
            ancestor_authors = ancestor_authors[:-pops]

        # adjust upvotes to 0, stored as -1 if not retrieved in original dataset
        up_votes = max(row.up_votes, 0)

        # if not the first row of a post, start building the relationships
        if row.type != 'submission_title':
            # interactions are inversely propertional to the depth
            depth_score = round(1 / (row.depth + 1), 2) 

            # relationships to symbols in the current row
            for symbol in row_symbols:
                # we want to weight symbols in the main post and those closer to it heigher so use the depth score
                post_to_symbols.append([post_row.submission_id, symbol, depth_score, up_votes, round(depth_score * up_votes, 2)])
                # author to symbols they posted about so depth score of 1
                author_to_symbols.append([row.author_id, symbol, 1, up_votes, up_votes])

            # author to current post
            author_to_posts.append([row.author_id, post_row.submission_id, depth_score, up_votes, round(depth_score * up_votes, 2)])

        # for comments we also build relationships to authors and symbols in ancestors
        if row.type == 'comment':
            # for each previously seen author, create a relationship with a weight thats the inverse of their distance
            for cur_depth, cur_author_id in enumerate(reversed(ancestor_authors)):
                # ignore yourself in the ancestors
                if cur_author_id != row.author_id:
                    cur_depth_score = round(1 / (cur_depth + 1), 2) 
                    author_to_authors.append([row.author_id, cur_author_id, cur_depth_score, up_votes, round(cur_depth_score * up_votes, 2)])

            # similarly for previously seen symbols
            for cur_depth, cur_symbols in enumerate(reversed(ancestor_symbols)):
                cur_depth_score = round(1 / (cur_depth + 1), 2) 
                for symbol in cur_symbols:
                    author_to_symbols.append([row.author_id, symbol, cur_depth_score, up_votes, round(cur_depth_score * up_votes, 2)])

            # update ancestor accumulators
            ancestor_symbols.append(row_symbols)
            ancestor_authors.append(row.author_id)

        # track depth so we know when we popped up from a leaf
        previous_depth = row.depth
            
        rows_processed += 1
        if rows_processed % 25000 == 0:
            print(f'Processed {rows_processed} rows.')
            
    # aggregate the last post
    aggregate_post()
    
    return [
        pandas.DataFrame(posts, columns=['post_id', 'title', 'author_id', 'timestamp', 'up_votes', 'ratio', 'replies', 'authors', 'symbols', 'sum_up_votes']),
        pandas.concat(post_to_symbols_dfs),
        pandas.concat(author_to_posts_dfs),
        pandas.concat(author_to_authors_dfs),
        pandas.concat(author_to_symbols_dfs)
    ]

In [None]:
with Pool(THREADS) as pool:
    processed_posts = pool.map(partial(process_chunk, symbols), submission_chunks)
    
# concat the dataframes from each chunk together and write to file
pandas.concat([s[0] for s in processed_posts]).to_csv(POSTS_LOC, index=False, header=True)
pandas.concat([s[1] for s in processed_posts]).reset_index().to_csv(POSTS_TO_SYMBOLS_LOC, index=False, header=True)
pandas.concat([s[2] for s in processed_posts]).reset_index().to_csv(AUTHORS_TO_POSTS_LOC, index=False, header=True)

# these are not indexed by posts so actually need to be reaggregated
aggregates = {
    'max_depth_score': pandas.NamedAgg(column='depth_score', aggfunc='max'),
    'depth_score': pandas.NamedAgg(column='depth_score', aggfunc='sum'),
    'up_votes': pandas.NamedAgg(column='up_votes', aggfunc='sum'),
    'up_votes_with_depth': pandas.NamedAgg(column='up_votes_with_depth', aggfunc='sum'),
    'count': pandas.NamedAgg(column='depth_score', aggfunc='count')
}

authors_to_authors = pandas.concat([s[3] for s in processed_posts]).reset_index()\
    .groupby([
        pandas.Grouper('from_id'),
        pandas.Grouper('to_id')
    ]).agg(**aggregates)
authors_to_authors['mean_depth_score'] = authors_to_authors['depth_score'] / authors_to_authors['count']
authors_to_authors.reset_index().to_csv(AUTHORS_TO_AUTHORS_LOC, index=False, header=True)

authors_to_symbols = pandas.concat([s[4] for s in processed_posts]).reset_index()\
    .groupby([
        pandas.Grouper('author_id'),
        pandas.Grouper('symbol')
    ]).agg(**aggregates)
authors_to_symbols['mean_depth_score'] = authors_to_symbols['depth_score'] / authors_to_symbols['count']
authors_to_symbols.reset_index().to_csv(AUTHORS_TO_SYMBOLS_LOC, index=False, header=True)

In [131]:
authors_to_posts = pandas.read_csv(AUTHORS_TO_POSTS_LOC)
authors_to_authors = pandas.read_csv(AUTHORS_TO_AUTHORS_LOC)
authors_to_symbols = pandas.read_csv(AUTHORS_TO_SYMBOLS_LOC)
posts_to_symbols = pandas.read_csv(POSTS_TO_SYMBOLS_LOC)

authors_to_posts_agg = authors_to_posts.groupby([
    pandas.Grouper('author_id')
]).agg(
    posts = pandas.NamedAgg(column='post_id', aggfunc='count'),
    posts_depth_score = pandas.NamedAgg(column='depth_score', aggfunc='sum'),
    posts_up_votes = pandas.NamedAgg(column='up_votes', aggfunc='sum'),
    posts_up_votes_with_depth = pandas.NamedAgg(column='up_votes_with_depth', aggfunc='sum')
).sort_values(by=['posts'])

authors_to_authors_agg = authors_to_authors.groupby([
    pandas.Grouper('from_id')
]).agg(
    replied_to = pandas.NamedAgg(column='to_id', aggfunc='count'),
    replied_depth_score = pandas.NamedAgg(column='depth_score', aggfunc='sum'),
    replied_up_votes = pandas.NamedAgg(column='up_votes', aggfunc='sum'),
    replied_up_votes_with_depth = pandas.NamedAgg(column='up_votes_with_depth', aggfunc='sum')
).rename(columns={'from_id': 'author_id'})

authors_to_symbols_agg = authors_to_symbols.groupby([
    pandas.Grouper('author_id')
]).agg(
    symbols = pandas.NamedAgg(column='symbol', aggfunc='count'),
    symbols_depth_score = pandas.NamedAgg(column='depth_score', aggfunc='sum'),
    symbols_up_votes = pandas.NamedAgg(column='up_votes', aggfunc='sum'),
    symbols_up_votes_with_depth = pandas.NamedAgg(column='up_votes_with_depth', aggfunc='sum')
)

# join all the user metrics into a single csv
authors_to_posts_agg.join(authors_to_authors_agg).join(authors_to_symbols_agg).reset_index().to_csv(USERS_LOC, index=False, header=True)

# aggregate some symbols metrics as well
symbols_to_authors_agg = authors_to_symbols.groupby([
    pandas.Grouper('symbol')
]).agg(
    authors = pandas.NamedAgg(column='author_id', aggfunc='count'),
    authors_depth_score = pandas.NamedAgg(column='depth_score', aggfunc='sum'),
    authors_up_votes = pandas.NamedAgg(column='up_votes', aggfunc='sum'),
    authors_up_votes_with_depth = pandas.NamedAgg(column='up_votes_with_depth', aggfunc='sum')
).sort_values(by=['authors'])

symbols_to_posts_agg = posts_to_symbols.groupby([
    pandas.Grouper('symbol')
]).agg(
    posts = pandas.NamedAgg(column='post_id', aggfunc='count'),
    posts_depth_score = pandas.NamedAgg(column='depth_score', aggfunc='sum'),
    posts_up_votes = pandas.NamedAgg(column='up_votes', aggfunc='sum'),
    posts_up_votes_with_depth = pandas.NamedAgg(column='up_votes_with_depth', aggfunc='sum')
)

# join some symbol metrics into a since csv
symbols_to_authors_agg.join(symbols_to_posts_agg).reset_index().to_csv(SYMBOLS_LOC, index=False, header=True)

In [3]:
def load_users(file, tx):
    tx.run(f"LOAD CSV WITH HEADERS FROM 'file:///{file}' " +
        """
            AS line
            CREATE (:User {
                id: line.author_id,
                posts: toInteger(line.posts),
                posts_up_votes: toInteger(line.posts_up_votes),
                posts_up_votes_with_depth: toFloat(line.posts_up_votes_with_depth),
                replied_to: toInteger(line.replied_to),
                symbols: toInteger(line.symbols)
            })
        """,
        file=file
    )
    
def load_posts(file, tx):
    tx.run(f"LOAD CSV WITH HEADERS FROM 'file:///{file}' " +
        """
            AS line
            CREATE (:Post {
                id: line.post_id,
                title: line.title,
                author_id: line.author_id,
                timestamp: datetime({epochSeconds:toInteger(line.timestamp)}),
                up_votes: toInteger(line.up_votes),
                ratio: toInteger(line.ratio),
                replies: toInteger(line.replies),
                authors: toInteger(line.authors),
                symbols: toInteger(line.symbols), 
                sum_up_votes: toInteger(line.sum_up_votes)
            })
        """
    )
    
def load_symbols(file, tx):
    tx.run(f"LOAD CSV WITH HEADERS FROM 'file:///{file}' " +
        """ 
            AS line
            CREATE (:Symbol {
                id: line.symbol,
                authors: toInteger(line.authors),
                posts: toInteger(line.posts),
                posts_up_votes_with_depth: toFloat(line.posts_up_votes_with_depth)
            })
        """
    )
    
def create_indices(tx):
    # create indices for things were going to query by
    for node_label in ['User', 'Submission', 'Symbol']:
        for node_property in ['id', 'communityUpVotes']:
            tx.run(f"CREATE INDEX {node_label}_{node_property} IF NOT EXISTS FOR (n:{node_label}) ON (n.{node_property})")
    
def load_posts_to_symbols(file, tx):
    tx.run('CALL apoc.periodic.iterate(' +
        f"""'LOAD CSV WITH HEADERS FROM "file:///{file}" AS line RETURN line', """ +  
        """
            'MATCH (post:Post {id: line.post_id}), (symbol:Symbol {id: line.symbol})
            CREATE (post)-[:Mentioned {
                count: toInteger(line.count),
                depth_score: toInteger(line.depth_score),
                up_votes: toInteger(line.up_votes),
                up_votes_with_depth: toInteger(line.up_votes_with_depth)
            }]->(symbol)'
        , {batchSize:100000, iterateList:true, parallel:false})"""
    )
    
def load_authors_to_authors(file, tx):
    tx.run('CALL apoc.periodic.iterate(' +
        f"""'LOAD CSV WITH HEADERS FROM "file:///{file}" AS line RETURN line', """ +  
        """
            'MATCH (from:User {id: line.from_id}), (to:User {id: line.to_id})
            CREATE (from)-[:Replied {
                count: toInteger(line.count),
                depth_score: toInteger(line.depth_score),
                up_votes: toInteger(line.up_votes),
                up_votes_with_depth: toInteger(line.up_votes_with_depth)
            }]->(to)'
        , {batchSize:100000, iterateList:true, parallel:false})"""
     )
    
def load_authors_to_symbols(file, tx):
    tx.run('CALL apoc.periodic.iterate(' +
        f"""'LOAD CSV WITH HEADERS FROM "file:///{file}" AS line RETURN line', """ +  
        """
            'MATCH (user:User {id: line.author_id}), (symbol:Symbol {id: line.symbol})
            CREATE (user)-[:Mentioned {
                count: toInteger(line.count),
                depth_score: toInteger(line.depth_score),
                up_votes: toInteger(line.up_votes),
                up_votes_with_depth: toInteger(line.up_votes_with_depth)
            }]->(symbol)'
        , {batchSize:100000, iterateList:true, parallel:false})"""
     )
    
def load_authors_to_posts(file, tx):
    tx.run('CALL apoc.periodic.iterate(' +
        f"""'LOAD CSV WITH HEADERS FROM "file:///{file}" AS line RETURN line', """ +  
        """
            'MATCH (user:User {id: line.author_id}), (post:Post {id: line.post_id})
            CREATE (user)-[:Post {
                count: toInteger(line.count),
                depth_score: toInteger(line.depth_score),
                up_votes: toInteger(line.up_votes),
                up_votes_with_depth: toInteger(line.up_votes_with_depth)
            }]->(post)'
        , {batchSize:100000, iterateList:true, parallel:false})"""
     )

In [None]:
# create the vertices and indices
with driver.session() as session:
    session.write_transaction(partial(load_users, USERS_LOC))
    session.write_transaction(partial(load_posts, POSTS_LOC))
    session.write_transaction(partial(load_symbols, SYMBOLS_LOC))
    
    session.write_transaction(create_indices)

In [5]:
# create the edges
with driver.session() as session:
    session.write_transaction(partial(load_authors_to_posts, AUTHORS_TO_POSTS_LOC))
    session.write_transaction(partial(load_posts_to_symbols, POSTS_TO_SYMBOLS_LOC))
    session.write_transaction(partial(load_authors_to_authors, AUTHORS_TO_AUTHORS_LOC))
    session.write_transaction(partial(load_authors_to_symbols, AUTHORS_TO_SYMBOLS_LOC))

In [None]:
driver.close()