# Hello Spark
Demonstration based on the [Spark Quick Start](https://spark.apache.org/docs/latest/quick-start.html)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import log10, explode
import sys

In [2]:
ls  /opt/data

1682639678.json  1682639688.json  1682639718.json  1682639934.json
1682639681.json  1682639691.json  1682639745.json  1682640068.json
1682639684.json  1682639696.json  1682639797.json  as-you-like-it.txt
1682639686.json  1682639704.json  1682639858.json


In [None]:
pip

In [3]:
# Initialize SparkSession
spark = SparkSession.builder.appName("TF-IDF").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/01 00:35:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/01 00:35:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
# Load data from JSON file
data = spark.read.json("../../../opt/data/1682639678.json")

                                                                                

In [5]:
data.printSchema()

root
 |-- account: struct (nullable = true)
 |    |-- acct: string (nullable = true)
 |    |-- avatar: string (nullable = true)
 |    |-- avatar_static: string (nullable = true)
 |    |-- bot: boolean (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- discoverable: boolean (nullable = true)
 |    |-- display_name: string (nullable = true)
 |    |-- emojis: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- shortcode: string (nullable = true)
 |    |    |    |-- static_url: string (nullable = true)
 |    |    |    |-- url: string (nullable = true)
 |    |    |    |-- visible_in_picker: boolean (nullable = true)
 |    |-- fields: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |    |    |-- verified_at: string (nullable = true)
 |    |-- followers_count: long (nullable = true)
 |    

In [6]:
from pyspark.sql.functions import log10, concat_ws, flatten,collect_list

# Use concat_ws() to combine the array of strings into a single column
data = data.withColumn("content", concat_ws(" ", "content"))

# Use groupBy() and concat_ws() to combine the strings for rows with the same ID
data = data.groupBy("id").agg(concat_ws(" ", collect_list("content")).alias("combined_content"))

data.show()

+------------------+----------------------------------+
|                id|                  combined_content|
+------------------+----------------------------------+
|110273473674788626|              <p>Meta is going ...|
|110273473488790997|<p>ÈáëÂçÅ‰∏∫‰ªÄ‰πàÁªôÊàë‰∏ÄÁßçÊºÇÊµÅÊïôÂÆ§ÁöÑ...|
|110273473835154582|              <p>Musk claims he...|
|110273473375300613|              <p>April 27 7:52p...|
|110273473319144564|              <p>$4 Pick Up. <a...|
|110273473819204072|              <p>They‚Äôre ‚Äòskeet...|
|110273473430816942|              <p>Gluten-free, s...|
|110273473378589701|              <p>My energy is W...|
|110273473636530708|<p>„Åæ„Åò„ÇÅ„Å´„Ç≥„ÉÑ„Ç≥„ÉÑ„ÇÑ„Å£„Å¶„Åç„Åü„Åã...|
|110273473383805187|              <p>La sensaci√≥n d...|
|110273473595453936|<p>ÊØèÁîªÂÆå‰∏ÄÂº†Á≠æÁªòÂ∞±Ë¢´‰∏ëÂæó„ÄÇ„ÄÇ„ÄÇ...|
|110273473727702772|              <p>Los jueves son...|
|110273473344515693|                 <p>„Åä„ÅØ„Åß„ÅôÔΩû</p>|
|110273473299954485|      <p><span>„ÉØ„Ç§„ÇíÂ∞äÊï¨Ôº

In [7]:
!pip install numpy
!pip install pandas

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m23.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m23.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [8]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
# from pyspark.ml.functions import to_dense_udf
from pyspark.ml.linalg import SparseVector, VectorUDT, DenseVector
from pyspark.sql.functions import concat_ws, collect_list, udf

import numpy
# Tokenize content column
tokenizer = Tokenizer(inputCol="combined_content", outputCol="words")
data = tokenizer.transform(data)

# Compute Term Frequencies
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
data = hashingTF.transform(data)

# Compute Inverse Document Frequencies
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(data)
data = idfModel.transform(data)

# Convert sparse vectors to dense vectors
to_dense = lambda v: DenseVector(v.toArray()) if isinstance(v, SparseVector) else v
to_dense_udf = udf(to_dense, VectorUDT())
data = data.withColumn("features", to_dense_udf("features"))

                                                                                

In [9]:
# Save the result to a Parquet file
data.write.parquet("/opt/warehouse/tf_idf3.parquet")



23/05/01 00:35:51 WARN DAGScheduler: Broadcasting large task binary with size 4.3 MiB
                                                                                

In [11]:
tf_idf = spark.read.parquet('/opt/warehouse/tf_idf3.parquet/')


                                                                                

+------------------+----------------------------------+---------------------------------+--------------------+--------------------+
|                id|                  combined_content|                            words|         rawFeatures|            features|
+------------------+----------------------------------+---------------------------------+--------------------+--------------------+
|110273473674788626|              <p>Meta is going ...|             [<p>meta, is, goi...|(262144,[2348,706...|[0.0,0.0,0.0,0.0,...|
|110273473488790997|<p>ÈáëÂçÅ‰∏∫‰ªÄ‰πàÁªôÊàë‰∏ÄÁßçÊºÇÊµÅÊïôÂÆ§ÁöÑ...|[<p>ÈáëÂçÅ‰∏∫‰ªÄ‰πàÁªôÊàë‰∏ÄÁßçÊºÇÊµÅÊïôÂÆ§...|(262144,[251637],...|[0.0,0.0,0.0,0.0,...|
|110273473835154582|              <p>Musk claims he...|             [<p>musk, claims,...|(262144,[2348,568...|[0.0,0.0,0.0,0.0,...|
|110273473375300613|              <p>April 27 7:52p...|             [<p>april, 27, 7:...|(262144,[2348,114...|[0.0,0.0,0.0,0.0,...|
|110273473319144564|              <p>$4 Pick Up. 

In [20]:
tf_idf.select("features").show(1,)

+--------------------+
|            features|
+--------------------+
|[0.0,0.0,0.0,0.0,...|
+--------------------+
only showing top 1 row



In [23]:
first_element = tf_idf.select("features").first()[0]
print(first_element[:50])

[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0.]


In [22]:
first_element = tf_idf.select("words").first()[0]
print(first_element)

['<p>meta', 'is', 'going', 'to', 'let', 'you', 'update', 'quest', 'apps', 'before', 'the', 'headset', 'shuts', 'down', '<a', 'href="https://www.theverge.com/2023/4/27/23701286/meta-quest-v53-update-apps-headset-shuts-down"', 'rel="nofollow', 'noopener', 'noreferrer"', 'target="_blank"><span', 'class="invisible">https://www.</span><span', 'class="ellipsis">theverge.com/2023/4/27/2370128</span><span', 'class="invisible">6/meta-quest-v53-update-apps-headset-shuts-down</span></a></p>']


In [None]:
# Stop the SparkSession
spark.stop()

In [None]:
# tokens = data.selectExpr("id", "split(combined_content, ' ') as words") \
#     .withColumn("word", explode("words")) \
#     .select("id", "word")

# tf = tokens.rdd.map(lambda x: ((x[0], x[1]), 1)) \
#     .reduceByKey(lambda x, y: x + y) \
#     .map(lambda x: (x[0][0], (x[0][1], x[1]))) \
#     .groupByKey() \
#     .mapValues(dict)


In [25]:
# tf.take(10)

In [26]:
# tf_df = tf.toDF()

In [27]:
# print(type(tf_df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [28]:
# tokens.show()

+------------------+--------------------+
|                id|                word|
+------------------+--------------------+
|110273473674788626|             <p>Meta|
|110273473674788626|                  is|
|110273473674788626|               going|
|110273473674788626|                  to|
|110273473674788626|                 let|
|110273473674788626|                 you|
|110273473674788626|              update|
|110273473674788626|               Quest|
|110273473674788626|                apps|
|110273473674788626|              before|
|110273473674788626|                 the|
|110273473674788626|             headset|
|110273473674788626|               shuts|
|110273473674788626|                down|
|110273473674788626|                  <a|
|110273473674788626|href="https://www...|
|110273473674788626|       rel="nofollow|
|110273473674788626|            noopener|
|110273473674788626|         noreferrer"|
|110273473674788626|target="_blank"><...|
+------------------+--------------

In [38]:
# # Calculate inverse document frequency (IDF) for each word
# idf = tokens.rdd.map(lambda x: (x[1], x[0])) \
#     .groupByKey() \
#     .map(lambda x: (x[0], log10(data.count()/len(x[1]))))


In [39]:
# print(type(idf))

<class 'pyspark.rdd.PipelinedRDD'>


In [40]:
# idf.take(10)

Traceback (most recent call last):
  File "/opt/spark/python/pyspark/serializers.py", line 459, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/opt/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/opt/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
  File "/opt/spark/python/pyspark/context.py", line 462, in __getnewargs__
    raise RuntimeError(
RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.


PicklingError: Could not serialize object: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

In [41]:
idf_df = idf.toDF()

Traceback (most recent call last):
  File "/opt/spark/python/pyspark/serializers.py", line 459, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/opt/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/opt/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
  File "/opt/spark/python/pyspark/context.py", line 462, in __getnewargs__
    raise RuntimeError(
RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.


PicklingError: Could not serialize object: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

In [None]:
print(type(idf_df))

In [13]:
## Multiply 
# Multiply TF by IDF to get TF-IDF score
tf_idf = tf_df.join(idf) \
    .map(lambda x: (x[0], {k: v*x[1][1] for k, v in x[1][0].items()}))



AttributeError: 'PipelinedRDD' object has no attribute '_jdf'

In [48]:
# Convert TF-IDF scores to a DataFrame and store as Parquet file
tf_idf_df = tf_idf.toDF(["id", "tf_idf"])
tf_idf_df.write.mode("overwrite").parquet("tf_idf")

NameError: name 'tf_idf' is not defined

In [None]:

# Stop SparkSession
spark.stop()

# Create a Spark Session
The SparkSession object is our connection to the Spark Context Manager running on the spark-master host.

There are a few important details in the setting up of the SparkSession:
1. The `appName` is what shows up in the "Running Apps" section of http://localhost:8080/ -- It'll move to "Completed Apps" once we call `.stop()` on this session.
2. The `master` tells it where to our Spark config-manager so we can launch spark-applications from this session.
3. The `spark.sql.warehouse.dir` tells it where to find our Hive tables.


In [2]:
from pyspark.sql import SparkSession

In [3]:
spark_session = SparkSession.builder\
    .appName("hello-pyspark")\
    .master("spark://spark-master:7077")\
    .config("spark.executor.instances", 1)\
    .config("spark.cores.max", 2)\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/30 23:05:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Word Count
This is a very basic hello-world to make sure the we can run a little PySpark:

### Get Some Sample Data
We pull Shakespeare's "As You Like It" from Project Gutenberg, and write it to `/opt/data`.  This is mounted to our `fileshare` volume which is mounted on this docker container as well as all of the spark-containers (master and worker(s)).  

In [4]:
import requests
resp = requests.get('https://www.gutenberg.org/cache/epub/1121/pg1121.txt')
with open('/opt/data/as-you-like-it.txt','w')as fp:
    fp.write(resp.text)


In [5]:
ls /opt/data

1682639678.json  1682639688.json  1682639718.json  1682639934.json
1682639681.json  1682639691.json  1682639745.json  1682640068.json
1682639684.json  1682639696.json  1682639797.json  as-you-like-it.txt
1682639686.json  1682639704.json  1682639858.json


### Perform word-count on Spark

In [5]:
ayli = spark_session.read.text('/opt/data/as-you-like-it.txt')
ans = ayli.count()
print(ans)

[Stage 0:>                                                          (0 + 1) / 1]

4215


                                                                                

In [6]:
ayli

DataFrame[value: string]

23/04/16 21:37:43 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
23/04/16 21:37:43 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.errors.SparkCoreErrors$.clusterSchedulerError(SparkCoreErrors.scala:291)
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:978)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:165)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:263)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:170)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.proce

# Spark grep

In [23]:
orlandos_lines = ayli.filter(ayli.value.contains("ORLANDO"))

In [24]:
orlandos_lines.show(n=10)

+--------------------+
|               value|
+--------------------+
|  ORLANDO,  "   "...|
|Enter ORLANDO and...|
|  ORLANDO. As I r...|
|  ORLANDO. Go apa...|
|  ORLANDO. Nothin...|
|  ORLANDO. Marry,...|
|  ORLANDO. Shall ...|
|  ORLANDO. O, sir...|
|  ORLANDO. Ay, be...|
|  ORLANDO. Come, ...|
+--------------------+
only showing top 10 rows



# Term Frequency

In [25]:
from pyspark.sql.functions import explode, split
wordCounts = ayli.select(explode(split(ayli.value, "\s+")).alias("word")).groupBy("word").count()
_coll = wordCounts.collect()

                                                                                

In [26]:
wordCounts.show()

+-----------+-----+
|       word|count|
+-----------+-----+
|     online|    4|
|PERMISSION.|    7|
|       some|   26|
|  disgrace,|    1|
|       hope|    8|
|      still|    7|
|         By|   24|
| misplaced;|    1|
|      those|    8|
|    knight,|    1|
| FREDERICK.|   20|
|  wrestler?|    1|
|    embrace|    1|
|        art|   21|
|      burs,|    1|
| likelihood|    1|
|     travel|    3|
|assailants.|    1|
|      cold,|    1|
|    blossom|    1|
+-----------+-----+
only showing top 20 rows



## Save as Parquet File
https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

In [27]:
wordCounts.write.mode('overwrite').parquet('/opt/warehouse/wordcounts.parquet')

                                                                                

## Read back Parquet data

In [3]:
wc2 = spark_session.read.parquet('/opt/warehouse/wordcounts.parquet/')
wc2.show()

[Stage 1:>                                                          (0 + 1) / 1]

+-----------+-----+
|       word|count|
+-----------+-----+
|     online|    4|
|PERMISSION.|    7|
|       some|   26|
|  disgrace,|    1|
|       hope|    8|
|      still|    7|
|         By|   24|
| misplaced;|    1|
|      those|    8|
|    knight,|    1|
| FREDERICK.|   20|
|  wrestler?|    1|
|    embrace|    1|
|        art|   21|
|      burs,|    1|
| likelihood|    1|
|     travel|    3|
|assailants.|    1|
|      cold,|    1|
|    blossom|    1|
+-----------+-----+
only showing top 20 rows



                                                                                

### Enable SQL-querying
Create a temp-view from wc2 with name "wordcounts" so we can reference that as a table name in subsequent SQL queries.

In [5]:
wc2.createOrReplaceTempView("wordcounts")

ans = spark_session.sql("SELECT * FROM wordcounts WHERE LEN(word) > 4 ORDER BY count DESC")
ans.show()

+------------+-----+
|        word|count|
+------------+-----+
|   ROSALIND.|  201|
|    ORLANDO.|  120|
|      CELIA.|  109|
|     Project|   78|
| TOUCHSTONE.|   74|
|       would|   68|
|       shall|   61|
|     JAQUES.|   57|
|Gutenberg-tm|   53|
|       Enter|   51|
|       which|   50|
|     OLIVER.|   37|
|      should|   35|
|       there|   35|
|       these|   32|
|     SENIOR.|   32|
|       their|   31|
|  electronic|   27|
|      cannot|   27|
|      Exeunt|   27|
+------------+-----+
only showing top 20 rows



23/04/16 21:27:39 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
23/04/16 21:27:39 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.errors.SparkCoreErrors$.clusterSchedulerError(SparkCoreErrors.scala:291)
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:978)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:165)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:263)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:170)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.proce

In [55]:
ans.limit(10).write.json('/opt/warehouse/answer.json')

In [60]:
list_of_dicts = ans.limit(10).rdd.map(lambda row: row.asDict()).collect()


23/04/16 21:00:50 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
23/04/16 21:00:50 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.errors.SparkCoreErrors$.clusterSchedulerError(SparkCoreErrors.scala:291)
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:978)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:165)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:263)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:170)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.proce

# Close Session
This shuts down the executors running on the workers and relinquishes cluster resources associated with this app.

In [15]:
spark_session.stop()