# SparkSQL basics with SparkR  

[**Introduction to Apache Spark with R by J. A. Dianes**](https://github.com/jadianes/spark-r-notebooks)

In this notebook we will introduce basic concepts about SparkSQL with R that you can find in the [SparkR documentation](http://spark.apache.org/docs/latest/sparkr.html), applied to the [2013 American Community Survey dataset](http://www.census.gov/programs-surveys/acs/data/summary-file.html). We will do two things, read data into a SparkSQL data frame, and have a quick look at the schema and what we have read. 

## Creating a SparkSQL context

In further notebooks, we will explore our data by loading them into SparkSQL data frames. But first we need to init a SparkSQL context. The first thing we need to do is to set up some environment variables and library paths as follows. Remember to replace the value assigned to `SPARK_HOME` with your Spark home folder.  

In [1]:
# Set Spark home and R libs
# Sys.setenv(SPARK_HOME='/home/cluster/spark-1.5.0-bin-hadoop2.6')
# .libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths()))

Now we can load the `SparkR` library as follows.

In [2]:
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))


Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    filter, na.omit

The following objects are masked from ‘package:base’:

    intersect, rbind, sample, subset, summary, table, transform



And now we can initialise the Spark context as [in the official documentation](http://spark.apache.org/docs/latest/sparkr.html#starting-up-sparkcontext-sqlcontext). In our case we are use a standalone Spark cluster with one master and seven workers. If you are running Spark in local node, use just `master='local'`. Additionally, we require a Spark package from Databricks to read CSV files (more on this in the next section).  

In [3]:
sc <- sparkR.init(master='spark://169.254.206.2:7077', sparkPackages="com.databricks:spark-csv_2.11:1.2.0")

Launching java with spark-submit command /home/cluster/spark-1.5.0-bin-hadoop2.6/bin/spark-submit  --packages com.databricks:spark-csv_2.11:1.2.0 sparkr-shell /tmp/RtmpyC8rUQ/backend_port3743359adb78 


And finally we can start the SparkSQL context as follows.

In [4]:
sqlContext <- sparkRSQL.init(sc)

## Creating SparkSQL data frames

### Reading CSV data using Databricks csv extension

The easiest way to get our CSV data into a [SparkSQL dataframe](http://spark.apache.org/docs/latest/sparkr.html#creating-dataframes), is by using [Databricks CSV extension](https://github.com/databricks/spark-csv) to read SparkSQL dataframes directly from csv files. In any case, remember to set the right path for your data files in the first line, ours is `/nfs/data/2013-acs/ss13husa.csv`.  

In [5]:
housing_a_file_path <- file.path('', 'nfs','data','2013-acs','ss13husa.csv')
housing_b_file_path <- file.path('', 'nfs','data','2013-acs','ss13husb.csv')

Now let's read into a SparkSQL dataframe. We need to pass four parameters in addition to the `sqlContext`:  

- The file path.  
- `header='true'` since our `csv` files have a header with the column names. 
- Indicate that we want the library to infer the schema.  
- And the source type (the Databricks package in this case).  

In [6]:
system.time(
    housing_a_df <- read.df(sqlContext, 
                        housing_a_file_path, 
                        header='true', 
                        source = "com.databricks.spark.csv", 
                        inferSchema='true')
)

   user  system elapsed 
  0.002   0.000  16.919 

Let's have a look at the inferred schema.

In [7]:
system.time(
    printSchema(housing_a_df)
)

root
 |-- RT: string (nullable = true)
 |-- SERIALNO: integer (nullable = true)
 |-- DIVISION: integer (nullable = true)
 |-- PUMA: integer (nullable = true)
 |-- REGION: integer (nullable = true)
 |-- ST: integer (nullable = true)
 |-- ADJHSG: integer (nullable = true)
 |-- ADJINC: integer (nullable = true)
 |-- WGTP: integer (nullable = true)
 |-- NP: integer (nullable = true)
 |-- TYPE: integer (nullable = true)
 |-- ACCESS: integer (nullable = true)
 |-- ACR: integer (nullable = true)
 |-- AGS: integer (nullable = true)
 |-- BATH: integer (nullable = true)
 |-- BDSP: integer (nullable = true)
 |-- BLD: integer (nullable = true)
 |-- BROADBND: integer (nullable = true)
 |-- BUS: integer (nullable = true)
 |-- COMPOTHX: integer (nullable = true)
 |-- CONP: integer (nullable = true)
 |-- DIALUP: integer (nullable = true)
 |-- DSL: integer (nullable = true)
 |-- ELEP: integer (nullable = true)
 |-- FIBEROP: integer (nullable = true)
 |-- FS: integer (nullable = true)
 |-- FULP: integer

   user  system elapsed 
  0.002   0.000   0.062 

Looks good. Let's have a look at the first few rows.  

In [8]:
head(housing_a_df)

Unnamed: 0,RT,SERIALNO,DIVISION,PUMA,REGION,ST,ADJHSG,ADJINC,WGTP,NP,ellip.h,wgtp71,wgtp72,wgtp73,wgtp74,wgtp75,wgtp76,wgtp77,wgtp78,wgtp79,wgtp80
1,H,84,6,2600,3,1,1000000,1007549,0,1,⋯,0,0,0,0,0,0,0,0,0,0
2,H,154,6,2500,3,1,1000000,1007549,51,4,⋯,86,53,59,84,49,15,15,20,50,16
3,H,156,6,1700,3,1,1000000,1007549,449,1,⋯,161,530,601,579,341,378,387,421,621,486
4,H,160,6,2200,3,1,1000000,1007549,16,3,⋯,31,24,33,7,7,13,18,23,23,5
5,H,231,6,2400,3,1,1000000,1007549,52,1,⋯,21,18,37,49,103,38,49,51,46,47
6,H,286,6,900,3,1,1000000,1007549,76,1,⋯,128,25,68,66,80,26,66,164,88,24


And let's count how many rows do we have in the first dataset. For that we will use `nrow` as we do with regular R data frames. Let's have a look at the documentation.  

In [9]:
?nrow

0,1
count {SparkR},R Documentation

0,1
x,A SparkSQL DataFrame


It is just a definition of `nrow` by the package *SparkR*.

In [10]:
nrow(housing_a_df)

Let's read the second housing data frame and count the number of rows.

In [11]:
system.time(
    housing_b_df <- read.df(sqlContext, 
                        housing_b_file_path, 
                        header='true', 
                        source = "com.databricks.spark.csv", 
                        inferSchema='true')
)

   user  system elapsed 
  0.128   0.016   9.666 

In [12]:
print(nrow(housing_b_df))

[1] 720248


### Merging data frames

Now we can use `rbind()` as we do with regular R data frames to put both of them together. Let's have a look at the documentation.  

In [13]:
?rbind

0,1
rbind {SparkR},R Documentation


Again, SparkR redefines many of the common R functions to work with SparkSQL data frames. Let's actually use `rbind` as follows.

In [14]:
housing_df <- rbind(housing_a_df, housing_b_df)

And let's count how many rows do we have in the complete data frame.

In [15]:
system.time(
    housing_samples <- nrow(housing_df)
)
print(housing_samples)

   user  system elapsed 
  0.001   0.000  17.206 

[1] 1476313


Finally, let's get a feeling of what is to explore data using SparkR by using the `summary` function on the data frame. Let's first have a look at the documentation.

In [16]:
?summary

0,1
describe {SparkR},R Documentation

0,1
x,A DataFrame to be computed.
col,A string of name
...,Additional expressions


We can see that we are redirected to `describe`, that optionally accepts column names. Let's use it with the whole data frame, that is, the 231 columns and 1476313 rows. *Note*: We use `collect` here because the results of `describe` are given as a `DataFrame` object and we need to print them in the notebook.

In [19]:
system.time(
    housing_summary <- describe(housing_df)
)

   user  system elapsed 
  0.174   0.016 197.755 

In [20]:
collect(housing_summary)

Unnamed: 0,summary,RT,SERIALNO,DIVISION,PUMA,REGION,ST,ADJHSG,ADJINC,WGTP,ellip.h,wgtp71,wgtp72,wgtp73,wgtp74,wgtp75,wgtp76,wgtp77,wgtp78,wgtp79,wgtp80
1,count,1476313,1476313.0,1476313.0,1476313.0,1476313.0,1476313.0,1476313.0,1476313.0,1476313.0,⋯,1476313.0,1476313.0,1476313.0,1476313.0,1476313.0,1476313.0,1476313.0,1476313.0,1476313.0,1476313.0
2,mean,,746447.7617598707,5.114928202894643,4458.457590632881,2.630594596132392,27.8048550679971,1000000.0,1007549.0,89.95933585899468,⋯,89.95847289836234,89.9587072660066,89.95868220357065,89.95941511048132,89.95922341671448,89.95888473514763,89.95931147392186,89.95866933367111,89.95865239959276,89.95870862073286
3,stddev,,,2.476670855019605,,1.0144176501154056,15.894404191598897,,,80.66731786485431,⋯,99.43323120400837,100.08749152422924,98.37291356061534,99.52607767089758,98.67766416960376,99.11341616484188,99.05644177173914,98.94559614502197,97.87387141885895,100.18396018153416
4,min,H,1.0,1.0,100.0,1.0,1.0,1000000.0,1007549.0,0.0,⋯,-8.0,-494.0,-14.0,-151.0,-38.0,-5.0,-3.0,-16.0,-22.0,-243.0
5,max,H,1492845.0,9.0,70301.0,4.0,56.0,1000000.0,1007549.0,1829.0,⋯,2282.0,2328.0,2393.0,2348.0,2263.0,2310.0,2131.0,2794.0,2710.0,2447.0


Or we can select individual column summaries using `select` as follows (here is [a dictionary](http://www2.census.gov/programs-surveys/acs/tech_docs/pums/data_dict/PUMSDataDict13.txt) of each column meaning) where `VALP` is the property value.  

In [25]:
collect(select(housing_summary,"VALP"))

Unnamed: 0,VALP
1,859691.0
2,247682.84302150423
3,
4,100.0
5,4775000.0


And that's it. In this notebook we have shown how to load a CSV file into an SparkSQL data frame using SparkR. We also had a look at the data loaded, mainly to the number of samples loaded, and the data summary.