# Reading input from Ceph Object Store with Apache Spark on OpenShift

In this demonstration we will load textual data from [Ceph](http://ceph.com/) using [S3 API](http://docs.ceph.com/docs/master/radosgw/s3/). There are two key pieces of information to get from this demonstration,

0. First, loading of the S3 client libraries (hadoop-aws)
1. Second, configuring the client with Ceph/S3 credentials.


### Important - load the S3 client libraries

This uses some Jupyter line magic to put **--packages** on the pyspark command line for the kernel.

In [None]:
%set_env PYSPARK_SUBMIT_ARGS=--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell

### Configure Notebook parameters

The following parameters must be filled according to your environment. If you have a Ceph environment deployed using **Ceph Nano**, you can get the access keys and the ceph IP with the `cn cluster status <cluster-name>` command.

In [None]:
access_key = '' # Add your S3 Access Key here
secret_key = '' # Add your S3 Secret Key here
bucket_name = "ceph-source"

ceph_host = '' # Add your rgw0 Ceph host here
ceph_port = 8000

spark_cluster = '' # add your Spark Cluster name here

## Setup a bucket

Before create a job in our Spark cluster, let's create a bucket using S3 API:

In [None]:
import boto
import boto.s3.connection

conn = boto.connect_s3(
        aws_access_key_id = access_key,
        aws_secret_access_key = secret_key,
        host = ceph_host,
        port = ceph_port,
        is_secure=False,
        calling_format = boto.s3.connection.OrdinaryCallingFormat()
        )

bucket = conn.create_bucket("my-bucket")

print "Bucket {} created!".format("my-bucket")

## Put a file in the Bucket

In [None]:
from boto.s3.key import Key

object_key = "spark-test"
object_value = "/opt/spark/README.md"

bucket = conn.get_bucket("my-bucket")

k = Key(bucket)
k.key = object_key
k.set_contents_from_filename(object_value)

print "Object {} added in bucket {}!".format(object_key, "my-bucket")

## List contents in the bucket

In [None]:
for key in bucket.list():
    print "{name}\t{size}\t{modified}".format(
        name = key.name,
        size = key.size,
        modified = key.last_modified,
    )

Setup your SparkSession as you normally would.

In [None]:
import pyspark

conf=pyspark.SparkConf().setMaster('spark://{}:7077'.format(spark_cluster)) \
     .set('spark.driver.host', 'base-notebook') \
     .set('spark.driver.port', 42000) \
     .set('spark.driver.bindAddress', '0.0.0.0') \
     .set('spark.driver.blockManager.port', 42100)
spark=pyspark.SparkContext(conf=conf)

### Important - configure the S3 client with your credentials
Don't store your credentials in code, use [Secrets](https://kubernetes.io/docs/user-guide/secrets/). Do use [AWS IAM](https://aws.amazon.com/iam/) and credentials with only the capabilities needed for your application.

In [None]:
hadoopConf=spark._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.access.key", access_key)
hadoopConf.set("fs.s3a.secret.key", secret_key)
hadoopConf.set("fs.s3a.endpoint", "{}:8080".format(ceph_host))
hadoopConf.set("fs.s3a.connection.ssl.enabled", "false")

This is a simple test to see what workers are available in your cluster.

In [None]:
import socket
spark._jsc.sc().getExecutorMemoryStatus().size()

## Read a simple text file from S3

In [None]:
df0 = spark.textFile("s3a://ceph-source/ceph-source")

In [None]:
df0.count()

In [None]:
df0.take(10).show()

In [None]:
from operator import add
df0.flatMap(lambda x: list(x[0])).map(lambda x: (x, 1)).reduceByKey(add).sortBy(lambda x: x[1], ascending=False).take(5)

## Word count example

In [None]:
counts = df0.flatMap(lambda line: line[0].split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(add) \
             .take(10)
counts