# Part 1: RDD Review

## Transformations

In [None]:

val elements = List("claro", "corona", "telcel", "cemex", "claro", "bimbo", "oxxo", "bigdata", "bbva", "banamex", "claro")
val elementsRDD = sc.parallelize(elements)
elementsRDD.glom().take(2)

### map()

In [None]:
elementsRDD.map(word => word.toUpperCase).collect()

### flatMap()

In [None]:
val collectionElements = sc.parallelize(List(List(1,2,3), List(1), List(3,4,5,6)))
collectionElements.collect()

In [None]:
collectionElements.flatMap(listas => listas).collect()

### filter()

In [None]:
val listToFilter = sc.parallelize(List("paranoid android","nude","no surprises","house of cards","airbag","creep"))
listToFilter.filter(song => song.startsWith("n")).collect()

### groupBy()

In [None]:
val listToGroup = sc.parallelize(List("Rodrigo", "Lucas", "Mauricio", "Marcos", "Carlos", "Camilo", "Romina", "Ruben"))
val listGrouped = listToGroup.groupBy(nombre => nombre.charAt(0))
listGrouped.collect()

### groupByKey()

In [None]:
val pairRDD = sc.parallelize(List(('a', 10),('b', 9),('a', 5),('a', 9),('b', 8),('b', 8)))
val pairRDDGrouped = pairRDD.groupByKey()
pairRDDGrouped.collect()

### reduceByKey

In [None]:
val pairRDDRBK = pairRDD.reduceByKey((v1, v2) => v1 + v2)
pairRDDRBK.collect()

### join()

In [None]:
val x = sc.parallelize(List(("a", 1), ("b", 2), ("c", 6)))
val y = sc.parallelize(List(("a", 3), ("a", 4), ("b", 5)))

x.join(y).collect()

### distinct()

In [None]:
val x = sc.parallelize(List(("a", 1), ("a", 2), ("a", 1), ("a", 3)))
x.distinct().collect()

### coalesce()

In [None]:
val numbersRDD = sc.parallelize(1 until 51 toList)
numbersRDD.getNumPartitions

In [None]:
numbersRDD.coalesce(2).getNumPartitions

### zip()

In [None]:
val x = sc.parallelize(List(1, 2, 3 , 4, 5))
val y = sc.parallelize(List("a", "b", "c", "d", "e"))

x.zip(y).collect()

## Actions

### reduce()

In [None]:
elementsRDD.reduce((w1, w2) => w1 + w2)

# Lab 1
## word count

In [None]:
// Generate RDD from scala list
val wordList =  List("boat","cat","house","river","boat","rat","elephant")
val wordsRDD =  sc.parallelize(wordList)
wordsRDD.getClass()

In [None]:
// Create a function to pluralize simple words
def makePlural(word: String): String = {
     word+"s"
}

print(makePlural("cat"))

In [None]:
// Apply makePlural function to RDD
val pluralRDD =  wordsRDD.map(x => makePlural(x))// FILLIN
pluralRDD.collect()

In [None]:
// Apply lambda function to pluralize simple words
val pluralLambdaRDD = wordsRDD.map(_ + 's')
pluralLambdaRDD.collect

In [None]:
// Make RDD with length of each word
val pluralLengths = pluralRDD.map(word => word.size)
pluralLengths.collect()

In [None]:
// Word count

// From wordsRDD create a pair RDD where each element is a pair tuple (k, v) consisting of (<word>, 1)
// using map()

val wordPairs = wordsRDD.map(word =>(word, 1))
wordPairs.collect

In [None]:
// count each word with groupByKey approach

val wordsGrouped = wordPairs.groupByKey 
wordsGrouped.collect

In [None]:
// get the v size for count each word
val wordCountsGrouped = wordsGrouped.map(pair => (pair._1, pair._2.size ))
wordCountsGrouped.collect

In [None]:
// count by key approach
wordPairs.countByKey.take(10)

In [None]:
// reduceByKey approach
val wordCounts = wordPairs.reduceByKey(_ + _)
wordCounts.take(10)

In [None]:
// all together with reduce by key

val wordCountsCollected = wordsRDD.map(w => (w, 1)).reduceByKey(_ + _)
wordCountsCollected.take(10)

In [None]:
// count unique words
val uniqueWords = wordCountsCollected.count
uniqueWords

In [None]:
// Finding the mean number of words per unique
val totalCount = wordCountsCollected.map(x => x._2).reduce(_ + _) 
val average = totalCount / wordCounts.count.toDouble
println(totalCount)
print(wordCounts.count.toDouble)
"%.2f".format(average)

In [None]:
// wordCount function

import org.apache.spark.rdd.RDD
def wordCount(rdd: RDD[String]): RDD[(String, Int)] = {
    rdd.map(x => (x, 1)).reduceByKey((a, b) => a + b)
}
wordCount(wordsRDD).collect

In [None]:
// RemovePunctuation function

def removePunctuation(text: String): String = {
    val regex = "([^A-Za-z1-9 ]*)".r
    regex.replaceAllIn(text, "").trim.toLowerCase
}

println(removePunctuation("Hi, you!"))
println(removePunctuation(" No under_score!"))
println(removePunctuation(" *      Remove punctuation then spaces  * "))

In [None]:
val shakespeareString = """
Project Gutenberg’s The Complete Works of William Shakespeare, by William
Shakespeare

This eBook is for the use of anyone anywhere in the United States and
most other parts of the world at no cost and with almost no restrictions
whatsoever.  You may copy it, give it away or re-use it under the terms
of the Project Gutenberg License included with this eBook or online at
www.gutenberg.org.  If you are not located in the United States, you’ll
have to check the laws of the country where you are located before using
this ebook.

See at the end of this file: * CONTENT NOTE (added in 2017) *


Title: The Complete Works of William Shakespeare

Author: William Shakespeare

Release Date: January 1994 [EBook #100]
Last Updated: February 19, 2018

Language: English

Character set encoding: UTF-8

*** START OF THIS PROJECT GUTENBERG EBOOK THE COMPLETE WORKS OF WILLIAM SHAKESPEARE ***



The Complete Works of William Shakespeare



by William Shakespeare
"""

In [None]:
// Create RDD
val shakespeareRDD = sc.parallelize(shakespeareString.split('\n'))

shakespeareRDD.collect

In [None]:
// clean empty lines an punctuation
val shakespeareRDDclean = shakespeareRDD.filter(x => x != "").map(x => removePunctuation(x))
shakespeareRDDclean.take(10)

In [None]:
// Words from lines
val shakespeareWordsRDD = shakespeareRDDclean.flatMap(x => x.split(" "))
shakespeareWordsRDD.collect

In [None]:
shakespeareWordsRDD.count

In [None]:
// count empty words
shakespeareWordsRDD.filter(x => x == "").count

In [None]:
// clean empty words
val shakeWordsRDD = shakespeareWordsRDD.filter(_ != "")
shakeWordsRDD.count

In [None]:
val shakeWordCount = wordCount(shakeWordsRDD)
shakeWordCount.take(10)

In [None]:
shakeWordCount.map(x => (x._2, x._1)).sortByKey(false).take(15)

In [None]:
shakeWordCount.getNumPartitions

# Part 2: DataFrame review

## DataFrame creation

In [None]:
// Case class approach

val spark2 = spark

case class Emp(name: String, age: Int, sex: String, emp: String)
case class Company(emp: String, emp_type: String)

import spark2.sqlContext.implicits._



val peopleList = Seq(new Emp("Mauro", 46, "Hombre", "BBVA"),
              new Emp("Maribel", 30, "Mujer", "Banamex"),
              new Emp("Marcos", 40, "Hombre", "Banamex"),
              new Emp("Ruben", 42, "Hombre", "BBVA"),
              new Emp("Ruben", 42, "Hombre", "BBVA"),
              new Emp("Pepe", 42, "Hombre", "Banamex"),
              new Emp("Sarai", 27, "Mujer", "Banamex"))
                            
val peopleRDD = sc.parallelize(peopleList)

val peopleDF = peopleRDD.toDF()

peopleDF.printSchema
//peopleDF.show()

In [None]:
// StrucType approach
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.Row

val schema = new StructType().add("name", StringType, true).add("age", IntegerType, true).add("sex", StringType, true).add("emp", StringType, true)

val peopleRowRDD = peopleRDD.map(x => Row(x.name, x.age, x.sex, x.emp))

val peopleDFSchema = spark.createDataFrame(peopleRowRDD, schema)
peopleDFSchema.show()

In [None]:
// select
peopleDFSchema.select("age", "name").show()

In [None]:
// select distinct

peopleDFSchema.select("emp").distinct().show()

In [None]:
// describe
peopleDFSchema.describe().show()

In [None]:
// pivot 1

peopleDFSchema.groupBy("name").pivot("emp").count().orderBy("name").show()

In [None]:
// pivot 2

peopleDFSchema.groupBy("age").pivot("sex").count().show()

In [None]:
peopleDFSchema.groupBy("sex").pivot("sex").count().show()

In [None]:
// dropDuplicates
peopleDFSchema.dropDuplicates().groupBy("age").pivot("sex").count().show()

In [None]:
// groupBy
peopleDFSchema.groupBy("emp").agg("age" -> "mean", "sex" -> "count").show()

In [None]:
// orderBy
import org.apache.spark.sql.functions._
peopleDFSchema.orderBy(asc("name")).show()

In [None]:
// add new colum with operation over an existing column

peopleDFSchema.withColumn("sex_type", col("sex").substr(0,1)).show()

In [None]:
// add new column with literal value
peopleDFSchema.withColumn("literal", lit(10)).show()

In [None]:
// Apply sql querys to dataframe
peopleDFSchema.createOrReplaceTempView("people_view")
spark.sql("select age, count(emp) from people_view group by age").show()

In [None]:
// tuple access
("BBVA", "Banamex")._1

In [None]:
// join
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.Row
// FILLIN: create case class Company
case class Company(emp: String, emp_type: String)

import spark.sqlContext.implicits._

val schema = new StructType().add("emp", StringType, true).add("emp_type", StringType, true)
// FILLIN: create schema with columns emp: String, emp_type String



val coSeq = Seq(("BBVA","bank"),("Banamex", "otherBank"))
val coRDD = sc.parallelize(coSeq)

//val coRowRDD =  // FILLIN map coRDD tuples to rows
val coRowRDD = coRDD.map(x => Row(x._1,x._2))
val coDFSchema = spark.createDataFrame(coRowRDD, schema)
coDFSchema.show() 

In [None]:
peopleDFSchema.join(coDFSchema, "emp").show()

In [None]:
peopleDFSchema.alias("a").join(coDFSchema.alias("b"), $"a.emp" === $"b.emp" && $"a.sex" === "Mujer").show()