# Case Study 5: Activity Synchronization

``Pump \& dump" is a shady scheme where the price of a stock is inflated by simulating a surge in buyer interest through false statements (pump) to sell the cheaply purchased stock at a higher price (dump). Investors are vulnerable to this kind of manipulation because they want to act quickly when acquiring stocks that seem to promise high future profits. By exposing investors to information seemingly from different sources in a short period of time, fraudsters create a false sense of urgency that prompts victims to act.

Social media provides fertile grounds for this type of scam. We investigate the effectiveness of our approach in detecting coordinated cryptocurrency pump \& dump campaigns on Twitter. The data was collected using keywords and cashtags (e.g., \$BTC) associated with 25 vulnerable cryptocoins as query terms. We consider both original tweets and retweets because they all add to the stream of information considered by potential buyers. More details on the dataset are found in 

| **Parameter**       | Description       |
|-----------------------|------------------------------|
| **Support filter**    | Accounts with < 8 tweets    |
| **Trace**             | Tweet timestamp             |
| **Eng. trace**        | 30-minute time intervals    |
| **Bipartite weight**  | TF-IDF                      |
| **Proj. weight**      | Cosine similarity           |
| **Edge filter**       | Keep top 0.5% weights       |
| **Clustering**        | Connected components        |
| **Data source**       | DARPA SocialSim             |
| **Data period**       | Jan 2017--Jan 2019          |
| **No. accounts**      | 887,239                     |

*Table: Case study 5 summary*

## Detection

We hypothesize that coordinated pump \& dump campaigns use software to have multiple accounts post pump messages in close temporal proximity. Tweet timestamps are therefore used as the behavioral trace of the accounts. 
The shorter the time interval in which two tweets are posted, the less likely they are to be coincidental. However, short time intervals result in significantly fewer matches and increased computation time. On the other hand, longer (e.g., daily) intervals produce many false positive matches. To balance between these concerns, we use 30-minute time intervals.

Intuitively, it is likely that any two users would post one or two tweets that fall within any time interval; however, the same is not true for a set of more tweets. To focus on accounts with sufficient support for coordination, we only keep those that post at least eight messages. This specific support threshold value is chosen to minimize false positive matches

The tweets are then binned based on the time interval in which they are posted. These time features are used to construct the bipartite network of accounts and tweet times. Edges are weighted using TF-IDF. Similar to the previous case, the projected account coordination network is weighted by the cosine similarity between the TF-IDF vectors. Upon manual inspection, we found that many of the tweets being shared in this network are not related to cryptocurrencies, while only a small percentage of edges are about this topic. These edges also have high similarity and yield a strong signal of coordination. Thus, we only preserve the 0.5\% of edges with largest cosine similarity 

# Data
The sample data used in this notebook can be found at [this link](https://drive.google.com/drive/folders/12KmY-mZw_GdwXHoUKcIr1jJvmleGY794?usp=sharing).

To use it, download the file to `CS5/data` folder

# Helper functions

In [6]:
import gzip
import os
import numpy as np
import pandas as pd
from sparse_dot_topn import sp_matmul_topn
from tqdm import tqdm
from sklearn.feature_extraction.text import TfidfVectorizer
from collections import Counter, defaultdict
import json
import math


def counters2edges(counter_dict, p1_col, p2_col, w_col):
    # Count how many times each feature co-occurs with each user
    edge = pd.DataFrame.from_records(
        [
            (uid, feature, count)
            for uid, feature_counter in counter_dict.items()
            for feature, count in feature_counter.items()
        ],
        columns=[p1_col, p2_col, w_col],
    )
    return edge


def prepare_content(in_fp, tqdm_desc="parse raw content", tqdm_total=None):
    """
    Parse tweet timestamps from a raw tweet file.
    Tweet timestamps are converted to the nearest 30 minutes
    Note that the fields in here are different from the usual Twitter fields, because this is a anonymized dataset
    i.e., we access the attribute "id_h" instead of "id_str"
    """
    user_features = defaultdict(lambda: defaultdict(Counter))
    n = 0
    for line in tqdm(in_fp, desc=tqdm_desc, total=tqdm_total):
        twt = json.loads(line)

        if "timestamp_ms" in twt:
            try:
                user_id = twt["user"]["id_str_h"]
                # user_id = twt["user"]["id_str"] # Use these fields if run on normal Twitter dataset
                twt_time_str = twt["timestamp_ms"][:-3]
                created_at = int(twt_time_str)
            except:
                continue

            # find min time
            if n == 0:
                min_time = created_at
            # convert the time to the nearest 30 minutes
            if ((created_at - min_time) % 1800) == 0:
                user_features["tweet_time"][user_id][created_at] += 1
            else:
                if created_at < min_time:
                    time_bin = math.ceil((created_at - min_time) / 1800)
                if created_at > min_time:
                    time_bin = math.floor((created_at - min_time) / 1800)
                twt_time = min_time + time_bin * 1800
                user_features["tweet_time"][user_id][twt_time] += 1
            n += 1

    return user_features


def dummy_func(doc):
    return doc


def calculate_interaction(
    edge_df,
    p1_col,
    p2_col,
    w_col,
    node1_col,
    node2_col,
    sim_col,
    sup_col,
    tqdm_total=None,
    tqdm_desc="calculating interaction",
):
    """
    calculate interactions between nodes in partition 1

    input:
        edge_df : dataframe that contains (p1, p2, w)
        p1_col : column name that represent p1, i.e: uid
        p2_col : column name that represent p2 , e.g: time
        w_col : column name that represent weight, if None it's unweighted
        node1_col, node2_col: col after projection, both are uid
        supports: dictionary of userids - post count {p1_node: support}
    return:
        interactions : dict { (p1_node1, p1_node2) : interaction_weight }

    assumption:
        1. non-empty inputs: edge_df
        2. p1_node1 < p1_node2 holds for the tuples in interactions
    """
    user_ids, docs = zip(
        *(
            (p1_node, grp[p2_col].repeat(grp[w_col]).values)
            for p1_node, grp in edge_df.groupby(p1_col)
            if len(grp) > 1
        )
    )

    try:
        tfidf = TfidfVectorizer(
            analyzer="word",
            tokenizer=dummy_func,
            preprocessor=dummy_func,
            use_idf=True,
            token_pattern=None,
            lowercase=False,
            sublinear_tf=True,
            min_df=3,
        )
        docs_vec = tfidf.fit_transform(docs)
    except ValueError:
        tfidf = TfidfVectorizer(
            analyzer="word",
            tokenizer=dummy_func,
            preprocessor=dummy_func,
            use_idf=True,
            token_pattern=None,
            lowercase=False,
            sublinear_tf=True,
            min_df=1,
        )
        docs_vec = tfidf.fit_transform(docs)

    results = sp_matmul_topn(docs_vec, docs_vec.T, top_n=100)

    interactions = pd.DataFrame.from_records(
        [
            (
                u1,
                user_ids[u2_idx],
                sim,
                min(supports[user_ids[u1_idx]], supports[user_ids[u2_idx]]),
            )
            for u1_idx, u1 in enumerate(user_ids)
            for u2_idx, sim in zip(
                results.indices[results.indptr[u1_idx] : results.indptr[u1_idx + 1]],
                results.data[results.indptr[u1_idx] : results.indptr[u1_idx + 1]],
            )
            if u1_idx < u2_idx
        ],
        columns=[node1_col, node2_col, sim_col, sup_col],
    )

    return interactions

## Count the number of lines to use progress bar (optional)

In [7]:
# get number of tweets (this case study uses the same data as CS4)
raw_json_gz = f"data/sample_raw_tweets.json.gz"
# number of lines in the input file
numline_file = f"data/sample_raw_tweets.numline"

In [8]:
! cat {raw_json_gz} | wc -l > {numline_file}

In [9]:
numline = int(open(numline_file).read().strip())
numline

1850502

# Step 1. Parse json of tweets to a table of user - feature values 

Construct a uid - feature - count table from raw tweets 

Input: raw tweet.json.gz files 

Output: feature parquets
In this case study, the feature are the timestamps of tweets that a user posted.
For other case studies, the features can be different. E.g., in case study 4, the features are the tweet IDs that a user retweeted.

In [10]:
outdir = "results"
if not os.path.exists(outdir):
    os.makedirs(outdir)
# column name of nodes in partite 1 and 2; column name of edge weights
p1_col = "uid"
p2_col = "feature"
w_col = "cnt"

feature = "tweet_time"
# read input
with gzip.open(raw_json_gz, "rb") as in_fp:
    # do work
    user_features = prepare_content(in_fp, tqdm_total=numline)

# write output
edge_df_name = f"{outdir}/{feature}.edge.parquet"
for feature, counter_dict in user_features.items():
    edge_df = counters2edges(counter_dict, p1_col, p2_col, w_col)

    fname = "{}.edge.parquet".format(feature)
    edge_df.to_parquet(edge_df_name)

parse raw content:  32%|███▏      | 600000/1850502 [00:30<01:03, 19571.36it/s]


## Let's take a look at the feature dataframe

In [12]:
edge_df.head()

Unnamed: 0,uid,feature,cnt
0,Q0uH5xSU0vGCYSd4R_8gJg,1491410045,1
1,Q0uH5xSU0vGCYSd4R_8gJg,1491006845,1
2,Q0uH5xSU0vGCYSd4R_8gJg,1491237245,1
3,Q0uH5xSU0vGCYSd4R_8gJg,1493147045,1
4,Q0uH5xSU0vGCYSd4R_8gJg,1503542045,1


In [13]:
pd.read_parquet(edge_df_name).head()

Unnamed: 0,uid,feature,cnt
0,Q0uH5xSU0vGCYSd4R_8gJg,1491410045,1
1,Q0uH5xSU0vGCYSd4R_8gJg,1491006845,1
2,Q0uH5xSU0vGCYSd4R_8gJg,1491237245,1
3,Q0uH5xSU0vGCYSd4R_8gJg,1493147045,1
4,Q0uH5xSU0vGCYSd4R_8gJg,1503542045,1


# Step 2. Calculate the similarity between features 

We consider each user as a document, and "words" are the feature scores, i.e., time intervals. 
- We first filter out users with lack of support, i.e., those posting less than `sup_thresh = 8`

- We then create a TF-IDF vector for each user, where each dimension is a time interval, and the value is the number of tweets in that time interval.

We output a dataframe with the following columns: `user1, user2, similarity, support`; where each record represents an interaction between `user1` and `user2`; `similarity` is the strength of connections and `support` is the post count of `user1`

In [14]:
# column name of node1, node2, similarity, support
node1_col = "user1"
node2_col = "user2"
sim_col = "similarity"
sup_col = "support"
sup_thresh = 8
# takes a .parquet file of edge table
# save to output file of interactions with p-values
interaction_file = f"{outdir}/{feature}.interactions.parquet"
outfile = interaction_file
# read input
edge_df = pd.read_parquet(edge_df_name)

if not (len(edge_df) > 0 and np.any(edge_df.groupby(p1_col).size() > 1)):
    raise ValueError(
        "Empty input edge table. Run `prepare_content()` to create edge table first"
    )
else:
    # calculate the support, which is the post count for each user
    supports = edge_df.groupby(p1_col)[w_col].sum().to_dict()
    print("No users: ", len(supports))
    # filter nodes based on support
    supports = {k: v for k, v in supports.items() if v > sup_thresh}
    print(f"No users after filtering with threshold={sup_thresh}: {len(supports)}")
    filtered_nodes = list(supports.keys())
    # remove the nodes with low support from edge_df
    edge_df = edge_df[edge_df[p1_col].isin(filtered_nodes)]

    # calculate similirity between users
    interaction_df = calculate_interaction(
        edge_df, p1_col, p2_col, w_col, node1_col, node2_col, sim_col, sup_col
    )
    # write output
    interaction_df.to_parquet(outfile)

No users:  255999
No users after filtering with threshold=8: 7832


# Let's take a look at the interaction df

In [15]:
interaction_df = pd.read_parquet(interaction_file)
interaction_df.head()

Unnamed: 0,user1,user2,similarity,support
0,--ImalkjS1xalfu1Yctubg,rmkPcZpvrs2UwzjM2sClQw,0.079465,13
1,--ImalkjS1xalfu1Yctubg,re76DNK1AMl1M1tlGHNR-w,0.086645,10
2,--ImalkjS1xalfu1Yctubg,jYGAgiqc2IGYpwXDPIczaA,0.079286,14
3,--ImalkjS1xalfu1Yctubg,7JdRlQYUwQVr8e-tgM3nFQ,0.009486,22
4,--ImalkjS1xalfu1Yctubg,zWE8_nGEpdzm1xRboFSmeQ,0.088512,11


# Step 3. Filter low edge weight and create a network 
For case study 5, we apply an weight filter `sim_thresh = 0.995`, i.e., only accounts with similarity >0.995 percentile are considered

In [18]:
import networkx as nx

# path to input indexed text of tweets
twttext = f"{outdir}/table.by.user.pkl"

# path to output file of the graphml for Gephi visualization
outgraph = f"{outdir}/{feature}.filtered.coord.graphml"
# path to output file of context of each suspicious group
grouptext = f"{outdir}/{feature}filtered.coord.pkl"

# read input parquet file of interaction
interaction_df = pd.read_parquet(interaction_file)

# define the filters
sim_thresh = 0.995

In [19]:
# apply edge weight filter: 0.995 quantile cosine similarity of TFIDF vectors
interaction_df = interaction_df[
    interaction_df[sim_col] >= interaction_df[sim_col].quantile(sim_thresh)
]
# create a graph object from the interaction table
G = nx.Graph()
for idx, row in interaction_df.iterrows():
    try:
        G.add_edge(
            row[node1_col], row[node2_col], weight=row[sim_col], support=row[sup_col]
        )
    except:
        print(row)
        pass
#  write graph object to file
nx.write_gml(G, outgraph)

# Note

The above code matches the filtering procedure described in the paper: 
1. users with low support (few posts) are removed
2. tf-idf vectors are calculated
3. similarity between tf-idf vectors are calculated, and edges with low similarity are removed. 

The results of case study 5 in the paper were obtained by a slightly different ordering of these steps:
1. tf-idf vectors are calculated
2. similarity between tf-idf vectors are calculated, and edges with low similarity are removed
3. users with low support are removed. 

The resulting networks are very similar, and the result reported in the paper are not affected by this change.


# Optional final step: manual inspection

For optional close-reading of the case study, we parse the raw json of tweets to a table of user_id - tweet text and save to file

## 1. Save user_id and tweet texts 

In [34]:
#!/usr/bin/env python3
import os
import gzip
import pickle
import ujson as json
from tqdm import tqdm
import networkx as nx

# path to output directory for output randomized edge tables
outdir = "results"
if not os.path.exists(outdir):
    os.makedirs(outdir)


def prepare_content(in_fp, tqdm_desc="parse user tweet", tqdm_total=None):
    # create a dictionary of user_id -> list of tweets by that user
    user_tweets = defaultdict(list)
    for line in tqdm(in_fp, desc=tqdm_desc, total=tqdm_total):
        twt = json.loads(line)
        try:
            user_id = twt["user"]["id_str_h"]
            user_tweets[user_id].append(twt["text_m"])
            # Use these fields if run on normal Twitter dataset
            # user_id = twt["user"]["id_str"]
            # user_tweets[user_id].append(twt["text"])
        except:
            pass
    return user_tweets


# path to output pickle file of tweet table
outfile = f"{outdir}/table.by.user.pkl"


# read input to create a dictionary of user_id -> list of tweets by that user
with gzip.open(raw_json_gz, "rb") as in_fp:
    # do work
    user_tweets = prepare_content(in_fp, tqdm_total=numline)
with open(outfile, "wb") as out_fp:
    # write output
    pickle.dump(user_tweets, out_fp)

parse user tweet:  32%|███▏      | 600000/1850502 [00:24<00:50, 24529.52it/s]


### Let's take a look at a few records

In [39]:
idx = 0
for k, v in user_tweets.items():
    print(k, v)
    idx += 1
    if idx > 5:
        break

Q0uH5xSU0vGCYSd4R_8gJg [' url: https://t.co/9rJOUrrFjs56EKUC7Mx5sg  #3: K-Swiss Ultra-Express Omni N Zapatillas, Hombre  url: https://t.co/08XQ7_X4PM3ErAz3WXhh0g  #valencia #vigo #bilbao', ' url: https://t.co/9rJOUrrFjs56EKUC7Mx5sg  #3: K-Swiss Accomplish II Mesh Omni Fiery - Zapatillas paraâ\x80¦  url: https://t.co/VyDDbaAiRwAmM52jUuLtaw  #valencia #vigo #bilbao', ' url: https://t.co/9rJOUrrFjs56EKUC7Mx5sg  #3: K-Swiss Ultra-Express Omni N Zapatillas, Hombre  url: https://t.co/08XQ7_X4PM3ErAz3WXhh0g  #valencia #vigo #bilbao', ' url: https://t.co/9rJOUrrFjs56EKUC7Mx5sg  #7: Columbia Redmond Mid Leather Omni-Tech, Zapatos de High Riseâ\x80¦  url: https://t.co/udryRnBU7tCeE9EU57oR9g  #valencia #vigo #bilbao  url: https://t.co/-UodBxzGyl5gscXXykuaXw ', ' url: https://t.co/9rJOUrrFjs56EKUC7Mx5sg  |MÃ¡s vendido #1: K-Swiss Accomplish II Mesh Omni Fiery -â\x80¦  url: https://t.co/8p4srGb4IsI9-ZWBJJBY7Q  #valencia #vigo #bilbao  url: https://t.co/MO4-9TF15AkOFh2wzihIAA ', ' url: https://t.co/

In [36]:
user_to_tweets = f"{outdir}/table.by.user.pkl"

## 2. Get the post texts of users from the coordinated connected component

In [37]:
# get the tweet texts for each connected component for manual annotation
has_text = twttext is not None
if has_text:
    with open(twttext, "rb") as f:
        tweet_table = pickle.load(f)
# texts (list of dict): each element is a connected component
# represented by a dictionary where each key is a node_id and the value is the tweet text
texts = [
    {node_id: tweet_table[node_id] for node_id in component}
    for component in nx.connected_components(G)
]
texts.sort(key=len, reverse=True)
# save to file
with open(grouptext, "wb") as f:
    pickle.dump(texts, f)

In [38]:
texts

[{'OIOkXKf3p-rjjf5hMlyp6Q': ['RT @un: FV7wzePr5Cz_5f4gEjkZQw : @un: uZJAmvOkoJiV_zazPeCXDA  @un: gxZkAmskUwpCjznO-dr9Bw  @un: L-nbZF_-nQTS4FZTgLDhtw  @un: 6fxlMQWzQ0oRwDQ41Wp2tg _Oficial @un: SGMa5xvrMyXaVnpjPKj0LA  @un: wmly5A_oy7R3tQm_8xRfkA _PASION__ @un: 2p1u-4o4V6LlUs2QlgWYmg â\x80¦ ',
   'RT @un: SGMa5xvrMyXaVnpjPKj0LA : @un: wmly5A_oy7R3tQm_8xRfkA _PASION__ @un: FV7wzePr5Cz_5f4gEjkZQw  @un: uZJAmvOkoJiV_zazPeCXDA  @un: 2p1u-4o4V6LlUs2QlgWYmg  @un: 8edZUNkogcbvwltdGLvSjQ  @un: gxZkAmskUwpCjznO-dr9Bw  @un: 0k_cUCmSUKXltfnu8yq4Yw  @un: k1W8MRWz05RszT537A8rMQ â\x80¦ ',
   'RT @un: SGMa5xvrMyXaVnpjPKj0LA : @un: 0k_cUCmSUKXltfnu8yq4Yw  @un: gxZkAmskUwpCjznO-dr9Bw  @un: wmly5A_oy7R3tQm_8xRfkA _PASION__ @un: k1W8MRWz05RszT537A8rMQ  @un: DMhqX8FOmoJwdO5ibABtqQ  @un: hAbk8HqzJd1e6_RJtP5_eg  @un: 24Igax1JBC0acQ6ciMIdNg _fynny @un: FA0gXVRMzpwGVBobnk6P2w â\x80¦ ',
   'RT @un: 6fxlMQWzQ0oRwDQ41Wp2tg _Oficial: @un: YBJ7yw8J1qsl0VwHrbNIbg  @un: hAbk8HqzJd1e6_RJtP5_eg  @un: uZJAmvOkoJiV_zazPeCX