<a href="https://colab.research.google.com/github/sjtalkar/DP-203-Azure-Data-Engineering-Notes/blob/main/SparkAndDeltaLake.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [7]:
# %%shell
# SCALA_VERSION=2.12.8 ALMOND_VERSION=0.3.0+16-548dc10f-SNAPSHOT
# curl -Lo coursier https://git.io/coursier-cli
# chmod +x coursier
# ./coursier bootstrap \
#     -r jitpack -r sonatype:snapshots \
#     -i user -I user:sh.almond:scala-kernel-api_$SCALA_VERSION:$ALMOND_VERSION \
#     sh.almond:scala-kernel_$SCALA_VERSION:$ALMOND_VERSION \
#     --sources --default=true \
#     -o almond-snapshot --embed-files=false
# rm coursier
# ./almond-snapshot --install --global --force
# rm almond-snapshot

In [8]:
# %%shell
# echo "{
#   \"language\" : \"scala\",
#   \"display_name\" : \"Scala\",
#   \"argv\" : [
#     \"bash\",
#     \"-c\",
#     \"env LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libpython3.6m.so:\$LD_PRELOAD java -jar /usr/local/share/jupyter/kernels/scala/launcher.jar --connection-file {connection_file}\"
#   ]
# }" > /usr/local/share/jupyter/kernels/scala/kernel.json

In [9]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!tar xf 'gdrive/My Drive/Databricks/spark-3.0.0-bin-hadoop3.2.tgz' 
!pip -q install findspark
!pip install ipython-sql



In [10]:
!pwd

/content


In [11]:
%ls 'gdrive/My Drive/Databricks/spark-3.0.0-bin-hadoop3.2.tgz'

'gdrive/My Drive/Databricks/spark-3.0.0-bin-hadoop3.2.tgz'


In [12]:
%ls /content

[0m[01;34mgdrive[0m/  [01;34msample_data[0m/  [01;34mspark-3.0.0-bin-hadoop3.2[0m/


In [13]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:0.7.0 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog pyspark-shell'


In [14]:
import findspark
findspark.init()

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('delta_session').getOrCreate()

In [16]:
from pyspark.sql.types import *

In [17]:
# create database to house SQL tables
_ = spark.sql('CREATE DATABASE IF NOT EXISTS kkbox')

In [18]:
transaction_schema = StructType([
            StructField('msno', StringType()),
            StructField('payment_method_id', IntegerType()),
            StructField('payment_plan_days', IntegerType()),
            StructField('plan_list_price', IntegerType()),
            StructField('actual_amount_paid', IntegerType()),
            StructField('is_auto_renew', IntegerType()),
            StructField('transaction_date', DateType()),
            StructField('membership_expire_date', DateType()),
            StructField('is_cancel', IntegerType()),
            ])
#read data from csv
transactions = (
                spark
                  .read
                  .csv(
                      'gdrive/My Drive/Databricks/transactions.csv',
                       schema=transaction_schema,
                       header=True,
                       dateFormat = 'yyyyMMdd'
                  )
)

In [19]:
# Work with the data using DELTA

In [20]:
#persist in delta lake format - essentially a parquet file with additional features such as history and versioning
# This creates  folders called  G:\My Drive\Databricks\transactions\transaction_date=2015-01-01 ......
(
  transactions
    .write
    .format('delta')
    .partitionBy('transaction_date')
    .mode('overwrite')
    .save('gdrive/My Drive/Databricks/kkbox/transactions')    
)

# New Section

In [21]:
spark.sql("""
    DROP  TABLE  IF EXISTS kkbox.transactions
""")

DataFrame[]

In [22]:
spark.sql("""
    CREATE TABLE  kkbox.transactions 
    USING DELTA
    LOCATION 'gdrive/My Drive/Databricks/kkbox/transactions'
""")

DataFrame[]

In [23]:
capture = spark.sql('''
SELECT * FROM  kkbox.transactions LIMIT 10
''')

In [24]:
capture.show()

+--------------------+-----------------+-----------------+---------------+------------------+-------------+----------------+----------------------+---------+
|                msno|payment_method_id|payment_plan_days|plan_list_price|actual_amount_paid|is_auto_renew|transaction_date|membership_expire_date|is_cancel|
+--------------------+-----------------+-----------------+---------------+------------------+-------------+----------------+----------------------+---------+
|+2HVGotjiE2ofWgVp...|               41|               30|             99|                99|            1|      2017-03-31|            2017-04-30|        0|
|+5Yo2rxxC+x+kIYl4...|               34|               30|            149|               149|            1|      2017-03-31|            2017-04-30|        0|
|+BForXQeVUWKHbTM/...|               39|               30|            149|               149|            1|      2017-03-31|            2017-05-16|        0|
|+L/XrtIg0DD9ku+ik...|               33|            

In [25]:

# members dataset schema
member_schema = StructType([
  StructField('msno', StringType()),
  StructField('city', IntegerType()),
  StructField('bd', IntegerType()),
  StructField('gender', StringType()),
  StructField('registered_via', IntegerType()),
  StructField('registration_init_time', DateType())
  ])

# read data from csv
members = (
  spark
    .read
    .csv(
      'gdrive/My Drive/Databricks/members.csv',
      schema=member_schema,
      header=True,
      dateFormat='yyyyMMdd'
      )
    )



In [26]:
# persist in delta lake format
(
  members
    .write
    .format('delta')
    .mode('overwrite')
    .save('gdrive/My Drive/Databricks/kkbox/members')
  )


In [27]:

  # create table object to make delta lake queriable
_ = spark.sql('''
    CREATE TABLE kkbox.members 
    USING DELTA 
    LOCATION 'gdrive/My Drive/Databricks/kkbox/members'
    ''')

In [28]:

#Load User Logs Table

_ = spark.sql('DROP TABLE IF EXISTS kkbox.user_logs')

# drop any old delta lake files that might have been created
#shutil.rmtree('gdrive/My Drive/Databricks/kkbox/user_logs', ignore_errors=True)

# user logs dataset schema
user_logs_schema = StructType([ 
  StructField('msno', StringType()),
  StructField('date', DateType()),
  StructField('num_25', IntegerType()),
  StructField('num_50', IntegerType()),
  StructField('num_75', IntegerType()),
  StructField('num_985', IntegerType()),
  StructField('num_100', IntegerType()),
  StructField('num_uniq', IntegerType()),
  StructField('total_secs', FloatType())  
  ])

# read data from csv
user_logs = (
  spark
    .read
    .csv(
      'gdrive/My Drive/Databricks/user_logs.csv',
      schema=user_logs_schema,
      header=True,
      dateFormat='yyyyMMdd'
      )
    )


In [29]:

# persist in delta lake format
( user_logs
    .write
    .format('delta')
    .partitionBy('date')
    .mode('overwrite')
    .save('gdrive/My Drive/Databricks/kkbox/user_logs')
  )

In [30]:


# create table object to make delta lake queriable
_ = spark.sql('''
  CREATE TABLE IF NOT EXISTS kkbox.user_logs
  USING DELTA 
  LOCATION 'gdrive/My Drive/Databricks/kkbox/user_logs'
  ''')

In [31]:
capture = spark.sql('''
SELECT * FROM  kkbox.members LIMIT 10
''')

In [32]:
capture.show()

+--------------------+----+---+------+--------------+----------------------+
|                msno|city| bd|gender|registered_via|registration_init_time|
+--------------------+----+---+------+--------------+----------------------+
|Rb9UwLQTrxzBVwCB6...|   1|  0|  null|            11|            2011-09-11|
|+tJonkh+O1CA796Fm...|   1|  0|  null|             7|            2011-09-14|
|cV358ssn7a0f7jZOw...|   1|  0|  null|            11|            2011-09-15|
|9bzDeJP6sQodK73K5...|   1|  0|  null|            11|            2011-09-15|
|WFLY3s7z4EZsieHCt...|   6| 32|female|             9|            2011-09-15|
|yLkV2gbZ4GLFwqTOX...|   4| 30|  male|             9|            2011-09-16|
|jNCGK78YkTyId3H3w...|   1|  0|  null|             7|            2011-09-16|
|WH5Jq4mgtfUFXh2yz...|   5| 34|  male|             9|            2011-09-16|
|tKmbR4X5VXjHmxERr...|   5| 19|  male|             9|            2011-09-17|
|I0yFvqMoNkM8ZNHb6...|  13| 63|  male|             9|            2011-09-18|

In [33]:
# Step 2: Acquire Churn Labels
# To build our model, we will need to identify which customers have churned within two periods of interest.
# These periods are February 2017 and March 2017. We will train our model to predict churn in February 2017 and then evaluate our model's ability 
# to predict churn in March 2017, making these our training and testing datasets, respectively.

# Per instructions provided in the Kaggle competition, a KKBox subscriber is not identified as churned until he or she fails to renew their 
#subscription 30-days following its expiration. Most subscriptions are themselves on a 30-day renewal schedule (though some subscriptions renew on significantly longer cycles). This means that identifying churn involves a sequential walk through the customer data, looking for renewal gaps that would indicate a customer churned on a prior expiration date.

# While the competition makes available pre-labeled training and testing datasets, train.csv and train_v2.csv, respectively, several past 
#participants have noted that these datasets should be regenerated. A Scala script for doing so is provided by KKBox. Modifying the script for this environment, we might regenerate our training and test datasets as follows:

In [None]:
# %scala

# import java.time.{LocalDate}
# import java.time.format.DateTimeFormatter
# import java.time.temporal.ChronoUnit

# import org.apache.spark.sql.{Row, SparkSession}
# import org.apache.spark.sql.functions._
# import scala.collection.mutable

# def calculateLastday(wrappedArray: mutable.WrappedArray[Row]) :String ={
#   val orderedList = wrappedArray.sortWith((x:Row, y:Row) => {
#     if(x.getAs[String]("transaction_date") != y.getAs[String]("transaction_date")) {
#       x.getAs[String]("transaction_date") < y.getAs[String]("transaction_date")
#     } else {
      
#       val x_sig = x.getAs[String]("plan_list_price") +
#         x.getAs[String]("payment_plan_days") +
#         x.getAs[String]("payment_method_id")

#       val y_sig = y.getAs[String]("plan_list_price") +
#         y.getAs[String]("payment_plan_days") +
#         y.getAs[String]("payment_method_id")

#       //same plan, always subscribe then unsubscribe
#       if(x_sig != y_sig) {
#         x_sig > y_sig
#       } else {
#         if(x.getAs[String]("is_cancel")== "1" && y.getAs[String]("is_cancel") == "1") {
#           //multiple cancel, consecutive cancels should only put the expiration date earlier
#           x.getAs[String]("membership_expire_date") > y.getAs[String]("membership_expire_date")
#         } else if(x.getAs[String]("is_cancel")== "0" && y.getAs[String]("is_cancel") == "0") {
#           //multiple renewal, expiration date keeps extending
#           x.getAs[String]("membership_expire_date") < y.getAs[String]("membership_expire_date")
#         } else {
#           //same day same plan transaction: subscription preceeds cancellation
#           x.getAs[String]("is_cancel") < y.getAs[String]("is_cancel")
#         }
#       }
#     }
#   })
#   orderedList.last.getAs[String]("membership_expire_date")
# }

# def calculateRenewalGap(log:mutable.WrappedArray[Row], lastExpiration: String): Int = {
#   val orderedDates = log.sortWith((x:Row, y:Row) => {
#     if(x.getAs[String]("transaction_date") != y.getAs[String]("transaction_date")) {
#       x.getAs[String]("transaction_date") < y.getAs[String]("transaction_date")
#     } else {
      
#       val x_sig = x.getAs[String]("plan_list_price") +
#         x.getAs[String]("payment_plan_days") +
#         x.getAs[String]("payment_method_id")

#       val y_sig = y.getAs[String]("plan_list_price") +
#         y.getAs[String]("payment_plan_days") +
#         y.getAs[String]("payment_method_id")

#       //same data same plan transaction, assumption: subscribe then unsubscribe
#       if(x_sig != y_sig) {
#         x_sig > y_sig
#       } else {
#         if(x.getAs[String]("is_cancel")== "1" && y.getAs[String]("is_cancel") == "1") {
#           //multiple cancel of same plan, consecutive cancels should only put the expiration date earlier
#           x.getAs[String]("membership_expire_date") > y.getAs[String]("membership_expire_date")
#         } else if(x.getAs[String]("is_cancel")== "0" && y.getAs[String]("is_cancel") == "0") {
#           //multiple renewal, expire date keep extending
#           x.getAs[String]("membership_expire_date") < y.getAs[String]("membership_expire_date")
#         } else {
#           //same date cancel should follow subscription
#           x.getAs[String]("is_cancel") < y.getAs[String]("is_cancel")
#         }
#       }
#     }
#   })

#   //Search for the first subscription after expiration
#   //If active cancel is the first action, find the gap between the cancellation and renewal
#   val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
#   var lastExpireDate = LocalDate.parse(s"${lastExpiration.substring(0,4)}-${lastExpiration.substring(4,6)}-${lastExpiration.substring(6,8)}", formatter)
#   var gap = 9999
#   for(
#     date <- orderedDates
#     if gap == 9999
#   ) {
#     val transString = date.getAs[String]("transaction_date")
#     val transDate = LocalDate.parse(s"${transString.substring(0,4)}-${transString.substring(4,6)}-${transString.substring(6,8)}", formatter)
#     val expireString = date.getAs[String]("membership_expire_date")
#     val expireDate = LocalDate.parse(s"${expireString.substring(0,4)}-${expireString.substring(4,6)}-${expireString.substring(6,8)}", formatter)
#     val isCancel = date.getAs[String]("is_cancel")

#     if(isCancel == "1") {
#       if(expireDate.isBefore(lastExpireDate)) {
#         lastExpireDate = expireDate
#       }
#     } else {
#       gap = ChronoUnit.DAYS.between(lastExpireDate, transDate).toInt
#     }
#   }
#   gap
# }

# val data = spark
#   .read
#   .option("header", value = true)
#   .csv("/mnt/kkbox/transactions/")

# val historyCutoff = "20170131"

# val historyData = data.filter(col("transaction_date")>="20170101" and col("transaction_date")<=lit(historyCutoff))
# val futureData = data.filter(col("transaction_date") > lit(historyCutoff))

# val calculateLastdayUDF = udf(calculateLastday _)
# val userExpire = historyData
#   .groupBy("msno")
#   .agg(
#     calculateLastdayUDF(
#       collect_list(
#         struct(
#           col("payment_method_id"),
#           col("payment_plan_days"),
#           col("plan_list_price"),
#           col("transaction_date"),
#           col("membership_expire_date"),
#           col("is_cancel")
#         )
#       )
#     ).alias("last_expire")
#   )

# val predictionCandidates = userExpire
#   .filter(
#     col("last_expire") >= "20170201" and col("last_expire") <= "20170228"
#   )
#   .select("msno", "last_expire")


# val joinedData = predictionCandidates
#   .join(futureData,Seq("msno"), "left_outer")

# val noActivity = joinedData
#   .filter(col("payment_method_id").isNull)
#   .withColumn("is_churn", lit(1))


# val calculateRenewalGapUDF = udf(calculateRenewalGap _)
# val renewals = joinedData
#   .filter(col("payment_method_id").isNotNull)
#   .groupBy("msno", "last_expire")
#   .agg(
#     calculateRenewalGapUDF(
#       collect_list(
#         struct(
#           col("payment_method_id"),
#           col("payment_plan_days"),
#           col("plan_list_price"),
#           col("transaction_date"),
#           col("membership_expire_date"),
#           col("is_cancel")
#         )
#       ),
#       col("last_expire")
#     ).alias("gap")
#   )

# val validRenewals = renewals.filter(col("gap") < 30)
#   .withColumn("is_churn", lit(0))
# val lateRenewals = renewals.filter(col("gap") >= 30)
#   .withColumn("is_churn", lit(1))

# val resultSet = validRenewals
#   .select("msno","is_churn")
#   .union(
#     lateRenewals
#       .select("msno","is_churn")
#       .union(
#         noActivity.select("msno","is_churn")
#       )
#   )

# resultSet.write.format("delta").mode("overwrite").save("/mnt/kkbox/silver/train/")