## Reading Data

## Text Files 
*[`sc.textFile()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.textFile)*

In [7]:
lines = sc.textFile("data/iris.csv")
print lines.count()
print lines.take(5)

151
[u'"","Sepal.Length","Sepal.Width","Petal.Length","Petal.Width","Species"', u'"1",5.1,3.5,1.4,0.2,"setosa"', u'"2",4.9,3,1.4,0.2,"setosa"', u'"3",4.7,3.2,1.3,0.2,"setosa"', u'"4",4.6,3.1,1.5,0.2,"setosa"']


In [10]:
lines_zip = sc.textFile("data/zips/1.txt.gz")
print lines_zip.count()
print lines_zip.take(5)

20
[u'1', u'2', u'3', u'4', u'5']


In [11]:
lines_zip = sc.textFile("data/zips/*.txt.gz")
print lines_zip.count()
print lines_zip.take(5)

200
[u'101', u'102', u'103', u'104', u'105']


Take care of the parameter for `sc.textFile`. By default it uses the `fs.defaultFS` configuration in the hadoop `core-site.xml`.

This configuration values differs from one setup to another:
- Developement: It is the local file system, relative to the notebook location.
- Azure HDInsights: It defaults to an azure storage container, accessible through the [`wasb://` scheme](https://azure.microsoft.com/en-us/documentation/articles/hdinsight-hadoop-use-blob-storage/).
- Amazon EMR: It defaults to `hdfs://`. This means that the data folder needs to be copied to HDFS on your running cluster. On amazon, you can also read data from S3 using the `s3n://` scheme

In [2]:
# Note that this only works inside amazon EMR
lines_zip = sc.textFile("s3n://spark-talk-data/data/zips/*.txt.gz")
print lines_zip.count()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.io.IOException: No FileSystem for scheme: s3n
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:57)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:374)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)


### CSVs

In [13]:
lines = sc.textFile("data/mtcars.csv")
print lines.count()
print lines.map(lambda x: x.split(",")).take(3)

33
[[u'""', u'"mpg"', u'"cyl"', u'"disp"', u'"hp"', u'"drat"', u'"wt"', u'"qsec"', u'"vs"', u'"am"', u'"gear"', u'"carb"'], [u'"Mazda RX4"', u'21', u'6', u'160', u'110', u'3.9', u'2.62', u'16.46', u'0', u'1', u'4', u'4'], [u'"Mazda RX4 Wag"', u'21', u'6', u'160', u'110', u'3.9', u'2.875', u'17.02', u'0', u'1', u'4', u'4']]


Better to try spark data frames

In [7]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

In [51]:
col_names = lines.first().split(",")
casts = [float]*len(col_names)
casts[0] = str

types = [FloatType]*len(col_names)
types[0] = StringType
fields = [StructField(col, col_type(), True) for col, col_type in zip(col_names, types)]
schema = StructType(fields)

data = lines.map(lambda x: x.split(",")).filter(lambda x: x!=col_names)
data = data.map(lambda row: map(lambda (cast, cell): cast(cell), zip(casts, row)))

mtcars =  sqlContext.createDataFrame(data, schema)
mtcars.printSchema()
mtcars.describe().show()

root
 |-- "": string (nullable = true)
 |-- "mpg": float (nullable = true)
 |-- "cyl": float (nullable = true)
 |-- "disp": float (nullable = true)
 |-- "hp": float (nullable = true)
 |-- "drat": float (nullable = true)
 |-- "wt": float (nullable = true)
 |-- "qsec": float (nullable = true)
 |-- "vs": float (nullable = true)
 |-- "am": float (nullable = true)
 |-- "gear": float (nullable = true)
 |-- "carb": float (nullable = true)

summary "mpg"             "cyl"             "disp"             "hp"              "drat"            "wt"               "qsec"             "vs"                "am"               "gear"             "carb"           
count   32                32                32                 32                32                32                 32                 32                  32                 32                 32               
mean    20.09062498807907 6.1875            230.72187399864197 146.6875          3.59656248241663  3.2172499895095825 17.848749935626984 

### External Databases (SQL, Mongo, Elasticsearch, ...)

Hadoop input formats are very flexible. These are accessible in spark through [`sc.newAPIHadoopRDD()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.newAPIHadoopRDD)

Looking at the function, we are placed back into the Java side of spark.

Trying on [MongoDB with spark](https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage)

In [52]:
# for this to run, this requires running pyspark with:
# `SPARK_CLASSPATH=/vagrant/meetup/mongo-hadoop-core-1.4.0.jar:/vagrant/meetup/mongo-java-driver-3.0.3.jar pyspark`
# or to add the env variable to the processing starting the notebook
rdd = sc.newAPIHadoopRDD(
    inputFormatClass='com.mongodb.hadoop.MongoInputFormat',
    keyClass='org.apache.hadoop.io.Text',
    valueClass='org.apache.hadoop.io.MapWritable',
    conf={
        'mongo.input.uri': 'mongodb://talk:talk@ds035693.mongolab.com:35693/spark-talk.minibars',
        'mongo.input.split.create_input_splits': 'false' # this is needed since: the mongodb above is not shared, and there is no access to a cluster manager account. 
    }
)
print rdd.take(1)
print rdd.count()

[({u'timeSecond': 1440791303, u'timestamp': 1440791303, u'counter': 2794334, u'__class__': u'org.bson.types.ObjectId', u'processIdentifier': 3375, u'time': 1440791303000L, u'date': datetime.datetime(2015, 8, 28, 19, 48, 23), u'machineIdentifier': 8324479}, {u'High': 24.33, u'Timestamp': u'2009-08-24 09:31', u'Symbol': u'MSFT', u'Volume': 207651, u'Low': 24.28, u'Close': 24.3, u'_id': {u'timeSecond': 1440791303, u'timestamp': 1440791303, u'counter': 2794334, u'__class__': u'org.bson.types.ObjectId', u'processIdentifier': 3375, u'time': 1440791303000L, u'date': datetime.datetime(2015, 8, 28, 19, 48, 23), u'machineIdentifier': 8324479}, u'Open': 24.32, u'Day': 24})]
97609


Another way would be to use the python mongo driver

In [1]:
# quick and dirty way to add python packages
pymongo_url = "https://pypi.python.org/packages/2.7/p/pymongo/pymongo-3.0.3-py2.7-macosx-10.10-intel.egg#md5=558e9021691bcf1051946d1f9e122d59"
pymongo_dest = "pymongo.egg"
import os
os.system("wget %s -O %s" %(pymongo_url, pymongo_dest))
sc.addPyFile(pymongo_dest) 
import pymongo

In [2]:
def get_collection():
    client = pymongo.MongoClient('mongodb://talk:talk@ds035693.mongolab.com:35693/spark-talk')
    db = client['spark-talk']
    return db.minibars

def get_split(skip, limit = 100):
    collection = get_collection()
    return collection.find().limit(limit).skip(skip)
    
count = get_collection().count()

splits_rdd = sc.parallelize(xrange(count/100))
data = splits_rdd.flatMap(get_split).cache()
print data.take(2)
print count, data.count()

[{u'Volume': 207651, u'Timestamp': u'2009-08-24 09:31', u'Symbol': u'MSFT', u'High': 24.33, u'Low': 24.28, u'Close': 24.3, u'_id': ObjectId('55e0bb077f057f0d2f2aa35e'), u'Open': 24.32, u'Day': 24}, {u'Volume': 683713, u'Timestamp': u'2009-08-24 09:30', u'Symbol': u'MSFT', u'High': 24.42, u'Low': 24.31, u'Close': 24.31, u'_id': ObjectId('55e0bb087f057f0d2f2aa35f'), u'Open': 24.41, u'Day': 24}]
97609 97600


In [15]:
def convert_id_to_string(row):
    row["_id"] = str(row["_id"])
    return row
data = data.map(convert_id_to_string)
minibars_df = sqlContext.createDataFrame(data.map(lambda row: pyspark.sql.Row(**row)))
minibars_df.printSchema()

root
 |-- Close: double (nullable = true)
 |-- Day: long (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Symbol: string (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- Volume: long (nullable = true)
 |-- _id: string (nullable = true)



## Writing Data

- [`rdd.saveAsTextFile()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.saveAsTextFile) Here you also have to take care of the file system scheme used in the paramter.
- [`rdd.saveAsSequenceFile()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.saveAsSequenceFile) This serializes an RDD to a binary file.
- For external databases, the same concepts can be used as in reading from external databases.
- [`rdd.saveAsPickleFile()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.saveAsPickleFile) This is another way of serializing data to a binary file.