# SCALA 

In [8]:
case class Bloggers(id: Int, first: String, last: String, url: String, date: String, hits: Int, campaigns: Array[String])

defined class Bloggers


In [7]:
import org.apache.spark.sql.Encoders

import org.apache.spark.sql.Encoders


In [10]:
val esquema = Encoders.product[Bloggers].schema

esquema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(first,StringType,true), StructField(last,StringType,true), StructField(url,StringType,true), StructField(date,StringType,true), StructField(hits,IntegerType,false), StructField(campaigns,ArrayType(StringType,true),true))


In [2]:
val bloggers = "C:/LearningSparkV2-master/databricks-datasets/learning-spark-v2/blogs.json"

bloggers: String = C:/LearningSparkV2-master/databricks-datasets/learning-spark-v2/blogs.json


In [12]:
val bloggersDS = spark.read.format("json")
                .option("header", "true")
                .schema(esquema)
                .json(bloggers)
                .as[Bloggers]

bloggersDS: org.apache.spark.sql.Dataset[Bloggers] = [id: int, first: string ... 5 more fields]


###### Creating Sample Data

In [13]:
import scala.util.Random._

import scala.util.Random._


In [14]:
case class Usage(uid:Int, uname:String, usage:Int)

defined class Usage


In [15]:
val r = new scala.util.Random(42)

r: scala.util.Random = scala.util.Random@660320c4


In [16]:
val data = for (i <- 0 to 1000)
 yield (Usage(i, "user-" + r.alphanumeric.take(5).mkString(""), r.nextInt(1000)))

data: scala.collection.immutable.IndexedSeq[Usage] = Vector(Usage(0,user-Gpi2C,525), Usage(1,user-DgXDi,502), Usage(2,user-M66yO,170), Usage(3,user-xTOn6,913), Usage(4,user-3xGSz,246), Usage(5,user-2aWRN,727), Usage(6,user-EzZY1,65), Usage(7,user-ZlZMZ,935), Usage(8,user-VjxeG,756), Usage(9,user-iqf1P,3), Usage(10,user-91S1q,794), Usage(11,user-qHNj0,501), Usage(12,user-7hb94,460), Usage(13,user-bz0WF,142), Usage(14,user-71nwy,479), Usage(15,user-7GZz1,823), Usage(16,user-1CSk6,140), Usage(17,user-WPzlL,246), Usage(18,user-VaEit,451), Usage(19,user-PSaRq,679), Usage(20,user-0Kkzu,332), Usage(21,user-UN3MG,172), Usage(22,user-KwwER,442), Usage(23,user-ZnltJ,923), Usage(24,user-IRA17,741), Usage(25,user-yNHRT,299), Usage(26,user-CJY3C,996), Usage(27,user-Yq9WW,529), Usage(28,user-RFWw1,30...


In [17]:
val UsageDS = spark.createDataset(data)

UsageDS: org.apache.spark.sql.Dataset[Usage] = [uid: int, uname: string ... 1 more field]


In [18]:
UsageDS.show(10, false)

+---+----------+-----+
|uid|uname     |usage|
+---+----------+-----+
|0  |user-Gpi2C|525  |
|1  |user-DgXDi|502  |
|2  |user-M66yO|170  |
|3  |user-xTOn6|913  |
|4  |user-3xGSz|246  |
|5  |user-2aWRN|727  |
|6  |user-EzZY1|65   |
|7  |user-ZlZMZ|935  |
|8  |user-VjxeG|756  |
|9  |user-iqf1P|3    |
+---+----------+-----+
only showing top 10 rows



###### Higher-order functions and functional programming

In [19]:
import org.apache.spark.sql.functions._

import org.apache.spark.sql.functions._


In [24]:
UsageDS.where(col("usage")>900).orderBy(desc("usage")).show(10, false)

+---+----------+-----+
|uid|uname     |usage|
+---+----------+-----+
|634|user-L0wci|999  |
|561|user-5n2xY|999  |
|113|user-nnAXr|999  |
|605|user-NL6c4|999  |
|26 |user-CJY3C|996  |
|345|user-QKrVb|996  |
|805|user-LX27o|996  |
|49 |user-xPBrB|993  |
|142|user-waRT9|992  |
|681|user-QwV36|992  |
+---+----------+-----+
only showing top 10 rows



Esto es otra manera de hacerlo, declarando una función previamente

In [26]:
def filterWithUsage(u:Usage) = u.usage>900

filterWithUsage: (u: Usage)Boolean


In [28]:
// dsUsage.filter(filterWithUsage(_)).orderBy(desc("usage")).show(5)

In [30]:
// UsageDS.filter(d => d.usage > 900).orderBy(desc("usage")).show(5, false)

USANDO EL IF ELSE EN DATASET

In [33]:
// UsageDS.map(u => {if (u.usage > 750) u.usage * .15 else u.usage * .50 }).show(5, false)

In [43]:
// UsageDS.when(col("usage")>750, col("usage")* 0.15).when(col("usage")<750, col("usage")* 0.50).show(10, false)

Como las funciones lambda no funcionan en Jupiter Notebook, hemos creado una vista temporal y hemos usado spark.sql

In [39]:
UsageDS.createOrReplaceTempView("dsUsage")

In [42]:
spark.sql("""SELECT uid, uname, usage, CASE 
    WHEN usage > 750 THEN usage * 0.15
    WHEN usage < 750 THEN usage * 0.15
    ELSE 750 *1
END AS usage_mult
FROM dsUsage
ORDER BY usage DESC""").show(10)

+---+----------+-----+----------+
|uid|     uname|usage|usage_mult|
+---+----------+-----+----------+
|561|user-5n2xY|  999|    149.85|
|113|user-nnAXr|  999|    149.85|
|605|user-NL6c4|  999|    149.85|
|634|user-L0wci|  999|    149.85|
|345|user-QKrVb|  996|    149.40|
|805|user-LX27o|  996|    149.40|
| 26|user-CJY3C|  996|    149.40|
| 49|user-xPBrB|  993|    148.95|
|255|user-Ckzof|  992|    148.80|
|681|user-QwV36|  992|    148.80|
+---+----------+-----+----------+
only showing top 10 rows

