# Loading CSV Data Files with SparkR
This is the "Hello, World!" for loading data from CSV files into Spark.

This example is specific for the configuration of the Institute for Insight Hadoop/Spark cluster (as of Fall 2016). We're currently running Spark 1.6.1.



    $ hdfs dfs -ls /user/mgrace/red_cross/
    Found 3 items
    -rw-r--r--   3 mgrace hadoop   142603317 2016-09-02 10:24 /user/mgrace/red_cross/donor_deferral912016.csv
    -rw-r--r--   3 mgrace hadoop  4763358313 2016-09-02 10:24 /user/mgrace/red_cross/donor_summary912016.csv
    -rw-r--r--   3 mgrace hadoop 10551624919 2016-09-02 10:26 /user/mgrace/red_cross/donor_transactions912016.csv

## Initialize Spark
The following lines load the "SparkR" library, start a Spark session, and initialize the SQL context. If this step needs to be repeated a current Spark session should be shut-down first. These commands can also be exectued in RStudio.

**References**:
- https://github.com/jadianes/spark-r-notebooks/blob/master/notebooks/nb0-starting-up/nb0-starting-up.ipynb
- https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/functions.html

In [1]:
#
# Loading library
#
Sys.setenv(SPARK_HOME='/usr/hdp/2.4.2.0-258/spark')
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths()))
library(SparkR)
#
# Starting SparkR session (run `sparkR.stop()` before starting a new session)
#
#sparkR.stop()    ### this command will stop the running Spark session
sc <- sparkR.init(sparkPackages = "com.databricks:spark-csv_2.11:1.2.0")
sqlctx <- sparkRSQL.init(sc)


Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    cov, filter, lag, na.omit, predict, sd, var

The following objects are masked from ‘package:base’:

    colnames, colnames<-, endsWith, intersect, rank, rbind, sample,
    startsWith, subset, summary, table, transform



Launching java with spark-submit command /usr/hdp/2.4.2.0-258/spark/bin/spark-submit  --packages com.databricks:spark-csv_2.11:1.2.0 sparkr-shell /tmp/Rtmpy8cjYz/backend_port18905263a7f5a 


## Reading Data Files
Let's start with the smallest (and simplest) data file `/user/mgrace/red_cross/donor_deferral912016.csv`

In [108]:
df_deferral <- read.df(sqlctx, "/user/mgrace/red_cross/donor_deferral912016.csv", "com.databricks.spark.csv", header="true")
schema(df_deferral)

StructType
|-name = "arc_id", type = "StringType", nullable = TRUE
|-name = "deferral_start_date", type = "StringType", nullable = TRUE
|-name = "deferral_end_date", type = "StringType", nullable = TRUE

All fields are of type `StringType`. The 'spark-csv' package is able to infer the table schema from the data itself. However, this can be fairly time consuming, and may not always suffice.

In [131]:
df_deferral <- read.df(sqlctx, "/user/mgrace/red_cross/donor_deferral912016.csv", "com.databricks.spark.csv", header="true", inferSchema="true")
schema(df_deferral)

StructType
|-name = "arc_id", type = "IntegerType", nullable = TRUE
|-name = "deferral_start_date", type = "StringType", nullable = TRUE
|-name = "deferral_end_date", type = "StringType", nullable = TRUE

The columns `deferral_start_date` and `deferral_end_date` are still strings, even though we see that they hold dates. The alterntive is to define custom schema.

## Define a Custom Schema
The following example shows how to create a custom schema for a table and apply it to the loading process. We define the first column as "integer" and the other two as "date". In this step we may also change the names of the columns. The names in the custom schema override those headers in the CSV file.

In [3]:
deferralSchema <- structType(
    structField("arc_id", "integer", nullable = TRUE),
    structField("deferral_start_date", "date", nullable = TRUE),
    structField("deferral_end_date", "date", nullable = TRUE))

In [4]:
df_deferral <- read.df(sqlctx, "/user/mgrace/red_cross/donor_deferral912016.csv", "com.databricks.spark.csv", header="true", schema=deferralSchema)
schema(df_deferral)

StructType
|-name = "arc_id", type = "IntegerType", nullable = TRUE
|-name = "deferral_start_date", type = "DateType", nullable = TRUE
|-name = "deferral_end_date", type = "DateType", nullable = TRUE

Everything looks good. Unfortunately it's not. Because Spark uses a *lazy evaluation* sheme nothing has been executed, yet. Once we actually call on getting some data we will learn that our custom schema fails.

In [135]:
##
## Execute the command below
##
#head(df_deferral)

ERROR: Error in invokeJava(isStatic = TRUE, className, methodName, ...): org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 47.0 failed 1 times, most recent failure: ...


We have to take ***one step back***. Our date columns are not read properly because of to the format "yyyy/mm/dd" in which they were stored. We need to resort back to strings.

We give the two date columns diffent names by preceding them with `"tmp"`. How to use the renamed columns and add columns of the original table with the proper data type will be shown in the next section.

In [2]:
deferralSchema_2 <- structType(
    structField("arc_id", "integer", nullable = TRUE),
    structField("tmp_deferral_start_date", "string", nullable = TRUE),
    structField("tmp_deferral_end_date", "string", nullable = TRUE))

In [3]:
df_deferral <- read.df(sqlctx, "/user/mgrace/red_cross/donor_deferral912016.csv", "com.databricks.spark.csv", header="true",
                       schema=deferralSchema_2)
schema(df_deferral)

StructType
|-name = "arc_id", type = "IntegerType", nullable = TRUE
|-name = "tmp_deferral_start_date", type = "StringType", nullable = TRUE
|-name = "tmp_deferral_end_date", type = "StringType", nullable = TRUE

In [4]:
head(df_deferral)

Unnamed: 0,arc_id,tmp_deferral_start_date,tmp_deferral_end_date
1,60391835,2002/06/13,2003/06/14
2,77352072,2007/04/21,2008/04/22
3,59320300,2001/11/07,2003/06/01
4,70083734,2000/10/05,2000/10/18
5,50383290,1997/07/09,1997/07/15
6,67244463,2001/11/12,2222/02/02


Let's look at the next data file:

In [5]:
df_summary <- read.df(sqlctx, "/user/mgrace/red_cross/donor_summary912016.csv", "com.databricks.spark.csv", header="true", nullValue='')
schema(df_summary)

StructType
|-name = "bzd_assessedhomevalue", type = "StringType", nullable = TRUE
|-name = "bzd_avg_bank_credit6", type = "StringType", nullable = TRUE
|-name = "bzd_avg_inq_all12", type = "StringType", nullable = TRUE
|-name = "bzd_avg_maxcred_install6", type = "StringType", nullable = TRUE
|-name = "bzd_avg_mos_autopay", type = "StringType", nullable = TRUE
|-name = "bzd_avg_numauto12", type = "StringType", nullable = TRUE
|-name = "bzd_descretionary_spend", type = "StringType", nullable = TRUE
|-name = "bzd_income", type = "StringType", nullable = TRUE
|-name = "bzd_lengthofresidence", type = "StringType", nullable = TRUE
|-name = "bzd_mortg_equity", type = "StringType", nullable = TRUE
|-name = "bzd_numberadultsinhh", type = "StringType", nullable = TRUE
|-name = "bzd_numberofchildrenhh", type = "StringType", nullable = TRUE
|-name = "bzd_realty_mortgremain", type = "StringType", nullable = TRUE
|-name = "bzd_realty_mospay", type = "StringType", nullable = TRUE
|-name = "bzd_tt_all

The summary table has 93 columns. Instead of creating a custom schema for this table we may just use the default `StringType` and convert selected columns as we need them. This is demonstrated in the next section.

## Adding Columns, Fixing Dates
Our data files use the format `yyyy/mm/dd`. If we where to use the type `"date"` in the schema our load procedure would fail. Therefore, we read data fields as strings into temporary columns.

In [6]:
schema(df_deferral)

StructType
|-name = "arc_id", type = "IntegerType", nullable = TRUE
|-name = "tmp_deferral_start_date", type = "StringType", nullable = TRUE
|-name = "tmp_deferral_end_date", type = "StringType", nullable = TRUE

In [7]:
head(df_deferral)

Unnamed: 0,arc_id,tmp_deferral_start_date,tmp_deferral_end_date
1,60391835,2002/06/13,2003/06/14
2,77352072,2007/04/21,2008/04/22
3,59320300,2001/11/07,2003/06/01
4,70083734,2000/10/05,2000/10/18
5,50383290,1997/07/09,1997/07/15
6,67244463,2001/11/12,2222/02/02


We can, now, use the `selectExpr` method to create a new data frame with some of the original columns, and some that are calculated.
Expresssions are of the same form as the ones in "SELECT" statements in SparkSQL. A list of available functions can be found at https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/functions.html

The first argument of `selectExpr` is a (Spark) data frame. The remaining arguments are strings of expressions, whereby the simplest expression would be just a column name.

Transforming data frames into new once with computed columns is common practice in Spark. The transformation aren't actually performed until we aggregate or collect rows. Spark uses *lazy evaluation*.

In [8]:
df_deferral2 <- selectExpr(df_deferral, "arc_id"
                ,"unix_timestamp(tmp_deferral_start_date, 'yyyy/MM/dd') as deferral_start_ts"
                ,"from_unixtime(unix_timestamp(tmp_deferral_start_date, 'yyyy/MM/dd')) as deferral_start_string"
                ,"to_date(from_unixtime(unix_timestamp(tmp_deferral_start_date, 'yyyy/MM/dd'))) as deferral_start_date"
                ,"unix_timestamp(tmp_deferral_end_date, 'yyyy/MM/dd') as deferral_end_ts"
                ,"from_unixtime(unix_timestamp(tmp_deferral_end_date, 'yyyy/MM/dd')) as deferral_end_string"
                ,"to_date(from_unixtime(unix_timestamp(tmp_deferral_end_date, 'yyyy/MM/dd'))) as deferral_end_date"
               )
schema(df_deferral2)

StructType
|-name = "arc_id", type = "IntegerType", nullable = TRUE
|-name = "deferral_start_ts", type = "LongType", nullable = TRUE
|-name = "deferral_start_string", type = "StringType", nullable = TRUE
|-name = "deferral_start_date", type = "DateType", nullable = TRUE
|-name = "deferral_end_ts", type = "LongType", nullable = TRUE
|-name = "deferral_end_string", type = "StringType", nullable = TRUE
|-name = "deferral_end_date", type = "DateType", nullable = TRUE

In [9]:
head(df_deferral2)

Unnamed: 0,arc_id,deferral_start_ts,deferral_start_string,deferral_start_date,deferral_end_ts,deferral_end_string,deferral_end_date
1,60391835,1023940800,2002-06-13 00:00:00,11851,1055563200,2003-06-14 00:00:00,12217
2,77352072,1177128000,2007-04-21 00:00:00,13624,1208836800,2008-04-22 00:00:00,13991
3,59320300,1005109200,2001-11-07 00:00:00,11633,1054440000,2003-06-01 00:00:00,12204
4,70083734,970718400,2000-10-05 00:00:00,11235,971841600,2000-10-18 00:00:00,11248
5,50383290,868420800,1997-07-09 00:00:00,10051,868939200,1997-07-15 00:00:00,10057
6,67244463,1005541200,2001-11-12 00:00:00,11638,7955125200,2222-02-02 00:00:00,92073


The '\_ts' columns show the result of the `unix_timestamp` function. The UNIX time is represented as the number of elapsed seconds since midnight, January 1, 1970. This function allows for a format argument; in this case "yyyy/MM/dd".

Converting unix time back into text format with `from_unixtime` shows the official datetime format that Spark (and Java) use. As seen in the '\_string' columns.

The actual '\_date' columns also contain numbers, which are not that useful when looking at the data. However, this data format is essential for the many date functions that Spark offers, as the following example demonstrates.

In [10]:
df3 <- selectExpr(df_deferral2, "arc_id", "deferral_start_date"
                 ,"year(deferral_start_date) as start_year"
                 ,"month(deferral_start_date) as start_month"
                 ,"day(deferral_start_date) as start_day"
                 )

In [11]:
schema(df3)

StructType
|-name = "arc_id", type = "IntegerType", nullable = TRUE
|-name = "deferral_start_date", type = "DateType", nullable = TRUE
|-name = "start_year", type = "IntegerType", nullable = TRUE
|-name = "start_month", type = "IntegerType", nullable = TRUE
|-name = "start_day", type = "IntegerType", nullable = TRUE

In [12]:
head(df3)

Unnamed: 0,arc_id,deferral_start_date,start_year,start_month,start_day
1,60391835,11851,2002,6,13
2,77352072,13624,2007,4,21
3,59320300,11633,2001,11,7
4,70083734,11235,2000,10,5
5,50383290,10051,1997,7,9
6,67244463,11638,2001,11,12


## What's next?
The advantage of using Spark is to work on very large data sets that would not fit into the computers memory as regular R data.frame.
One should try to perform as much cleaning, filtering, grouping, and sorting in Spark and then pull some (`head`, `take`) or all (`collect`) rows into an R data.frame.
See https://spark.apache.org/docs/1.6.1/api/R/index.html for a list of SparkR functions.

## What can go wrong?
Yes, a lot of things can get wrong. The R process is 'talking' to the Spark environment, and some of the instructions and expressions that are send to Spark may cause some the background Spark processes to crash. Error messages can be very cryptic. Here are some steps:
- Try if you're still talking to Spark by looking at your DataFrame: `head(df_deferral)`
- If you lost the connecion to Spark you may have to reinitialize the Spark context. First kill the Spark session with `sparkR.stop()`. Then go through the initialization process
- If nothing works restart the kernel suing the "Kernel" drop-down in the menu bar. Outside of the Jupyter Notebook kill your R process.
- Check if there is a rouge process on the system by using `top` command the UNIX command line:
<pre>
Tasks: 986 total,   1 running, 985 sleeping,   0 stopped,   0 zombie
%Cpu(s):  0.1 us,  0.1 sy,  0.0 ni, 99.9 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem : 23083510+total, 83742512 free, 32968732 used, 11412386+buff/cache
KiB Swap:  1023996 total,   520828 free,   503168 used. 19664708+avail Mem 
</pre>
<pre>
   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
117451 molnar    20   0 12.054g 492288  25764 S   1.0  0.2   0:23.74 java
124997 molnar    20   0  147092   3024   1424 R   1.0  0.0   0:00.17 top
  1248 git       20   0 3345800 402508   2248 S   0.7  0.2 336:22.19 ruby
   155 root      20   0       0      0      0 S   0.3  0.0   1:24.71 rcuos/0
  2000 named     20   0 2496732  41816   1428 S   0.3  0.0  70:50.91 named
  3517 git       20   0  568496 274944   4748 S   0.3  0.1   0:12.59 ruby
100061 mgrace    20   0 17.339g 0.016t  20308 S   0.3  7.6   2:05.82 rsession
106136 rfu3      20   0  464880  52380   7128 S   0.3  0.0   1:04.64 jupyterhub-sing
138708 zxiao1    20   0  766980  55488   6776 S   0.3  0.0   0:38.54 python3.4
     1 root      20   0  190936   3644   1664 S   0.0  0.0   2:46.86 systemd
     2 root      20   0       0      0      0 S   0.0  0.0   0:00.20 kthreadd
     3 root      20   0       0      0      0 S   0.0  0.0   0:01.25 ksoftirqd/0
     5 root       0 -20       0      0      0 S   0.0  0.0   0:00.00 kworker/0:0H
     8 root      rt   0       0      0      0 S   0.0  0.0   0:03.03 migration/0
</pre>
If there's a process on top that uses a huge amount of CPU percentage, and it's yours, kill it with `kill -9 PID` where `PID` is the process ID show in the far left column.

In [13]:
### Good Bye!  --- It's good practice to close the Spark sessionm
sparkR.stop()