In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,lit

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("team1_sp22_final_project") \
    .config("spark.executor.memory", '8g') \
    .config('spark.executor.cores', '4') \
    .config('spark.cores.max', '4') \
    .config("spark.driver.memory",'8g') \
    .getOrCreate()

sc = spark.sparkContext

sc = spark.sparkContext


import os
import glob

In [2]:
# Simple pattern to Install custom packages from Juypter.
username = 'mhk9c'
# Install a pip package in the current Jupyter kernel
import sys
!{sys.executable} -m pip install demoji
!{sys.executable} -m pip install tldextract

sys.path.append(f'/home/{username}/.local/lib/python3.7/site-packages/')

# Then you can import them.
import demoji 
demoji.download_codes()

import tldextract

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable




### I propose that we keep all of our data, downloaded or derived, in the common folder

In [3]:
data_path = "/project/ds5559/team1_sp22/data/"
load_par = True

In [4]:
def save_df(_df, name):
    # Check whether the specified path exists or not
    full_path = f'{data_path}{name}'
    print(full_path)  
    if not os.path.exists(full_path):  
        # Create a new directory because it does not exist 
        os.makedirs(full_path)
        print("The new directory is created!")
    
    _df.write.format("parquet").mode("overwrite").save(f"{full_path}")
    os.system(f'chmod -R 777 {full_path}')
    print(f'Saved as: {full_path}')

def load_data(name):       
    full_path = f'{data_path}/{name}'
    _df = spark.read.parquet(full_path)        
    print(f'Done loading from {full_path}.')
    return _df
    
def create_df_from_csv(name):
    first = True
    for file in glob.glob(f'{data_path}/{name}/*.csv'):            
        print(file)
        if(first):
            _df = spark.read.csv(file, header=True, inferSchema=True, mode="DROPMALFORMED")                
            _df = _df.withColumn("source_file",lit(file))
        else:
            new_df = spark.read.csv(file, header=True, inferSchema=True, mode="DROPMALFORMED")
            new_df = new_df.withColumn("source_file",lit(file))                
            _df = _df.union(new_df)                        
        first = False        
    return _df
    


In [5]:
if(load_par):
    df = load_data("russian-troll-tweets")
else:
    df = create_df_from_csv("russian-troll-tweets-master")
    save_df(df, "russian-troll-tweets")
    
total_tweets = df.count()
print(f'There are {total_tweets} tweets in this dataset')


Done loading from /project/ds5559/team1_sp22/data//russian-troll-tweets.
There are 2914254 tweets in this dataset


In [None]:
# df.printSchema()

In [6]:
df.createOrReplaceTempView("tweets")

In [7]:
sqlDF = spark.sql("SELECT * FROM tweets where language = 'English' ")
english_tweets = sqlDF.count()
print(f'There are {english_tweets:,} english tweets in this dataset. They account for {english_tweets/total_tweets:%} of the dataset.')

There are 2,096,049 english tweets in this dataset. They account for 71.924033% of the dataset.


### Add some additional columns to the data.

In [12]:
import pyspark.sql.functions as func
from pyspark.sql.types import StringType, ArrayType
import re
b = re.compile(r"@[a-zA-Z0-9]+")

def convert_emojii(string):    
    try:
        return demoji.replace_with_desc(string, ":")
    except:
        return "COULD NOT CONVERT EMOJII"
convert_emojii_UDF = func.udf(lambda z:convert_emojii(z),StringType())   
# test = convert_emojii("🐝🐝🐝")   
# print(test)


def extract_domain_information(url):
    try:
        if(url):
            ext = tldextract.extract(url)
            return ext.registered_domain
        else:
            return ""        
    except:
         return ""    
extract_domain_information_UDF = func.udf(lambda z:extract_domain_information(z),StringType())   
# test = extract_domain_information("https://rivanna-portal.hpc.virginia.edu/node/udc-ba27-18/55477/lab?")
# print(test)


def extract_handles(content): 
    try:
        if(content is not None):        
            result = re.findall(b, content) 
            return result
        else:
            return []
    except:
        return []    
extract_handles_UDF = func.udf(lambda z:extract_handles(z),ArrayType(StringType(), True))   
# test = extract_handles("Hi @MichelleObama , remember when you praised Harvey Weinstein as 'a wonderful human being, a good friend and a powerhouse.")
# print(test)


In [13]:
sqlDF = sqlDF.withColumn("curated_content", convert_emojii_UDF(col("content"))) \
                .withColumn("tco1_step1_domain", extract_domain_information_UDF(col("tco1_step1"))) \
                .withColumn("tco2_step1_domain", extract_domain_information_UDF(col("tco2_step1"))) \
                .withColumn("tco3_step1_domain", extract_domain_information_UDF(col("tco3_step1"))) \
                .withColumn("handles", extract_handles_UDF(col("content")))
save_df(sqlDF, "russian-troll-tweets-enriched")

/project/ds5559/team1_sp22/data/russian-troll-tweets-enriched
Saved as: /project/ds5559/team1_sp22/data/russian-troll-tweets-enriched


In [None]:
sqlDF.createOrReplaceTempView("english_tweets")

In [None]:
_sqlDF = spark.sql("SELECT tco1_step1, tco2_step1, tco3_step1  FROM english_tweets LIMIT 10")
_sqlDF.show(10, False)

In [None]:
sqlDF = spark.sql("SELECT content,source_file FROM english_tweets LIMIT 100")

### Looking at bigrams

In [None]:
sqlDF_content = spark.sql("SELECT content FROM tweets where language = 'English'")
content_RDD = sqlDF_content.rdd 
type(content_RDD)

In [None]:
content_RDD.count()

In [None]:
rdd_content_english.take(5)

In [None]:
# Bigram Word Count
bigrams = content_RDD \
          .map(lambda x: x['content']) \
          .map(lambda x: [] if (x is None) else x.split() ) \
          .flatMap(lambda x: [((x[i],x[i+1]),1) for i in range(0,len(x)-1)])\
          .reduceByKey(lambda x,y: x+y) \
          .map(lambda x: (x[1],x[0])) \
          .sortByKey(False)

bigrams.take(10)
# Not that exciting...

In [None]:
bigrams.saveAsTextFile(f'{data_path}/bigrams')

In [None]:
from pyspark.mllib.feature import HashingTF
hashingTF = HashingTF()

# Load documents (one per line).
# documents = sc.textFile("...").map(lambda line: line.split(" "))
documents = content_RDD \
          .map(lambda x: x['content']) \
          .map(lambda x: [] if (x is None) else x.split() )         

tf = hashingTF.transform(documents)

In [None]:
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

In [None]:
tf.cache()
idf = IDF(minDocFreq=2).fit(tf)
tfidf = idf.transform(tf)