In [1]:
import pyspark as ps

spark = (
        ps.sql.SparkSession.builder 
        .master("local[4]") 
        .appName("lecture") 
        .getOrCreate()
        )

sc = spark.sparkContext

In [2]:
df = spark.read.json("sample.json")

In [3]:
df.show(5)

+---------------+--------------+----------------------+-----------------+--------------------+--------+----------------+-----------+-------------+------+------+-------+------------+---------+----------+--------------------+------------+-----+--------+-----------------+------------+
|         author|author_cakeday|author_flair_css_class|author_flair_text|                body|can_gild|controversiality|created_utc|distinguished|edited|gilded|     id|is_submitter|  link_id| parent_id|           permalink|retrieved_on|score|stickied|        subreddit|subreddit_id|
+---------------+--------------+----------------------+-----------------+--------------------+--------+----------------+-----------+-------------+------+------+-------+------------+---------+----------+--------------------+------------+-----+--------+-----------------+------------+
|       Dethcola|          null|                      |       Clairemont|            A quarry|    true|               0| 1506816000|         null| fals

In [4]:
df.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: long (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)



In [5]:
print("line count: {}".format(df.count()))

line count: 10000


In [6]:
df1 = df[['author','body','subreddit']]

In [7]:
from pyspark.sql import functions as F

In [8]:
df_users = df1.groupby('author').agg(F.count('body')).orderBy('count(body)', ascending=False)

In [9]:
df_users.show(5)

+------------------+-----------+
|            author|count(body)|
+------------------+-----------+
|         [deleted]|        779|
|ithinkisaidtoomuch|        258|
|   Concise_AMA_Bot|        147|
|         grrrrreat|        136|
|     AutoModerator|         92|
+------------------+-----------+
only showing top 5 rows



In [10]:
df[['body']].show()

+--------------------+
|                body|
+--------------------+
|            A quarry|
|[Salutations! I'm...|
|I got into baseba...|
|        FUCKING TORY|
|I see a water dra...|
|Wait. The Michiga...|
|              ye fam|
|143417804| &gt; U...|
|That is some chic...|
|Does he even know...|
|            Tequila.|
|your heart beats ...|
|&gt; Subscribe: /...|
|you're really ign...|
|lets see how deep...|
|You are arguing t...|
|I'm thinking abou...|
|[Original post](h...|
|I think that's a ...|
|Harp absolutelly....|
+--------------------+
only showing top 20 rows



In [11]:
comments = df.groupBy("author").agg(F.collect_list("body"))

In [12]:
comments.show(5, truncate = False)

+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|author     |collect_list(body)                                                                                                                                                                                                                                                                                                                                                         |
+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [13]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, FloatType, IntegerType

In [14]:
def join_comments(comments):
    return ' '.join(comments)
    

In [15]:
join_comments_udf = udf(lambda x: ' '.join(x), StringType())
df_join_comments = comments.withColumn('corpus', join_comments_udf(comments['collect_list(body)']))

In [16]:
df_join_comments.show(5, truncate = False)

+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|author     |collect_list(body)                                                                                                                                                                                                                                      

In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import CountVectorizer, Tokenizer

In [18]:
tokenizer = Tokenizer(inputCol="corpus", outputCol="words")
tokens = tokenizer.transform(df_join_comments)

In [19]:
tokens.show(5)

+-----------+--------------------+--------------------+--------------------+
|     author|  collect_list(body)|              corpus|               words|
+-----------+--------------------+--------------------+--------------------+
|  CanyonWrn|[Still waiting fo...|Still waiting for...|[still, waiting, ...|
|     Cogeno|[I can't say as t...|I can't say as th...|[i, can't, say, a...|
|   Daverost|[I used to have a...|I used to have a ...|[i, used, to, hav...|
|GordonByron|[Just the fact th...|Just the fact tha...|[just, the, fact,...|
|  Intrixina|[Your reality TV ...|Your reality TV g...|[your, reality, t...|
+-----------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [20]:
from nltk.stem import WordNetLemmatizer

In [22]:
from pyspark.ml.feature import StopWordsRemover

In [23]:
stopwords = StopWordsRemover(inputCol="words", outputCol="true_words")
words = stopwords.transform(tokens)

In [25]:
words.show(5, truncate = False)

+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [2]:
def lemmatize(corpus):
    lem = nltk.WordNetLemmatizer()
    return [lem.lemmatize(words) for words in corpus]

In [3]:
lemm_udf = udf(lemmatize, ArrayType(StringType()))
df_lemm = words.withColumn('lemmas', lemm_udf(words['corpus']))

NameError: name 'udf' is not defined