## Table Of Contents
* [Writing output to sink](#Writing-output-to-sink)
* [repartition](#repartition)
* [partitionBy](#partitionBy)
* [maxRecordsPerFile](#maxRecordsPerFile)
* [SparkSQL](#SparkSQL)
* [Saving the data as a table](#Saving-the-data-as-a-table)
* [bucketBy](#bucketBy)
* [How to refer a column in a dataframe?](#How-to-refer-a-column-in-a-dataframe?)
* [Column Expressions](#Column-Expressions)
* [User Defined Functions in structured APIs](#User-Defined-Functions-in-structured-APIs)
  * [Column Object expression UDF](#Column-Object-expression-UDF)
  * [sql/string expression UDF](#sql/string-expression-UDF)
  * [Listing the functions in spark catalog](#Listing-the-functions-in-spark-catalog)
* [Aggregations](#Aggregations)
  * [Simple Aggregations](#Simple-Aggregations)
  * [Window aggregations](#Window-aggregations)
* [Joins on dataframes](#Joins-on-dataframes)
* [Showcasing how your code can lead to ambiguous problem](#Showcasing-how-your-code-can-lead-to-ambiguous-problem)
* [How to deal with NULLs](#How-to-deal-with-NULLs)
* [Internals of a normal join operations](#Internals-of-a-normal-join-operations)
* [Broadcast Join](#Broadcast-Join)
* [Practical - grouping on loglevel and month](#Practical---grouping-on-loglevel-and-month)

### Writing output to sink

we've to provide a file format and if we don't provide any format it'll be parquet format.

Savemodes:
* append - Appending the file in existing directory.

* Overwrite - First delete the directory if exist then create a new one.

* errorIfExist - If the output directory exist it will give error.

* ignore - If the output directory exist then it will ignore.

In [4]:
import org.apache.spark.SparkConf

val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")

val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

// Never use the inferSchema on prod as it will only read sampleset and infer the schema
val ordersDf = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week11_practicals/orders-201019-002101.csv")

ordersDf.write
.format("csv")
.mode("Overwrite")
.option("path","/user/itv002768/week11_practicals/output")
.save()
/*
[itv002768@g02 ~]$ hadoop fs -ls /user/itv002768/week11_practicals/output
Found 2 items
-rw-r--r--   3 itv002768 supergroup          0 2022-09-08 12:31 /user/itv002768/week11_practicals/output/_SUCCESS
-rw-r--r--   3 itv002768 supergroup    3551029 2022-09-08 12:31 /user/itv002768/week11_practicals/output/part-00000-0df75f10-3b4e-471c-93c5-7ba9f4470d14-c000.csv
*/

sparkConfig = org.apache.spark.SparkConf@1589c060
spark = org.apache.spark.sql.SparkSession@3a759967
ordersDf = [order_id: int, order_date: timestamp ... 2 more fields]


[order_id: int, order_date: timestamp ... 2 more fields]

In [5]:
import org.apache.spark.SparkConf

val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")

val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

// Never use the inferSchema on prod as it will only read sampleset and infer the schema
val ordersDf = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week11_practicals/orders-201019-002101.csv")

// For partitioning
val ordersRep = ordersDf.repartition(4)

ordersRep.write
.format("csv")
.mode("Overwrite")
.option("path","/user/itv002768/week11_practicals/output")
.save()
/*
[itv002768@g02 ~]$ hadoop fs -ls /user/itv002768/week11_practicals/output
Found 5 items
-rw-r--r--   3 itv002768 supergroup          0 2022-09-08 12:39 /user/itv002768/week11_practicals/output/_SUCCESS
-rw-r--r--   3 itv002768 supergroup     887823 2022-09-08 12:39 /user/itv002768/week11_practicals/output/part-00000-5211d04d-b6b4-426f-bf32-a10dffeefde1-c000.csv
-rw-r--r--   3 itv002768 supergroup     887951 2022-09-08 12:39 /user/itv002768/week11_practicals/output/part-00001-5211d04d-b6b4-426f-bf32-a10dffeefde1-c000.csv
-rw-r--r--   3 itv002768 supergroup     887291 2022-09-08 12:39 /user/itv002768/week11_practicals/output/part-00002-5211d04d-b6b4-426f-bf32-a10dffeefde1-c000.csv
-rw-r--r--   3 itv002768 supergroup     887964 2022-09-08 12:39 /user/itv002768/week11_practicals/output/part-00003-5211d04d-b6b4-426f-bf32-a10dffeefde1-c000.csv
*/

sparkConfig = org.apache.spark.SparkConf@71408836
spark = org.apache.spark.sql.SparkSession@3a759967
ordersDf = [order_id: int, order_date: timestamp ... 2 more fields]
ordersRep = [order_id: int, order_date: timestamp ... 2 more fields]


[order_id: int, order_date: timestamp ... 2 more fields]

For getting the number of partitions for a dataframe we cannot use the `getNumPartitions` property directly. We've to first convert the data frame to RDD.

`repartition` tries to divide the file in equal parts and this requires full shuffle.

In [8]:
ordersRep.rdd.getNumPartitions

4

Normally when we are writing a data from to a output directory then we have few options to control spark file layouts.

1. How many files? 
* `repartition` - Not a preferred choice because it requires full shuffling and we don not know what data will go in what file.
* partitioning and Bucketing
* sorted data - using sortBy

Note: Number of output files will be equal to the number of partitions in your dataframe.

### repartition
1. It can help with the parallelism.
2. With a normal repartition you won't be able to skip some of the partitions for performance improvement or we can say partition pruning is not possible.

### partitionBy
1. This is equivlent to partitioning in HIVE.
2. Provides partition pruning.

Please go through the following code

In [9]:
import org.apache.spark.SparkConf

val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")

val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

// Never use the inferSchema on prod as it will only read sampleset and infer the schema
val ordersDf = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week11_practicals/orders-201019-002101.csv")

// For partitioning
val ordersRep = ordersDf.repartition(4)

ordersRep.write
.format("csv")
.partitionBy("order_status")
.mode("Overwrite")
.option("path","/user/itv002768/week11_practicals/output")
.save()
/*
[itv002768@g02 ~]$ hadoop fs -ls /user/itv002768/week11_practicals/output
Found 10 items
-rw-r--r--   3 itv002768 supergroup          0 2022-09-08 12:57 /user/itv002768/week11_practicals/output/_SUCCESS
drwxr-xr-x   - itv002768 supergroup          0 2022-09-08 12:57 /user/itv002768/week11_practicals/output/order_status=CANCELED
drwxr-xr-x   - itv002768 supergroup          0 2022-09-08 12:57 /user/itv002768/week11_practicals/output/order_status=CLOSED
drwxr-xr-x   - itv002768 supergroup          0 2022-09-08 12:57 /user/itv002768/week11_practicals/output/order_status=COMPLETE
drwxr-xr-x   - itv002768 supergroup          0 2022-09-08 12:57 /user/itv002768/week11_practicals/output/order_status=ON_HOLD
drwxr-xr-x   - itv002768 supergroup          0 2022-09-08 12:57 /user/itv002768/week11_practicals/output/order_status=PAYMENT_REVIEW
drwxr-xr-x   - itv002768 supergroup          0 2022-09-08 12:57 /user/itv002768/week11_practicals/output/order_status=PENDING
drwxr-xr-x   - itv002768 supergroup          0 2022-09-08 12:57 /user/itv002768/week11_practicals/output/order_status=PENDING_PAYMENT
drwxr-xr-x   - itv002768 supergroup          0 2022-09-08 12:57 /user/itv002768/week11_practicals/output/order_status=PROCESSING
drwxr-xr-x   - itv002768 supergroup          0 2022-09-08 12:57 /user/itv002768/week11_practicals/output/order_status=SUSPECTED_FRAUD
[itv002768@g02 ~]$ hadoop fs -ls /user/itv002768/week11_practicals/output/order_status=CLOSED
Found 4 items
-rw-r--r--   3 itv002768 supergroup      78170 2022-09-08 12:57 /user/itv002768/week11_practicals/output/order_status=CLOSED/part-00000-934901fd-0af7-4b24-bd1f-92a2057ee9dc.c000.csv
-rw-r--r--   3 itv002768 supergroup      76486 2022-09-08 12:57 /user/itv002768/week11_practicals/output/order_status=CLOSED/part-00001-934901fd-0af7-4b24-bd1f-92a2057ee9dc.c000.csv
-rw-r--r--   3 itv002768 supergroup      76880 2022-09-08 12:57 /user/itv002768/week11_practicals/output/order_status=CLOSED/part-00002-934901fd-0af7-4b24-bd1f-92a2057ee9dc.c000.csv
-rw-r--r--   3 itv002768 supergroup      77850 2022-09-08 12:57 /user/itv002768/week11_practicals/output/order_status=CLOSED/part-00003-934901fd-0af7-4b24-bd1f-92a2057ee9dc.c000.csv

[itv002768@g02 ~]$ hadoop fs -head /user/itv002768/week11_practicals/output/order_status=CLOSED/part-00003-934901fd-0af7-4b24-bd1f-92a2057ee9dc.c000.csv
13151,2013-10-13T00:00:00.000-04:00,5736
2923,2013-08-10T00:00:00.000-04:00,6362
7765,2013-09-10T00:00:00.000-04:00,12248
33538,2014-02-16T00:00:00.000-05:00,10299
57250,2014-07-21T00:00:00.000-04:00,4775
68291,2014-03-30T00:00:00.000-04:00,4470
38665,2014-03-20T00:00:00.000-04:00,8719
42245,2014-04-11T00:00:00.000-04:00,8680
*/

sparkConfig = org.apache.spark.SparkConf@7ddf2c18
spark = org.apache.spark.sql.SparkSession@3a759967
ordersDf = [order_id: int, order_date: timestamp ... 2 more fields]
ordersRep = [order_id: int, order_date: timestamp ... 2 more fields]


[order_id: int, order_date: timestamp ... 2 more fields]

### maxRecordsPerFile
* It divides number of files based on max records to be accomodated in a file.


In [10]:
import org.apache.spark.SparkConf

val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")

val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

// Never use the inferSchema on prod as it will only read sampleset and infer the schema
val ordersDf = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week11_practicals/orders-201019-002101.csv")

ordersDf.write
.format("csv")
.option("maxRecordsPerFile", 2000)
.mode("Overwrite")
.option("path","/user/itv002768/week11_practicals/output")
.save()

/*
[itv002768@g02 ~]$ hadoop fs -ls /user/itv002768/week11_practicals/output | wc -l
37

[itv002768@g02 ~]$ hadoop fs -cat /user/itv002768/week11_practicals/output/part-00000-e9a52b95-41f0-4065-8e6f-e0feab43003b-c032.csv | wc -l
2000
*/

sparkConfig = org.apache.spark.SparkConf@405c9c83
spark = org.apache.spark.sql.SparkSession@3a759967
ordersDf = [order_id: int, order_date: timestamp ... 2 more fields]


[order_id: int, order_date: timestamp ... 2 more fields]

AVRO is an external format and is not supported be defaule like csv, json or parquet. You've to download a jar for spark-avro-version and load it.

### SparkSQL

In [15]:
import org.apache.spark.SparkConf

val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")

val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

// Never use the inferSchema on prod as it will only read sampleset and infer the schema
val ordersDf = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week11_practicals/orders-201019-002101.csv")

// creating a table orders out of the dataframe ordersDF
ordersDf.createOrReplaceTempView("orders")

val resultDf = spark.sql("select * from orders limit 10")
resultDf.show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
+--------+-------------------+-----------------+---------------+



sparkConfig = org.apache.spark.SparkConf@3794482c
spark = org.apache.spark.sql.SparkSession@3a759967
ordersDf = [order_id: int, order_date: timestamp ... 2 more fields]
resultDf = [order_id: int, order_date: timestamp ... 2 more fields]


[order_id: int, order_date: timestamp ... 2 more fields]

### Saving the data as a table
Sometimes there is a requirement to store the data as persistent table so that we can connect externale tools like tableu or powerbi for reporting purpose.

Table has two parts:
- Data - It is stored in spark warehouse directly.

- Metadata/Schema - It is stored in catalog metastore. It is store in memory. In this case if we'll terminate the application it is gone. We can use Hive metastore to store spark metadata. Use `enableHiveSupport` function to store the metadata in hive.

By default, table will be created in default database

In [3]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession


val username = System.getProperty("user.name")
val spark = SparkSession.builder().config(sparkConfig)
                        .config("spark.ui.port", "0")
                        .config("spark.sql.warehouse.dir", "/user/${username}/warehouse").master("yarn")
                        .appName("${username} | spark_sql")
                        .enableHiveSupport().getOrCreate()

// Never use the inferSchema on prod as it will only read sampleset and infer the schema
val ordersDf = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week11_practicals/orders-201019-002101.csv")
spark.sql("create database tushar_retail")

ordersDf.write
.format("csv")
.mode(SaveMode.Overwrite)
.saveAsTable("tushar_retail.orders_11")

lastException = null


org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Got exception: org.apache.hadoop.security.AccessControlException Permission denied: user=itv002768, access=WRITE, inode="/user":hdfs:supergroup:drwxr-xr-x
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:496)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:336)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermissionWithContext(FSPermissionChecker.java:360)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:239)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1909)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1893)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1852)
	at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:60)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3407)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1161)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:739)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:532)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1020)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:948)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2952)
);

### bucketBy

It only works when we save the data as table.



In [None]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession


val username = System.getProperty("user.name")
val spark = SparkSession.builder().config(sparkConfig)
                        .config("spark.ui.port", "0")
                        .config("spark.sql.warehouse.dir", "/user/${username}/warehouse").master("yarn")
                        .appName("${username} | spark_sql")
                        .enableHiveSupport().getOrCreate()

// Never use the inferSchema on prod as it will only read sampleset and infer the schema
val ordersDf = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week11_practicals/orders-201019-002101.csv")
spark.sql("create database tushar_retail")

ordersDf.write
.format("csv")
.mode(SaveMode.Overwrite)
.bucketBy(4, "order_id")
.sortBy()
.saveAsTable("tushar_retail.orders_11")

### Transformations

1. Low level transformations</BR>
**map, filter, groupByKey etc.**

We can perform these using raw rdds, also some of these are even possible with dataframes and datasets.

2. High level transformations</BR>
**select, where, groupby etc.**

These are supported by dataframs and datasets.


```
1 2022-01-02   112122,CLOSED
2 2022-01-03   112422,START
3 2022-01-04   112222,CLOSED
4 2022-01-05   112622,CLOSED
```
In case of an ustructured file, we'll load it as a raw rdd. Each row will be of string type and we'll use a map transformation that is a low level transformation.

Here, we can use the regular expression in map and then we can use a case class for to associate a structure. Then we can convert the rdd to a dataset using `.toDS`.



In [3]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

val stringRegex = """^(\S+) (\S+)\t(\S+)\,(\S+)""".r

case class Orders(order_id: Int, customer_id: Int, order_status: String)

def parser(line: String) = {
    line match {
        case stringRegex(order_id, date, customer_id, order_status) => 
        Orders(order_id.toInt, customer_id.toInt, order_status)
    }
}

val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")


val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

val input = spark.sparkContext.textFile("/user/itv002768/week12_practicals/regex_example.txt")

import spark.implicits._
val parserOutput = input.map(parser).toDS()
parserOutput.show()


+--------+-----------+------------+
|order_id|customer_id|order_status|
+--------+-----------+------------+
|       1|     112122|      CLOSED|
|       2|     112422|       START|
|       3|     112222|      CLOSED|
|       4|     112622|      CLOSED|
+--------+-----------+------------+



stringRegex = ^(\S+) (\S+)\t(\S+)\,(\S+)
defined class Orders
sparkConfig = org.apache.spark.SparkConf@df88ec3
spark = org.apache.spark.sql.SparkSession@1b767de0
input = /user/itv002768/week12_practicals/regex_example.txt MapPartitionsRDD[1] at textFile at <console>:36
parserOutput = [order_id: int, customer_id: int ... 1 more field]


parser: (line: String)Orders


[order_id: int, customer_id: int ... 1 more field]

### How to refer a column in a dataframe?

1. Column String

This is the easiest way and you can access the column using the column name. Please refer the below code.


In [6]:
import org.apache.spark.SparkConf

val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")

val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

// Never use the inferSchema on prod as it will only read sampleset and infer the schema
val ordersDf = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week11_practicals/orders-201019-002101.csv")

ordersDf.show()
ordersDf.select("order_id", "order_status").show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

sparkConfig = org.apache.spark.SparkConf@1ade4393
spark = org.apache.spark.sql.SparkSession@1b767de0
ordersDf = [order_id: int, order_date: timestamp ... 2 more fields]


[order_id: int, order_date: timestamp ... 2 more fields]

2. Column Object

You can address a column using the column or col function and it is generic that can used in pyspark, scala and spark.

Scala specific - $"order_id" or 'order_id'

In [18]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.column

val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")

val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

// Never use the inferSchema on prod as it will only read sampleset and infer the schema
val ordersDf = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week11_practicals/orders-201019-002101.csv")

ordersDf.show()

import spark.implicits._
ordersDf.select(column("order_id"), col("order_date"), $"order_status", 'order_customer_id).show()



+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

sparkConfig = org.apache.spark.SparkConf@2d1a8389
spark = org.apache.spark.sql.SparkSession@1b767de0
ordersDf = [order_id: int, order_date: timestamp ... 2 more fields]


[order_id: int, order_date: timestamp ... 2 more fields]

### Column Expressions

* We cannot mix column object with column string.</BR>
* we cannot mix column object with column expression.</BR>
* we cannot mix column string with column expression.</BR>
There is a way to convert column expression to a column object by passing it to a function `expr`.</BR>

using `selectExpr` we can use column string with column object.

In [25]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.column
import org.apache.spark.sql.functions.expr

val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")

val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

// Never use the inferSchema on prod as it will only read sampleset and infer the schema
val ordersDf = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week11_practicals/orders-201019-002101.csv")

ordersDf.show()
// We cannot mix column strings with column expressions the below line will give error
//ordersDf.select("order_id", "concat(order_status, '_STATUS')").show()
ordersDf.select($"order_id", expr("concat(order_status, '_STATUS')")).show(false)
ordersDf.selectExpr("order_id", "concat(order_status, '_STATUS')").show(false)

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

sparkConfig = org.apache.spark.SparkConf@382f0585
spark = org.apache.spark.sql.SparkSession@1b767de0
ordersDf = [order_id: int, order_date: timestamp ... 2 more fields]


[order_id: int, order_date: timestamp ... 2 more fields]

### User Defined Functions in structured APIs

#### Column Object expression UDF

In case ot datasets we've to create a function and the register it. Whenever we want to add a new column we can use `.withColumn`.

When we register a function it is registered with the driver, the driver will serialize the function and send it to each executor.

In [36]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.column
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.SparkSession

case class AgeInfo(name: String, age: Int, city: String)

def ageCheck(age: Int)={
    if(age>18) "Y" else "N"
}

val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")

val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

// Never use the inferSchema on prod as it will only read sampleset and infer the schema
val ageDf = spark.read.option("header", false)
               .option("inferSchema", true)
               .csv("/user/itv002768/week12_practicals/agedataset")

ageDf.show()

val ageDf_ = ageDf.toDF("name", "age", "city")

import spark.implicits._

//converting it to a dataset
ageDf_.as[AgeInfo]

//Registering the function
val parseAgeFunction = udf(ageCheck(_: Int): String)

val final_ = ageDf_.withColumn("adult", parseAgeFunction(column("age")))

final_.show

+-------+---+---------+
|    _c0|_c1|      _c2|
+-------+---+---------+
|  sumit| 30|bangalore|
|  kapil| 32|hyderabad|
|sathish| 16|  chennai|
|   ravi| 39|bangalore|
| kavita| 12|hyderabad|
|  kavya| 19|   mysore|
+-------+---+---------+

+-------+---+---------+-----+
|   name|age|     city|adult|
+-------+---+---------+-----+
|  sumit| 30|bangalore|    Y|
|  kapil| 32|hyderabad|    Y|
|sathish| 16|  chennai|    N|
|   ravi| 39|bangalore|    Y|
| kavita| 12|hyderabad|    N|
|  kavya| 19|   mysore|    Y|
+-------+---+---------+-----+



defined class AgeInfo
sparkConfig = org.apache.spark.SparkConf@7751241d
spark = org.apache.spark.sql.SparkSession@1b767de0
ageDf = [_c0: string, _c1: int ... 1 more field]
ageDf_ = [name: string, age: int ... 1 more field]
parseAgeFunction = UserDefinedFunction(<function1>,StringType,Some(List(IntegerType)))


ageCheck: (age: Int)String
final_: o...


UserDefinedFunction(<function1>,StringType,Some(List(IntegerType)))

#### sql/string expression UDF

Here, we'll try to register a function using sql. 

`spark.udf.register("parseAgeFunction", ageCheck(_: Int): String)`

We can also use this function witn `spark.sql` after creating the table.


In [38]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.column
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.SparkSession

case class AgeInfo(name: String, age: Int, city: String)

def ageCheck(age: Int)={
    if(age>18) "Y" else "N"
}

val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")

val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

// Never use the inferSchema on prod as it will only read sampleset and infer the schema
val ageDf = spark.read.option("header", false)
               .option("inferSchema", true)
               .csv("/user/itv002768/week12_practicals/agedataset")

ageDf.show()

val ageDf_ = ageDf.toDF("name", "age", "city")

import spark.implicits._

//converting it to a dataset
ageDf_.as[AgeInfo]

spark.udf.register("parseAgeFunction", ageCheck(_: Int): String)

val final_ = ageDf_.withColumn("adult", expr("parseAgeFunction(age)"))
final_.show

+-------+---+---------+
|    _c0|_c1|      _c2|
+-------+---+---------+
|  sumit| 30|bangalore|
|  kapil| 32|hyderabad|
|sathish| 16|  chennai|
|   ravi| 39|bangalore|
| kavita| 12|hyderabad|
|  kavya| 19|   mysore|
+-------+---+---------+

+-------+---+---------+-----+
|   name|age|     city|adult|
+-------+---+---------+-----+
|  sumit| 30|bangalore|    Y|
|  kapil| 32|hyderabad|    Y|
|sathish| 16|  chennai|    N|
|   ravi| 39|bangalore|    Y|
| kavita| 12|hyderabad|    N|
|  kavya| 19|   mysore|    Y|
+-------+---+---------+-----+



defined class AgeInfo
sparkConfig = org.apache.spark.SparkConf@69d1bd2
spark = org.apache.spark.sql.SparkSession@1b767de0
ageDf = [_c0: string, _c1: int ... 1 more field]
ageDf_ = [name: string, age: int ... 1 more field]
final_ = [name: string, age: int ... 2 more fields]


lastException: Throwable = null
ageCheck: (age: Int)String


[name: string, age: int ... 2 more fields]

#### Listing the functions in spark catalog

If a function is registered using `spark.udf.register` only that will be shown in the catalog.

In [42]:
//to show all functions
spark.catalog.listFunctions().show()

spark.catalog.listFunctions().filter(x => x.name == "parseAgeFunction" ).show()

+----------+--------+-----------+--------------------+-----------+
|      name|database|description|           className|isTemporary|
+----------+--------+-----------+--------------------+-----------+
|         !|    null|       null|org.apache.spark....|       true|
|         %|    null|       null|org.apache.spark....|       true|
|         &|    null|       null|org.apache.spark....|       true|
|         *|    null|       null|org.apache.spark....|       true|
|         +|    null|       null|org.apache.spark....|       true|
|         -|    null|       null|org.apache.spark....|       true|
|         /|    null|       null|org.apache.spark....|       true|
|         <|    null|       null|org.apache.spark....|       true|
|        <=|    null|       null|org.apache.spark....|       true|
|       <=>|    null|       null|org.apache.spark....|       true|
|         =|    null|       null|org.apache.spark....|       true|
|        ==|    null|       null|org.apache.spark....|       t

### Practical on orders data
1. Create a scala list from sample data.
2. from a scala list create a dataframe with cols orderid, orderdate, customerid, status.
3. convert orderdate from timestamp to unixtimestamp.
4. Create a new column newid and make sure it has unique ids.
5. Drop duplicated based on columns(orderdate, customerid).
6. drop orderid column.
7. sort it based on orderdate





In [52]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.column
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.types.DateType


val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")

val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

// create a scala list
val myList = List(
    (1, "2022-07-11", 115, "CLOSED"),
    (2, "2022-07-11", 256, "PAYMENT_PENDING"),
    (3, "2022-07-11", 115, "COMPLETED"),
    (4, "2022-06-11", 8827, "CLOSED")
)

import spark.implicits._

// create a DF out of list
val ordersDf = spark.createDataFrame(myList)
            .toDF("orderid", "orderdate", "customerid", "status")


// Convert orderdate to unixtimestamp
// create a new column newid having only unique ids
// Drop the duplicates
// Drop column orderid
// sort based on orderdate
val ordersDfUnix = ordersDf.withColumn("orderdate", unix_timestamp(col("orderdate").cast(DateType)))
                .withColumn("newid", monotonically_increasing_id)
                .dropDuplicates("orderdate", "customerid")
                .drop("orderid")
                .sort("orderdate")
ordersDfUnix.show()

+----------+----------+---------------+-----+
| orderdate|customerid|         status|newid|
+----------+----------+---------------+-----+
|1654920000|      8827|         CLOSED|    3|
|1657512000|       256|PAYMENT_PENDING|    1|
|1657512000|       115|         CLOSED|    0|
+----------+----------+---------------+-----+



sparkConfig = org.apache.spark.SparkConf@3cb496a3
spark = org.apache.spark.sql.SparkSession@1b767de0
myList = List((1,2022-07-11,115,CLOSED), (2,2022-07-11,256,PAYMENT_PENDING), (3,2022-07-11,115,COMPLETED), (4,2022-06-11,8827,CLOSED))
ordersDf = [orderid: int, orderdate: string ... 2 more fields]
ordersDfUnix = [orderdate: bigint, customerid: int ... 2 more fields]


[orderdate: bigint, customerid: int ... 2 more fields]

### Aggregations
### Simple Aggregations

* After doing the aggregations when we get the single record e.g Total number of records, sum of all quantities.


In the below code:
- totalNumberOfRows
- totalQuantity
- avgUnitPrice
- numUniqueInvoices

we'll use the column expression, string expression and spark sql.


2. Grouped Aggregations

* How many Items are there in each invoice number
- group data based on country and invoice number(total quantity for each group, sum of invoice value)

3. window aggregated

* Sale happened in last n days.

Each line item corrosponds to a different kind of thing.


```
[itv002768@g02 ~]$ hadoop fs -head /user/itv002768/week12_practicals/order_data-201025-223502.csv
﻿InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536378,,PACK OF 60 DINOSAUR CAKE CASES,24,01-12-2010 9.37,0.55,14688,United Kingdom
536378,,PACK OF 60 PINK PAISLEY CAKE CASES,24,01-12-2010 9.37,0.55,14688,United Kingdom
536378,84991,60 TEATIME FAIRY CAKE CASES,24,01-12-2010 9.37,0.55,14688,United Kingdom
536378,84519A,TOMATO CHARLIE+LOLA COASTER SET,6,01-12-2010 9.37,2.95,14688,United Kingdom
536378,85183B,CHARLIE & LOLA WASTEPAPER BIN FLORA,48,01-12-2010 9.37,1.25,14688,United Kingdom
536378,85071B,RED CHARLIE+LOLA PERSONAL DOORSIGN,96,01-12-2010 9.37,0.38,14688,United Kingdom
536378,21931,JUMBO STORAGE BAG SUKI,10,01-12-2010 9.37,1.95,14688,United Kingdom
536378,21929,JUMBO BAG PINK VINTAGE PAISLEY,10,01-12-2010 9.37,1.95,14688,United Kingdom
536380,22961,JAM MAKING SET PRINTED,24,01-12-2010 9.41,1.45,17809,United Kingdom
536381,22139,RETROSPOT TEA SET CERAMIC 11 PC ,23,01-12-2010 9.41,4.25,15311,United Kingdom
```

In [65]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.column
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.SparkSession

val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")

val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

val stockOrdersDf = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week12_practicals/order_data-201025-223502.csv")
// Col Expression
// Simple aggregations
stockOrdersDf.select(
    count("*").as("totalRowCount"),
    sum("Quantity").as("totalQuantity"),
    avg("UnitPrice").as("avgUnitPrice"),
    countDistinct("InvoiceNo").as("numUniqueInvoices")
).show()

// Grouped aggregations
stockOrdersDf.groupBy("Country", "InvoiceNo")
    .agg(sum("Quantity").as("TotalQuantity"),
         sum(expr("Quantity * UnitPrice")).as("InvoiceValue")
).show()

// string expressions
// Simple aggregations
stockOrdersDf.selectExpr(
    "count(*) as totalRowCount",
    "sum(Quantity) as totalQuantity",
    "avg(UnitPrice) as avgUnitPrice",
    "count(Distinct(InvoiceNo)) as numUniqueInvoices"
).show()

stockOrdersDf.groupBy("Country", "InvoiceNo").agg(
    expr("sum(Quantity) as totalQuantity"),
    expr("sum(Quantity * UnitPrice) as InvoiceValue")
).show()

// spark sql
// Simple aggregations
stockOrdersDf.createOrReplaceTempView("stock_sales")
spark.sql("select count(*), sum(Quantity), avg(UnitPrice), count(distinct(InvoiceNo)) from stock_sales").show()

// grouped aggregations
spark.sql("select Country, InvoiceNo, sum(Quantity), sum(Quantity * UnitPrice) from stock_sales group by 1,2").show()



+-------------+-------------+-----------------+-----------------+
|totalRowCount|totalQuantity|     avgUnitPrice|numUniqueInvoices|
+-------------+-------------+-----------------+-----------------+
|       541782|      5175855|4.611565323321927|            25858|
+-------------+-------------+-----------------+-----------------+

+--------------+---------+-------------+------------------+
|       Country|InvoiceNo|TotalQuantity|      InvoiceValue|
+--------------+---------+-------------+------------------+
|United Kingdom|   536446|          329|            440.89|
|United Kingdom|   536508|          216|            155.52|
|United Kingdom|   537811|           74|            268.86|
|United Kingdom|   538895|          370|            247.38|
|United Kingdom|   540453|          341|302.44999999999993|
|United Kingdom|   541291|          217|305.81000000000006|
|United Kingdom|   542551|           -1|               0.0|
|United Kingdom|   542576|           -1|               0.0|
|United K

sparkConfig = org.apache.spark.SparkConf@6dbbb929
spark = org.apache.spark.sql.SparkSession@1b767de0
stockOrdersDf = [InvoiceNo: string, StockCode: string ... 6 more fields]


[InvoiceNo: string, StockCode: string ... 6 more fields]

### Window aggregations
```
[itv002768@g02 ~]$ hadoop fs -put windowdata-201025-223502.csv /user/itv002768/week12_practicals
[itv002768@g02 ~]$ hadoop fs -head /user/itv002768/week12_practicals/windowdata-201025-223502.csv
Spain,49,1,67,174.72
Germany,48,11,1795,3309.75
Lithuania,48,3,622,1598.06
Germany,49,12,1852,4521.39
Bahrain,51,1,54,205.74
Iceland,49,1,319,711.79
India,51,5,95,276.84
Australia,50,2,133,387.95
Italy,49,1,-2,-17.0
India,49,5,1280,3284.1
Spain,50,2,400,1049.01
United Kingdom,51,200,28782,75103.46
```

partition column - country</BR>
ordering column -  weeknum</BR>
window size - 1st row till current row</BR>



In [9]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.column
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window


val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")


val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

val input = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week12_practicals/windowdata-201025-223502.csv")
val invoiceDf = input.toDF("country", "weeknum", "numinvoices", "totalquantity", "invoicevalue")


val mywindow = Window.partitionBy("country").orderBy("weeknum")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

val final_ = invoiceDf.withColumn("running_total", sum("invoicevalue").over(mywindow))
final_.show()

+-------+-------+-----------+-------------+------------+------------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|     running_total|
+-------+-------+-----------+-------------+------------+------------------+
| Sweden|     50|          3|         3714|      2646.3|            2646.3|
|Germany|     48|         11|         1795|     3309.75|           3309.75|
|Germany|     49|         12|         1852|     4521.39|           7831.14|
|Germany|     50|         15|         1973|     5065.79|          12896.93|
|Germany|     51|          5|         1103|     1665.91|          14562.84|
| France|     48|          4|         1299|     2808.16|           2808.16|
| France|     49|          9|         2303|     4527.01|           7335.17|
| France|     50|          6|          529|      537.32|           7872.49|
| France|     51|          5|          847|     1702.87|           9575.36|
|Belgium|     48|          1|          528|       346.1|             346.1|
|Belgium|   

sparkConfig = org.apache.spark.SparkConf@6ff31b1b
spark = org.apache.spark.sql.SparkSession@2166e24d
input = [Spain: string, 49: int ... 3 more fields]
invoiceDf = [country: string, weeknum: int ... 3 more fields]
mywindow = org.apache.spark.sql.expressions.WindowSpec@5332c238
final_ = [country: stri...


lastException: Throwable = null


[country: stri...

### Joins on dataframes
There are two kind of joins which we can perform:
* Simple join or Shuffle sort merge join

* Broadcast join

* Inner join - Matching records from both the tables
* outer join - mathcing records + non-matching records from left table + non-matching records from right table
* left join - matching records + non-matching records from left table
* right join - matching records + non-matching records from right table

Data on which we've to perform the joins:
```
[itv002768@g02 ~]$ hadoop fs -head /user/itv002768/week12_practicals/orders-201025-223502.csv
order_id,order_date,order_customer_id,order_status
1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE
6,2013-07-25 00:00:00.0,7130,COMPLETE
7,2013-07-25 00:00:00.0,4530,COMPLETE
8,2013-07-25 00:00:00.0,2911,PROCESSING
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT
11,2013-07-25 00:00:00.0,918,PAYMENT_REVIEW
12,2013-07-25 00:00:00.0,1837,CLOSED
13,2013-07-25 00:00:00.0,9149,PENDING_PAYMENT
14,2013-07-25 00:00:00.0,9842,PROCESSING
15,2013-07-25 00:00:00.0,2568,COMPLETE
16,2013-07-25 00:00:00.0,7276,PENDING_PAYMENT
17,2013-07-25 00:00:00.0,2667,COMPLETE
18,2013-07-25 00:00:00.0,1205,CLOSED
19,2013-07-25 00:00:00.0,9488,PENDING_PAYMENT
20,2013-07-25 00:00:00.0,9198,PROCESSING
21,2013-07-25 00:00:00.0,2711,PENDING
22,2013-07-25 00:00:00.0,333,COMPLETE
23,2013-07-25 00:00:00.0,4367,PENDING_PAYMENT
24,2013-07-25 00:00:00.0,11441,CL[itv002768@g02 ~]$ hadoop fs -head /user/itv002768/week12_practicals/customers-201025-223502.csv
customer_id,customer_fname,customer_lname,customer_email,customer_password,customer_street,customer_city,customer_state,customer_zipcode
1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521
2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126
3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,00725
4,Mary,Jones,XXXXXXXXX,XXXXXXXXX,8324 Little Common,San Marcos,CA,92069
5,Robert,Hudson,XXXXXXXXX,XXXXXXXXX,10 Crystal River Mall ,Caguas,PR,00725
6,Mary,Smith,XXXXXXXXX,XXXXXXXXX,3151 Sleepy Quail Promenade,Passaic,NJ,07055
7,Melissa,Wilcox,XXXXXXXXX,XXXXXXXXX,9453 High Concession,Caguas,PR,00725
8,Megan,Smith,XXXXXXXXX,XXXXXXXXX,3047 Foggy Forest Plaza,Lawrence,MA,01841
9,Mary,Perez,XXXXXXXXX,XXXXXXXXX,3616 Quaking Street,Caguas,PR,00725
10,Melissa,Smith,XXXXXXXXX,XXXXXXXXX,8598 Harvest Beacon Plaza,Stafford,VA,22554
11,Mary,Huffman,XXXXXXXXX,XXXXXXXXX,3169 Stony Woods,Caguas,PR,00725

```

In [14]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.column
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window


val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")


val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

val customerDetails = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week12_practicals/customers-201025-223502.csv")
customerDetails.show()

val orderInfo = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week12_practicals/orders-201025-223502.csv")
orderInfo.show()
// join condition inside we can also put the join condition in a variable
val innerJoinDf = orderInfo.join(customerDetails, orderInfo.col("order_customer_id")===customerDetails.col("customer_id"), "inner")
innerJoinDf.show()

//Left join
val leftJoinDf = customerDetails.join(orderInfo, customerDetails.col("customer_id")===orderInfo.col("order_customer_id"), "left").sort("order_id")
leftJoinDf.show()



+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsville|            TX|           78521|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|             725|
|          4|          Mary|         Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|   San Marcos|            CA|          

sparkConfig = org.apache.spark.SparkConf@79537bae
spark = org.apache.spark.sql.SparkSession@2166e24d
customerDetails = [customer_id: int, customer_fname: string ... 7 more fields]
orderInfo = [order_id: int, order_date: timestamp ... 2 more fields]
innerJoinDf = [order_id: int, order_date: timestamp ... 11 more fields]


leftJoinDf: org.apache.sp...


[order_id: int, order_date: timestamp ... 11 more fields]

### Showcasing how your code can lead to ambiguous problem

There are two ways to solve the ambiguity:
* Rename ambiguous column so that ambiguity won't come</BR>
`.withColumnRenamed("olcclumnName","newcolumnName")`

* After join we can drop one of the columns</BR>
`.drop`

In [21]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.column
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window


val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")


val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

val customerDetails = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week12_practicals/customers-201025-223502.csv")
customerDetails.show()

val orderInfo = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week12_practicals/orders-201025-223502.csv").withColumnRenamed("order_customer_id", "customer_id")
orderInfo.show()


val innerJoinDf = orderInfo.join(customerDetails, orderInfo.col("customer_id")===customerDetails.col("customer_id"), "inner")
    .drop(orderInfo.col("customer_id"))
    .select("order_id", "customer_id", "customer_fname")
innerJoinDf.show()


+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsville|            TX|           78521|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|             725|
|          4|          Mary|         Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|   San Marcos|            CA|          

sparkConfig = org.apache.spark.SparkConf@3ad445d4
spark = org.apache.spark.sql.SparkSession@2166e24d
customerDetails = [customer_id: int, customer_fname: string ... 7 more fields]
orderInfo = [order_id: int, order_date: timestamp ... 2 more fields]
innerJoinDf = [order_id: int, customer_id: int ... 1 more field]


[order_id: int, customer_id: int ... 1 more field]

### How to deal with NULLs

Whenever order_id is null show -1 for this we can use `coalesce`

In [25]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.column
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window


val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")


val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

val customerDetails = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week12_practicals/customers-201025-223502.csv")
customerDetails.show()

val orderInfo = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week12_practicals/orders-201025-223502.csv").withColumnRenamed("order_customer_id", "customer_id")
orderInfo.show()


val innerJoinDf = customerDetails.join(orderInfo, customerDetails.col("customer_id")===orderInfo.col("customer_id"), "left")
    .drop(orderInfo.col("customer_id"))
    .select("order_id", "customer_id", "customer_fname")
    .sort("order_id")
    .withColumn("order_id", expr("coalesce(order_id, -1)"))
innerJoinDf.show()



+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsville|            TX|           78521|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|             725|
|          4|          Mary|         Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|   San Marcos|            CA|          

sparkConfig = org.apache.spark.SparkConf@5a7632b0
spark = org.apache.spark.sql.SparkSession@2166e24d
customerDetails = [customer_id: int, customer_fname: string ... 7 more fields]
orderInfo = [order_id: int, order_date: timestamp ... 2 more fields]
innerJoinDf = [order_id: int, customer_id: int ... 1 more field]


[order_id: int, customer_id: int ... 1 more field]

### Internals of a normal join operations

**Please refer to Dataframe Session-21**</BR>
Let's say we have a three node cluster and we are reading two file orders and customers which are divided into two parts each. In this case it'll go to three executors on three nodes. Now suppose if we'll join these two df then there is a possibility that join column id in one dataframe on one node will have its join mate on different executor. In this case shuffling will happen.

Once the join start it will try to join in the same executor and output is written to exchange. **exchange** is nothing but a buffer in an executor. From this exchange spark can read it and do the shuffle.

All the records with same key will go to the same reduce exchange.

### Broadcast Join

**Please refer to Dataframe Session-22**</BR>
This doesn't require a shuffle.

When to use which type of join?</BR>
* Whenever we're joining two large dataframed then it'll invoke a simple join and shuffle will be required.
* If one dataframe is large and one dataframe in smaller in that case we can go with broadcast join. That small dataframe(fullcopy) will be sent to all the executors.
* If both the datasets are small we can use the programming language like python. Spark is not required here.

Broadcast dataframe should be small enough that it should fit into the executor's memory.

In [26]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.column
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window


val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")


val spark = SparkSession.builder().config(sparkConfig).getOrCreate()

val customerDetails = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week12_practicals/customers-201025-223502.csv")
customerDetails.show()

val orderInfo = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week12_practicals/orders-201025-223502.csv").withColumnRenamed("order_customer_id", "customer_id")
orderInfo.show()

// Doing a broadcast join
val innerJoinDf = orderInfo.join(broadcast(customerDetails), customerDetails.col("customer_id")===orderInfo.col("customer_id"), "inner")
innerJoinDf.show()

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsville|            TX|           78521|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|             725|
|          4|          Mary|         Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|   San Marcos|            CA|          

sparkConfig = org.apache.spark.SparkConf@76942a71
spark = org.apache.spark.sql.SparkSession@2166e24d
customerDetails = [customer_id: int, customer_fname: string ... 7 more fields]
orderInfo = [order_id: int, order_date: timestamp ... 2 more fields]
innerJoinDf = [order_id: int, order_date: timestamp ... 11 more fields]


[order_id: int, order_date: timestamp ... 11 more fields]

### Practical - grouping on loglevel and month

The following file contains the log level and the timestamp of each log.

Our task is to count the log level for each month.
```
[itv002768@g02 ~]$ hadoop fs -head /user/itv002768/week12_practicals/biglog-201105-152517.txt
level,datetime
DEBUG,2015-2-6 16:24:07
WARN,2016-7-26 18:54:43
INFO,2012-10-18 14:35:19
DEBUG,2012-4-26 14:26:50
DEBUG,2013-9-28 20:27:13
INFO,2017-8-20 13:17:27
INFO,2015-4-13 09:28:17
DEBUG,2015-7-17 00:49:27
DEBUG,2014-7-26 02:33:09
INFO,2016-1-13 09:51:57
DEBUG,2015-1-14 08:55:30
DEBUG,2016-1-20 03:47:06
DEBUG,2013-7-8 21:00:50
DEBUG,2012-5-22 11:43:57
DEBUG,2013-3-20 06:14:50
```

I'll load the hardcoded data and once I feel its confident enought to do the required operations I'll use the original file.

In [37]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.column
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window


case class Logging(level: String, datetime: String)

def mapper(line: String): Logging = {
    val fields = line.split(",")
    val logging: Logging = Logging(fields(0), fields(1))
    return logging
}

val sparkConfig = new SparkConf()
sparkConfig.set("spark.app.name", "My Application-1")
sparkConfig.set("spark.master", "local[2]")


val spark = SparkSession.builder().config(sparkConfig).getOrCreate()


/*
This is not required as we only did it for testing purpose

import spark.implicits._
val myList = List("DEBUG,2015-2-6 16:24:07",
"WARN,2016-7-26 18:54:43",
"INFO,2012-10-18 14:35:19",
"DEBUG,2012-4-26 14:26:50",
"DEBUG,2013-9-28 20:27:13",
"INFO,2017-8-20 13:17:27",
"INFO,2015-4-13 09:28:17")
val sampleDataRdd = spark.sparkContext.parallelize(myList)


val rdd2 = sampleDataRdd.map(mapper)

val logDetails = rdd2.toDF()
*/
val logDetails = spark.read.option("header", true)
               .option("inferSchema", true)
               .csv("/user/itv002768/week12_practicals/biglog-201105-152517.txt")



logDetails.createOrReplaceTempView("logging_table")

val result = spark.sql("""
select level,
       date_format(datetime, 'MMMM') as month,
       count(*) as total
from logging_table
group by level, month
""")

val months = List("January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December")
val result1 = spark.sql("""
select level,
       date_format(datetime, 'MMMM') month
from logging_table
""").groupBy("level").pivot("month", months).count().show()

+-----+-------+--------+-----+-----+-----+-----+-----+------+---------+-------+--------+--------+
|level|January|February|March|April|  May| June| July|August|September|October|November|December|
+-----+-------+--------+-----+-----+-----+-----+-----+------+---------+-------+--------+--------+
| INFO|  29119|   28983|29095|29302|28900|29143|29300| 28993|    29038|  29018|   23301|   28874|
|ERROR|   4054|    4013| 4122| 4107| 4086| 4059| 3976|  3987|     4161|   4040|    3389|    4106|
| WARN|   8217|    8266| 8165| 8277| 8403| 8191| 8222|  8381|     8352|   8226|    6616|    8328|
|FATAL|     94|      72|   70|   83|   60|   78|   98|    80|       81|     92|   16797|      94|
|DEBUG|  41961|   41734|41652|41869|41785|41774|42085| 42147|    41433|  41936|   33366|   41749|
+-----+-------+--------+-----+-----+-----+-----+-----+------+---------+-------+--------+--------+



defined class Logging
sparkConfig = org.apache.spark.SparkConf@3b5309dc
spark = org.apache.spark.sql.SparkSession@4b8f727c
logDetails = [level: string, datetime: timestamp]
result = [level: string, month: string ... 1 more field]
months = List(January, February, March, April, May, June, July, August, September, October, November, De...


lastException: Throwable = null
mapper: (line: String)Logging


List(January, February, March, April, May, June, July, August, September, October, November, De...