In [1]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0 pyspark-shell'


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("pyspark-notebook")\
    .master("spark://spark-master:7077")\
    .config("spark.driver.memory", "4G")\
    .config("spark.executor.memory", "7500m")\
    .config("spark.mongodb.input.uri", "mongodb://mongo_docker_mongodb-service_1:27017/twitter.rawTweets") \
    .config("spark.mongodb.output.uri", "mongodb://mongo_docker_mongodb-service_1:27017/twitter")\
    .config("spark.sql.shuffle.partitions", "1000")\
    .getOrCreate()


In [3]:
df_raw = spark.read.format("mongo").load()


In [5]:
#df = df_raw.sample(withReplacement=False, fraction=0.001)
#text = df.select("data.text")
text = df_raw.select("data.text")


In [6]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, IntegerType, StringType
from langdetect import detect 

import re
from nltk.tokenize import word_tokenize
from string import punctuation 
from nltk.corpus import stopwords 

STOP_WORDS = set(stopwords.words("english") + list(punctuation) + ['AT_USER','URL', 'rt'])
EMOJI_PATTERN = re.compile(
    "["
    "\U0001F1E0-\U0001F1FF"  # flags (iOS)
    "\U0001F300-\U0001F5FF"  # symbols & pictographs
    "\U0001F600-\U0001F64F"  # emoticons
    "\U0001F680-\U0001F6FF"  # transport & map symbols
    "\U0001F700-\U0001F77F"  # alchemical symbols
    "\U0001F780-\U0001F7FF"  # Geometric Shapes Extended
    "\U0001F800-\U0001F8FF"  # Supplemental Arrows-C
    "\U0001F900-\U0001F9FF"  # Supplemental Symbols and Pictographs
    "\U0001FA00-\U0001FA6F"  # Chess Symbols
    "\U0001FA70-\U0001FAFF"  # Symbols and Pictographs Extended-A
    "\U00002702-\U000027B0"  # Dingbats
    "\U000024C2-\U0001F251" 
    "]+")
    
def process_tweet(tweet):
    if tweet is None:
        return ""
    tweet = str(tweet).lower() # convert text to lower-case
    tweet = re.sub('((www\.[^\s]+)|(https?://[^\s]+))', 'URL', tweet) # remove URLs
    tweet = re.sub('@[^\s]+', '', tweet) # remove usernames
    tweet = re.sub(r'#([^\s]+)', r'\1', tweet) # remove the # in #hashtag
    tweet = re.sub(r':[^\s]+', r'', tweet) # remove emoticons starting with :
    tweet = re.sub(EMOJI_PATTERN, r'', tweet) # remove emoticons
    tweet =  re.sub(r'[^\w\s]','', tweet)
    tweet = word_tokenize(tweet) # remove repeated characters (helloooooooo into hello)
    return " ".join([word for word in tweet if word not in STOP_WORDS]).strip()

def lang_detect(text):
    try:
        lang = detect(text)
    except:
        lang = ""
    return lang

def sentence_len(text):
    return len(text.split())

u_lang_detect = udf(lang_detect, StringType())
u_process_tweet = udf(process_tweet, StringType())
u_sentence_len = udf(sentence_len, IntegerType())




In [7]:
text_with_language = text.withColumn('processed', u_process_tweet('text')).withColumn('lang', u_lang_detect('processed')).withColumn('length', u_sentence_len('processed'))
text_filtered = text_with_language.filter("lang  == 'en'").filter("length > 8")


In [8]:
text_filtered.write.format("mongo").mode("append").option("collection", "processedTweetsEN").save()