# Data Science Examples
This repo contains example code that demonstrates the data science software available on Great Lakes and Cavium ThunderX. The code utilizes Spark, Python and other libraries commonly used for data science.

## Select Cluster
Run the cell below to create a dropdown menu, then select which cluster your are running on, Great Lakes or ThunderX.

In [None]:
import ipywidgets as widgets

# Prompt user to select cluster for Great Lakes or Cavium ThunderX
cluster = widgets.Dropdown(options = ['greatlakes', 'thunderx'], description = "Cluster")
cluster

## Approximate Pi
Performs parallel computation of Pi using some example code included with Spark.

Spark creates 1,500 partitions, creates a task per partition, and divides the tasks between the available executors.

In [None]:
import time

from random import random
from operator import add

from pyspark.sql import SparkSession

partitions = 1500
n = 100000 * partitions

def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0

start = time.time()
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
end = time.time()

print("Pi was computed using Spark in {} seconds. Pi is roughly {}."
    .format(round(end - start, 2), round(4.0 * count / n, 3)))

## Word Count
Performs a word count on a text file with Spark RDD.

Spark reads the dataset (1.2 GB) and automatically splits it into 9 partitions based on file size. It assigns the tasks to the available executors.

In [None]:
import time

# Create an RDD in PySpark from large text file
start = time.time()
if cluster.value == "greatlakes":
    input_text_file = "/nfs/turbo/arcts-data-hadoop-stage/data/Gutenberg.txt"
elif cluster.value == "thunderx":
    input_text_file = "/data/Gutenberg.txt"
else:
    print("Error: The var 'cluster' should be set to 'greatlakes' or 'thunderx'.")
rdd = sc.textFile(input_text_file)

# Create function to make it all lower-case and split the lines into words,
# creating a new RDD with each element being a word.
def Func(lines):
    lines = lines.lower()
    lines = lines.split()
    return lines

rdd_flat  = rdd.flatMap(Func)

# Do a word count using a map-reduce like function.
# Map each word with a count of 1 like a key-value pair where the value is 1.
rdd_mapped = rdd_flat.map(lambda x: (x,1))
# Then group each count by key.
rdd_grouped = rdd_mapped.groupByKey()
# Take the sum of each word, then swap the key value pair order,
# then sort by value instead of key.
rdd_frequency = rdd_grouped.mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(False)
# Get the 10 most frequent words.
top_ten = rdd_frequency.take(10)
end = time.time()

print("Top 10 most frequent words were found with Spark in {} seconds.".format(round(end - start, 2)))
print("Most frequent words: {}".format(top_ten))

## Test NumPy
Performs NumPy's mod() function on large dataset.

In [None]:
import time

def mod(x):
    import numpy as np
    return (x, np.mod(x, 2))

# Create an RDD of series of integers with 20 partitions
start = time.time()
rdd = sc.parallelize(range(10000000), 30)
print("Created RDD of {0} partitions.".format(rdd.getNumPartitions()))

# Apply the mod function to each integer and get max
rdd.map(mod).max()
end = time.time()
print("Test completed in {0} seconds.".format(round(end - start, 2)))

## Numeric Integration
Performs a numeric integration with SciPy.

In [None]:
# Example Python Program with NumPy and SciPy
# Basic Numerical Integration: the Trapezoid Rule
# https://nbviewer.jupyter.org/github/ipython/ipython/blob/master/examples/IPython%20Kernel/Trapezoid%20Rule.ipynb

import time
import numpy as np
from scipy.integrate import quad
from scipy.version import version

# Use NumPy to define a simple function and sample it between 0 and 10 at 200 points
def f(x):
    return (x-3)*(x-5)*(x-7)+85

start = time.time()
x = np.linspace(0, 10, 200)
y = f(x)

# Use NumPy to choose a region to integrate over and take only a few points in that region
a, b = 1, 8 # the left and right boundaries
N = 5 # the number of points
xint = np.linspace(a, b, N)
yint = f(xint)

# Compute the integral both at high accuracy and with the trapezoid approximation

# Use SciPy to calculate the integral
integral, error = quad(f, a, b)
print("The integral is:", integral, "+/-", error)

# Use NumPy to calculate the area with the trapezoid approximation
integral_trapezoid = sum( (xint[1:] - xint[:-1]) * (yint[1:] + yint[:-1])
    ) / 2
print("The trapezoid approximation with", len(xint),
    "points is:", integral_trapezoid)
end = time.time()
print("Test completed in {0} seconds.".format(round(end - start, 2)))

## Machine Learning
Performs a logistic regression using Spark MLlib with example code included with Spark.

In [None]:
import sys
import time

from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

start = time.time()
if cluster.value == "greatlakes":
    input_svm_text_file = "/nfs/turbo/arcts-data-hadoop-stage/data/sample_svm_data.txt" # Great Lakes
elif cluster.value == "thunderx":
    input_svm_text_file = "/data/sample_svm_data.txt"                                   # ThunderX
else:
    print("Error: The var 'cluster' should be set to 'greatlakes' or 'thunderx'.")
data = sc.textFile(input_svm_text_file)
parsedData = data.map(parsePoint)

# Build the model
model = LogisticRegressionWithLBFGS.train(parsedData)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
end = time.time()

print("Logistic regression was completed in {0} seconds.".format(round(end - start, 2)))
print("Training Error = " + str(trainErr))


## Natural Language Toolkit
Performs word tokenizing and part of speech tagging with NLTK.

In [None]:
import os
import sys
import time

partitions = 12
nltk_data_location = "/nfs/turbo/arcts-data-hadoop-stage/data/nltk_data/"

start = time.time()

if cluster.value == "greatlakes":
    input_text_file = "/nfs/turbo/arcts-data-hadoop-stage/data/complete-works-of-shakespeare.txt"    
elif cluster.value == "thunderx":
    input_text_file = "/data/complete-works-of-shakespeare.txt"    
else:
    print("Error: The var 'cluster' should be set to 'greatlakes' or 'thunderx'.")

data = spark.sparkContext.textFile(input_text_file, partitions)

def word_tokenize(x):
    import nltk
    nltk.data.path.append(nltk_data_location)
    return nltk.word_tokenize(x)

def pos_tag(x):
    import nltk
    nltk.data.path.append(nltk_data_location)
    return nltk.pos_tag([x])

# Tokenize
words = data.flatMap(word_tokenize)

# Label parts of speech
pos_word = words.map(pos_tag)
print(pos_word.take(20))
end = time.time()

print("Completed dataset tokenization and part of speech tagging of first 20 elements in {0} seconds."
      .format(round(end - start, 2)))