In [1]:
# This is a lab test from DataAnalisysWith PySpark
##<font size="12">Chapter 2</font>


In [2]:
# end-of-chapter.py############################################################
#
# Use this to get a free pass from Chapter 2 to Chapter 3.
#
# Remember, with great power comes great responsibility. Make sure you
# understand the code before running it! If necessary, refer to the text in
# Chapter 2.
#
###############################################################################

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, lower, regexp_extract

spark = SparkSession.builder.getOrCreate()

book = spark.read.text("../data/gutenberg_books/1342-0.txt")

lines = book.select(split(book.value, " ").alias("line"))

words = lines.select(explode(col("line")).alias("word"))

words_lower = words.select(lower(col("word")).alias("word_lower"))

words_clean = words_lower.select(
    regexp_extract(col("word_lower"), "[a-z]*", 0).alias("word")
)

words_nonull = words_clean.where(col("word") != "")

In [3]:
#book = spark.read.text("../data/gutenberg_books/1342-0.txt")
#book
# DataFrame[value: string]

In [4]:
words_nonull.show(10)

+---------+
|     word|
+---------+
|    start|
|       of|
|      the|
|  project|
|gutenberg|
|    ebook|
|   george|
|    allen|
|publisher|
|  charing|
+---------+
only showing top 10 rows



In [20]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# Import pathlib
from pathlib import Path

# Specify the file path
folder_path = Path("./results_single_partition.csv")

# Check if the file exists before deleting it
#if folder_path.exists():
    # Delete the file
#    folder_path.unlink()
#    print("File deleted successfully.")
#else:
#    print("File not found.")
    
# Removing a non-empty folder with shutil.rmtree()
import shutil

# Delete folder using shutil
try:
    shutil.rmtree(folder_path)
    print(f"The folder at {folder_path} has been successfully deleted!")
except OSError as e:
    print(f"Error: {folder_path} - {e.strerror}.")

spark = SparkSession.builder.appName(
    "Counting word occurences from a book."
).getOrCreate()

spark.sparkContext.setLogLevel("WARN")

#pyspark.sql.functions.explode
"""
    pyspark.sql.functions.explode(col: ColumnOrName) → pyspark.sql.column.Column[source]

    Returns a new row for each element in the given array or map. 
    Uses the default column name col for elements in the array and key and value for elements in the map unless specified otherwise.
    
"""

# If you need to read multiple text files, replace `1342-0` by `*`.
results = (
    spark.read.text("../data/gutenberg_books/1342-0.txt")
    .select(F.split(F.col("value"), " ").alias("line"))
    .select(F.explode(F.col("line")).alias("word"))
    .select(F.lower(F.col("word")).alias("word"))
    .select(F.regexp_extract(F.col("word"), "[a-z']*", 0).alias("word"))
    .where(F.col("word") != "")
    .groupby(F.col("word"))
    .count()
)

results.orderBy("count", ascending=False).show(10)
results.coalesce(1).write.csv("./results_single_partition.csv")

"""
When using toPandas(), remember that you lose the advantages of working with multiple machines, as the data will accumulate on the driver. 
Reserve this operation for an aggregated or manageable data set. While this is a crude formula, I usually take the number of rows times the number of columns; 
if this number is over 100,000 (for a 16 GB driver), I try to reduce it further. This simple trick helps me get a sense of the size of the data I am dealing with, 
as well as what’s possible given my driver size.

You do not want to move your data between a pandas and a PySpark data frame all the time. 
Reserve toPandas() for either discrete operations or for moving your data into a pandas data frame once and for all. 
Moving back and forth will yield a ton of unnecessary work in distributing and collecting the data for nothing.
"""

if (os.path.exists("./results_single_partition_sg.csv")):
    os.remove("./results_single_partition_sg.csv")
    print("File deleted successfully.")
else:
    results.coalesce(1).toPandas().to_csv("./results_single_partition_sg.csv")

results.columns

The folder at results_single_partition.csv has been successfully deleted!
+----+-----+
|word|count|
+----+-----+
| the| 4617|
|  to| 4292|
|  of| 3831|
| and| 3615|
| her| 2254|
|   a| 2007|
|  in| 1964|
| was| 1867|
|   i| 1778|
| she| 1703|
+----+-----+
only showing top 10 rows

File deleted successfully.


['word', 'count']

In [6]:
from pyspark.sql.functions import col, split
lines = book.select(split(col("value"), " "))
lines

# DataFrame[split(value,  , -1): array<string>]
lines.printSchema()
# root
#  |-- split(value,  , -1): array (nullable = true)
#  |    |-- element: string (containsNull = true)
lines.show(5)
# +--------------------+
# | split(value,  , -1)|
# +--------------------+
# |[The, Project, Gu...|
# |                  []|
# |[This, eBook, is,...|
# |[almost, no, rest...|
# |[re-use, it, unde...|
# +--------------------+
# only showing top 5 rows

root
 |-- split(value,  , -1): array (nullable = true)
 |    |-- element: string (containsNull = false)

+--------------------+
| split(value,  , -1)|
+--------------------+
|[***, START, OF, ...|
|                  []|
|                  []|
|                  []|
|                  []|
+--------------------+
only showing top 5 rows

