# Hello Spark
Demonstration based on the [Spark Quick Start](https://spark.apache.org/docs/latest/quick-start.html)

# Create a Spark Session
The SparkSession object is our connection to the Spark Context Manager running on the spark-master host.

There are a few important details in the setting up of the SparkSession:
1. The `appName` is what shows up in the "Running Apps" section of http://localhost:8080/ -- It'll move to "Completed Apps" once we call `.stop()` on this session.
2. The `master` tells it where to our Spark config-manager so we can launch spark-applications from this session.
3. The `spark.sql.warehouse.dir` tells it where to find our Hive tables.


In [1]:
!python3 -m pip install --upgrade pip

[0m

In [6]:
!pip install numpy

Collecting numpy
  Downloading numpy-1.26.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.metadata (62 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.7/62.7 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading numpy-1.26.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (14.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.2/14.2 MB[0m [31m15.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: numpy
Successfully installed numpy-1.26.2
[0m

In [1]:
from pyspark.sql import SparkSession
import os, json, time
from pyspark.sql.functions import col, concat_ws, collect_list, lower, regexp_replace, udf, lit
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, CountVectorizer, NGram
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.sql.types import StructType, StructField, StringType, Row
from pyspark.mllib.feature import HashingTF

In [2]:
spark_session = SparkSession.builder \
        .appName("TF-IDF Calculator") \
        .master("spark://spark-master:7077") \
        .config("spark.executor.instances", 1) \
        .config("spark.cores.max", 2) \
        .config("spark.sql.warehouse.dir", "/opt/warehouse/") \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/15 19:25:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark_session.stop()

# Word Count
This is a very basic hello-world to make sure the we can run a little PySpark:

### Get Some Sample Data
We pull Shakespeare's "As You Like It" from Project Gutenberg, and write it to `/opt/data`.  This is mounted to our `fileshare` volume which is mounted on this docker container as well as all of the spark-containers (master and worker(s)).  

In [7]:
import requests
resp = requests.get('https://www.gutenberg.org/cache/epub/1121/pg1121.txt')
with open('/opt/data/as-you-like-it.txt','w')as fp:
    fp.write(resp.text)


FileNotFoundError: [Errno 2] No such file or directory: '/opt/data/as-you-like-it.txt'

In [13]:
ls /opt/datalake

timeline_20231215193817.json  timeline_20231215193918.json
timeline_20231215193818.json  timeline_20231215193949.json
timeline_20231215193848.json


In [21]:
# !ls /opt/warehouse
!ls /opt/warehouse/wordcounts.parquet

part-00000-4458dc11-8d43-412b-ab59-debe5b9a7707-c000.snappy.parquet  _SUCCESS


### Perform word-count on Spark

In [12]:
import numpy
# Define an empty DataFrame to hold all the data
aggregated_df = None


# Function to process and aggregate content of a file
# def process_and_aggregate(file_path):
#     # df = spark_session.read.json(file_path)
#     df = df.select(
#         col("account.id").alias("user_id"),
#         col("account.username").alias("username"),
#         lower(regexp_replace("content", "[^\\w\\s]", "")).alias("content")
#     )
#     df = df.groupBy("user_id", "username").agg(concat_ws(" ", collect_list("content")).alias("aggregated_content"))
#     return df
    
# while True:
#     # Loop through files in the directory
#     for filename in os.listdir('/opt/datalake'):
#         file_path = os.path.join('/opt/datalake', filename)
        
#         # Check if the current object is a file
#         if os.path.isfile(file_path):
#             schema = StructType([
#                 StructField("account", StructType(
#                     StructType([
#                         StructField("id", StringType(), True),
#                         StructField("username", StringType(), True)
#                     ])
#                 ), True),
#                 StructField("content", StringType(), True)
#             ])

#             # Create an empty DataFrame with the defined schema
#             df = spark_session.createDataFrame([], schema)
#             # Process and aggregate the file
#             # Read the JSON file as a text file
#             with open(file_path, "r", encoding="utf-8-sig") as file:
#                 file_content = file.read()
            
#             # Handle bad JSON files
#             try:
#                 json_array = json.loads(file_content)
#             except json.JSONDecodeError as e:
#                 print(f"Error parsing file {file_path}: {e}")
#                 print(f"File content: {file_content}")
#                 os.remove(file_path)
#                 continue

#             # Extract only the fields defined in the schema for each JSON object in the array
#             filtered_objects = [
#                 {field.name: json_object[field.name] for field in schema.fields}
#                 for json_object in json_array
#             ]

#             # Convert the filtered JSON objects to PySpark Rows
#             rows = [Row(**filtered_object) for filtered_object in filtered_objects]

#             # Create a DataFrame with the rows and schema
#             df = spark_session.createDataFrame(rows, schema)
#             # df = process_and_aggregate(file_path)
#             df = df.select(
#                 col("account.id").alias("user_id"),
#                 col("account.username").alias("username"),
#                 lower(regexp_replace("content", "[^\\w\\s]", "")).alias("content")
#             )
#             df = df.groupBy("user_id", "username").agg(concat_ws(" ", collect_list("content")).alias("aggregated_content"))
#             # Union the processed DataFrame with the aggregated DataFrame
#             if aggregated_df is None:
#                 aggregated_df = df
#             else:
#                 aggregated_df = aggregated_df.union(df)
#             os.remove(file_path)
    
#     # Now perform the final aggregation across all files
#     if aggregated_df is not None:
#         final_df = aggregated_df.groupBy("user_id", "username").agg(concat_ws(" ", collect_list("aggregated_content")).alias("final_aggregated_content"))
#         # final_df.show(2)
#         # Tokenize the content
#         tokenizer = Tokenizer(inputCol="final_aggregated_content", outputCol="words")
#         wordsData = tokenizer.transform(final_df)
#         # Remove stopwords
#         remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
#         wordsData = remover.transform(wordsData)
        
#         # Count the frequency of each word and filter out rare words
#         cv = CountVectorizer(inputCol="filtered_words", outputCol="cv_features", minDF=2)
#         cvModel = cv.fit(wordsData)
#         filteredWordsData = cvModel.transform(wordsData)
#         # filteredWordsData.show()
#         # Compute TF-IDF
#         idf = IDF(inputCol="cv_features", outputCol="features")
#         idfModel = idf.fit(filteredWordsData)
#         rescaledData = idfModel.transform(filteredWordsData)
#         # rescaledData.show()
#         # Select the required columns (user id, username, and features)
#         tfidf_matrix = rescaledData.select("user_id", "username", "features")
#         tfidf_matrix.show(truncate=False)
#         tfidf_matrix.write.mode("append").parquet("opt/warehouse")
#     time.sleep(300)

        
while True:
    for filename in os.listdir('/opt/datalake'):
        file_path = os.path.join('/opt/datalake', filename)
        
        # Check if the current object is a file
        if os.path.isfile(file_path):
            # print(f"Processing file: {file_path}")
            
            df = spark_session.read.json(file_path)
            df = df.select(col("account.id").alias("user_id"), 
                   col("account.username").alias("username"), 
                   lower(regexp_replace("content", "[^\\w\\s]", "")).alias("content"))
            # df = df.groupBy("user_id", "username").agg(concat_ws(" ", collect_list("content")).alias("aggregated_content"))
            # Drop missing values
            df = df.na.drop(subset=["user_id", "username", "content"])
            # Tokenize the content
            tokenizer = Tokenizer(inputCol="content", outputCol="words")
            wordsData = tokenizer.transform(df)
            
            # Remove stopwords
            remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
            wordsData = remover.transform(wordsData)
            
            # Count the frequency of each word and filter out rare words
            cv = CountVectorizer(inputCol="filtered_words", outputCol="cv_features", minDF=2)
            cvModel = cv.fit(wordsData)
            filteredWordsData = cvModel.transform(wordsData)
            # filteredWordsData.show()
            # Compute TF-IDF
            idf = IDF(inputCol="cv_features", outputCol="features")
            idfModel = idf.fit(filteredWordsData)
            rescaledData = idfModel.transform(filteredWordsData)
            # rescaledData.show()
            # Select the required columns (user id, username, and features)
            tfidf_matrix = rescaledData.select("user_id", "username", "filtered_words", "features")
            tfidf_matrix.show(truncate=False)
            tfidf_matrix.write.mode("append").parquet("opt/warehouse")
            # Remove processed file
            os.remove(file_path)
    time.sleep(300)


+------------------+------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

KeyboardInterrupt: 

In [12]:
ayli

DataFrame[value: string]

# Spark grep

In [21]:
orlandos_lines = ayli.filter(ayli.value.contains("Project"))

In [22]:
orlandos_lines.show(n=10)

+--------------------+
|               value|
+--------------------+
|The Project Guten...|
|of the Project Gu...|
|of this license, ...|
|concept and trade...|
|of the Project Gu...|
|of derivative wor...|
|To protect the Pr...|
|(or any other wor...|
|Project Gutenberg...|
|Section 1. Genera...|
+--------------------+
only showing top 10 rows



# Term Frequency

In [23]:
from pyspark.sql.functions import explode, split
wordCounts = ayli.select(explode(split(ayli.value, "\s+")).alias("word")).groupBy("word").count()
_coll = wordCounts.collect()

In [24]:
wordCounts.show()

+----------+-----+
|      word|count|
+----------+-----+
|    online|    4|
|      some|   25|
| disgrace,|    1|
|      hope|    8|
|     still|    5|
|     those|    9|
|        By|   19|
|       art|   22|
|   enough;|    1|
| Monsieur,|    2|
|    youth,|   11|
|Princesse:|    1|
|   embrace|    1|
|     burs,|    1|
|     cold,|    1|
|   strait.|    1|
|     1.Lo.|    1|
|     Sword|    2|
|    merrie|    1|
| Spheares:|    1|
+----------+-----+
only showing top 20 rows



## Save as Parquet File
https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

In [25]:
wordCounts.write.mode('overwrite').parquet('/opt/warehouse/wordcounts.parquet')

## Read back Parquet data

In [26]:
wc2 = spark_session.read.parquet('/opt/warehouse/wordcounts.parquet/')
wc2.show()

+----------+-----+
|      word|count|
+----------+-----+
|    online|    4|
|      some|   25|
| disgrace,|    1|
|      hope|    8|
|     still|    5|
|     those|    9|
|        By|   19|
|       art|   22|
|   enough;|    1|
| Monsieur,|    2|
|    youth,|   11|
|Princesse:|    1|
|   embrace|    1|
|     burs,|    1|
|     cold,|    1|
|   strait.|    1|
|     1.Lo.|    1|
|     Sword|    2|
|    merrie|    1|
| Spheares:|    1|
+----------+-----+
only showing top 20 rows



### Enable SQL-querying
Create a temp-view from wc2 with name "wordcounts" so we can reference that as a table name in subsequent SQL queries.

In [29]:
wc2.createOrReplaceTempView("wordcounts")

ans = spark_session.sql("SELECT * FROM wordcounts WHERE LEN(word) > 4 ORDER BY count DESC")
ans.show()

+----------+-----+
|      word|count|
+----------+-----+
|   Project|   79|
|     would|   65|
|Gutenberg™|   53|
|     shall|   53|
|     Enter|   52|
|     which|   50|
|     there|   39|
|    should|   34|
|     these|   32|
|     their|   31|
|     other|   28|
|electronic|   27|
|    cannot|   27|
|    better|   26|
|  Rosalind|   25|
|     neuer|   24|
|     could|   23|
|     works|   23|
|    thinke|   23|
|     beare|   22|
+----------+-----+
only showing top 20 rows



In [30]:
ans.limit(10).write.json('/opt/warehouse/answer.json')

In [31]:
list_of_dicts = ans.limit(10).rdd.map(lambda row: row.asDict()).collect()


# Close Session
This shuts down the executors running on the workers and relinquishes cluster resources associated with this app.

In [32]:
spark_session.stop()