# **PySpark**: The Apache Spark Python API

## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark cluster to process data using Spark Python API.

## 2. The Spark Cluster

### 2.1. Connection

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        config("spark.driver.memory","16G").\
        config("spark.driver.maxResultSize", "0").\
        getOrCreate()
spark

22/01/05 21:22:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [10]:
!pip install sparknlp



In [None]:
!pip install wget
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash

More confs for SparkSession object in standalone mode can be added using the **config** method. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession).

## 3. The Data

### 3.1. Introduction

We will be using Spark Python API to read, process and write data. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/index.html).

### 3.2. Read

Let's read some UK's macroeconomic data ([source](https://www.kaggle.com/bank-of-england/a-millennium-of-macroeconomic-data)) from the cluster's simulated **Hadoop distributed file system (HDFS)** into a Spark dataframe.

In [35]:

# dataset = spark.read \
#       .option("header", True) \
#       .json("data/Prime_Pantry.json")
dataset = spark.read \
      .option("header", True) \
      .option("inferSchema", True) \
      .option('quote', '"') \
      .option('escape', '"') \
      .csv("hdfs://namenode:9000/data/preprocessed_data.csv")
      

Let's then display some dataframe metadata, such as the number of rows and cols and its schema (cols name and type).

In [36]:
from pyspark.sql.functions import rand 
dataset = dataset.orderBy(rand()) # Shuffle

In [37]:
dataset.show(5)

+----+--------------------+------------------+
| _c0|              inputs|           outputs|
+----+--------------------+------------------+
|  23|cổ_điển nổi_tiếng...|         am-nhac-1|
|3872|ươm mầm trí_tuệ t...|my-thuat-kien-truc|
|4451|phương_pháp nuôi ...|   nong-lam-nghiep|
|6445|bóng_đá 12 vì tin...|          the-thao|
|6862|phim đời vượt lên...|van-hoa-nghe-thuat|
+----+--------------------+------------------+
only showing top 5 rows



In [38]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="inputs", outputCol="words", pattern="\\W")
# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

In [39]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "outputs", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(dataset)
dataset = pipelineFit.transform(dataset)
dataset.show(5)

                                                                                

+----+--------------------+------------------+--------------------+--------------------+--------------------+-----+
| _c0|              inputs|           outputs|               words|            filtered|            features|label|
+----+--------------------+------------------+--------------------+--------------------+--------------------+-----+
|  23|cổ_điển nổi_tiếng...|         am-nhac-1|[c, _, i, n, n, i...|[_, i, n, n, i_ti...|(3929,[0,1,2,3,4,...| 11.0|
|3872|ươm mầm trí_tuệ t...|my-thuat-kien-truc|[m, m, m, tr, _tu...|[m, m, m, tr, _tu...|(3929,[0,1,3,4,5,...|  8.0|
|4451|phương_pháp nuôi ...|   nong-lam-nghiep|[ph, ng_ph, p, nu...|[ph, ng_ph, p, nu...|(3929,[0,1,2,3,4,...|  7.0|
|6445|bóng_đá 12 vì tin...|          the-thao|[b, ng_, 12, v, t...|[b, ng_, 12, v, t...|(3929,[0,1,2,3,4,...|  9.0|
|6862|phim đời vượt lên...|van-hoa-nghe-thuat|[phim, i, v, t, l...|[phim, i, v, l, n...|(3929,[0,1,2,3,4,...| 12.0|
+----+--------------------+------------------+--------------------+-----

In [40]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
trainingData.show(10)
testData.show(10)

+----+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
| _c0|              inputs|             outputs|               words|            filtered|            features|label|
+----+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|  23|cổ_điển nổi_tiếng...|           am-nhac-1|[c, _, i, n, n, i...|[_, i, n, n, i_ti...|(3929,[0,1,2,3,4,...| 11.0|
| 571|lão tử đạo_đức ki...| chinh-tri-triet-hoc|[l, o, t, o_, c, ...|[l, o, o_, kinh, ...|(3929,[0,1,2,3,4,...|  5.0|
| 770|nha trang_điểm hẹ...|           du-lich-1|[nha, trang_, i, ...|[nha, trang_, i, ...|(3929,[0,1,2,3,4,...| 13.0|
| 936|hỗn_độn hài_hòa t...|     khoa-hoc-co-ban|[h, n_, n, h, i_h...|[h, n_, n, h, i_h...|(3929,[0,1,2,3,4,...| 10.0|
| 979|sổ_tay kiến_thức_...|     khoa-hoc-co-ban|[s, _tay, ki, n_t...|[s, _tay, ki, n_t...|(3929,[0,1,2,3,6,...| 10.0|
|1419|thiết_kế vi_mạch ...|   khoa-hoc-ky-thuat|[thi, t_



+----+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
| _c0|              inputs|             outputs|               words|            filtered|            features|label|
+----+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
| 808|500 danh_lam 500 ...|           du-lich-1|[500, danh_lam, 5...|[500, danh_lam, 5...|(3929,[0,1,2,3,4,...| 13.0|
|1951|trắc_nghiệm thông...|khoa-hoc-tu-nhien...|[tr, c_nghi, m, t...|[tr, c_nghi, m, t...|(3929,[0,1,2,3,4,...|  3.0|
|2545|hoàng lê nhất_thố...|      lich-su-dia-ly|[ho, ng, l, nh, t...|[ho, ng, l, nh, t...|(3929,[0,1,2,3,4,...|  0.0|
|3961|1000 gương_mặt th...|  my-thuat-kien-truc|[1000, g, ng_m, t...|[1000, g, ng_m, t...|(3929,[0,1,2,3,4,...|  8.0|
|4746|luật thuế thu_nhậ...|         phap-luat-1|[lu, t, thu, thu_...|[lu, thu, thu_nh,...|(3929,[0,1,2,3,4,...|  4.0|
|5361|thiền_sư em bé 5 ...|       sach-ton-giao|[thi, n_

                                                                                

In [41]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.1]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)
cvModel = cv.fit(trainingData)

22/01/05 21:41:32 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/01/05 21:41:32 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/01/05 21:43:55 WARN TaskSetManager: Lost task 73.0 in stage 343.0 (TID 18205, 172.31.0.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 12042556 bytes of memory, got 10678061
	at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:118)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:381)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:405)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterat

### 3.3. Process

In this example, we will get UK's population and unemployment rate thoughtout the years. Let's start by selecting the relevant columns.

In [53]:
predictions = cvModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("inputs","outputs","probability","label","prediction") \
    .show(n = 20, truncate = 30)

+------------------------------+------------------------+------------------------------+-----+----------+
|                        inputs|                 outputs|                   probability|label|prediction|
+------------------------------+------------------------+------------------------------+-----+----------+
|hoàng lê nhất_thống chí diễ...|          lich-su-dia-ly|[0.4643103572852561,0.09053...|  0.0|       0.0|
|1000 gương_mặt thượng_đế ng...|      my-thuat-kien-truc|[0.15277156658172505,0.0922...|  8.0|       0.0|
|nhà_tù côn_đảo 1862 1975 nh...|          lich-su-dia-ly|[0.5679016755962756,0.03807...|  0.0|       0.0|
|dân_ca làn_điệu dân_ca phổ_...|               am-nhac-1|[0.21366076468261685,0.0953...| 11.0|       0.0|
|mandala thiên_nhiên tô_màu ...|      my-thuat-kien-truc|[0.147508900349656,0.113460...|  8.0|       0.0|
|tìm_hiểu kiến_trúc xây_dựng...|      my-thuat-kien-truc|[0.1489358689412086,0.11637...|  8.0|       0.0|
|vương_quốc đàng ngoài tái_b...|          lich

In [59]:
predictions = cvModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("inputs","outputs","probability","label","prediction") \
    .show(n = 500, truncate = 30)



+------------------------------+------------------------+------------------------------+-----+----------+
|                        inputs|                 outputs|                   probability|label|prediction|
+------------------------------+------------------------+------------------------------+-----+----------+
|hoàng lê nhất_thống chí diễ...|          lich-su-dia-ly|[0.4643103572852561,0.09053...|  0.0|       0.0|
|1000 gương_mặt thượng_đế ng...|      my-thuat-kien-truc|[0.15277156658172505,0.0922...|  8.0|       0.0|
|nhà_tù côn_đảo 1862 1975 nh...|          lich-su-dia-ly|[0.5679016755962756,0.03807...|  0.0|       0.0|
|dân_ca làn_điệu dân_ca phổ_...|               am-nhac-1|[0.21366076468261685,0.0953...| 11.0|       0.0|
|mandala thiên_nhiên tô_màu ...|      my-thuat-kien-truc|[0.147508900349656,0.113460...|  8.0|       0.0|
|tìm_hiểu kiến_trúc xây_dựng...|      my-thuat-kien-truc|[0.1489358689412086,0.11637...|  8.0|       0.0|
|vương_quốc đàng ngoài tái_b...|          lich

                                                                                

In [46]:
evaluator.evaluate(predictions)


                                                                                

0.6420433316892963

We successfully selected the desired columns but two problems were found:
+ The first line contains no data but the unit of measurement of each column;
+ There are many years with missing population and unemployment data.

Let's then remove the first line.

In [49]:
pip install sklearn

Collecting sklearn
  Downloading sklearn-0.0.tar.gz (1.1 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting scikit-learn
  Downloading scikit_learn-1.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (26.4 MB)
     |████████████████████████████████| 26.4 MB 874 kB/s            
[?25hCollecting joblib>=0.11
  Downloading joblib-1.1.0-py2.py3-none-any.whl (306 kB)
     |████████████████████████████████| 306 kB 2.5 MB/s            
Collecting scipy>=1.1.0
  Downloading scipy-1.7.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (39.8 MB)
     |████████████████████████████████| 39.8 MB 676 kB/s            
[?25hCollecting threadpoolctl>=2.0.0
  Downloading threadpoolctl-3.0.0-py3-none-any.whl (14 kB)
Building wheels for collected packages: sklearn
  Building wheel for sklearn (setup.py) ... [?25ldone
[?25h  Created wheel for sklearn: filename=sklearn-0.0-py2.py3-none-any.whl size=1315 sha256=6496816f2187fd14411ca5d8ef75238edc3e9e2003517d3d01071b41a4

Nice! Now, let's drop the dataframe rows with missing data and refactor its columns names.

In [56]:
!pip install pandas
predictions = predictions.toPandas()

Collecting pandas
  Downloading pandas-1.3.5-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.5 MB)
     |████████████████████████████████| 11.5 MB 2.0 MB/s            
Installing collected packages: pandas
Successfully installed pandas-1.3.5


                                                                                

In [57]:

# We are going to use sklearn to evalute the results on test dataset
from sklearn.metrics import classification_report
print (classification_report(predictions['prediction'], predictions['label']))

              precision    recall  f1-score   support

         0.0       0.84      0.45      0.58       445
         1.0       0.88      0.79      0.83       247
         2.0       0.85      0.74      0.79       201
         3.0       0.63      0.45      0.53       256
         4.0       0.92      0.95      0.93       148
         5.0       0.36      0.78      0.49        49
         6.0       0.66      0.79      0.72        75
         7.0       0.72      0.91      0.81        68
         8.0       0.33      0.79      0.46        33
         9.0       0.39      0.85      0.54        39
        10.0       0.28      1.00      0.44        15
        11.0       0.47      0.95      0.63        20
        12.0       0.02      0.33      0.04         3
        13.0       0.21      1.00      0.34         5
        14.0       0.17      1.00      0.29         2
        15.0       0.00      0.00      0.00         0

    accuracy                           0.66      1606
   macro avg       0.48   

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


### 3.4. Write

Lastly, we persist the unemployment data into the cluster's simulated **HDFS**.