**Utilities**:
- Windows keystroke for \` character is ALT+96
- Windows keystroke for ~ character is ALT+126

**Notebook Initialization**: Hereby all the needed `import` statements, global variables or functions with global scope throughout the notebook.
**PySpark** has been used for data processing to avoid overloading memory usage and take advantage of the benefits from the **RDD** (Resilient Distributed Dataset) format to deal with huge files without the explicit need of loading all their contents into memory.

Run the cell below once to initialize all the needed variables.

In [1]:
from IPython.display import HTML, display
import tabulate
import os
import sys
import operator
import json
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql.types import *
%load_ext Cython
%matplotlib inline

# Initialize PySpark
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
sqlContext = SQLContext(sc)

# Global Settings
dataset_dir = "../data/mmr_graph/"

# 1. Data Preparation
Storing Twitter usernames without any transformation in memory would mean huge memory consumption to store a relatively expensive data type such as strings. I could optimize this by performing some *data encoding* on the usernames and convert strings to integers by keeping a 1:1 mapping between the string representation of the username and its integer representation. This would result in huge memory-usage optimization when loading the graph into memory.

## 1.1 Twitter Usernames Encoding
Since I am dealing with huge files (19GB overall for the MMR dataset) I have to pay attention with what is actually loaded into memory, although the current server configuration offers 128GB available RAM. As a result of the encoding process, I want to create a .CSV file that will store the 1:1 mapping between the (string) Twitter username and its integer encoding. As a rule of thumb, it's just easy to start counting from 0 and assign a unique ID to each username. Output is saved to a `usernames.csv` file. I also wanted to create an encoded copy of the MMR dataset in the form of a single .CSV file, which would then be also easier to parse. It's also just convenient to have a single dataset file as a result of the encoding process, named `mmr_encoded.csv`.

In [2]:
# Global variables
usernames_filename = "../data/usernames.csv"
usernames_header_filename = "../data/usernames_header.csv"
mmr_encoded_filename = "../data/mmr_encoded.csv"
csv_header = ["username", "encoding:ID(User)"]

# Support functions declarations
def add_or_get_new_user_encoding(username):
    global current_user_id
    global usernames_encoding
    if not username in usernames_encoding.keys():
        user_enc = current_user_id
        usernames_encoding[username] = user_enc
        current_user_id += 1
        if current_user_id%10000000 == 0:
            print("Currently processed {0} unique usernames".format(current_user_id))
        return user_enc
    return usernames_encoding[username]

def get_encoding(username):
    usernames_rdd = sc.textFile(usernames_filename)
    return usernames_rdd.map(lambda x: x.split(',')).filter(lambda x: x[0] == username).map(lambda x: int(x[1])).first()

def process_line(line):
    a,b = line.strip("()\n").split(', ')
    a = a[2:-1]
    b = b[2:-1]
    a_enc = add_or_get_new_user_encoding(a)
    b_enc = add_or_get_new_user_encoding(b)
    return a_enc, b_enc

In [3]:
%%time

def start_encoding():
    print("Duplicating dataset in encoded format...")
    with open(mmr_encoded_filename, "w") as mmr_enc_f:
        for folder in sorted(os.listdir(dataset_dir)):
            folder_path = os.path.join(dataset_dir, folder)
            print("Processing files in folder {0}...".format(folder))
            for part in os.listdir(folder_path):
                part_path = os.path.join(folder_path, part)
                if os.path.isfile(part_path) and part.startswith("part"):
                    with open(part_path, encoding="utf-8") as part:
                        for line in part:
                            a_enc, b_enc = process_line(line)
                            mmr_enc_f.write(",".join([str(a_enc),str(b_enc)]) + "\n")

    print("Writing CSV header to separate file {0}...".format(usernames_header_filename))
    with open(usernames_header_filename, "w") as usernames_header_f:
        usernames_header_f.write(",".join(csv_header)+"\n")
    print("Dumping encoded usernames to {0}...".format(usernames_filename))
    with open(usernames_filename, "w") as usernames_out_f:
        for k in usernames_encoding.keys():
            usernames_out_f.write(",".join([k, str(usernames_encoding[k])])+"\n")
    print("Processed all usernames. Total unique usernames: {0}".format(current_user_id))

# Start processing
current_user_id = 0 #Start with ID = 0
usernames_encoding = {}

# If any of the expected output files are missing, run the cell block.
if not os.path.exists(usernames_filename) or not os.path.exists(mmr_encoded_filename):
    start_encoding()
    del usernames_encoding

Duplicating dataset in encoded format...
Processing files in folder 2013-09...
Currently processed 10000000 unique usernames
Currently processed 20000000 unique usernames
Processing files in folder 2013-12...
Currently processed 30000000 unique usernames
Processing files in folder 2014-03...
Currently processed 40000000 unique usernames
Processing files in folder 2014-06...
Currently processed 50000000 unique usernames
Processing files in folder 2014-09...
Processing files in folder 2014-12...
Processing files in folder 2015-03...
Currently processed 60000000 unique usernames
Processing files in folder 2015-06...
Processing files in folder 2015-09...
Currently processed 70000000 unique usernames
Processing files in folder 2015-12...
Processing files in folder 2016-03...
Currently processed 80000000 unique usernames
Processing files in folder 2016-06...
Processing files in folder 2016-09...
Writing CSV header to separate file ../data/usernames_header.csv...
Dumping encoded usernames to 

As a result of the encoding process outlined above, these are the two output files and their respective size that will be used from now on:

| `usernames.csv` | `mmr_encoded.csv`   |
|---|---|
|   1.8GB         |                9.8GB|

## 1.2 Querying for Username-Encoding
As a reference and last step of this Jupyter Notebook, I include how one can, given a username/encoding, efficiently query for its corresponding encoding/username by using **PySpark RDDs and pure bash scripting**.

### 1.2.1 Examples: Query for encoding, given the username

In [4]:
%%time

get_encoding("barackobama")

CPU times: user 12 ms, sys: 12 ms, total: 24 ms
Wall time: 8.98 s


4969601

In [5]:
%%time

get_encoding("realdonaldtrump")

CPU times: user 12 ms, sys: 8 ms, total: 20 ms
Wall time: 7.46 s


3793089

### 1.2.2 Examples: Query for username, given the encoding
Given a username encoding, querying for its corresponding string representation is trivial with the following **bash script**:

~~~bash
USERNAMES="../data/usernames.csv"
ENCODING=$1
LINE_NUMBER=$(($ENCODING+1))

sed -n "$LINE_NUMBER"p $USERNAMES
~~~

In [7]:
%%time
%%bash
../scripts/get_username.sh 46915512

ryofujii0311,46915512
CPU times: user 4 ms, sys: 8 ms, total: 12 ms
Wall time: 10.7 s


In [8]:
%%time
%%bash
../scripts/get_username.sh 49309989

ohigeforever,49309989
CPU times: user 4 ms, sys: 8 ms, total: 12 ms
Wall time: 10.8 s


### 1.2.3 Super-inefficient example using Spark Dataframes

In [12]:
%%time

# Using a DataFrame
usernames_rdd = sc.textFile(usernames_filename)
df_fields = [StructField(csv_header[0], StringType(), False), StructField(csv_header[1], IntegerType(), False)]
df_schema = StructType(df_fields)
header_rdd = usernames_rdd.filter(lambda l: ",".join(csv_header) in l)
usernames_noHeader_rdd = usernames_rdd.subtract(header_rdd)
usernames_df = usernames_noHeader_rdd.map(lambda l: l.split(',')).map(lambda p: (p[0], int(p[1]))).toDF(df_schema)
%timeit usernames_df.where(usernames_df.username == "barackobama").first()

3min 49s ± 398 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
CPU times: user 292 ms, sys: 56 ms, total: 348 ms
Wall time: 34min 33s


The stats above probably show that PySpark dataframe is much slower than the RDD version, therefore for the easy purpose as retrieving the username / encoding of a given input we can just safely rely on the first option. 