# Reading and Writing Data with Spark

This notebook contains the code from the previous screencast. The only difference is that instead of reading in a dataset from a remote cluster, the data set is read in from a local file. You can see the file by clicking on the "jupyter" icon and opening the folder titled "data".

Run the code cell to see how everything works. 

First let's import SparkConf and SparkSession

In [1]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

Since we're using Spark locally we already have both a sparkcontext and a sparksession running. We can update some of the parameters, such our application's name. Let's just call it "Our first Python Spark SQL example"

In [2]:
spark = SparkSession \
    .builder \
    .appName("Our first Python Spark SQL example") \
    .getOrCreate()

Let's check if the change went through

In [3]:
spark.sparkContext.getConf().getAll()

[('spark.app.name', 'Our first Python Spark SQL example'),
 ('spark.driver.port', '62533'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1612160490053'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.host', '192.168.146.183')]

In [4]:
spark

As you can see the app name is exactly how we set it

Let's create our first dataframe from a fairly small sample data set. Througout the course we'll work with a log file data set that describes user interactions with a music streaming service. The records describe events such as logging in to the site, visiting a page, listening to the next song, seeing an ad.

In [5]:
path = "/Users/ai.tran/src/pyviz/flights-viz/data/24.tsv"
daystream = spark.read.csv(path)

In [6]:
daystream.printSchema()

root
 |-- _c0: string (nullable = true)



In [8]:
from pyspark.sql.window import Window as W
from pyspark.sql import functions as F

In [9]:
daystream2 = daystream.withColumn("line_num", F.monotonically_increasing_id())

In [10]:
daystream2.take(1)

[Row(_c0='_hc\t1611446400\t_hs\t0\t_c\t1611446400\t_s\t0\ttype\tposition\tident\tAWQ320\tchildID\t197\t_t\tb\tadsb_version\t0\tairGround_src\tA\tairground\tA\talt\t367\talt_ft\t36700\talt_gnss_src\tA\talt_pressure\t36700\talt_src\tA\tbitmask\t0\tclock\t1611446391\tcombid\t1611446400-7\tdest\tWMKK\tfacility\tfAT-b68f52fd-d1f7-4043-94d4-beb0526d16e5\tfeed\tADEPT22\tfeed_c\t1611446399\tfeed_s\t4180\tflightlevel\t367\tflightlevel_pressure\t367\tfp\tAWQ320-1611267240-schedule-0289:0\tgSource\tfeed\tgs\t501\tgs_src\tA\thSource\tfeed\theading\t296\theading_magnetic\t297.1\theading_src\tA\thexid\t8A02CB\tlat\t1.83881\tlon\t103.30525\tmach\t0.760\tnac_p\t7\tnac_v\t0\tnav_alt\t30000\tnav_qnh\t1009.0\torig\tWARR\totherPedigrees\t{aireon sita_ups wide} {sita_ups wide} wide\tpedigree\taireon wide\tpos_nic\t7\tpos_rc\t371\tposition_src\tA\tpreferred\t1\tpressure\t220\tprovenance\twide\trecvd\t1611446400\treg\tPKAXU\troll\t0.0\tscore\t1000\tsil\t2\tsil_type\tunknown\tspeed_ias\t248\tspeed_tas\t442\ts

In [12]:
daystream2.show(n=5)

+--------------------+--------+
|                 _c0|line_num|
+--------------------+--------+
|_hc	1611446400	_h...|       0|
|_hc	1611446400	_h...|       1|
|_hc	1611446400	_h...|       2|
|_hc	1611446400	_h...|       3|
|_hc	1611446400	_h...|       4|
+--------------------+--------+
only showing top 5 rows



In [15]:
daystream2.select(
    "line_num",
    F.split("_c0", '\t'),
    F.posexplode(F.split("_c0", '\t')).alias("pos", "val")
)\
.show(200)

+--------+--------------------+---+--------------------+
|line_num|   split(_c0, 	, -1)|pos|                 val|
+--------+--------------------+---+--------------------+
|       0|[_hc, 1611446400,...|  0|                 _hc|
|       0|[_hc, 1611446400,...|  1|          1611446400|
|       0|[_hc, 1611446400,...|  2|                 _hs|
|       0|[_hc, 1611446400,...|  3|                   0|
|       0|[_hc, 1611446400,...|  4|                  _c|
|       0|[_hc, 1611446400,...|  5|          1611446400|
|       0|[_hc, 1611446400,...|  6|                  _s|
|       0|[_hc, 1611446400,...|  7|                   0|
|       0|[_hc, 1611446400,...|  8|                type|
|       0|[_hc, 1611446400,...|  9|            position|
|       0|[_hc, 1611446400,...| 10|               ident|
|       0|[_hc, 1611446400,...| 11|              AWQ320|
|       0|[_hc, 1611446400,...| 12|             childID|
|       0|[_hc, 1611446400,...| 13|                 197|
|       0|[_hc, 1611446400,...|

In [21]:
daystream2.select(
    "line_num",
    F.split("_c0", '\t').alias("line"),
    F.posexplode(F.split("_c0", '\t')).alias("pos", "val")
)\
.drop("val")\
.select(
    "line_num",
    F.concat(F.lit("col"),F.col("pos").cast("string")).alias("name"),
    F.expr("line[pos]").alias("val")
)\
.groupBy("line_num").pivot("name").agg(F.first("val"))\
.show(10)

Py4JJavaError: An error occurred while calling o219.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 14.0 failed 1 times, most recent failure: Lost task 7.0 in stage 14.0 (TID 693, 192.168.146.183, executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
	at org.apache.spark.io.ReadAheadInputStream.<init>(ReadAheadInputStream.java:105)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:74)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:473)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:172)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at org.apache.spark.sql.execution.aggregate.SortAggregateExec.$anonfun$doExecute$2(SortAggregateExec.scala:80)
	at org.apache.spark.sql.execution.aggregate.SortAggregateExec.$anonfun$doExecute$2$adapted(SortAggregateExec.scala:77)
	at org.apache.spark.sql.execution.aggregate.SortAggregateExec$$Lambda$3507/1150688301.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:859)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:859)
	at org.apache.spark.rdd.RDD$$Lambda$2366/1594000826.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2004/644433456.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
	at org.apache.spark.io.ReadAheadInputStream.<init>(ReadAheadInputStream.java:105)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:74)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:473)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:172)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at org.apache.spark.sql.execution.aggregate.SortAggregateExec.$anonfun$doExecute$2(SortAggregateExec.scala:80)
	at org.apache.spark.sql.execution.aggregate.SortAggregateExec.$anonfun$doExecute$2$adapted(SortAggregateExec.scala:77)
	at org.apache.spark.sql.execution.aggregate.SortAggregateExec$$Lambda$3507/1150688301.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:859)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:859)
	at org.apache.spark.rdd.RDD$$Lambda$2366/1594000826.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2004/644433456.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
