# PySpark training for data engineers
## 02. Data Ingestion

### Goal

* Create an XML file with the notebook and retrieve the content using PySpark. The output is written to a pickle to be used in the next notebook.
* Creating two CSV files with the notebook and retrieve the content using PySpark. Output is written to a single pickle.

### Highlights

* `spark.wholeTextFiles()` reads one file or directory at once.
* `rdd.saveAsPickleFile()` saves an RDD straight to a pickle that can be re-used.

### Implementation
For every Spark job, **always create a context**. This basically means you get one (or more) containers running a Spark job allocated for you.

In [1]:
from pyspark import SparkConf, SparkContext
config = SparkConf().setMaster('local')
spark = SparkContext.getOrCreate(conf=config)

Verify the Spark context has been created:

In [2]:
spark

#### XML ingestion

In [3]:
%%file test.xml
<teststructure>
    <info>testfile</info>
    <outerlist>
        <elt>
            <subelement>One</subelement>
        </elt>
        <elt>
            <subelement>Two</subelement>
        </elt>
    </outerlist>
</teststructure>

Overwriting test.xml


Create a data object by reading the XML file into an RDD. By specifying `file://` we indicate we want to read from the local file system. Often Spark is used with Hadoop and the file is read from the Hadoop filesystem and will have an URL like `hdfs://`.

Check where we are working now to get the absolute path to the XML file:

In [4]:
!pwd

/home/jovyan


In [9]:
rdd = spark.wholeTextFiles('file://///home/jovyan/test.xml')

We need to call `collect()` in order to see the content of the rdd, since it is a lazy execution.

In [10]:
for file in rdd.collect():
    filename, filecontent = file
    print('Found %s' % filename)
    print('Content:\n %s' % filecontent)

Found file:/home/jovyan/test.xml
Content:
 <teststructure>
    <info>testfile</info>
    <outerlist>
        <elt>
            <subelement>One</subelement>
        </elt>
        <elt>
            <subelement>Two</subelement>
        </elt>
    </outerlist>
</teststructure>


We can see that the content is indeed the same as we specified on top of this notebook. Lets use the method `saveAsPickleFile` to save the RDD to a variable on disk which we can use in the next notebook.

In [11]:
rdd.saveAsPickleFile('xml-pickle-file')

Py4JJavaError: An error occurred while calling o38.saveAsObjectFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/home/jovyan/xml-pickle-file already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:283)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.SequenceFileRDDFunctions$$anonfun$saveAsSequenceFile$1.apply$mcV$sp(SequenceFileRDDFunctions.scala:69)
	at org.apache.spark.rdd.SequenceFileRDDFunctions$$anonfun$saveAsSequenceFile$1.apply(SequenceFileRDDFunctions.scala:54)
	at org.apache.spark.rdd.SequenceFileRDDFunctions$$anonfun$saveAsSequenceFile$1.apply(SequenceFileRDDFunctions.scala:54)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.SequenceFileRDDFunctions.saveAsSequenceFile(SequenceFileRDDFunctions.scala:54)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsObjectFile$1.apply$mcV$sp(RDD.scala:1520)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsObjectFile$1.apply(RDD.scala:1520)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsObjectFile$1.apply(RDD.scala:1520)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.saveAsObjectFile(RDD.scala:1517)
	at org.apache.spark.api.java.JavaRDDLike$class.saveAsObjectFile(JavaRDDLike.scala:565)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsObjectFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


#### CSV ingestion

In [12]:
%%file csvfile01.csv
john,doe,male,32
jake,doe,male,16

Overwriting csvfile01.csv


In [13]:
%%file csvfile02.csv
jane,doe,female,31
janet,doe,female,13

Overwriting csvfile02.csv


We can use a wild card to read several files into the RDD at once.

In [14]:
csvrdd = spark.wholeTextFiles('file://///home/jovyan/*.csv')

Check the files that the reader picked up:

In [15]:
for filename, filecontent in csvrdd.collect():
    print(filename)

file:/home/jovyan/csvfile01.csv
file:/home/jovyan/csvfile02.csv


Save the RDD to disk:

In [16]:
csvrdd.saveAsPickleFile('csv-pickle-file')

Py4JJavaError: An error occurred while calling o49.saveAsObjectFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/home/jovyan/csv-pickle-file already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:283)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.SequenceFileRDDFunctions$$anonfun$saveAsSequenceFile$1.apply$mcV$sp(SequenceFileRDDFunctions.scala:69)
	at org.apache.spark.rdd.SequenceFileRDDFunctions$$anonfun$saveAsSequenceFile$1.apply(SequenceFileRDDFunctions.scala:54)
	at org.apache.spark.rdd.SequenceFileRDDFunctions$$anonfun$saveAsSequenceFile$1.apply(SequenceFileRDDFunctions.scala:54)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.SequenceFileRDDFunctions.saveAsSequenceFile(SequenceFileRDDFunctions.scala:54)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsObjectFile$1.apply$mcV$sp(RDD.scala:1520)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsObjectFile$1.apply(RDD.scala:1520)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsObjectFile$1.apply(RDD.scala:1520)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.saveAsObjectFile(RDD.scala:1517)
	at org.apache.spark.api.java.JavaRDDLike$class.saveAsObjectFile(JavaRDDLike.scala:565)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsObjectFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
