This notebook is intended for Python 2 with Spark 2.0. It uses SparkSession to load a CSV file stored in Bluemix object storage into a dataframe, filters that data, then writes the filtered data to a previoulsy created Cloudant database. This example notebook loads a CSV file containing Child Care providers in Massachusetts downloaded from https://data.mass.gov/Education/Program-list-for-Child-Care-Search-1-15-2015/cb6m-ccic

This first cell simply verifies the version of Spark you are using.

In [None]:
spark.version

Cell 2: Replace the contents of the first cell by following these steps:
1. Displaying the Files slide out panel.
2. Select the Insert to code menu for your file, and select Insert Credentials.
3. Replace the name of the inserted array with credentials_621 as referenced in the rest of the code. 

In [None]:
# @hidden_cell
credentials_621 = {
  'auth_url':'https://identity.open.softlayer.com',
  'project':'object_storage_xxxxxxxx',
  'project_id':'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
  'region':'dallas',
  'user_id':'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
  'domain_id':'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
  'domain_name':'xxxxxxxx',
  'username':'member_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
  'password':"""xxxxxxxxxxxxxxxx""",
  'container':'CloudantSparkIntegration',
  'tenantId':'undefined',
  'filename':'Program_list_for_Child_Care_Search_1-15-2015.csv'
}

Cell 3: The following cell imports SparkSession from pyspark.sql. SparkSession is the entry point to programming Spark with the Dataset and DataFrame API.
Next, the code defines a variable to set the credentials for authentication for the Bluemix Object Storage.

In [None]:
from pyspark.sql import SparkSession

# @hidden_cell
# This function is used to setup the access of Spark to your Object Storage. The definition contains your credentials.
# You might want to remove those credentials before you share your notebook.
def set_hadoop_config_with_credentials_620ad16a(name):
    """This function sets the Hadoop configuration so it is possible to
    access data from Bluemix Object Storage using Spark"""

    prefix = 'fs.swift.service.' + name
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + '.auth.url', credentials_621['auth_url']+'/v3/auth/tokens')
    hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')
    hconf.set(prefix + '.tenant', credentials_621['project_id'])
    hconf.set(prefix + '.username', credentials_621['user_id'])
    hconf.set(prefix + '.password', credentials_621['password'])
    hconf.setInt(prefix + '.http.port', 8080)
    hconf.set(prefix + '.region', credentials_621['region'])
    hconf.setBoolean(prefix + '.public', False)

# you can choose any name
name = 'keystone'
set_hadoop_config_with_credentials_620ad16a(name)

spark = SparkSession.builder.getOrCreate()

Cell 4: The following cell reads the CSV file into a data frame, infers the schema, and then displays the first two entries.

In [None]:
massdata = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option('timestampFormat', 'MM/dd/yyyy')\
  .option('inferSchema', 'true')\
  .load('swift://' + credentials_621['container'] + '.' + name + '/' + credentials_621['filename'])
massdata.take(2)

Cell 5: The following cell prints the schema and a record count of the data frame contents.

In [None]:
massdata.printSchema()
massdata.count()

Cell 6: The following cell displays the first 30 values in the Session1Name field. Notice that there are null values.

In [None]:
massdata.select("Session1Name").show(30)

Cell 7: The following cell filters the data to just those facilities that have a specified Session1Name. Then it displays the first two entries and a count of the filtered data.

In [None]:
sessiondata = massdata.filter(massdata.Session1Name.isNotNull())
sessiondata.show(2)
sessiondata.count()

Cell 8: The following cell displays the first 30 values in the Session1Name field. Notice that there are NO null values.

In [None]:
sessiondata.select("Session1Name").show(30)

Cell 9: The following cell writes the contents of the sessiondata data frame to a Cloudant database called child_care. Note: The Cloudant database MUST already exist.

In [None]:
sessiondata.write.format("com.cloudant.spark") \
  .option("cloudant.host","xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-bluemix.cloudant.com") \
  .option("cloudant.username","xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-bluemix") \
  .option("cloudant.password","xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") \
  .save("child_care")