# The Cavium Data Science cluster

The Cavium Data Science infrastructure allows for the distributed processing of large data sets across a cluster of computers using simple programming models. The infrastructure is designed to scale up from single servers to over 30 machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, delivering a highly-available service.

The following software is currently installed on the cluster:
* Hadoop 2.8.3  http://hadoop.apache.org/docs/r2.8.3/index.html
* Hive 2.x.x  http://hive.apache.org/   (note: We recommend Hive on Tez for it's improved performance, rather than hive on hadoop.  We do not support hive on spark.)
* Spark 2.x.x http://spark.apache.org/

## How to get an account


Using the Cavium Hadoop cluster requires an ARC-TS user account (acquired by filling out [this form](https://docs.google.com/a/umich.edu/forms/d/e/1FAIpQLSfLnLd7cu_19JeL2kPCfCraAGYiEKTlXKi4nGJLKrcSk26pVQ/viewform)) but does not require an active Flux allocation.

Note: When you create an account, you will automatically be added to our default queue. If you are using our Hadoop cluster for a class or you need a significant allocation, please <a href="mailto:hpc-support@umich.edu">hpc-support@umich.edu</a> with us so you can be added to the appropriate queue. In many of the examples in this user guide, there is a “–queue <your_queue>” flag. Please fill in the name of your queue when running this examples, or simply “default” if you do not have one.

## Accessing the cluster

To use the cluster, you will need a terminal.  Currently the cluster is only accessible via the commmand line. 

If you are trying to access the cluster from off campus, or via the MGuest wireless network, you will need one of two things:

* [Install VPN software](http://www.itcom.itd.umich.edu/vpn/) on your computer
* First ssh to login.itd.umich.edu, then ssh to cavium-thunderx.arc-ts.umich.edu from there.




### On windows

Windows does not come with a suitable terminal/command line prompt, so extra software is needed to access the cluster.  PuTTY is probably the simplest to use.  [Click here](http://www.chiark.greenend.org.uk/~sgtatham/putty/) to download it.  

Starting putty for the first time will bring up a window with an IP address, like so: ![putty login](https://arc-ts.umich.edu/wp-content/uploads/sites/4/2016/02/putty.png "putty login")

Instead of flux-login.arc-ts.umich.edu, replace it with cavium-thunderx-login.arc-ts.umich.edu, and click on the open button and the bottom of the window.  When prompted, you will be asked for your uniqname and kerberos password.  After that you will be prompted for your second authentication token from Duo.  Enter that and you will be logged into the system and receive a command line prompt.  

### On Mac OS X

All OS X laptops and desktops come with a Terminal application under the Utilities folder 

![_](Utilities.jpeg "bleh")
![_](Terminal.jpeg "bleh")

Once you open a terminal window, you can simply type
```
ssh cavium-thunderx-login.arc-ts.umich.edu
```
And enter your kerberos password when prompted, followed by your Duo second factor token.

### On Linux

All linux workstations and servers come with a terminal application.  Start that terminal application, then type 

```
ssh cavium-thunderx-login.arc-ts.umich.edu
```
And enter your kerberos password when prompted, followed by your Duo second factor token.

### Via a jupyter terminal

# Cavium Basics

There is still some preparation to start using the Cavium cluster.  

* Move data into HDFS to take full effect of the cluster.  
* Transfer your code into the cluster and adapting it to use our versions of the software

## Moving Data into HDFS

Hadoop consists of two components; HDFS, a filesystem built for high read speeds, and YARN, a resource manager. HDFS is not a POSIX filesystem, so normal command line tools like “cp” and “mv” will not work. Most of the common tools have been reimplemented for HDFS and can be run using the “hdfs dfs” command. All data must be in HDFS for jobs to be able to read it.

We provide a staging volume for resaearchers to move data into the cluster.  We highly recommend using the umich#flux globus endpoint to quickly move data into and out of the cluster whenever possible when dealing with 1 TB or more of data.  The globus endpoint will allow access to the staging volume at 40 Gbps. 

If you do not have access to globus, researchers can also scp the data into the staging volume using the following command:

```
scp datafiles <your_username>@flux-xfer.arc-ts.umich.edu:/nfs/cavium-staging/<your_username>/
```

Once in the staging volume, you can use the HDFS native commands to put the data into your HDFS home directory as listed below.  


### HDFS native commands

Below are some basic commands: 

List the contents of your HDFS home directory
```
hdfs dfs -ls
```
Copy local file data.csv to your HDFS home directory
```
hdfs dfs -put data.csv data.csv
```
Copy HDFS file data.csv back to your local home directory
```
hdfs dfs -get data.csv data2.csv
```
A complete reference of HDFS commands can be found on the [Apache website](http://hadoop.apache.org/docs/r2.8.3/hadoop-project-dist/hadoop-common/FileSystemShell.html).


### Fuse HDFS

Fuse HDFS allows you use standard posix system commands with HDFS. This may be useful if you have a program that needs to use data that is stored in HDFS. It's also useful to move data into HDFS without relying on the staging volume.

The downside is that we currently do not offer the globus endpoint to the Fuse HDFS, so the only mechanism to copy data to it is via scp:

```
scp data <your_username>@cavium-thunderx-login.arc-ts.umich.edu:/hadoop-fuse/user/<your_username>/
```


### Using Fuse HDFS commands

Fuse HDFS can also be used to directly interact with your data via code like it's directly in the POSIX filesystem, rather than being limited to the HDFS commands.

To use Fuse HDFS, change directories to /hadoop-fuse/user/<your_uniqname>

```
cd /hadoop-fuse/user/<your_username>
```

Once in this directory, you can use commands on your HDFS files just as you would on any other files. For example, the ls command will list the contents of your HDFS home directory.

You could also run a Python or R program that uses a file in HDFS.

You can save the below file and run it as you would regularly run a python program to access an example data file we have available to all users in HDFS.

```
#!/usr/bin/python
f = open("/hadoop-fuse/var/examples/romeojuliet.txt", "r")
data = f.read()
f.close()
d = {}
for word in data.split(' '):
        if word in d:
                d[word] += 1
        else:
                d[word] = 1
for word, count in d.items():
        print word + str(count)
```

# Introduction to Spark on Cavium

Spark and PySpark utilize a container called Resilient Distributed Dataset (RDD) for storing and operating on data. The most important characteristic of Spark’s RDD is that it is immutable — once created, the data it contains cannot be updated. New RDDs can be created by transforming the data in another RDD, which is how analysis is done with Spark.

Using Spark’s native language, Scala, requires more setup than using PySpark. Some example Scala jobs, including the same example job in the PySpark documentation, can be found on this website. That Spark code has some trivial set up required to run a Spark job, and all of the actual logic is in the ‘run’ function.

## Using Pyspark



Spark comes with an interactive Python console, which can be opened this way:
```
# Load the pyspark console 
pyspark --master yarn --queue <your_queue>
```
This interactive console can be used for prototyping or debugging, or just running simple jobs.

The following example runs a simple line count on a text file, as well as counts the number of instances of the word “words” in that textfile. You can use any text file you have for this example:
```
>>> textFile = sc.textFile("test.txt")
>>> textFile.count()
>>> textFile.first()
>>> textFile.filter(lambda line: "words" in line).count()
```

You can also submit a job using PySpark without using the interactive console.

Save this file as job.py.
```
from pyspark import SparkConf, SparkContext
import sys

# This script takes two arguments, an input and output
if len(sys.argv) != 3:
  print('Usage: ' + sys.argv[0] + ' <in> <out>')
  sys.exit(1)

input = sys.argv[1]
output = sys.argv[2]

# Set up the configuration and job context
conf = SparkConf().setAppName('AnnualWordLength')
sc = SparkContext(conf=conf)


# Read in the dataset and immediately transform all the lines in arrays
data = sc.textFile(input).map(lambda line: line.split('\t'))

# Create the 'length' dataset as mentioned above. This is done using the next two variables, and the 'length' dataset ends up in 'yearlyLength'.
yearlyLengthAll = data.map(
    lambda arr: (int(arr[1]), float(len(arr[0])) * float(arr[2]))
)
yearlyLength = yearlyLengthAll.reduceByKey(lambda a, b: a + b)

# Create the 'words' dataset as mentioned above.
yearlyCount = data.map(
    lambda arr: (int(arr[1]), float(arr[2]))
).reduceByKey(
    lambda a, b: a + b
)

# Create the 'average_length' dataset as mentioned above.
yearlyAvg = yearlyLength.join(yearlyCount).map(
    lambda tup: (tup[0], tup[1][0] / tup[1][1])
)

# Save the results in the specified output directory.
yearlyAvg.saveAsTextFile(output)

# Finally, let Spark know that the job is done.
sc.stop()
```
This above script averages the lengths of words in the NGrams dataset by year. There are two main operations in the above code: ‘map’ and ‘reduceByKey’. ‘map’ applies a function to each RDD element and returns a new RDD containing the results. ‘reduceByKey’ applies a function to the group of values with the same key – for all keys – and returns an RDD with the result.

The job can be submitted by running:
```
spark-submit \
 --master yarn \
 --num-executors 35 \
 --executor-memory 5g \
 --executor-cores 4 \
 job.py /var/ngrams/data ngrams-out


hdfs dfs -cat ngrams-out/*
```
 

The only required argument from the above job submission command is ‘–master yarn’. The values passed to the other arguments may be modified in order to get better performance or conform to the limits of your queue.

Note: Our default Python is python x.x.x. We currently do not use the Conda distribution as there is not a version of Conda for the 64 bit ARM archtitecture at this time.  

## Using Spark Shell

Spark has an easy-to-use interactive shell that can be used to learn API and also analyze data interactively. Below is a simple example written in Scala. You can use any text file that you have:
```
spark-shell --master yarn --queue <your_queue>
scala> val textFile = spark.read.textFile("test.txt")
scala> textFile.count()
scala> textFile.first()
//Count how many lines contain the word "words"
//You can replace "words" with any word you'd like
scala> textFile.filter(line => line.contains("words")).count()
```

## Using Spark Submit

The following is a simple example of submitting a Spark job that uses an existing jar all users have access to. It estimates Pi, and the number at the end is the number of iterations it uses (more iterations = more accurate).
```
export SPARK_MAJOR_VERSION=2
cd /usr/hdp/current/spark2-client
spark-submit \
   --class org.apache.spark.examples.SparkPi \
   --master yarn \
   --queue <your_queue> \
examples/jars/spark-examples*.jar 10
```
Gradle is a popular build tool for Java and Scala. The following example is useful if you may be getting code from bitbucket, github, etc. This code can be downloaded and built by logging on to cavium-thunderx-arc-ts.umich.edu and running:
```
git clone https://bitbucket.org/umarcts/spark-examples
cd spark-examples
./gradlew jar
```
The last command, “./gradlew jar”, will download all dependencies, compile the code, run tests, and package all of the code into a Java ARchive (JAR). This JAR is submitted to the cluster to run a job. For example, the AverageNGramLength job can be launched by running:
```
spark-submit \
   --class com.alectenharmsel.examples.spark.AverageNGramLength \
   --master yarn \
   --executor-memory 3g \
   --num-executors 35 \
 build/libs/spark-examples-*-all.jar /var/ngrams/data ngrams-out
```
The output will be located in your home directory in a directory called ‘ngrams-out’, and can be viewed by running:
```
hdfs dfs -cat ngrams-out/* | tail -5
```

The output should look like this:

![spark output](https://arc-ts.umich.edu/wp-content/uploads/sites/4/2016/03/spark-output.png)

## Using SparkR


SparkR allows users to utilize the ease of data analysis in R while using the speed and capacity of Spark on our Hadoop cluster. Those familiar with R should have no problem utilizing this feature. After opening the SparkR session, simply begin typing out your program in R.

Run this to open a SparkR session, run this:
```
sparkR --master yarn --queue <your_queue> --num-executors 4 --executor-memory 1g --executor-cores 4
``` 

The following is an example you can run to get a feel for how SparkR works. This example was taken from the official SparkR documentation, which can be found here, along with other examples.
```
families <- c("gaussian", "poisson")
train <- function(family) {
 model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
 summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# Print the summary of each model
print(model.summaries)
```

## Using Parquet

If you’re familiar with Spark, you know that a dataframe is essentially a data structure that contains “tabular” data in memory. That is, it consists of rows and columns of data that can, for example, store the results of an SQL-style query. Dataframes can be saved into HDFS as Parquet files. Parquet files not only preserve the schema information of the dataframe, but will also compress the data when it gets written into HDFS. This means that the saved file will take up less space in HDFS and it will load faster if you read the data again later. Therefore, it is a useful storage format for data you may want to analyze multiple times.

The Pyspark example below uses Reddit data which is available to all Cavium Hadoop users in HDFS ‘/var/reddit’. This data consists of information about all posts made on the popular website Reddit, including their score, subreddit, text body, author, all of which can make for interesting data analysis.

#First, launch the pyspark shell

```
pyspark --master yarn --queue <your_queue> --num-executors 35 --executor-cores 4 --executor-memory 5g

#Load the reddit data into a dataframe

>>> reddit = sqlContext.read.json("/var/reddit/RS_2016-0*")

#Set compression type to snappy

>>> sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")

#Write data into a parquet file - this example puts it into your HDFS home directory as “reddit.parquet”

>>> reddit.write.parquet("reddit.parquet")

#Create a new dataframe from parquet file 

>>> parquetFile = sqlContext.read.parquet("reddit.parquet")

#Register dataframe as a SQL temporary table

>>> parquetFile.registerTempTable(“reddit_table")

#Query the table

#Can really be any query, but this query will find some of the more highly rated posts

>>> ask = sqlContext.sql(“SELECT title FROM reddit_table WHERE score > 1000 and subreddit = ‘AskReddit’”)

#Since we created the dataframe “ask” with the previous query, we can write it out to HDFS as a parquet file so it can be accessed again later

>>> ask.write.parquet(“ask.parquet”)

#Exit the pyspark console - you’ll view the contents of your parquet file after

>>> exit()
``` 

To view the contents of your Parquet file, use Parquet tools. Parquet tools is a command line tool that aids in the inspection of Parquet files, such as viewing its contents or its schema.
```
#view the output 

hadoop jar /sw/dsi/noarch/parquet-tools-1.7.0.jar cat \ 
ask.parquet

#view the schema; in this case, just the “title” of the askreddit thread 

hadoop jar /sw/dsi/noarch/parquet-tools-1.7.0.jar schema \ 
ask.parquet 

#to get a full list of all of the options when using Parquet tools  
hadoop jar /sw/dsi/noarch/parquet-tools-1.7.0.jar -h
```

# Other Tools

This section will be for other tools.

# Policies

This section is for policies for using the Cavium Data Science Cluster.

## Requesting new tools


If you'd like to request new tools, feel free to email <a href="mailto:hpc-support@umich.edu">hpc-support@umich.edu</a>.  Because of the pilot nature of the Cavium Data Science environment, it may not be possible to install it immediately.  

## Data Management


Because of the shared nature of the staging volume in the Cavium Data Science cluster, data has to have an active data lifecycle before it must be erased.  We will hold to a schedule of 4 weeks for data on the staging volume before it must be deleted.  The staging volume currently has 50 TB of space, so we must be mindful of usable space avsilable to all.  

Similarly, data within HDFS will also have a lifecycle, but exact details are yet to be determined.


## Keeping another copy of your data.

We recommend that all researchers back up a copy of your data, especially for very large datasets. You may wish to do so locally, but also we do offer the Data Den service for people who need to keep very large datasets cheaply.  It is close to the Cavium cluster for access, it will take some time to copy data back to the Data Science cluster, but it will be faster to retrieve.  