In [None]:
import findspark
findspark.init()

import webbrowser

from pyspark.sql import SparkSession
import nltk
from nltk.corpus import twitter_samples
import pandas as pd

from pyspark import StorageLevel
from pyspark.sql.functions import split, explode, regexp_replace, lower, regexp_extract
from pyspark.sql.functions import row_number, monotonically_increasing_id as identity, col, length
from pyspark.sql.functions import lag, lead, udf
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, StringType

# Creating Spark Session

In [None]:
spark = SparkSession.builder.appName('SparkNlp').getOrCreate()
webbrowser.open('http://localhost:4040')

# Downloading the Twitter Sample Data

In [None]:
nltk.download('twitter_samples')

# Extracting Twitter Positive and Negative Data

In [None]:
twitter_samples.fileids()

In [None]:
# positive and negative documents
pos_tw = [(t, 'pos') for t in twitter_samples.strings('positive_tweets.json')]
neg_tw = [(t, 'neg') for t in twitter_samples.strings('negative_tweets.json')]

# joining documents
document = [pos_tw] + [neg_tw]

# list to dataframe
df = pd.DataFrame(document[0]).append(pd.DataFrame(document[1])).rename(columns={0:'text', 1:'label'})

# Create Resilient Distributed Datastore

In [None]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df_rdd = spark.createDataFrame(df)

In [None]:
df_rdd.printSchema()

In [None]:
# creates a job
df_rdd.show(5, truncate=False)

# Create Temp Table Reference

In [None]:
df_rdd.createOrReplaceTempView('SqlNlp')

In [None]:
spark.catalog.listTables()

In [None]:
spark.sql("SHOW TABLES").show()

# Exploring the Text Data

In [None]:
df_rdd.select('text').show(4, truncate=False)

# Cleaning the Text Data

In [None]:
smilies = [':-)', ':)', ';)', ':o)', ':]', ':3', ':c)', ':>', '=]', '8)', '=)', ':}',
    ':^)', ':-D', ':D', '8-D', '8D', 'x-D', 'xD', 'X-D', 'XD', '=-D', '=D',
    '=-3', '=3', ':-))', ":'-)", ":')", ':*', ':^*', '>:P', ':-P', ':P', 'X-P',
    'x-p', 'xp', 'XP', ':-p', ':p', '=p', ':-b', ':b', '>:)', '>;)', '>:-)',
    '<3', ':L', ':-/', '>:/', ':S', '>:[', ':@', ':-(', ':[', ':-||', '=L', ':<',
    ':-[', ':-<', '=\\', '=/', '>:(', ':(', '>.<', ":'-(", ":'(", ':\\', ':-c',
    ':c', ':{', '>:\\', ';(', '(', ')', 'via']

# Adding Sentence Identity

In [None]:
w = Window.partitionBy('label').orderBy('text')
df_rdd = df_rdd.withColumn('sentence_id', row_number().over(w))

## Cleaning Invalid Characters

In [None]:
df_clean = df_rdd.withColumn('clean_text', lower(regexp_replace('text', '[^a-zA-Z#@ ]', '')))
df_clean.show(5, truncate=False)

## Splitting the Words

In [None]:
df_split = df_clean.select(split('clean_text', ' ').alias('words'), 'label', 'sentence_id')
df_split.show(4, truncate=False)

In [None]:
df_split.printSchema()

# Removing Hash and User Tags

In [None]:
udf_cotains_hash_user = udf(lambda row: any([any([i in x for i in ['#', '@']]) for x in row]))
udf_contains_hash_only = udf(lambda row: any(['#' in x for x in row]))
udf_clear_hash = udf(lambda row: [x for x in row if '#' not in x], ArrayType(StringType(), True))
udf_clear_user = udf(lambda row: [x for x in row if '@' not in x], ArrayType(StringType(), True))

df_split = df_split\
    .withColumn('contain_tags', udf_cotains_hash_user('words'))\
    .withColumn('contain_hash_only', udf_contains_hash_only('words'))\
    .withColumn('words_clean', udf_clear_user(udf_clear_hash('words')))

In [None]:
df_split.printSchema()

In [None]:
df_split.show()

## Exploding the Words into Column

In [None]:
df_exp = df_split.select(explode('words_clean').alias('word'), 'label', 'contain_tags', 'contain_hash_only', 'sentence_id')
df_exp.show()

In [None]:
# cache in memory to avoid lazy evaluation later
df_exp.cache()

## Filtering Blanks

In [None]:
df_exp = df_exp.filter(col('word')!='')
df_exp.show()

## Creating Word Identity

In [None]:
df_exp = df_exp.withColumn('id', identity())
df_exp.show()

# Creating Sliding Window

In [None]:
df_exp.createOrReplaceTempView('WindowTutorial')

In [None]:
spark.sql("""
SELECT
    id,
    LAG(word, 1) OVER(ORDER BY id) AS w1,
    word,
    LEAD(word, 1) OVER(ORDER BY id) AS w2
FROM WindowTutorial
""").show()

In [None]:
w = Window.orderBy('id')
df_exp.select(
    'id',
    lag('word', 1).over(w).alias('w1'),
    'word',
    lead('word', 1).over(w).alias('w2')
).show()

# Sliding Window as Subquery: Most common 3-tuples

In [None]:
spark.sql("""
SELECT label, w1, w2, w3, w4, COUNT(1) AS phrase_count 
FROM (
    SELECT
        label,
        word AS w1,
        LEAD(word, 1) OVER(ORDER BY id) AS w2,
        LEAD(word, 2) OVER(ORDER BY id) AS w3,
        LEAD(word, 3) OVER(ORDER BY id) AS w4
    FROM WindowTutorial
)
GROUP BY label, w1, w2, w3, w4
ORDER BY COUNT(1) DESC
""").show()

In [None]:
spark.sql("""
SELECT DISTINCT w1, w2, w3, w4
FROM (
    SELECT
        word AS w1,
        LEAD(word, 1) OVER(ORDER BY id) AS w2,
        LEAD(word, 2) OVER(ORDER BY id) AS w3,
        LEAD(word, 3) OVER(ORDER BY id) AS w4
    FROM WindowTutorial
)
ORDER BY w1 DESC, w2, w3, w4
""").show()

In [None]:
spark.sql("""
WITH subquery_cte AS (
    SELECT label, w1, w2, w3, w4, COUNT(1) AS phrase_count 
    FROM (
        SELECT
            label,
            word AS w1,
            LEAD(word, 1) OVER(ORDER BY id) AS w2,
            LEAD(word, 2) OVER(ORDER BY id) AS w3,
            LEAD(word, 3) OVER(ORDER BY id) AS w4
        FROM WindowTutorial
    )
    GROUP BY label, w1, w2, w3, w4
)
SELECT label, w1, w2, w3, w4, phrase_count
FROM (
    SELECT
        label,
        ROW_NUMBER() OVER(PARTITION BY label ORDER BY phrase_count DESC) AS row,
        w1, w2, w3, w4, phrase_count
    FROM subquery_cte
)
WHERE row = 1
ORDER BY label ASC
""").show()

In [None]:
df_exp.cache()

In [None]:
df_exp.filter(col('word')=='httptcorcvcyyoiq').show()

## Removing Stopwords

In [None]:
nltk.download('stopwords')
from nltk.corpus import stopwords

stopset = set(stopwords.words('english'))
df_exp = df_exp.filter(~col('word').isin(stopset))
df_exp.show()