In interactive notebook, the `spark` object is already created.
Instructors tested with 1 driver, 6 executors of small e4 (24 cores, 192GB memory)

### Launch spark environment

In [1]:
spark

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 29, 6, Finished, Available)

In [2]:
%%configure -f \
{"conf": {"spark.jars.packages": "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.2"}}

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 30, -1, Finished, Available)

Unrecognized options: 

### Set up data configuration

In [3]:
blob_account_name = "marckvnonprodblob"
blob_container_name = "bigdata"
# read only
blob_sas_token = "?sv=2021-10-04&st=2023-10-04T01%3A42%3A59Z&se=2024-01-02T02%3A42%3A00Z&sr=c&sp=rlf&sig=w3CH9MbCOpwO7DtHlrahc7AlRPxSZZb8MOgS6TaXLzI%3D"

wasbs_base_url = (
    f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/"
)
spark.conf.set(
    f"fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net",
    blob_sas_token,
)

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 30, 6, Finished, Available)

#### Reading in single parquet file

In [4]:
comments_path = "reddit-parquet/comments/"
submissions_path = "reddit-parquet/submissions/"

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 30, 7, Finished, Available)

In [5]:
topic = ["Tetris","pokemon","SuperMario","GTA","CallOfDuty","FIFA","legostarwars",
"assassinscreed","thesims","FinalFantasy"] 

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 30, 8, Finished, Available)

### Reeading in all of the Reddit data

In [6]:
comments_df = spark.read.parquet(f"{wasbs_base_url}{comments_path}")
submissions_df = spark.read.parquet(f"{wasbs_base_url}{submissions_path}")

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 30, 9, Finished, Available)

In [7]:
from pyspark.sql.functions import length, col,split
sub_filtered = submissions_df.filter((length(col("selftext")) > 0)& (col("selftext") != "[deleted]")&(col('selftext')!= "[removed]"))\
.filter(col("subreddit").isin(topic))

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 30, 10, Finished, Available)

In [8]:
df_save = sub_filtered.select("subreddit", "title", "selftext","year","month").cache()
df_save.show()

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 30, 11, Finished, Available)

+--------------+--------------------+--------------------+----+-----+
|     subreddit|               title|            selftext|year|month|
+--------------+--------------------+--------------------+----+-----+
|       pokemon|the PokemonTogeth...|So several days a...|2023|    2|
|       pokemon|Who's a non-villa...|For me, Tyme insp...|2023|    2|
|       pokemon|i have a realization|&amp;#x200B;\n\n[...|2023|    2|
|          FIFA|Is there any reas...|For the past 10 d...|2023|    2|
|           GTA|What should I buy...|I have around 5 m...|2023|    2|
|           GTA|what is the name ...|I know the Nero i...|2023|    2|
|       pokemon|Name any Bug type...|Ok now we’re doin...|2023|    2|
|       pokemon|My starters for e...|Gen 1: Charizard ...|2023|    2|
|       thesims|The Victoria Chal...|\n\nI made my own...|2023|    2|
|       pokemon|I really fucking ...|I feel like it's ...|2023|    2|
|       thesims|The sim 4 build m...|So whenever I pla...|2023|    2|
|          FIFA|  fl

## Using TFIDF to identify the key points for each game 

In [9]:
!pip install spark-nlp

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 30, 12, Finished, Available)



In [10]:
import pandas as pd
import numpy as np
import json
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer as tot, StopWordsRemover

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 30, 13, Finished, Available)

In [11]:
# Define the Spark ML components
tokenizer_nlp = (
    Tokenizer()
    .setInputCols(["document"])
    .setOutputCol("tokens_nlp")
)
stop_words = (
    StopWordsCleaner().pretrained("stopwords_iso","en")
    .setInputCols("tokens_nlp")
    .setOutputCol("cleanTokens")
)

documentAssembler = DocumentAssembler()\
    .setInputCol("selftext")\
    .setOutputCol("document")
    
use = UniversalSentenceEncoder.pretrained(name="tfhub_use", lang="en")\
 .setInputCols(["document"])\
 .setOutputCol("sentence_embeddings")


sentimental = SentimentDLModel.pretrained(lang="en")\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("sentiment")
# Create a pipeline
pipeline1 = Pipeline(stages=[documentAssembler, use,sentimental])

# Fit the pipeline on the data
model = pipeline1.fit(df_save)

# Transform the data to get TF-IDF features
result = model.transform(df_save)
result.cache()

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 30, 14, Finished, Available)

stopwords_iso download started this may take some time.
Approximate size to download 2.1 KB
[OK!]
tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[OK!]
sentimentdl_use_imdb download started this may take some time.
Approximate size to download 12 MB
[OK!]


DataFrame[subreddit: string, title: string, selftext: string, year: int, month: int, document: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, sentence_embeddings: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, sentiment: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>]

In [15]:
tokenizer = tot(inputCol="selftext", outputCol="tokens")

# StopWordsRemover
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")

# HashingTF and IDF
hashing_tf = HashingTF(inputCol="filtered_tokens", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Pipeline
pipeline2 = Pipeline(stages=[tokenizer, stopwords_remover, hashing_tf, idf])

# Fit and transform the data
model = pipeline2.fit(result)
result = model.transform(result)



StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 30, 18, Finished, Available)

In [19]:
result.cache().show()

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 30, 22, Finished, Available)

+--------------+--------------------+--------------------+----+-----+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+
|     subreddit|               title|            selftext|year|month|            document| sentence_embeddings|sentiment|              tokens|     filtered_tokens|         rawFeatures|            features|
+--------------+--------------------+--------------------+----+-----+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+
|       pokemon|the PokemonTogeth...|So several days a...|2023|    2|[{document, 0, 13...|[{sentence_embedd...|      pos|[so, several, day...|[several, days, a...|(262144,[3888,840...|(262144,[3888,840...|
|       pokemon|Who's a non-villa...|For me, Tyme insp...|2023|    2|[{document, 0, 66...|[{sentence_embedd...|      neg|[for, me,, tyme, ...|[me,, tyme, inspi...|(262144,[3421

In [20]:

import os
CSV_DIR = os.path.join("Users/yc1063/fall-2023-reddit-project-team-11/data", "csv")
sampled_df = result.sample(fraction=0.4, withReplacement=False, seed=42)
sampled_df.toPandas().to_csv(f"{CSV_DIR}/analysis.csv")

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 30, 23, Submitted, Running)

In [63]:
from pyspark.sql import functions as f
from pyspark.sql.types import MapType, StringType,ArrayType, IntegerType
ndf = result.select("subreddit",f.explode('filtered_tokens').name('expwords'),"rawFeatures","year","month","sentiment").withColumn('filtered_tokens',f.array('expwords'))
hashudf = f.udf(lambda vector: vector.indices.tolist(),ArrayType(IntegerType()))
wordtf = ndf.withColumn('wordhash', hashudf(col('rawFeatures')))
wordtf = wordtf.drop("rawFeatures")
wordtf.cache()
wordtf.show()

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 26, 66, Finished, Available)

+---------+-----------+----+-----+---------+---------------+--------------------+
|subreddit|   expwords|year|month|sentiment|filtered_tokens|            wordhash|
+---------+-----------+----+-----+---------+---------------+--------------------+
|  pokemon|    several|2023|    2|      pos|      [several]|[3888, 8408, 9781...|
|  pokemon|       days|2023|    2|      pos|         [days]|[3888, 8408, 9781...|
|  pokemon|       ago,|2023|    2|      pos|         [ago,]|[3888, 8408, 9781...|
|  pokemon|    pokemon|2023|    2|      pos|      [pokemon]|[3888, 8408, 9781...|
|  pokemon|    company|2023|    2|      pos|      [company]|[3888, 8408, 9781...|
|  pokemon|        put|2023|    2|      pos|          [put]|[3888, 8408, 9781...|
|  pokemon|   campaign|2023|    2|      pos|     [campaign]|[3888, 8408, 9781...|
|  pokemon|celebration|2023|    2|      pos|  [celebration]|[3888, 8408, 9781...|
|  pokemon|    pokemon|2023|    2|      pos|      [pokemon]|[3888, 8408, 9781...|
|  pokemon|     

In [68]:
import os
CSV_DIR = os.path.join("Users/yc1063/fall-2023-reddit-project-team-11/data", "csv")
wordtf.toPandas().to_csv(f"{CSV_DIR}/analysis.csv")

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 26, 71, Finished, Available)

  An error occurred while calling o7744.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:97)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:93)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayC

Py4JJavaError: An error occurred while calling o7744.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:97)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:93)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1345 tasks (4.0 GiB) is bigger than spark.driver.maxResultSize (4.0 GiB)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2464)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2413)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2412)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2412)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1168)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1168)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1168)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2652)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2594)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2583)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:944)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2326)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2421)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$5(Dataset.scala:3661)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3665)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3642)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3719)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:104)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:91)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3717)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1(Dataset.scala:3642)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1$adapted(Dataset.scala:3641)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:139)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:141)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:136)
	at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:113)
	at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:107)
	at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$4(SocketAuthServer.scala:68)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:68)


In [None]:
df

In [60]:
from pyspark.sql.window import Window
window_spec = Window().partitionBy("subreddit")

# Use explode and collect_list in a window specification
wordtf_result = (
    wordtf
    .withColumn("word_element", f.explode("wordhash"))
    .withColumn("wordhash", f.collect_list("word_element").over(window_spec))
    .distinct()
    .drop("word_element")
)

wordtf_result.show(truncate=False)

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 26, 63, Cancelled, Waiting)

In [77]:
udf1 = f.udf(lambda vec : dict(zip(vec.indices.tolist(),vec.values.tolist())),MapType(StringType(),StringType()))
valuedf = result.select('subreddit',f.explode(udf1(f.col('features'))).name('wordhash','value'))
valuedf.show()

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 20, 81, Finished, Available)

+--------------+--------------------+--------------------+----+-----+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     subreddit|               title|            selftext|year|month|            document| sentence_embeddings|sentiment|              tokens|     filtered_tokens|         rawFeatures|            features|      feature_values|
+--------------+--------------------+--------------------+----+-----+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|       pokemon|the PokemonTogeth...|So several days a...|2023|    2|[{document, 0, 13...|[{sentence_embedd...|      pos|[so, several, day...|[several, days, a...|(262144,[3888,840...|(262144,[3888,840...|[0.0, 0.0, 0.0, 0...|
|       pokemon|Who's a non-villa...|For me, Tyme insp...|2023|    2|[{document, 0, 66...|[{

In [44]:
result_without_duplicates = joined_df.dropDuplicates()

# Show the resulting DataFrame without duplicates
result_without_duplicates.cache().show()

StatementMeta(3c67b279-1d53-4b7a-b0d9-41cb8b4b6723, 20, 48, Cancelled, Waiting)

### Saving intermediate data

The intermediate outputs go into the azureml workspace attached storage using the URI `azureml://datastores/workspaceblobstore/paths/<PATH-TO_STORE>` this is the same for all workspaces. Then to re-load you use the same URI

In [8]:
import os
CSV_DIR = os.path.join("Users/yc1063/fall-2023-reddit-project-team-11/data", "csv")
joined_df.write.parquet(f"{CSV_DIR}/sentiment_tfidf.csv")

StatementMeta(cf47b043-0c98-4514-b15a-1e3237e3aed3, 23, 13, Finished, Available)