# SparkR ve SparkSQL kullanımı

[R ile Apache Spark a giriş, Hakan Sarıbıyık](https://github.com/vezir/spark-r-notebooks)

Bu notebook ile SparkR ın kullanımına [SparkR dokümanlarında](http://spark.apache.org/docs/latest/sparkr.html) verildiği şekilde bakacağız. Veriyi SparkSQL dataFrame e aktaracağız, sonrasında schemaya bakacağız. 

Veri setimiz, *ABD - California da yol kazalarında yaralanmaları 2002-2010* [Road Traffic Injuries 2002-2010](http://www.healthdata.gov/dataset/road-traffic-injuries-2002-2010) ile ilgili verileri içeriyor.

Bu veri setinde Kaliforniya'da yaşayan kişi ve mil başına olan trafik kazalarının yaya, otomobil, motorsiklet gibi kategorilerdeki istatistikleri, Kaliforniya'nın alt bölgeleri bazında verilmektedir. Veriyi doğrudan incelemek isterseniz [analiz için hazırlanmış sayfadan](https://cdph.data.ca.gov/Environment/Road-Traffic-Injuries-2002-2010/xmwz-xvsf) faydalanabilirsiniz. Var olan alanların neler olduğu ile ilgili bir [excel doküman](https://cdph.data.ca.gov/api/views/xmwz-xvsf/files/vFZ2-VvAdPb_6aOkATlLb19r3PpHHYGEgns1EH3kAQs?download=true&filename=RoadTrafficInjuries_DD.xlsx) da mevcuttur.

## SparkSQL context i yaratmak

Bu ve sonraki notebooklarda veriyi dataFrame aktarmak için öncelikle bir SparkSQL context e ihtiyacımız olacak. Ayrıca, SPARK_HOME gibi temel değişkenlere uygun değerleri atamamız da gerekiyor.

In [2]:
# Spark ın kurulduğu dizin
Sys.setenv(SPARK_HOME="/usr/local/spark")
# SparkR ın kurulduğu dizinden yüklenmesi için gerekiyor
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))

SparkR kütüphanesini yükleyelim.

In [3]:
library(SparkR)


Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    cov, filter, lag, na.omit, predict, sd, var

The following objects are masked from ‘package:base’:

    colnames, colnames<-, intersect, rank, rbind, sample, subset,
    summary, table, transform



Spark ı kullanabilmemiz için bir SparkContext te ihtiyacımız var. Bunu Spark ın [sayfasında](http://spark.apache.org/docs/latest/sparkr.html#starting-up-sparkcontext-sqlcontext) anlatıldığı şekilde yapacak olursak sparkR.init komutunu kullanmamız gerekiyor. Burada master olarak Spark ın bulundugu makinanın IP sini yada lokalde ise *local* kelimesini kullanıyoruz.

In [4]:
sc <- sparkR.init(master="local", sparkPackages="com.databricks:spark-csv_2.11:1.2.0")

Launching java with spark-submit command /usr/local/spark/bin/spark-submit  --packages com.databricks:spark-csv_2.11:1.2.0 sparkr-shell /tmp/RtmpVzZfyG/backend_port9534a1e1961 


Bu şekilde emrimizi bekleyen bir spark elde ettik. sparkPackages a koyduğumuz paket csv formatındaki dosyaları okumak için kullanılan bir paket. Artık dataFrame oluşturmak için gereken sparkSQL context i oluşturabiliriz. Çalıştırdığımız işlerin detay takibini standart olarak http://10.0.2.15:4040 adresinden browser yardımı ile yapabiliriz. Jupyter notebook un loglarından sizinkini görebilirisiniz. Artık sqlContext e geçebiliriz.

In [5]:
sqlContext <- sparkRSQL.init(sc)

## SparkSQL de data frame lerin yaratılması

## CSV dosyanın okunması
Databricks firmasının csv formatlı dosyalardan data frame oluşturmak için kullanıma sunduğu [paket](https://github.com/databricks/spark-csv) i kullanarak veriyi data frame e aktarıyoruz.

In [6]:
data_file_path <- '/home/dsuser/shared'

In [7]:
traffic_injuries_file_path <- file.path('','home','dsuser','shared','Road_Traffic_Injuries.txt')

In [8]:
print(traffic_injuries_file_path)

[1] "/home/dsuser/shared/Road_Traffic_Injuries.txt"


In [9]:
system.time(
    traffic_injuries_df <- read.df(sqlContext, 
                        paste('file:', traffic_injuries_file_path, sep=''), 
                        header='true', 
                        source = "com.databricks.spark.csv", 
                        inferSchema='true')
)

   user  system elapsed 
  0.000   0.000  19.541 

inferSchema='true' dediğimiz için dataFrame e aktarılırken veri yapısı otomatik olarak tespit edildi. Herhangi bir problem olup olmadığına bakalım. Bu noktada veri yapısını veren [excel dokümanını](https://cdph.data.ca.gov/api/views/xmwz-xvsf/files/vFZ2-VvAdPb_6aOkATlLb19r3PpHHYGEgns1EH3kAQs?download=true&filename=RoadTrafficInjuries_DD.xlsx)  kullanabiliriz. 

In [10]:
system.time(
    printSchema(traffic_injuries_df)
)

root
 |-- ind_id: integer (nullable = true)
 |-- ind_definition: string (nullable = true)
 |-- reportyear: string (nullable = true)
 |-- race_eth_code: integer (nullable = true)
 |-- race_eth_name: string (nullable = true)
 |-- geotype: string (nullable = true)
 |-- geotypevalue: long (nullable = true)
 |-- geoname: string (nullable = true)
 |-- county_name: string (nullable = true)
 |-- county_fips: integer (nullable = true)
 |-- region_name: string (nullable = true)
 |-- region_code: integer (nullable = true)
 |-- mode: string (nullable = true)
 |-- severity: string (nullable = true)
 |-- injuries: double (nullable = true)
 |-- totalpop: double (nullable = true)
 |-- poprate: double (nullable = true)
 |-- LL95CI_poprate: double (nullable = true)
 |-- UL95CI_poprate: double (nullable = true)
 |-- poprate_se: double (nullable = true)
 |-- poprate_rse: double (nullable = true)
 |-- CA_decile_pop: string (nullable = true)
 |-- CA_RR_poprate: double (nullable = true)
 |-- avmttotal: doubl

   user  system elapsed 
    0.0     0.0     0.1 

Burada eğer tipi yanlış belirlenmiş bir alan var ise düzeltebiliriz. İncelediğimizde aşağıdaki alanların dokümandaki tiplerine uymadıklarını görüyoruz. Örnek region_code schema da integer yapılmış ama dokümana göre string olmalı. Burada örnek olması açısından bunu düzeltelim.

```
geotypevalue : string
county_fips : string
region_code : string
CA_decile_pop : numeric
CA_decile_avmt : numeric
version : datetime (10/10/2014 12:00:00 AM)
```

In [11]:
traffic_injuries_df$region_code <- cast(traffic_injuries_df$region_code, "string")

In [None]:
region_code un string olduğunu kontrol edelim.

In [12]:
system.time(
    printSchema(traffic_injuries_df)
)

root
 |-- ind_id: integer (nullable = true)
 |-- ind_definition: string (nullable = true)
 |-- reportyear: string (nullable = true)
 |-- race_eth_code: integer (nullable = true)
 |-- race_eth_name: string (nullable = true)
 |-- geotype: string (nullable = true)
 |-- geotypevalue: long (nullable = true)
 |-- geoname: string (nullable = true)
 |-- county_name: string (nullable = true)
 |-- county_fips: integer (nullable = true)
 |-- region_name: string (nullable = true)
 |-- region_code: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- severity: string (nullable = true)
 |-- injuries: double (nullable = true)
 |-- totalpop: double (nullable = true)
 |-- poprate: double (nullable = true)
 |-- LL95CI_poprate: double (nullable = true)
 |-- UL95CI_poprate: double (nullable = true)
 |-- poprate_se: double (nullable = true)
 |-- poprate_rse: double (nullable = true)
 |-- CA_decile_pop: string (nullable = true)
 |-- CA_RR_poprate: double (nullable = true)
 |-- avmttotal: double

   user  system elapsed 
  0.020   0.004   0.040 

In [13]:
head(traffic_injuries_df)

Unnamed: 0,ind_id,ind_definition,reportyear,race_eth_code,race_eth_name,geotype,geotypevalue,geoname,county_name,county_fips,ellip.h,avmttotal,avmtrate,LL95CI_avmtrate,UL95CI_avmtrate,avmtrate_se,avmtrate_rse,CA_decile_avmt,CA_RR_avmtrate,groupquarters,version
1,753,Annual number of fatal and severe road traffic injuries per population and per miles traveled by transport mode,2002,9,Total,CA,6,California,,,⋯,326842416136.0,12.51062,12.12715,12.89408,0.1956456,1.563837,,1.0,823151.0,10/10/2014 12:00:00 AM
2,753,Annual number of fatal and severe road traffic injuries per population and per miles traveled by transport mode,2002,9,Total,CA,6,California,,,⋯,326842416136.0,41.12991,40.43462,41.8252,0.3547396,0.8624857,,1.0,823151.0,10/10/2014 12:00:00 AM
3,753,Annual number of fatal and severe road traffic injuries per population and per miles traveled by transport mode,2002,9,Total,CA,6,California,,,⋯,1214809885.0,69.9698,45.99164,93.94795,12.23375,17.48433,,1.0,,10/10/2014 12:00:00 AM
4,753,Annual number of fatal and severe road traffic injuries per population and per miles traveled by transport mode,2002,9,Total,CA,6,California,,,⋯,1214809885.0,452.7457,325.3094,580.1821,65.01855,14.36094,,1.0,,10/10/2014 12:00:00 AM
5,753,Annual number of fatal and severe road traffic injuries per population and per miles traveled by transport mode,2002,9,Total,CA,6,California,,,⋯,,,,,,,,,823151.0,10/10/2014 12:00:00 AM
6,753,Annual number of fatal and severe road traffic injuries per population and per miles traveled by transport mode,2002,9,Total,CA,6,California,,,⋯,,,,,,,,,823151.0,10/10/2014 12:00:00 AM


In [None]:
Verinin satır sayısını öğrenelim.

In [14]:
nrow(traffic_injuries_df)

Bu noktada önemli bir konu *traffic_injuries_df* nin DataFrame nesnesi olarak SparkSQL de kullanılmasıdır.
Fakat R daki data.frame lerin her kullanıldığı yerde kullanılamaz. SparkR için geliştirilen komutlar
kapsamında DataFrame nesneleri kullanılabilir. Eğer normal R komutları kullanılacaksa R daki data.frame lere dönüştürülmelidir.
SparkSQL dışında spark ın komutları kullanılacaksa [Resilient Distributed Dataset (RDD)](http://spark.apache.org/docs/latest/quick-start) ye çevrilmesi gerekir. Yani, aynı veriyi üç ayrı şekilde
tutabiliyoruz. Bunu herzaman akılda tutmak gerekiyor.

Örnek olarak *traffic_injuries_df* e *str* komutu ile baktığımızda;

In [15]:
str(traffic_injuries_df)

Formal class 'DataFrame' [package "SparkR"] with 2 slots
  ..@ env:<environment: 0x3b45b60> 
  ..@ sdf:Class 'jobj' <environment: 0x2b36e60> 


Burada DataFrame nesnesi SparkSQL komutları ile işlem yapmamızı sağlıyor.

In [26]:
traffic_injuries_dfx <- filter(
    traffic_injuries_df, 
   traffic_injuries_df$injuries >  13400
)

In [28]:
nrows <- nrow(traffic_injuries_dfx)
nrows
head(traffic_injuries_dfx)

Unnamed: 0,ind_id,ind_definition,reportyear,race_eth_code,race_eth_name,geotype,geotypevalue,geoname,county_name,county_fips,ellip.h,avmttotal,avmtrate,LL95CI_avmtrate,UL95CI_avmtrate,avmtrate_se,avmtrate_rse,CA_decile_avmt,CA_RR_avmtrate,groupquarters,version
1,753,Annual number of fatal and severe road traffic injuries per population and per miles traveled by transport mode,2002,9,Total,CA,6,California,,,⋯,326842416136,41.12991,40.43462,41.8252,0.3547396,0.8624857,,1,823151,10/10/2014 12:00:00 AM
2,753,Annual number of fatal and severe road traffic injuries per population and per miles traveled by transport mode,2004,9,Total,CA,6,California,,,⋯,333917717425,40.66271,39.97875,41.34668,0.3489622,0.8581873,,1,824661,10/10/2014 12:00:00 AM


Şimdide verinin genel istatistiklerine gözatalım. Önce komutun tanımına bakalım.

In [29]:
?summary

0,1
describe {SparkR},R Documentation

0,1
x,A DataFrame to be computed.
col,A string of name
...,Additional expressions
object,A fitted MLlib model


In [30]:
system.time(
    traffic_injuries_dfx_summary <- describe(traffic_injuries_dfx)
)

   user  system elapsed 
  0.132   0.016  18.193 

In [31]:
str(traffic_injuries_dfx_summary)

Formal class 'DataFrame' [package "SparkR"] with 2 slots
  ..@ env:<environment: 0x3823548> 
  ..@ sdf:Class 'jobj' <environment: 0x3830c88> 


Spark DataFrame tipini, R daki bildiğimiz data frame yapısına çevirmek istediğimizde *collect* i kullanıyoruz.

In [33]:
?collect

0,1
collect {SparkR},R Documentation

0,1
x,A SparkSQL DataFrame
stringsAsFactors,(Optional) A logical indicating whether or not string columns should be converted to factors. FALSE by default.


Bunu şöyle anlayabiliriz.

In [32]:
str(collect(traffic_injuries_dfx_summary))

'data.frame':	5 obs. of  34 variables:
 $ summary        : chr  "count" "mean" "stddev" "min" ...
 $ ind_id         : chr  "2" "753.0" "0.0" "753" ...
 $ ind_definition : chr  "2" NA NA "Annual number of fatal and severe road traffic injuries per population and per miles traveled by transport mode" ...
 $ reportyear     : chr  "2" "2003.0" "1.4142135623730951" "2002" ...
 $ race_eth_code  : chr  "2" "9.0" "0.0" "9" ...
 $ race_eth_name  : chr  "2" NA NA "Total" ...
 $ geotype        : chr  "2" NA NA "CA" ...
 $ geotypevalue   : chr  "2" "6.0" "0.0" "6" ...
 $ geoname        : chr  "2" NA NA "California" ...
 $ county_name    : chr  "2" NA NA "" ...
 $ county_fips    : chr  "0" NA NA NA ...
 $ region_name    : chr  "2" NA NA "" ...
 $ region_code    : chr  "0" NA NA NA ...
 $ mode           : chr  "2" NA NA "All modes" ...
 $ severity       : chr  "2" NA NA "Severe Injury" ...
 $ injuries       : chr  "2" "13510.5" "95.45941546018392" "13443.0" ...
 $ totalpop       : chr  "2" "3.533228

Bu tanıdık geldi. R daki data.frame! Bu yapıda R komutlarını rahatlıkla kullanabiliriz. DataFrame nesnesinde olduğumuzda sadece SparkR ile bize verilen fonksiyonları kullanabiliyoruz. 

In [16]:
collect(traffic_injuries_dfx_summary)

Unnamed: 0,summary,ind_id,ind_definition,reportyear,race_eth_code,race_eth_name,geotype,geotypevalue,geoname,county_name,ellip.h,avmttotal,avmtrate,LL95CI_avmtrate,UL95CI_avmtrate,avmtrate_se,avmtrate_rse,CA_decile_avmt,CA_RR_avmtrate,groupquarters,version
1,count,494226.0,494226,494226.0,494226.0,494226,494226,494226.0,494226,494226,⋯,12320.0,11774.0,11774.0,11774.0,11774.0,11774.0,494226.0,11774.0,59418.0,494226
2,mean,753.0,,2005.8811921576623,9.0,,,4431001576.189009,1847.7611721738963,,⋯,3030276587.57078,63.90611302781159,20.57549144469093,122.50280012022058,29.896268924698862,52.30289196387992,5.50058207217695,2.658148755860837,9092.740300918913,
3,stddev,0.0,,2.5442331668765643,0.0,,,3256269125.340502,2376.7218177940317,,⋯,23100238529.79673,130.3678309097662,57.25034022249537,290.4571520031402,89.09777729450262,39.04897780494879,2.8690840032038887,5.41543275468215,49322.98790573563,
4,min,753.0,Annual number of fatal and severe road traffic injuries per population and per miles traveled by transport mode,2002.0,9.0,Total,CA,1.0,0001.00,,⋯,0.0,0.674050088276818,0.0,1.60823584930548,0.136645672215764,0.858187328755105,,0.091088010318861,0.0,10/10/2014 12:00:00 AM
5,max,753.0,Annual number of fatal and severe road traffic injuries per population and per miles traveled by transport mode,2010.0,9.0,Total,RE,99999999999.0,Zayante CDP,Yuba,⋯,336306148012.608,4949.82133549228,770.840342077224,11809.9237101896,3500.05223198844,223.606797749979,9.0,246.054811599176,834673.0,10/10/2014 12:00:00 AM


Tabii burada sadece istatistik olarak anlamlı olan kolonları dikkate almak gerekir. Örnek olarak 
- injuries : yaralanma sayısı
- poprate : Toplam nufüs içinde yaralanma oranı (Her 100,000 insan için)

alırsak

In [34]:
collect(select(traffic_injuries_dfx_summary,"summary","injuries", "poprate"))

Unnamed: 0,summary,injuries,poprate
1,count,2.0,2.0
2,mean,13510.5,38.2409056063083
3,stddev,95.45941546018392,0.3232867015540521
4,min,13443.0,38.012307387372
5,max,13578.0,38.4695038252446


Bu notebook un sonuna geldik. Neler yaptık, 
- CSV formatlı bir veriyi SparkR kullanarak SparkSQL dataFrame ine aktardık. 
- Veride bazı tip düzenlemelerin gerektiğini gördük, birini düzelttik. 
- Kolon bazında özet istatistiklerini aldık. 
- SparkSQL deki DataFrame ile R deki data.frame arasında operasyon açısından nasıl bir fark olduğuna değindik.

[Sonraki notebook](https://github.com/vezir/spark-r-notebooks/blob/master/notebooks/3-dataFrameOperations/dataFrameOperations.ipynb) da biraz daha detaylı işlemler yapacağız.