# Google Colab Exercise with PySpark

## Purpose

In this exercise, we are going to set PySpark with Google Colab and test some of its functionality.

From a coding perspective, PySpark may look just like what you can do with sklearn or pandas. So why is the similarity? What is the purpose of using Spark? You may think about these questions while doing the practice.

## Setup

Let's do the setup before using PySpark in Colab.

### Google Drive Access

Before we can install related packages for PySpark, we need to set up proper access to Google Drive from the Colab environment.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


After mounting the Google Drive, visit `Files` in Colab to see if you can access related folders properly. Place a zip file in a directory and test the `unzip` command.

In [None]:
# Test google drive access
# Suppose you have uploaded a zip file, test.txt.zip, in the MyDrive/LEADING2021 folder
!unzip "/content/drive/MyDrive/LEADING2021/test.txt.zip" -d "/content/drive/MyDrive/LEADING2021/"

Archive:  /content/drive/MyDrive/LEADING2021/test.txt.zip
  inflating: /content/drive/MyDrive/LEADING2021/test.txt  
   creating: /content/drive/MyDrive/LEADING2021/__MACOSX/
  inflating: /content/drive/MyDrive/LEADING2021/__MACOSX/._test.txt  


### PySpark Setup in Colab

Now that you have access to Google drive, let's start with setting up PySpark.

Note that:

1. Each step may take some time; so be patient.
2. The latest Spark version may have changed when you see this tutorial. You may visit: https://downloads.apache.org/spark/ first to find out about the latest version number.

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# add `-P /content/drive/MyDrive/LEADING2021/` if you want to download it to a specific directory
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

In [None]:
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

In [None]:
!pip install -q findspark

### Test PySpark

If you encountered no error in the previous steps, Spark-related libraries should be in place for Colab.

Now let's test it after the installation.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"   # make sure the version number matches

In [None]:
import findspark
findspark.init()

In [None]:
findspark.find()

'/content/spark-3.1.2-bin-hadoop3.2'

If `findspark.find()` output the correct spark version, the setup is ready. We can start a new (local) Spark Session:

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("LEADING")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
spark

## Spark Text Processing

Now let's use PySpark to help solve a real-world Big Data problem. In fact, we will be testing it on a very tiny amount of data for the sake of demonstration here.

### Text Vectorization

We can use related modules in pyspark.ml.feature to extract, transform and select features. In the following example, we use the `Tokenizer` to tokenize some text data and vectorize them using an `IDF` representation.

1. Text tokenization

In [None]:
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF

sentenceData = spark.createDataFrame([
    (0.0, "Python SQL Data"),
    (0.0, "R SQL Science"),
    (1.0, "Data Sience and Data Engineering")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

wordsData.show()

+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|  0.0|     Python SQL Data| [python, sql, data]|
|  0.0|       R SQL Science|   [r, sql, science]|
|  1.0|Data Sience and D...|[data, sience, an...|
+-----+--------------------+--------------------+



2. Text Vectorization with Frequencies

In [None]:
# hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
# featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

cv = CountVectorizer()
cv.setInputCol("words")
cv.setOutputCol("vectors")
model = cv.fit(wordsData)
vectorData = model.transform(wordsData)

In [None]:
model.vocabulary

['data', 'sql', 'sience', 'python', 'science', 'r', 'engineering', 'and']

In [None]:
vectorData.select("vectors").head()

Row(vectors=SparseVector(8, {0: 1.0, 1: 1.0, 3: 1.0}))

3. IDF Transformation

In [None]:
idf = IDF(inputCol="vectors", outputCol="idfs")
idfModel = idf.fit(vectorData)
idfData = idfModel.transform(vectorData)

In [None]:
# idfData.select("idfs").show()
idfData.select("idfs").head()

Row(idfs=SparseVector(8, {0: 0.2877, 1: 0.2877, 3: 0.6931}))

### Parallel Computing

We have used PySpark to process text in a local, in-memory session. The same can be done using other packages such as NLTK and/or sklearn. So why do we use Spark?

The benefit of Spark is that we can run the same process on a computer cluster, where work can be distributed/parallized. This is especially useful when we have a lot of data (volume and velocity) and the computing is intensive.

#### Continued "Words" Example in Memory

In the above example, we have a list of tokenized words in the `words` column. Let's take a quick look at how the data can be distributed using the Resilient Distributed Dataset (RDD) so they can be processed in parallel.  

In [None]:
words = wordsData.select("words")
# words.show()
words

DataFrame[words: array<string>]

In [None]:
import pyspark
sc = spark.sparkContext # pyspark.SparkContext('local[*]')

In [None]:
#words2 = "to be or not to be".split()
words2 = list(words.toPandas()['words'])
words_rdd = sc.parallelize(words2)

In [None]:
words_rdd

ParallelCollectionRDD[28] at readRDDFromFile at PythonRDD.scala:274

In [None]:
word_tuples_rdd = words_rdd.map(lambda x: (x, 1))
word_tuples_rdd

PythonRDD[29] at RDD at PythonRDD.scala:53

In [None]:
word_tuples_rdd.collect()

[(['python', 'sql', 'data'], 1),
 (['r', 'sql', 'science'], 1),
 (['data', 'sience', 'and', 'data', 'engineering'], 1)]

#### Text Files

In reality, it is more likely we have the distributed data as data files and we need to load them into memory before processing them. And we can implement the processes of `map` and `reduce` to run certain statistics, e.g. to get word counts.

Let's create a simple data file, e.g. `words.txt` on the google drive, and put some text content in it. The example here only uses one file; but remember one may be processing many files from many computing nodes of a Spark cluster.

Now can load the text data using `textFile()` of the Spark context and then split the text into words as RDD.

In [None]:
words3 = sc.textFile("/content/drive/MyDrive/LEADING2021/words.txt")\
            .flatMap(lambda line: line.split(" "))

Now we use `collect` to pull all data together from all nodes, though in this case we only have one, local Spark node.

In [None]:
words3.collect()

['Python',
 'SQL',
 'Data',
 'R',
 'SQL',
 'Science',
 'Data',
 'Sience',
 'and',
 'Data',
 'Engineering']

Now we use:

1. `map` to reduce every word into lower-case form and produce `word 1` as `key value/count` pairs.
2. `reduce` to aggregate the count for each word/key.

In [None]:
wordCounts = words3.map(lambda word: (word.lower(), 1))\
                    .reduceByKey(lambda a,b: a+b)

In the end, `collect` the aggregated data, i.e. `wordCounts`:

In [None]:
wordCounts.collect()

[('python', 1),
 ('sql', 2),
 ('data', 3),
 ('r', 1),
 ('science', 1),
 ('sience', 1),
 ('and', 1),
 ('engineering', 1)]

We save the output to a text file:

In [None]:
wordCounts.saveAsTextFile("/content/drive/MyDrive/LEADING2021/word_counts2")

## Spark Structured Data Processing

Now let's take a look at structured data processing with Spark, using the classic `Iris` dataset in CSV.

In [None]:
iris = spark.read.csv("/content/drive/MyDrive/LEADING2021/iris.csv", header=True, inferSchema=True)
iris.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- SepalLengthCm: double (nullable = true)
 |-- SepalWidthCm: double (nullable = true)
 |-- PetalLengthCm: double (nullable = true)
 |-- PetalWidthCm: double (nullable = true)
 |-- Species: string (nullable = true)



In [None]:
iris.show(5)

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
+---+-------------+------------+-------------+------------+-----------+
only showing top 5 rows



In [None]:
iris.count()

150

In [None]:
iris.describe().show()

### Spark SQL

PySpark includes functions to query structured data on Spark, using DataFrame APIs or raw SQL.

In [None]:
# compare to `select SepalLengthCm, SepalWidthCm from iris` in SQL
iris.select("SepalLengthCm", "SepalWidthCm").show(5)

DataFrame[SepalLengthCm: double, SepalWidthCm: double]

In [None]:
# compare to `select distinct Species from iris" in SQL
iris.select('Species').distinct().show(10)

+---------------+
|        Species|
+---------------+
| Iris-virginica|
|    Iris-setosa|
|Iris-versicolor|
+---------------+



In [None]:
iris2 = iris.filter(iris.PetalWidthCm>1)

In [None]:
iris2.count()

93

In [None]:
two_species = ['Iris-setosa', 'Iris-virginica']
iris.filter( (iris.Species.isin(two_species)) & (iris.PetalWidthCm>1) )\
    .count()

50

#### Group By

Compute aggregations based on groups:

In [None]:
from pyspark.sql import functions as F

# Compare to SQL like:
# select Species, count(Id) from iris_table
iris.groupBy("Species").agg(F.count("Id")).show()

+---------------+---------+
|        Species|count(Id)|
+---------------+---------+
| Iris-virginica|       50|
|    Iris-setosa|       50|
|Iris-versicolor|       50|
+---------------+---------+



Use filter to obtain a subset and then run group-by statistics:

In [None]:
iris.filter( (iris.Species.isin(two_species)) & (iris.PetalWidthCm>1) )\
  .groupBy("Species").agg(F.count("Id")).show()

+--------------+---------+
|       Species|count(Id)|
+--------------+---------+
|Iris-virginica|       50|
+--------------+---------+



#### RAW SQL

It is also possible to run raw SQL statements directly within the Spark Context.

In [None]:
# Register the iris dataframe as a table
iris.registerTempTable("iris")

# Query the registered table `iris`
sc.sql("select Species from iris").show(10)