# Classification and Spark

In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline

# Classification example (6 points)
First, we need to install the sklearn (scikit-learn) package that contains simple and efficient tools for data mining and data analysis. Then, we load a data set from the sklearn.datasets as an example for classification.

In [2]:
!pip install sklearn



In [3]:
import sklearn.datasets as mldata
data_dict = mldata.load_breast_cancer() #load the data

# translate the data_dict to dataframe
cancer = pd.DataFrame(data_dict['data'], columns=data_dict['feature_names']) 

cancer['bias'] = 1.0 # for the convenience of model fitting

# Target data_dict['target'] = 0 is malignant; 1 is benign
cancer['malignant'] = 1 - data_dict['target'] 
cancer.iloc[0]

mean radius                  17.990000
mean texture                 10.380000
mean perimeter              122.800000
mean area                  1001.000000
mean smoothness               0.118400
mean compactness              0.277600
mean concavity                0.300100
mean concave points           0.147100
mean symmetry                 0.241900
mean fractal dimension        0.078710
radius error                  1.095000
texture error                 0.905300
perimeter error               8.589000
area error                  153.400000
smoothness error              0.006399
compactness error             0.049040
concavity error               0.053730
concave points error          0.015870
symmetry error                0.030030
fractal dimension error       0.006193
worst radius                 25.380000
worst texture                17.330000
worst perimeter             184.600000
worst area                 2019.000000
worst smoothness              0.162200
worst compactness        

Now we can conduct one train-test split.

In [4]:
from sklearn.model_selection import train_test_split 

#train_test_split in sklearn can help spit the data as follows
train, test = train_test_split(cancer, test_size=0.25, random_state=100)
x_train = train.drop('malignant', axis=1).values
y_train = train['malignant'].values
x_test = test.drop('malignant', axis=1).values
y_test = test['malignant'].values

print("Training Data Size: ", len(train))
print("Test Data Size: ", len(test))

Training Data Size:  426
Test Data Size:  143


The training data can be used to fit a LogisticRegression model as follows.

In [5]:
from sklearn.linear_model import LogisticRegression

model = LogisticRegression(fit_intercept=False, C=1e-5, solver='lbfgs')
model.fit(x_train, y_train)

LogisticRegression(C=1e-05, class_weight=None, dual=False,
          fit_intercept=False, intercept_scaling=1, max_iter=100,
          multi_class='warn', n_jobs=None, penalty='l2', random_state=None,
          solver='lbfgs', tol=0.0001, verbose=0, warm_start=False)

In [6]:
# Show the average train accuracy of the model
correct_train = model.predict(x_train) == y_train
np.mean(correct_train)

0.9014084507042254

In [7]:
# Show the average test accuracy of the model
correct_test = model.predict(x_test) == y_test
np.mean(correct_test)

0.9300699300699301

# Spark example (6 points)

# Installing PySpark Locally

Uncomment the following ones to install Spark locally in the same folder as this notebook:

In [10]:
!curl -O http://mirror.metrocast.net/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz 
!tar -xvf spark-2.4.2-bin-hadoop2.7.tgz

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  198M  100  198M    0     0  2109k      0  0:01:36  0:01:36 --:--:--  852k
spark-2.4.2-bin-hadoop2.7/
spark-2.4.2-bin-hadoop2.7/python/
spark-2.4.2-bin-hadoop2.7/python/setup.cfg
spark-2.4.2-bin-hadoop2.7/python/pyspark/
spark-2.4.2-bin-hadoop2.7/python/pyspark/resultiterable.py
spark-2.4.2-bin-hadoop2.7/python/pyspark/python/
spark-2.4.2-bin-hadoop2.7/python/pyspark/python/pyspark/
spark-2.4.2-bin-hadoop2.7/python/pyspark/python/pyspark/shell.py
spark-2.4.2-bin-hadoop2.7/python/pyspark/heapq3.py
spark-2.4.2-bin-hadoop2.7/python/pyspark/join.py
spark-2.4.2-bin-hadoop2.7/python/pyspark/version.py
spark-2.4.2-bin-hadoop2.7/python/pyspark/rdd.py
spark-2.4.2-bin-hadoop2.7/python/pyspark/java_gateway.py
spark-2.4.2-bin-hadoop2.7/python/pyspark/find_spark_home.py
spark-2.4.2-bin-hadoop2.7/python/pyspark/_globals.py
spark-2.4.2-bin-h

spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-cloudpickle.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-pmml-model.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-pyrolite.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-netlib.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-AnchorJS.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-modernizr.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-machinist.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-javolution.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-xmlenc.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-join.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-jline.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-antlr.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-jtransforms.html
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-leveldbjni.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-protobuf.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-bootstrap.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-zstd-jni.txt
spark-2.4.2-bin-hadoop2.7/licenses/LICENSE-dat

spark-2.4.2-bin-hadoop2.7/jars/snakeyaml-1.15.jar
spark-2.4.2-bin-hadoop2.7/jars/kubernetes-client-4.1.2.jar
spark-2.4.2-bin-hadoop2.7/jars/commons-cli-1.2.jar
spark-2.4.2-bin-hadoop2.7/jars/snappy-java-1.1.7.3.jar
spark-2.4.2-bin-hadoop2.7/jars/parquet-format-2.4.0.jar
spark-2.4.2-bin-hadoop2.7/jars/commons-configuration-1.6.jar
spark-2.4.2-bin-hadoop2.7/jars/jpam-1.1.jar
spark-2.4.2-bin-hadoop2.7/jars/spark-hive_2.12-2.4.2.jar
spark-2.4.2-bin-hadoop2.7/jars/kryo-shaded-4.0.2.jar
spark-2.4.2-bin-hadoop2.7/jars/zookeeper-3.4.6.jar
spark-2.4.2-bin-hadoop2.7/jars/javax.ws.rs-api-2.0.1.jar
spark-2.4.2-bin-hadoop2.7/jars/jackson-core-asl-1.9.13.jar
spark-2.4.2-bin-hadoop2.7/jars/hadoop-mapreduce-client-core-2.7.3.jar
spark-2.4.2-bin-hadoop2.7/jars/spark-launcher_2.12-2.4.2.jar
spark-2.4.2-bin-hadoop2.7/jars/avro-1.8.2.jar
spark-2.4.2-bin-hadoop2.7/jars/hadoop-yarn-api-2.7.3.jar
spark-2.4.2-bin-hadoop2.7/jars/api-asn1-api-1.0.0-M20.jar
spark-2.4.2-bin-hadoop2.7/jars/paranamer-2.8.jar
spark-

spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/R/SparkR
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/R/SparkR.rdx
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/html/
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/html/R.css
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/html/00Index.html
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/help/
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/help/AnIndex
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/help/SparkR.rdb
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/help/aliases.rds
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/help/SparkR.rdx
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/help/paths.rds
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/profile/
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/profile/general.R
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/profile/shell.R
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/tests/
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/tests/testthat/
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/tests/testthat/test_basic.R
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/Meta/
spark-2.4.2-bin-hadoop2.7/R/lib/SparkR/Meta/vignette.rds
s

The following Python Library will configure your python environment

In [11]:
!pip install findspark

Collecting findspark
  Downloading https://files.pythonhosted.org/packages/b1/c8/e6e1f6a303ae5122dc28d131b5a67c5eb87cbf8f7ac5b9f87764ea1b1e1e/findspark-1.3.0-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-1.3.0


If you would like to try using Spark on a cluster for free without any setup checkout [Databricks Community Edition](https://databricks.com/try-databricks)

# Launching PySpark

Setup the PySpark environment.

In [12]:
import os
import findspark
os.environ["PYSPARK_PYTHON"] = "python3"
findspark.init("spark-2.4.2-bin-hadoop2.7/",)

Initialize the SparkSQL session which contains a basic Spark Context.  This may take a few moments to launch the cluster of (typically 4 to 8 python jobs in the background).  Note in a real Spark deployment you would simply change the `.master("local[*]")` to instead point to the YARN resource manager.  To learn more about deploying Spark on a cluster of machines read [this tutorial](https://spark.apache.org/docs/latest/cluster-overview.html).

Note: You must have Java installed on your computer for this to work!

In [13]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
        .master("local[*]")
        .appName("LectureExample")
        .getOrCreate()
)
sc = spark.sparkContext

## Word Count Example

As a quick example of what Spark can do, the following code will compute the word counts of pg100 in a parallelized fashion. That means that if your computer has multiple processors, they are all put to use computing the word counts.

Below the layer of abstraction that we can see, it is running map reduce.

In [14]:
import re #regular expression used to split lines of text into words

lines = sc.textFile("./pg100.txt") # download pg100.txt from canvas in fold of Spark

#Split the lines into words (including all alphanumeric characters)
words = lines.flatMap(lambda line: re.split(r'[^\w]+', line))

#Mapper
pairs = words.map(lambda word: (word, 1))

#Reducer
counts = pairs.reduceByKey(lambda n1, n2: n1 + n2)

#Result
counts.toDF().toPandas()

Unnamed: 0,_1,_2
0,The,4041
1,Project,93
2,EBook,3
3,of,15830
4,Shakespeare,47
5,,197060
6,is,8437
7,use,319
8,anyone,6
9,anywhere,8


What if you want to remove some words with specific conditions?

In [15]:
words = words.filter(lambda word: word != '')

counts = words.map(lambda word: (word, 1)) \
              .reduceByKey(lambda a, b: a + b)

counts.toDF().toPandas()

Unnamed: 0,_1,_2
0,The,4041
1,Project,93
2,EBook,3
3,of,15830
4,Shakespeare,47
5,is,8437
6,use,319
7,anyone,6
8,anywhere,8
9,at,2299
