<img src="./arch.png"/>

#  Getting Started with Snowpark

In this notebook, we are going to use the [Snowpark](https://docs.snowflake.com/en/developer-guide/snowpark/index.html) Scala API. We will use a DEMO database with schema TELCO to demonstrate some new features of snowflake.

## Configure the Jupyter Notebook

In this step, we will write some boiler palte code to include important Maven repository for the required Snowpark libraries, configure the compiler for the Scala REPL, and set-up a directory for classes generated by the Scala REPL.

In [16]:
import sys.process._

val osgeoRepo = coursierapi.MavenRepository.of("https://repo.osgeo.org/repository/release")
interp.repositories() ++= Seq(osgeoRepo)

import sys.process._
val replClassPath = "/home/jovyan/work/repl"
s"mkdir -p $replClassPath" !

interp.configureCompiler(_.settings.outputDirs.setSingleOutput(replClassPath))
interp.configureCompiler(_.settings.Yreplclassbased)
interp.load.cp(os.Path(replClassPath))

[32mimport [39m[36msys.process._

[39m
[36mosgeoRepo[39m: [32mcoursierapi[39m.[32mMavenRepository[39m = MavenRepository(https://repo.osgeo.org/repository/release)
[32mimport [39m[36msys.process._
[39m
[36mreplClassPath[39m: [32mString[39m = [32m"/home/jovyan/work/repl"[39m
[36mres15_5[39m: [32mInt[39m = [32m0[39m

## Import Snowpark Libraries and Open a session

In this step, snowpark libraries are imported and a new session is opened. In order to open a session, we will add the parameters to a text file called telco_connection.txt. 

Navigate to **/home/jovyan/work**, there is a file called telco_connection.txt that can be customized to your Snowflake Account Name, your credentials, and the virtual warehouse name you want to use for this tutorial. 
We will demonstrate Snowpark's ability to perform ELT operations

In [17]:
// Import the Snowpark library from Maven.
import $ivy.`com.snowflake:snowpark:0.8.0`

import com.snowflake.snowpark._
import com.snowflake.snowpark.types._
import com.snowflake.snowpark.functions._

val session = Session.builder.configFile("/home/jovyan/telecom/telco_connection.txt").create

// Add the directory for REPL classes that you created earlier.
session.addDependency(replClassPath)

// Add the classes for Ammonite as dependencies.
// Required for UDF creation
def addClass(session: Session, className: String): String = {
  var cls1 = Class.forName(className)
  val resourceName = "/" + cls1.getName().replace(".", "/") + ".class"
  val url = cls1.getResource(resourceName)
  val path = url.getPath().split(":").last.split("!").head
  session.addDependency(path)
  path
}

addClass(session, "pprint.TPrintColors")

[32mimport [39m[36m$ivy.$                             

[39m
[32mimport [39m[36mcom.snowflake.snowpark._
[39m
[32mimport [39m[36mcom.snowflake.snowpark.types._
[39m
[32mimport [39m[36mcom.snowflake.snowpark.functions._

[39m
[36msession[39m: [32mSession[39m = com.snowflake.snowpark.Session@689b621c
defined [32mfunction[39m [36maddClass[39m
[36mres16_7[39m: [32mString[39m = [32m"/home/jovyan/.cache/coursier/v1/https/repo1.maven.org/maven2/com/lihaoyi/pprint_2.12/0.5.9/pprint_2.12-0.5.9.jar"[39m

In [18]:
// Parameters for run:
val trainDb="DE_DEMO_TKO_HOL"
val trainSchema="TELCO"

[36mtrainDb[39m: [32mString[39m = [32m"DE_DEMO_TKO_HOL"[39m
[36mtrainSchema[39m: [32mString[39m = [32m"TELCO"[39m

In [19]:
val rawData = session.table("RAW_PARQUET")
//rawData.show(5)
val setContext = "USE SCHEMA " + trainDb + "." + trainSchema + " ;"
session.sql(setContext).collect()

[36mrawData[39m: [32mUpdatable[39m = com.snowflake.snowpark.Updatable@49f52f0f
[36msetContext[39m: [32mString[39m = [32m"USE SCHEMA DE_DEMO_TKO_HOL.TELCO ;"[39m
[36mres18_2[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(Row[Statement executed successfully.])

# STEP 5 - Snowpark Transformations

The Snowpark API provides programming language constructs for building SQL statements. It's a new developer experience which enables us to build code in :-

<b><li>  Language of our choice </li></b>
<b><li> Tool of our choice and </li></b>
<b><li> Lazy execution to prevent multiple network hops to server </li></b>

Once the customer data is available in the RAW schema, we can use snowpark to create dimensions and fact tables. We will use the RAW_PARQUET table to create following tables -
    
<li> DEMOGRAPHICS </li>
<li> LOCATION </li>
<li> STATUS </li>
<li> SERVICES </li>

We will also transform and clean the data using Snowpark dataframe API


In [20]:
val dfDemographics = session.table("RAW_PARQUET")
   .select(col("CUSTOMERID"),
           replace(col("COUNT"),lit("NULL"),lit(1)).name("COUNT"),
          replace(col("GENDER"),lit("NULL"),lit("Male")).name("GENDER"),
           replace(col("SENIOR CITIZEN"),lit("NULL"),lit('N')).name("SENIORCITIZEN"),
           replace(col("PARTNER"),lit("NULL"),lit('N')).name("PARTNER"),
           replace(col("DEPENDENTS"),lit("NULL"),lit('N')).name("DEPENDENTS")          
          )

//dfRaw.schema
//dfRaw.show


dfDemographics.write
   .mode(SaveMode.Append)
   .saveAsTable("DEMOGRAPHICS")

[36mdfDemographics[39m: [32mDataFrame[39m = com.snowflake.snowpark.DataFrame@7a90a9f8

In [21]:
val dfLocation = session.table("RAW_PARQUET")
   .select(col("CUSTOMERID"),
           replace(col("COUNTRY"),lit("NULL"),lit("None")).name("COUNTRY"),
           replace(col("STATE"),lit("NULL"),lit("None")).name("STATE"),
           replace(col("CITY"),lit("NULL"),lit("None")).name("CITY"),
           replace(col("ZIP CODE"),lit("NULL"),lit(0)).name("ZIPCODE"),
           replace(col("LAT LONG"),lit("NULL"),lit("0.0,0.0")).name("LATLONG"),
           replace(col("LATITUDE"),lit("NULL"),lit("0.0")).name("LATITUDE"),
           replace(col("LONGITUDE"),lit("NULL"),lit("0.0")).name("LONGITUDE")          
          )


dfLocation.write
   .mode(SaveMode.Append)
   .saveAsTable("LOCATION")

[36mdfLocation[39m: [32mDataFrame[39m = com.snowflake.snowpark.DataFrame@406682e5

In [22]:
val dfServices = session.table("RAW_PARQUET")
           .select(col("CUSTOMERID"),
           col("TENURE MONTHS").as("TENUREMONTHS"),
           replace(col("PHONE SERVICE"),lit("NULL"),lit('N')).name("PHONESERVICE"),
           replace(col("MULTIPLE LINES"),lit("NULL"),lit("No")).name("MULTIPLELINES"),
           replace(col("INTERNET SERVICE"),lit("NULL"),lit("No")).name("INTERNETSERVICE"),
           replace(col("ONLINE SECURITY"),lit("NULL"),lit("No")).name("ONLINESECURITY"),
           replace(col("ONLINE BACKUP"),lit("NULL"),lit("No")).name("ONLINEBACKUP"),
           replace(col("DEVICE PROTECTION"),lit("NULL"),lit("No")).name("DEVICEPROTECTION"),
           replace(col("TECH SUPPORT"),lit("NULL"),lit('N')).name("TECHSUPPORT"),
           replace(col("STREAMING TV"),lit("NULL"),lit("No")).name("STREAMINGTV"),
           replace(col("STREAMING MOVIES"),lit("NULL"),lit("No")).name("STREAMINGMOVIES"),
           replace(col("CONTRACT"),lit("NULL"),lit("Month-to-month")).name("CONTRACT"),
           replace(col("PAPERLESS BILLING"),lit("NULL"),lit('Y')).name("PAPERLESSBILLING"),
           replace(col("PAYMENT METHOD"),lit("NULL"),lit("Mailed check")).name("PAYMENTMETHOD"),
           replace(col("MONTHLY CHARGES"),lit(" "),lit(20)).name("MONTHLYCHARGES"),
           replace(col("TOTAL CHARGES"),lit(" "),lit(0)).name("TOTALCHARGES"),
           replace(col("CHURN VALUE"),lit(" "),lit(0)).name("CHURNVALUE")        
              
          )


dfServices.write
   .mode(SaveMode.Append)
   .saveAsTable("SERVICES")

[36mdfServices[39m: [32mDataFrame[39m = com.snowflake.snowpark.DataFrame@6613ee9c

In [23]:
val dfStatus = session.table("RAW_PARQUET")
   .select(col("CUSTOMERID"),
           replace(col("CHURN LABEL"),lit("NULL"),lit('N')).name("CHURNLABEL"),
          replace(col("CHURN VALUE"),lit("NULL"),lit(0)).name("CHURNVALUE"),
           replace(col("CHURN SCORE"),lit("NULL"),lit(50)).name("CHURNSCORE"),
           replace(col("CLTV"),lit("NULL"),lit(0)).name("CLTV"),
           replace(col("CHURN REASON"),lit("NULL"),lit("do not know")).name("CHURNREASON")          
          )

//dfRaw.schema
//dfRaw.show

dfStatus.write
   .mode(SaveMode.Append)
   .saveAsTable("STATUS")

[36mdfStatus[39m: [32mDataFrame[39m = com.snowflake.snowpark.DataFrame@7008ee70

# Lets look at an example query

This shows one of many uses of snowpark. You can run dataframes lazily.

In [24]:
val dfLoc = session.table("LOCATION")
val dfServ = session.table("SERVICES")

val dfJoin = dfLoc.join(dfServ,dfLoc.col("CUSTOMERID") === dfServ.col("CUSTOMERID"))

val dfResult = dfJoin.select(
                            col("CITY"),col("CONTRACT"),col("TOTALCHARGES"))
                            .groupBy(col("CITY"),col("CONTRACT"))
                            .sum(col("TOTALCHARGES"))

dfResult.show()

---------------------------------------------------------
|"CITY"          |"CONTRACT"      |"SUM(TOTALCHARGES)"  |
---------------------------------------------------------
|Beverly Hills   |Month-to-month  |54131.15             |
|Lynwood         |Month-to-month  |19307.8              |
|Marina Del Rey  |Month-to-month  |102309.05            |
|La Habra        |Month-to-month  |6828.35              |
|Pico Rivera     |Month-to-month  |55887.25             |
|Los Alamitos    |Month-to-month  |1549.9               |
|San Pedro       |Month-to-month  |60210.75             |
|Carson          |Month-to-month  |79442.85             |
|Long Beach      |Month-to-month  |770477.45            |
|Pasadena        |One year        |304853.8             |
---------------------------------------------------------



[36mdfLoc[39m: [32mUpdatable[39m = com.snowflake.snowpark.Updatable@6e172abd
[36mdfServ[39m: [32mUpdatable[39m = com.snowflake.snowpark.Updatable@41f271a1
[36mdfJoin[39m: [32mDataFrame[39m = com.snowflake.snowpark.DataFrame@7f9f3566
[36mdfResult[39m: [32mDataFrame[39m = com.snowflake.snowpark.DataFrame@523df2c0

# STEP 6: Building features using serverless tasks

Snowflake now supports Serverless tasks for scheduling and chaining multiple tasks. Advantage for using this option is that you don't have to define a predefined warehouse to execute the statements. Compute utilization can ramp up and down based on the workload.

We can use this feature to build features and store it our feature store. This way Data science team use the final output of this end to end pipeline to find the Customer churn

In [25]:
print("End of Lab for snowpark transformation")

End of Lab for snowpark transformation