## FakeNewsNet

### Imports

In [None]:
# python imports
import os.path
import urllib.request
import shutil
import zipfile
import json

# external library imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql import DataFrame
from pyspark.sql import Row
from pyspark.rdd import RDD

# project imports


### PySpark contexts

In [None]:
spark = SparkSession.builder.appName('FakeNewsNet').getOrCreate()
sparkcontext = spark.sparkContext
sqlcontext   = SQLContext(sparkcontext)

### Dataset

In [None]:
# ----------------------------------------
# download or reference the dataset
# ----------------------------------------
def get_or_download_dataset():
    path = "./dataset/tweets"
    if not os.path.exists("./dataset"):
        print("downloading tweets...")
        os.mkdir("./dataset")
        # lionel: no public access to download the dataset directly. manually download and place in the stated directory
        url = "https://nusu-my.sharepoint.com/:u:/g/personal/e0809358_u_nus_edu/ETPkp1-0GbBHgB__wyeCS_QBE7_SFluzSCtocU0mUr3Jng"
        with urllib.request.urlopen(url) as response, open(path, 'wb') as out_file:
            shutil.copyfileobj(response, out_file)
            with zipfile.ZipFile(path + "/results.zip", "r") as zip_ref:
                zip_ref.extractall(path)
    return path
# ----------------------------------------
# convert to pandas dataframe
# ----------------------------------------
def to_dataframe_pandas(path, head_len = 10000):
    directories = os.listdir(path)
    json_all = []
    df_all = pd.DataFrame
    print("reading first " + str(head_len) + " json files in " + str(len(directories)) + " directories in " + path + "...")
    for index,directory in enumerate(directories):
        path_prefix = path + "/" + directory + "/tweets"
        files = os.listdir(path_prefix)
        # print("reading " + str(len(files)) + " json files from directory " + directory + " (" + str(index) + " of " + str(len(directories)) + ")")
        json_lines = []
        for file in files:
            path_full = path_prefix + "/" + file
            with open(path_full, 'r') as json_file:
                json_lines.append(json.loads(json_file.read()))
        df_current = pd.json_normalize(json_lines)
        assert len(files) == len(json_lines)
        assert len(files) == df_current.shape[0]
        # lionel: fixed schema with columns we want must be known beforehand as columns are mismatched among json files
        # df_all = pd.concat([df_all, df_current], axis=0, join='outer', sort=False)
        json_all.extend(json_lines)
        if head_len is not None and len(json_all) >= head_len:
            break
    df_all = pd.json_normalize(json_all[:head_len])
    return df_all
# ----------------------------------------
# convert to spark dataframe
# ----------------------------------------
def to_dataframe_spark(path, head_len = 10000):
    # lionel: out-of-memory
    # df_all = sqlcontext.read.json(path + "/*/tweets/*.json")
    # return df_all
    directories = os.listdir(path)
    json_files = []
    df_all = spark.createDataFrame([], StructType([]))
    print("reading first " + str(head_len) + " json files in " + str(len(directories)) + " directories in " + path + "...")
    for index,directory in enumerate(directories):
        path_prefix = path + "/" + directory + "/tweets"
        files = os.listdir(path_prefix)
        # print("reading " + str(len(files)) + " json files from directory " + directory + " (" + str(index) + " of " + str(len(directories)) + ")")
        for file in files:
            json_files.append(path_prefix + "/" + file)
        # lionel: fixed schema with columns we want must be known beforehand as columns are mismatched among json files
        # df_current = sqlcontext.read.json(path_prefix + "/*.json")
        # assert len(files) == df_current.count()
        # df_all = df_all.unionByName(df_current, allowMissingColumns=True)
        if head_len is not None and len(json_files) >= head_len:
            break
    df_all = sqlcontext.read.json(json_files)
    assert len(json_files) == df_all.count()
    if head_len is not None:
        df_all = df_all.limit(head_len)
    return df_all
# ----------------------------------------
# print helper function
# ----------------------------------------
def print_df(dfs):
    for df in dfs:
        print("==========")
        if isinstance(df, pd.DataFrame):
            print(df.shape)
            # print(df.info())
            # print(df.columns)
            # print(df.describe())
        if isinstance(df, DataFrame):
            print("(" + str(df.count()) + "," + str(len(df.columns)) + ")")
            # print(df.columns)
            # print(df.summary().show())
    print("==========")
# ----------------------------------------
# call sites
# ----------------------------------------
!pwd
!ls -l
path_dataset = get_or_download_dataset()
df_fake_1 = to_dataframe_spark(path_dataset + "/gossipcop" + "/fake")
df_real_1 = to_dataframe_spark(path_dataset + "/gossipcop" + "/real")
df_fake_2 = to_dataframe_spark(path_dataset + "/politifact" + "/fake")
df_real_2 = to_dataframe_spark(path_dataset + "/politifact" + "/real")
df_fake_1 = df_fake_1.toPandas()
df_real_1 = df_real_1.toPandas()
df_fake_2 = df_fake_2.toPandas()
df_real_2 = df_real_2.toPandas()
print_df([df_fake_1, df_real_1, df_fake_2, df_real_2])
# lionel: weirdly, pandas' and spark's json APIs yield different number of columns!

### EDA

In [None]:
def clean_dataset():
    pass