# Danske Bank account exploratory analysis

In [None]:
import $ivy.`org.apache.spark::spark-sql:2.4.5`

In [None]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

In [None]:
import org.apache.spark.sql._

implicit val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}

In [None]:
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.functions.udf
                                       
val updateTimestamp = udf((timestamp: String, index: Long) => {
    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
    val dateTime = LocalDateTime.parse(timestamp, formatter)
    dateTime.plusSeconds(index).toString
})

In [None]:
def withCleanColumnNames(df: DataFrame)(implicit spark: SparkSession): DataFrame =
    df.columns.foldLeft(df)((df, col) =>
            df.withColumnRenamed(
                existingName = col,
                newName = col.replaceAll("ø", "o")
            )
    )

In [None]:
def withFloat(columnName: String, df: DataFrame)(implicit spark: SparkSession): DataFrame =
    df.withColumn(columnName, regexp_replace(col(columnName), "\\.", ""))
      .withColumn(columnName, regexp_replace(col(columnName), ",", "."))
      .withColumn(columnName, col(columnName).cast("double"))

In [None]:
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

def withColumnIndex(df: DataFrame)(implicit spark: SparkSession): DataFrame = {
    spark.sqlContext.createDataFrame(
      df.rdd.zipWithIndex.map {
        case (row, index) => Row.fromSeq(row.toSeq :+ index)
      },
      // Create schema for index column
      StructType(df.schema.fields :+ StructField("index", LongType, false)))
  }

In [None]:
import org.apache.spark.sql.functions._

val dankortDF = spark.read
      .option("header", "true")
      .option("encoding", "iso-8859-1")
      .csv("dankort.csv")

val withIndex = dankortDF.transform(withCleanColumnNames)
    .transform(withColumnIndex)

val transformedDF = withIndex
    .withColumn("Dato", to_timestamp(col("Dato"), "dd.MM.yyyy"))
    .withColumn("Dato", updateTimestamp(col("Dato"), col("index")))
    .filter(col("Dato").gt("2019-05-26"))
    .transform(df => withFloat("Saldo", df))
    .transform(df => withFloat("Belob", df))
    .sort("Dato")

In [None]:
transformedDF.show(100)

In [None]:
import $ivy.`org.plotly-scala::plotly-almond:0.7.1`
import plotly._, plotly.element._, plotly.layout._, plotly.Almond._

// if you want to have the plots available without an internet connection:
init(offline=true)

// restrict the output height to avoid scrolling in output cells
repl.pprinter() = repl.pprinter().copy(defaultHeight = 3)

# What is the trend for the given data?

In [None]:
val (x, y) = transformedDF.select("Dato", "Saldo")
    .collect.map(r=>(r(0).toString, r(1).toString.toDouble)).toList.unzip
Bar(x, y).plot()

# Which is the most expensive transaction for each month of each year?

In [None]:
val dankortDateDF = transformedDF.withColumn("Dato", to_date(col("Dato")))

In [None]:
dankortDateDF.createOrReplaceTempView("dankort")

In [None]:
spark.sql("""
    select Dato, Belob, Tekst from (
    select *, row_number() OVER (PARTITION BY (year(Dato), month(Dato)) ORDER BY Belob asc) as rn from dankort
    ) tmp where rn = 1
 """).show(100, false)