# Spark DataFrame

Spark dataframes have three basics as follow:
- Creation
- Manipulation
- User Defined Functions

Spark dataframes are spark datasets organized into named columns. They are very similart to pandas dataframes.
All of the interaction with dataframes is done via sparksessions. To start programming spark with dataframe API, we need to create spark session. 

# Create SparkSession:
SparkSessions are how we interact with Spark Data Frame. To create SparkSession, We need to use a builder pattern as shown below:

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("Python Spark SQL basic example") \
        .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/10/28 14:50:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


- Dataframes can be created using SparkSession.
- DataFrames can be created from:
   - a list(of Tuples or values), 
   - an RDD(Resilient Distributed Dataset), or
   - specially-formatted Jason file. 

# Let's read our json file datasets:
- We have 3 large json datasets from yelp academic called user, review, and checkin.

In [4]:
user = spark.read.json('../../assets/data/yelp_academic/yelp_academic_dataset_user.json.gz')
review = spark.read.json('../../assets/data/yelp_academic/yelp_academic_dataset_review.json.gz')
checkin = spark.read.json('../../assets/data/yelp_academic/yelp_academic_dataset_checkin.json.gz')

                                                                                

# Data Manipulation
 In this section, we will manipulat large datasets to get relevant information as mentioned below:
 - -- CUTE COMPLIMENTS --
 - -- TOP NEGATIVE REVIEWS --
 - -- CHECKINS --
 - -- TOP_50_WORD_REVIEW --

In [5]:
#Since the file is so big, we can grab small part of it to get an idea about columns and...
user1=user.sample(0.1)

In [6]:
user1.columns # name of the columns
user1.dtypes # name of the columns with data type in each column

[('average_stars', 'double'),
 ('compliment_cool', 'bigint'),
 ('compliment_cute', 'bigint'),
 ('compliment_funny', 'bigint'),
 ('compliment_hot', 'bigint'),
 ('compliment_list', 'bigint'),
 ('compliment_more', 'bigint'),
 ('compliment_note', 'bigint'),
 ('compliment_photos', 'bigint'),
 ('compliment_plain', 'bigint'),
 ('compliment_profile', 'bigint'),
 ('compliment_writer', 'bigint'),
 ('cool', 'bigint'),
 ('elite', 'string'),
 ('fans', 'bigint'),
 ('friends', 'string'),
 ('funny', 'bigint'),
 ('name', 'string'),
 ('review_count', 'bigint'),
 ('useful', 'bigint'),
 ('user_id', 'string'),
 ('yelping_since', 'string')]

In [7]:
user1.show(2) # displayes first 2 entries/rows from dataset "user1"

+-------------+---------------+---------------+----------------+--------------+---------------+---------------+---------------+-----------------+----------------+------------------+-----------------+----+--------------+----+--------------------+-----+---------+------------+------+--------------------+-------------------+
|average_stars|compliment_cool|compliment_cute|compliment_funny|compliment_hot|compliment_list|compliment_more|compliment_note|compliment_photos|compliment_plain|compliment_profile|compliment_writer|cool|         elite|fans|             friends|funny|     name|review_count|useful|             user_id|      yelping_since|
+-------------+---------------+---------------+----------------+--------------+---------------+---------------+---------------+-----------------+----------------+------------------+-----------------+----+--------------+----+--------------------+-----+---------+------------+------+--------------------+-------------------+
|         4.03|              1|

### -- CUTE COMPLIMENTS --
- Count the number of users with less than 200 cute compliments (compliment_cute) and save it in a variable calles: cute_count

In [60]:
# Filter() function similar to pandas, we can filetr dataframe according to some conditions to get intended results
# We will use count() function to show the number of cute-compliments which are less than 200
cute_count = user.filter(user['compliment_cute']>=200).count()
cute_count

                                                                                

195

In [61]:
# number of users with more than or equal 200 cute compliments (compliment_cute)
cute_count2 = user.filter(user['compliment_cute']<200).count()
cute_count2

                                                                                

1636943

In [10]:
# distinct() will help us find the distinct values of specific column chosen by select() and show() will return 20 of them 

user.select('compliment_cute').distinct().show()

[Stage 10:>                                                         (0 + 1) / 1]

+---------------+
|compliment_cute|
+---------------+
|             29|
|             26|
|            474|
|             65|
|            418|
|           1224|
|            270|
|            222|
|            293|
|            730|
|            243|
|            278|
|             19|
|            296|
|             54|
|              0|
|            287|
|            277|
|            348|
|            113|
+---------------+
only showing top 20 rows



                                                                                

### -- TOP NEGATIVE REVIEWS --

Let's find the top 10 negative reviews:
- Create a spark dataframe named: top_10_negative_reviews
- For our purpose, negative reviwes are the ones with 1 or 2 stars (according to distinct we only have 1,2,3,4 & 5 as stars)
- Created dataframe should be ordered by funny and contain only 10 rows
- top_10_negative_reviews dataframe should have the folloiwng columns: user_id, review_id, funny, stars

In [11]:
review.columns
review.dtypes

[('business_id', 'string'),
 ('cool', 'bigint'),
 ('date', 'string'),
 ('funny', 'bigint'),
 ('review_id', 'string'),
 ('stars', 'double'),
 ('text', 'string'),
 ('useful', 'bigint'),
 ('user_id', 'string')]

In [12]:
review.select('stars').distinct().show()

[Stage 13:>                                                         (0 + 1) / 1]

+-----+
|stars|
+-----+
|  1.0|
|  4.0|
|  3.0|
|  2.0|
|  5.0|
+-----+



                                                                                

In [13]:
review.show(2)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|ujmEBvifdJM6h6RLv...|   0|2013-05-07 04:34:36|    1|Q1sbwvVQXV2734tPg...|  1.0|Total bill for th...|     6|hG7b0MtEbXx5QzbzE...|
|NZnhc2sEQy3RmzKTZ...|   0|2017-01-14 21:30:33|    0|GJXCdrto3ASJOqKeV...|  5.0|I *adore* Travis ...|     0|yXQM5uF2jS6es16SJ...|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
only showing top 2 rows



In [14]:
# top 10 negative reviews sorted by another column "funny"
top_10_negative_reviews=review.select(['user_id','review_id','funny','stars']).filter(review['stars']<3).sort('funny', ascending = False)
top_10_negative_reviews

DataFrame[user_id: string, review_id: string, funny: bigint, stars: double]

In [15]:
top_10_negative_reviews = top_10_negative_reviews.limit(10)
top_10_negative_reviews.show()

[Stage 17:>                                                         (0 + 1) / 1]

+--------------------+--------------------+-----+-----+
|             user_id|           review_id|funny|stars|
+--------------------+--------------------+-----+-----+
|qiTy11I-yp6foxIgh...|A8mLBytNM2zmjHgSp...|  628|  1.0|
|BbSWL3PjQBzY7uDWa...|ZmEtySx0W_RSv07aY...|  332|  1.0|
|H5dkC0OmZkRDTFjN4...|xophRrPX3yig5psGd...|  287|  1.0|
|AsUDg2wZZqkgZzl0k...|4ZN5ZWVoGd8er9giA...|  277|  2.0|
|p_2daiuEk774FFHsK...|c6lCQwW9oeq903z7Y...|  267|  1.0|
|XmIsBXpdBevTYCe8g...|69suGHxR3PVxicAgm...|  266|  1.0|
|TG3GcRx20tAxXxpqn...|EYbuFrEnVkVdavuRm...|  241|  1.0|
|-nlHAaKCQF5I0Gbbw...|NVeCBLhxOBQbSGLIe...|  237|  1.0|
|36kREh8Oib7RdVyaT...|PZVlLaH6SJoSSigLL...|  217|  1.0|
|Tyb_FnUv3L0LqsYyL...|i9LyzLWSozIe6fIiL...|  208|  2.0|
+--------------------+--------------------+-----+-----+



                                                                                

In [16]:
type(top_10_negative_reviews) # top_10_negative_reviews is a spark dataframe

pyspark.sql.dataframe.DataFrame

## -- CHECKINS --
Let's determine what hours of the day, the least number of checkins occur:
- Create a spark dataframe called checkin_hour_least
- Order dataframe by count, with only 20 top rows
- The following columns should be selected in the dataframe:
  -  "hour": The hour of the day as an integer and the hour after midnight being as 0
  -  "count": the number of checkin occured in that hour

In [17]:
checkin.first()

Row(business_id='--1UhMGODdWsrMastO9DZw', date='2016-04-26 19:49:16, 2016-08-30 18:36:57, 2016-10-15 02:45:18, 2016-11-18 01:54:50, 2017-04-20 18:39:06, 2017-05-03 17:58:02')

- Our date column in checkin dataset is made of multiple date times. Therefore, we need a User Defined Function to split them before parsing.
- UDFs (User Defined Functions) are a way of wrapping python defined functions that we want to apply to rows. 
- The reason we need to wrap them is because of spark being strongly typed language. Spark needs a lot of help figuring out what type, numbers or any objects are. It is because it comes from Scala or Java where we have strongly typed variables. In spark we have to be very strict about what type a number is. When we are going to use plain python functions, we need to specify the output type. 

## Import Important Libraries

In [18]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import explode

In [19]:
# Let's defind UDF to apply a split() function from python into a column we are interested in. 
# we will get an array of string types which is mentioned in the line below

split_date_udf = udf(lambda x: x.split(","), ArrayType(StringType()))

In [20]:
# Now we can apply udf to our date column in checkin dataset & get only first row for our information

checkin.select('date', split_date_udf('date').alias('Dates')).first()

Traceback (most recent call last):                                  (0 + 1) / 1]
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 187, in manager
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 730, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError
                                                                                

Row(date='2016-04-26 19:49:16, 2016-08-30 18:36:57, 2016-10-15 02:45:18, 2016-11-18 01:54:50, 2017-04-20 18:39:06, 2017-05-03 17:58:02', Dates=['2016-04-26 19:49:16', ' 2016-08-30 18:36:57', ' 2016-10-15 02:45:18', ' 2016-11-18 01:54:50', ' 2017-04-20 18:39:06', ' 2017-05-03 17:58:02'])

- Now, we will explode the values in each row of 'Dates' column since we have multiple dates in our 'date' column. but save them in separate column called 'cjheckin_hours'.


In [21]:
dataframe = checkin.select('date', split_date_udf('date').alias('Dates')).withColumn('checkin_hours', explode('Dates'))
dataframe.show(2)    

+--------------------+--------------------+--------------------+
|                date|               Dates|       checkin_hours|
+--------------------+--------------------+--------------------+
|2016-04-26 19:49:...|[2016-04-26 19:49...| 2016-04-26 19:49:16|
|2016-04-26 19:49:...|[2016-04-26 19:49...| 2016-08-30 18:36:57|
+--------------------+--------------------+--------------------+
only showing top 2 rows



- Our new 'checkin_hours' column is string type. we need to convert it to DateType

In [22]:
dataframe.dtypes

[('date', 'string'), ('Dates', 'array<string>'), ('checkin_hours', 'string')]

In [23]:
# Let's change the checkin_hours data type from string to DateType(we need the followig libraries)
#from pyspark.sql.functions import hour
from pyspark.sql.functions import *


dataframe.withColumn('timestamp_checkin_hour', to_timestamp('checkin_hours')).dtypes
# Our 'timestamp_checkin_hour' column has a timestamp type values

[('date', 'string'),
 ('Dates', 'array<string>'),
 ('checkin_hours', 'string'),
 ('timestamp_checkin_hour', 'timestamp')]

In [24]:
dataframe = dataframe.withColumn('timestamp_checkin_hour', to_timestamp('checkin_hours'))
dataframe.show(3)

+--------------------+--------------------+--------------------+----------------------+
|                date|               Dates|       checkin_hours|timestamp_checkin_hour|
+--------------------+--------------------+--------------------+----------------------+
|2016-04-26 19:49:...|[2016-04-26 19:49...| 2016-04-26 19:49:16|   2016-04-26 19:49:16|
|2016-04-26 19:49:...|[2016-04-26 19:49...| 2016-08-30 18:36:57|   2016-08-30 18:36:57|
|2016-04-26 19:49:...|[2016-04-26 19:49...| 2016-10-15 02:45:18|   2016-10-15 02:45:18|
+--------------------+--------------------+--------------------+----------------------+
only showing top 3 rows



In [25]:
# Now, let's extract the integer hours from our 'timestamp_checkin_hour' column using hour(df.timestamp) and rename it as 'hour'
#we need udf
hour_udf = udf(lambda x: x.hour, IntegerType())

#let's apply udf to our dataframe column:
dataframe = dataframe.select('timestamp_checkin_hour', hour_udf('timestamp_checkin_hour').alias('hour'))
dataframe

DataFrame[timestamp_checkin_hour: timestamp, hour: int]

In [26]:
dataframe.show()

[Stage 22:>                                                         (0 + 1) / 1]

+----------------------+----+
|timestamp_checkin_hour|hour|
+----------------------+----+
|   2016-04-26 19:49:16|  19|
|   2016-08-30 18:36:57|  18|
|   2016-10-15 02:45:18|   2|
|   2016-11-18 01:54:50|   1|
|   2017-04-20 18:39:06|  18|
|   2017-05-03 17:58:02|  17|
|   2011-06-04 18:22:23|  18|
|   2011-07-23 23:51:33|  23|
|   2012-04-15 01:07:50|   1|
|   2012-05-06 23:08:42|  23|
|   2012-06-08 22:43:12|  22|
|   2012-08-06 23:20:52|  23|
|   2012-08-19 18:30:44|  18|
|   2013-01-27 23:49:51|  23|
|   2013-03-01 01:22:29|   1|
|   2013-03-23 21:53:47|  21|
|   2013-03-24 01:11:51|   1|
|   2013-05-20 00:12:25|   0|
|   2013-06-29 22:50:57|  22|
|   2013-07-01 15:58:04|  15|
+----------------------+----+
only showing top 20 rows



                                                                                

In [27]:
# Let's count the number of each checkin hour during the day, first by groupBy on 'hour' column and apply count():

dataframe.groupBy('hour').count().show(5)

[Stage 23:>                                                         (0 + 1) / 1]

+----+-------+
|hour|  count|
+----+-------+
|  12| 178910|
|  22|1257437|
|   1|1561788|
|  13| 270145|
|  16| 852076|
+----+-------+
only showing top 5 rows



                                                                                

In [28]:
# First count the grouped hours and then sorted on counted values:

conted_hours = dataframe.groupBy('hour').count().alias('counted_hours')
sorted_hours = conted_hours.sort('count', ascending=True)

In [29]:
sorted_hours.show()

[Stage 26:>                                                         (0 + 1) / 1]

+----+-------+
|hour|  count|
+----+-------+
|  10|  88486|
|   9| 100568|
|  11| 111769|
|   8| 151065|
|  12| 178910|
|   7| 231417|
|  13| 270145|
|   6| 321764|
|  14| 418340|
|   5| 485129|
|  15| 617830|
|   4| 747453|
|  16| 852076|
|  17|1006102|
|   3|1078939|
|  21|1238808|
|  22|1257437|
|  18|1272108|
|  23|1344117|
|  20|1350195|
+----+-------+
only showing top 20 rows



                                                                                

- The least number of checkins occur in the 10, 9, 11, 8, 12 and 7 hour (before noon).

In [30]:
type(sorted_hours)

pyspark.sql.dataframe.DataFrame

- sorted_hours is a spark dataframe containing checkin hours and counts of checking hours from our checkin dataset
- The top 20 least checkin hours count is summarized above in the show() display function

## -- COMMON WORDS IN USEFUL REVIEWS --

We want to find the 50 most common words from *useful* reviews and their counts.

- A "useful review" has 10 or more "useful" ratings.
- We will focus on he 'text' column.
- Convert the 'text' to lower case.
- Use the provided `splitter()` function in a UDF to split the text into individual words.
- Exclude the words that are in the provided `STOP_WORDS` set.
- Final DataFrame should have the following columns:
    - `word`
    - `count`
- DataFrame should be sorted by `count` in descending order.

In [31]:
# Let's see a small part of the review:
review.sample(0.01).show(3)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|sMzNLdhJZGzYirIWt...|   0|2015-06-21 00:59:14|    0|RLbWoIri29BcQ8yjz...|  5.0|This place epitom...|     0|_o740mSNRhMNYuPjS...|
|dZVMp70AuSa4dQPvx...|   1|2011-09-25 18:10:04|    0|IRwIFWgjJiMSBBuce...|  3.0|In need of a burg...|     2|PFNZVn73upq3oZDG2...|
|yNPh5SO-7wr8HPpVC...|   0|2017-04-30 05:32:05|    0|JYdhCDyR6lYfN2qnS...|  5.0|First off food is...|     0|6kEFHccntnYMF_7cd...|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
only showing top 3 rows



In [32]:
# Splitter() function will split the text into words:
import re
def splitter(text):
    WORD_RE = re.compile(r"[\w']+")
    return WORD_RE.findall(text)

In [33]:
# Our STOWORDS
STOP_WORDS = {
    "a", "about", "above", "after", "again", "against", "aint", "all", "also", "although", "am", "an", "and", "any",
    "are", "as", "at", "be", "because", "been", "before", "being", "below", "between", "both", "but", "by", "can",
    "check", "checked", "could", "did", "do", "does", "doing", "don", "down", "during", "each", "few", "for", "from",
    "further", "get", "go", "got", "had", "has", "have", "having", "he", "her", "here", "hers", "herself", "him",
    "himself", "his", "how", "however", "i", "i'd", "if", "i'm", "in", "into", "is", "it", "its", "it's", "itself",
    "i've", "just", "me", "more", "most", "my", "myself", "no", "nor", "not", "now", "of", "off", "on", "once", "one",
    "online", "only", "or", "other", "our", "ours", "ourselves", "out", "over", "own", "paid", "place", "s", "said", 
    "same", "service", "she", "should", "so", "some", "such", "t", "than", "that", "the", "their", "theirs", "them",
    "themselves", "then", "there", "these", "they", "this", "those", "through", "to", "too", "under", "until", "up",
    "us", "very", "was", "we", "went", "were", "we've", "what", "when", "where", "which", "while", "who", "whom",
    "why", "will", "with", "would", "you", "your", "yours", "yourself", "yourselves",
}

In [34]:
# First, we will filter the dataset on 'useful' column for the values of 10 or more
df = review
df.filter(df['useful']>=10).show(4)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|1HD5iUUfVJDbfEBIn...|  11|2017-03-15 02:02:13|    8|qdQIIf6xuyubxEG05...|  5.0|Yes... the Boba T...|    11|XPZVfP7DQCSL3Nb9t...|
|PycR_Mr5jA9jB4Xg3...|   1|2014-01-17 02:15:25|    1|h7Rmb3EiXjajVfGYN...|  1.0|They charged me t...|    14|oAOE4UAC5ZbAjEGBE...|
|dLxT3-EwXkrI9AXoW...|   0|2016-01-30 01:29:21|    0|LqShY--VVp_0lkgau...|  1.0|PLEASE. Read this...|    10|scitRtsLa4QP9S1LZ...|
|tqN30ZVHlmxbI-uQ4...|   9|2016-04-11 17:11:35|    6|jQxm5RANNaqF6AV7l...|  5.0|I so disagree wit...|    11|wzuxPP-d18Mu_IooK...|
+--------------------+----+-------------------+-----+--------------------+-----+----------

In [35]:
df = df.filter(df['useful']>=10)

In [36]:
# We should make the words in the 'text' column lowercase and check if they are in the StopWords
# To do that, first import lower function and apply it to the text
from pyspark.sql.functions import lower

df = df.withColumn('text', lower(df['text']))

In [37]:
df.withColumn('text', lower(df['text'])).show(2)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|1HD5iUUfVJDbfEBIn...|  11|2017-03-15 02:02:13|    8|qdQIIf6xuyubxEG05...|  5.0|yes... the boba t...|    11|XPZVfP7DQCSL3Nb9t...|
|PycR_Mr5jA9jB4Xg3...|   1|2014-01-17 02:15:25|    1|h7Rmb3EiXjajVfGYN...|  1.0|they charged me t...|    14|oAOE4UAC5ZbAjEGBE...|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
only showing top 2 rows



In [38]:
udf_splitter = udf(lambda x: splitter(x), ArrayType(StringType()))

In [39]:
# Now, we will apply udf on out 'text' column of data with useful>=10

df = df.select('user_id', udf_splitter('text').alias('split_words'))
# We have a list of words in each row of 'text' column

In [40]:
df.show(3)

+--------------------+--------------------+
|             user_id|         split_words|
+--------------------+--------------------+
|XPZVfP7DQCSL3Nb9t...|[yes, the, boba, ...|
|oAOE4UAC5ZbAjEGBE...|[they, charged, m...|
|scitRtsLa4QP9S1LZ...|[please, read, th...|
+--------------------+--------------------+
only showing top 3 rows



Traceback (most recent call last):
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 187, in manager
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 730, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError


In [41]:
df.dtypes

[('user_id', 'string'), ('split_words', 'array<string>')]

## StopWordsRemover
In Spark, the StopWordsRemover is a feature in the pyspark.ml.feature module that is used to remove common stop words from a text. Stop words are words that are often filtered out in natural language processing tasks, such as "the," "is," "in," etc.

In [42]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StopWordsRemover

In [43]:
STOP_WORDS=list(STOP_WORDS)
remover = StopWordsRemover(stopWords=STOP_WORDS, inputCol="split_words", outputCol="filtered_words")
filtered_df = remover.transform(df)

In [44]:
filtered_df.show(4)

+--------------------+--------------------+--------------------+
|             user_id|         split_words|      filtered_words|
+--------------------+--------------------+--------------------+
|XPZVfP7DQCSL3Nb9t...|[yes, the, boba, ...|[yes, boba, tea, ...|
|oAOE4UAC5ZbAjEGBE...|[they, charged, m...|[charged, twice, ...|
|scitRtsLa4QP9S1LZ...|[please, read, th...|[please, read, bu...|
|wzuxPP-d18Mu_IooK...|[i, so, disagree,...|[disagree, couple...|
+--------------------+--------------------+--------------------+
only showing top 4 rows



Traceback (most recent call last):
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 187, in manager
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 730, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError


In [45]:
# we will drop the 'split_words' column:

filtered_df=filtered_df.drop('split_words')

In [46]:
filtered_df.show(2)

+--------------------+--------------------+
|             user_id|      filtered_words|
+--------------------+--------------------+
|XPZVfP7DQCSL3Nb9t...|[yes, boba, tea, ...|
|oAOE4UAC5ZbAjEGBE...|[charged, twice, ...|
+--------------------+--------------------+
only showing top 2 rows



Traceback (most recent call last):
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 187, in manager
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 730, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/conda/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError


In [47]:
filtered_df.dtypes

[('user_id', 'string'), ('filtered_words', 'array<string>')]

In [48]:
# Now we need to explode the 'filtered_words' column to distribute each word into a different row in the same 'filtered_words' column:

filtered_df = filtered_df.withColumn('filtered_words', explode('filtered_words'))

In [49]:
filtered_df.show(3)

+--------------------+--------------+
|             user_id|filtered_words|
+--------------------+--------------+
|XPZVfP7DQCSL3Nb9t...|           yes|
|XPZVfP7DQCSL3Nb9t...|          boba|
|XPZVfP7DQCSL3Nb9t...|           tea|
+--------------------+--------------+
only showing top 3 rows



In [50]:
# Let's sorth the column and count the words frewquency:
filtered_df = filtered_df.groupBy('filtered_words').count().sort('count', ascending=False)

In [51]:
filtered_df.show()

[Stage 36:>                                                         (0 + 1) / 1]

+--------------+------+
|filtered_words| count|
+--------------+------+
|          like|101251|
|          time| 86124|
|          good| 83486|
|          back| 71308|
|          food| 65281|
|          even| 58499|
|        really| 57687|
|         don't| 56146|
|         great| 55402|
|          well| 48297|
|        didn't| 45751|
|         first| 43738|
|        people| 42768|
|          know| 40954|
|         never| 40741|
|             2| 39573|
|          told| 39350|
|           day| 38164|
|          came| 38098|
|          much| 37227|
+--------------+------+
only showing top 20 rows



                                                                                

## Write Top_50_word_review Function

- Now, we will write a function which takes a DataFrame as a parameter and number n as the number of rows we are interested from it, returns a Spark DataFrame with 50 most common words in 'text' column of useful reviews.

In [52]:
type(review)

pyspark.sql.dataframe.DataFrame

In [53]:
df = review.select(review['useful'])
type(df)

pyspark.sql.dataframe.DataFrame

In [58]:
def Top_50_word_review(df):
    
    from pyspark.sql.functions import udf
    from pyspark.sql.types import IntegerType
    from pyspark.sql.types import ArrayType, StringType
    from pyspark.sql.functions import explode
    from pyspark.sql.functions import lower
    from pyspark.ml.feature import StopWordsRemover
    import pyspark.sql.functions as F
    import re
    
    df = df.filter(df['useful'] >= 10)
    df = df.withColumn('text', lower(df['text']))
    
    def splitter(text):
        WORD_RE = re.compile(r"[\w']+")
        return WORD_RE.findall(text)
    
    split_udf = udf(lambda x: splitter(x), ArrayType(StringType()))
    df = df.select('user_id', split_udf('text').alias('split_words'))
    
    STOP_WORDS = {
    "a", "about", "above", "after", "again", "against", "aint", "all", "also", "although", "am", "an", "and", "any",
    "are", "as", "at", "be", "because", "been", "before", "being", "below", "between", "both", "but", "by", "can",
    "check", "checked", "could", "did", "do", "does", "doing", "don", "down", "during", "each", "few", "for", "from",
    "further", "get", "go", "got", "had", "has", "have", "having", "he", "her", "here", "hers", "herself", "him",
    "himself", "his", "how", "however", "i", "i'd", "if", "i'm", "in", "into", "is", "it", "its", "it's", "itself",
    "i've", "just", "me", "more", "most", "my", "myself", "no", "nor", "not", "now", "of", "off", "on", "once", "one",
    "online", "only", "or", "other", "our", "ours", "ourselves", "out", "over", "own", "paid", "place", "s", "said", 
    "same", "service", "she", "should", "so", "some", "such", "t", "than", "that", "the", "their", "theirs", "them",
    "themselves", "then", "there", "these", "they", "this", "those", "through", "to", "too", "under", "until", "up",
    "us", "very", "was", "we", "went", "were", "we've", "what", "when", "where", "which", "while", "who", "whom",
    "why", "will", "with", "would", "you", "your", "yours", "yourself", "yourselves"}
    
    STOP_WORDS=list(STOP_WORDS)
    remover = StopWordsRemover(stopWords=STOP_WORDS, inputCol="split_words", outputCol="filtered_words")
    filtered_df = remover.transform(df)
    
    filtered_df = filtered_df.drop('split_words') 
    
    exploded_df = filtered_df.withColumn('filtered_words', explode('filtered_words'))
    
    sorted_df = exploded_df.groupBy('filtered_words').count().sort('count', ascending=False)
    most_common = sorted_df.limit(50)
    
    return most_common

In [59]:
Top_50_word_review(review).show()

[Stage 47:>                                                         (0 + 1) / 1]

+--------------+------+
|filtered_words| count|
+--------------+------+
|          like|101251|
|          time| 86124|
|          good| 83486|
|          back| 71308|
|          food| 65281|
|          even| 58499|
|        really| 57687|
|         don't| 56146|
|         great| 55402|
|          well| 48297|
|        didn't| 45751|
|         first| 43738|
|        people| 42768|
|          know| 40954|
|         never| 40741|
|             2| 39573|
|          told| 39350|
|           day| 38164|
|          came| 38098|
|          much| 37227|
+--------------+------+
only showing top 20 rows



                                                                                