# Loading data into RDD, DataFrame


We will focus on structured data here for the sake of time, since our focus should be on machine learning.

Loading unstructured data can be challanging, the lowest level API here is `textFile`, `wholeTextFiles` and `binaryRecords`. 

In some cases we would have to deal with data formats specific for a scientific collaboration. Example: CMS collaboration relies on root formats. Good news is, there are tools available for reading from root into Spark dataframes or RDDs.

For now, let us assume that someone has done the work of preparing and structring the data for us.

## Loading CSV

First, we are going to learn how to load data into structured CSV format. There is at least two ways to do that:

1) Read the files line by line with `textFiles()` method, splitting on a delimiter

Similarly to Python, there is a data structured designed to be used when working with structured data (I mean Pandas Dataframes), it is also called the Dataframe (a concept closely linked to Spark SQL). There is a way to read CSV directly into Spark dataframe 

2) csv Dataframe reader

Note: it used to be a third party spark-csv library developed and maintained by Databricks
https://github.com/databricks/spark-csv

In [1]:
def output_cleaner():
    import os
    os.system("rm -rf ./output*")
    print "Output folders removed!"

In [2]:
import csv
import sys
import StringIO
import os

prepath = "file://"+os.environ.get("HOME")+"/codas-ml"

#this one is use when you use textFile
def loadRecord(line,header,delimiter):
    """Parse a CSV line"""
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, delimiter=delimiter, fieldnames=header)
    return next(reader)

In [4]:
#Load into a regular RDD using textFile and parsing the CSV file line by line

delimiter = ","

# Try the record-per-line-input
input = sc.textFile(prepath+"/data/diamonds.csv")
header = input.first().split(delimiter)
data = input.filter(lambda x: header[0] not in x).map(lambda x: loadRecord(x,header,delimiter))

In [5]:
for item in data.take(3):
    print item

{u'"color"': 'E', u'""': '1', u'"carat"': '0.23', u'"cut"': 'Ideal', u'"table"': '55', u'"price"': '326', u'"clarity"': 'SI2', u'"depth"': '61.5', u'"x"': '3.95', u'"y"': '3.98', u'"z"': '2.43'}
{u'"color"': 'E', u'""': '2', u'"carat"': '0.21', u'"cut"': 'Premium', u'"table"': '61', u'"price"': '326', u'"clarity"': 'SI1', u'"depth"': '59.8', u'"x"': '3.89', u'"y"': '3.84', u'"z"': '2.31'}
{u'"color"': 'E', u'""': '3', u'"carat"': '0.23', u'"cut"': 'Good', u'"table"': '65', u'"price"': '327', u'"clarity"': 'VS1', u'"depth"': '56.9', u'"x"': '4.05', u'"y"': '4.07', u'"z"': '2.31'}


In [8]:
#this can be later converted to the dataframe

### CSV files to Dataframes (directly)

Spark CSV dataframe reader can handle delimiters, escaping, and skipping header lines for CSV files.

In [6]:
# Read csv data as DataFrame using spark csv dataframe reader
diamonds = spark.read.options(header='true', inferSchema='true').csv(prepath+"/data/diamonds.csv")

In [7]:
diamonds.show()

+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|_c0|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|  1| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
|  2| 0.21|  Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
|  3| 0.23|     Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
|  4| 0.29|  Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
|  5| 0.31|     Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
|  6| 0.24|Very Good|    J|   VVS2| 62.8| 57.0|  336|3.94|3.96|2.48|
|  7| 0.24|Very Good|    I|   VVS1| 62.3| 57.0|  336|3.95|3.98|2.47|
|  8| 0.26|Very Good|    H|    SI1| 61.9| 55.0|  337|4.07|4.11|2.53|
|  9| 0.22|     Fair|    E|    VS2| 65.1| 61.0|  337|3.87|3.78|2.49|
| 10| 0.23|Very Good|    H|    VS1| 59.4| 61.0|  338| 4.0|4.05|2.39|
| 11|  0.3|     Good|    J|    SI1| 64.0| 55.0|  339|4.25|4.28|2.73|
| 12| 0.23|    Ideal|    J|    VS1

In [8]:
diamonds.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- carat: double (nullable = true)
 |-- cut: string (nullable = true)
 |-- color: string (nullable = true)
 |-- clarity: string (nullable = true)
 |-- depth: double (nullable = true)
 |-- table: double (nullable = true)
 |-- price: integer (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



### Analyzing CSV files in Python as DataFrames

Let's try doing some basic queries to understand the dataset better.

In [9]:
diamonds.count()

53940

In [10]:
diamonds.select('color').distinct().show()

+-----+
|color|
+-----+
|    F|
|    E|
|    D|
|    J|
|    G|
|    I|
|    H|
+-----+



In [11]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import *

# Convert Price column from integer to type DoubleType - we are going to do some arithmetics on it
diamondsdf = diamonds.withColumn("price", diamonds["price"].cast(DoubleType()))

# Calculate average price per carat value
carat_avgPrice = (diamondsdf
                  .groupBy("carat")
                  .avg("price")
                  .withColumnRenamed("avg(price)", "avgPrice")
                  .orderBy(desc("avgPrice")))

# View top10 highest average prices and corresponding carat value
carat_avgPrice.show(10)

+-----+------------------+
|carat|          avgPrice|
+-----+------------------+
| 3.51|           18701.0|
| 2.67|           18686.0|
|  4.5|           18531.0|
| 5.01|           18018.0|
| 2.57|17841.666666666668|
|  2.6|           17535.0|
| 2.64|           17407.0|
| 4.13|           17329.0|
| 2.39|17182.428571428572|
| 2.71|           17146.0|
+-----+------------------+
only showing top 10 rows



In [12]:
# We can convert the DataFrame directly into an RDD
diamonds_rdd = diamonds.rdd

In [13]:
# View first 3 rows of the diamonds RDD of rows
diamonds_rdd.take(3)

[Row(_c0=1, carat=0.23, cut=u'Ideal', color=u'E', clarity=u'SI2', depth=61.5, table=55.0, price=326, x=3.95, y=3.98, z=2.43),
 Row(_c0=2, carat=0.21, cut=u'Premium', color=u'E', clarity=u'SI1', depth=59.8, table=61.0, price=326, x=3.89, y=3.84, z=2.31),
 Row(_c0=3, carat=0.23, cut=u'Good', color=u'E', clarity=u'VS1', depth=56.9, table=65.0, price=327, x=4.05, y=4.07, z=2.31)]

You can now use RDD operations to analyze the data.

In [14]:
# Diamond counts by cuts
countByGroup = diamonds_rdd.map(lambda x: (x.cut, 1)).reduceByKey(lambda x,y: x+y)
countByGroup.collect()

[(u'Ideal', 21551),
 (u'Good', 4906),
 (u'Premium', 13791),
 (u'Very Good', 12082),
 (u'Fair', 1610)]

In [15]:
# Distinct diamond clarities in dataset
distinctClarity = diamonds_rdd.map(lambda x: x.clarity).distinct()
distinctClarity.collect()

[u'SI2', u'SI1', u'VS1', u'I1', u'VS2', u'VVS1', u'VVS2', u'IF']

In [16]:
# Average price per diamond cut
avgPrice = diamonds_rdd.map(lambda x: (x.cut, float(x.price))).reduceByKey(lambda x,y: (x+y)/2)
avgPrice.collect()

[(u'Ideal', 2756.7240663718817),
 (u'Good', 2755.647409027791),
 (u'Premium', 2756.654813661215),
 (u'Very Good', 2756.7183661747795),
 (u'Fair', 2743.567771968392)]

### JSON files with Python

This notebook shows an example of how to load JSON data in Python notebooks and best practices for working with JSON data.

#### Loading JSON data with Spark SQL into a DataFrame

Spark SQL has built in support for reading in JSON files which contain a separate, self-contained JSON object per line. Multi-line JSON files are currently not compatible with Spark SQL.

In [17]:
testJsonData = spark.read.json(prepath+"/data/test.json")

In [18]:
testJsonData.printSchema()

root
 |-- array: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- dict: struct (nullable = true)
 |    |-- extra_key: string (nullable = true)
 |    |-- key: string (nullable = true)
 |-- int: long (nullable = true)
 |-- string: string (nullable = true)



In [19]:
testJsonData.show()

+---------+--------------------+---+-------+
|    array|                dict|int| string|
+---------+--------------------+---+-------+
|[1, 2, 3]|       [null,value1]|  1|string1|
|[2, 4, 6]|       [null,value2]|  2|string2|
|[3, 6, 9]|[extra_value3,val...|  3|string3|
+---------+--------------------+---+-------+



Spark SQL can infer the schema automatically from your JSON data. To view the schema, use printSchema.

### Parquet Files in Python

This notebook describes how to register a table in Spark SQL from parquet files.
Parquet Files are a great format for storing large tables in SparkSQL.
Consider converting text files with a schema into parquet files for more efficient storage.
Parquet provides a lot of optimizations under the hood to speed up your queries.
Just call ```bash .write.parquet``` on a DataFrame to encode in into Parquet.

In [32]:
from pyspark.sql import Row

array = [Row(key="a", group="vowels", value=1, someints=[1], map = {"a" : 1}),
         Row(key="b", group="consonants", value=2, someints=[2, 2], map = {"b" : 2}),
         Row(key="c", group="consonants", value=3, someints=[3, 3, 3], map = {"c" : 3}),
         Row(key="d", group="consonants", value=4, someints=[4, 4, 4, 4], map = {"d" : 4}),
         Row(key="e", group="vowels", value=5, someints=[5, 5, 5, 5, 5], map = {"3" : 5})]
dataframe = spark.createDataFrame(sc.parallelize(array))
dataframe.show()
# now that it's created, let's write it to disk
dataframe.write.parquet(prepath+"/output_parquet/testParquetFiles")

+----------+---+-----------+---------------+-----+
|     group|key|        map|       someints|value|
+----------+---+-----------+---------------+-----+
|    vowels|  a|Map(a -> 1)|            [1]|    1|
|consonants|  b|Map(b -> 2)|         [2, 2]|    2|
|consonants|  c|Map(c -> 3)|      [3, 3, 3]|    3|
|consonants|  d|Map(d -> 4)|   [4, 4, 4, 4]|    4|
|    vowels|  e|Map(3 -> 5)|[5, 5, 5, 5, 5]|    5|
+----------+---+-----------+---------------+-----+



<p>Continue on to the next exercise: [2_DataFrames.ipynb](2_DataFrames.ipynb).</p>