# Starting spark session

In [None]:
import org.apache.spark.sql.sparkSession

In [None]:
# creation of reading script with default schema (system generator name)
val orders=spark.
read.
csv("/public/retail_db/orders")

In [None]:
# creation of reading script with option ("inferSchema")
# the types of each fields are taking into account. 
val orders=spark.
read.
option("inferSchema","true").
csv("/public/retail_db/orders")

In [None]:
# creation of reading script with define schema 
val orders=spark.
read.
schema(""" order_id INT, order_date TIMESTAMP,
        order_customer_id INT, order_status STRING
        """).
csv("/public/retail_db/orders")

In [None]:
# creation of reading script with define schema and option 
val orders=spark.
read.
schema(""" order_id INT, order_date TIMESTAMP,
        order_customer_id INT, order_status STRING
        """).
option("sep",",").
csv("/public/retail_db/orders")

In [None]:
# creation of reading script with define schema and 
#option(separator which is comma in this case) and format of the file
val orders=spark.
read.
schema(""" order_id INT, order_date TIMESTAMP,
        order_customer_id INT, order_status STRING
        """).
option("sep",",").
format("csv").
load("/public/retail_db/orders")

##### 'We can built the same with json file'

## 83 - Overview of functions

In [None]:
# using col and lit
#import spark.implicit._
import org.apache.spark.sql.functions.{col, lit, concat} 
employeesDF.
     select(col("employee_id"), concat(col("first_name"), lit(" "),
            col("last_name")).alias("full_name"),
            col("salary"), col("nationality")
            ).show

In [None]:
# withColumn without dropping 
employeesDF.
      withColumn("full_name",concat(col("first_name"), lit(" "),
            col("last_name"))
            ).show

In [None]:
# withColumn without drop
employeesDF.
      withColumn("full_name",concat(col("first_name"), lit(","),
            col("last_name"))).
            drop("first_name", "last_name").show

In [None]:
# with $ with drop
employeesDF.
      withColumn("full_name",concat($"first_name", lit(","),
            $"last_name")).
            drop("first_name", "last_name").show

In [None]:
# with SQL to see all functions
spark.sql("SHOW functions").show(300, false)
# looking the description of the concat function
spark.sql("DESCRIBE FUNCTION concat").show(false)

In [None]:
# with SQL 
employeesDF.
      selectExpr("employee_id", "concat(first_name, ' ', last_name) AS full_name", 
                 "salary","nationality").show

In [None]:
# getting the information about the fuile format
spark.conf.get("spark.sql.parquet.compression.codec")
#res30: String = snappy : mean compress file

In [None]:
# Using write.parquet
orders.
    write.
    coealesce(1).
    option("compression", "none").
    mode("overwrite").
    format("parquet").
    save("/public/retail_db/orders")

# SECTION 7: Spark2 processing data

In [None]:
# intialisation with spark context
import org.apache.spark.sql.SparkSession

In [None]:
val spark = SparkSassion.
               builder.
            config("spark.ui.port", "0").
            appName(" rigobert init spark").
            master("yarn").
            getOrCreate

In [None]:
spark

In [None]:
import spark.implicits_

# Perform the case convertion

In [None]:
import org.apache.spark.sql.function{concat, col, lit, upper, initcap, lower}

In [None]:
employeesDF.
    select("employee_id","nationality").
    withColumn("nationality_upper", upper($"nationality")).
    withColumn("nationality_lower", lower(col("nationality"))).
    withColumn("nationality_initcap", initcap(employeesDF("nationality"))).
    withColumn("nationality_length", length(col("nationality"))).show

In [None]:
employeesDF.
    select("employee_id","phone_number","ssn").
    withColumn("phone_last4",substring($"phone_number",-4,4).cast("int")).
    withColumn("ssn_last8", substring($"ssn",8,4).cast("int")).show

In [None]:
employeesDF.
    select($"employee_id",$"phone_number",$"ssn", substring($"phone_number",-4,4).cast("int").alias("phone_last4"),
    substring($"ssn",8,4).cast("int").alias("ssn_last8")).show

In [None]:
### String manipulation

In [None]:
import org.apache.spark.sql.function.{split,lit}

In [None]:
val l=List("X")
val df = l.toDF("dummy")

In [None]:
df.select(split(lit("hello every one this is rigo tsouapi")," "){0}).show(false)

In [None]:
employeesDF.
    select("employee_id","phone_number","ssn").
    withColumn("area_code",split($"phone_number", " "){1}.cast("int")).
    withColumn("phone_last4", split($"phone_number"," "){3}.cast("int")).
    withColumn("ssn_last4", split($"ssn", " "){2}.cast("int")).show

In [None]:
employeesDF.
    select($"employee_id",$"phone_number",$"ssn", split($"phone_number"," "){0}.cast("int").alias("phone_last4"),
    split($"ssn"," "){1}.cast("int").alias("ssn_last8")).show

### Concatatanate string


In [None]:
import org.apache.spark.sql.function.{concat,lit}

In [None]:
import spark.implicit._

In [None]:
employeesDF.
  withColumn("full_name", concat($"first_name",$"last_name")).show

In [None]:
employeesDF.
  withColumn("fill_name", concat($"first_name", lit(","),$"last_name")).show

## padding 

In [None]:
#lpad, rpad
import org.apache.park.sql.function.{lit,lpad}
val l = List("X")
val df = l.toDF("dummy")

In [None]:
df.select(lpad(lit("hello world"),10,"-").alias("dummy")).show

In [None]:
df.select(lpad(lit(2), 2,"0").alias("dummy")).show

#### Assignment padding the columns 

In [None]:
val empFixedDF = employeesDF.select(
    concat(
        lpad($"employee_id", 5, "0"),
        rpad($"first_name", 10, "-"),
        rpad($"last_name", 10, "-"),
        lpad($"salary", 10, "0"),
        rpad($"nationality", 15, "-"),
        rpad($"phone_number", 17, "-"),
        $"ssn"
    ).alias("employee")
)

### trimming function

In [None]:
import org.apache.spark.sql.functions.{concat, ltrim, rtrim, trim}

In [None]:
val l= List("   Hello.    ")
val df=l.toDF("dummy")
df.show

In [None]:
df.withColumn("ltrim", ltrim(col("dummy"))).
    withColumn("rtrim", rtrim(col("dummy"))).
    withColumn("trim", trim(col("dummy"))).
    withColumn("rmv_dot_trim", trim(trim(col("dummy")),".") # remove dot
).show

### 97 date and time

In [None]:
import org.apache.spark.sql.functions.{current_date, currente_timestamp}

In [None]:
val l= List("   Hello.    ")
val df=l.toDF("dummy")
df.show

In [None]:
df.select(current_date.alias("current_date")).show
df.select(current_timestamp.alias("current_timestamp")).show

### Date and time - Arithmetique

In [None]:
val datetimes = List(("2014-02-28", "2014-02-28 10:00:00.123"),
                     ("2016-02-29", "2016-02-29 08:08:08.999"),
                     ("2017-10-31", "2017-12-31 11:59:59.123"),
                     ("2019-11-30", "2019-08-31 00:00:00.000")
                    )

In [None]:
val datetimesDF = datetimes.toDF("date", "time")

In [None]:
datetimesDF.show

In [None]:
import org.apache.spark.sql.functions.{date_add,date_sub, col}

In [None]:
import spark.implicits._

In [None]:
# Assignment 
# Add 10 days to date 

In [None]:
datetimesDF.
    withColumn("add_date_date",date_add($"date",10)).
    withColumn("add_date_time",date_add($"time",10)).
    withColumn("add_sub_date",date_sub($"date", 10)).  #date_add($"date",-10)
    withColumn("add_date_time",date_add($"time",10)
              ).show(false)

In [None]:
import org.apache.spark.sql.function.{current_date, current_timestamp, datediff}

In [None]:
datetimesDF.
    withColumn("datediff_date", datediff(current_date, $"date")).
    withColumn("datediff_timestamp", datediff(current_timestamp, $"time")).
    show(false)

In [None]:
import org.apache.spark.sql.function.{months_between, add_months, round}

In [None]:
datetimesDF.
    withColumn("months_between_date", round(months_between(current_date, $"date"),2)).
    withColumn("months_between_timestamp",round(months_between(current_timestamp, $"time"),2)).
    withColumn("add_months_date",add_months($"date",3)).
    withColumn("add_months_time",add_months($"time",3)).
    show(false)