In [2]:
import org.apache.spark.sql.functions.{ col, expr, udf, _ }
import org.apache.spark.sql.{ SparkSession, _ }

import org.apache.spark.sql.functions.{col, expr, udf, _}
import org.apache.spark.sql.{SparkSession, _}
load: (multiline: String, delimiter: String, header: String, encoding: String, srcPath: String)org.apache.spark.sql.DataFrame


In [48]:
//load data
def load(multiline: String, delimiter: String, header: String, encoding: String, srcPath: String): DataFrame = {
    spark.read
      .option("multiLine", multiline)
      .option("delimiter", delimiter)
      .option("header", header)
      .option("escape", "\"")
      .option("encoding", encoding)
      .csv(srcPath)
}

val srcPath = "s3a://telenor-se-dataplatform-prod-sthlm-dswbexport/test/IN/ingestion_date=2021-04-10/*"
val InputDf = load("false", ",", "true", "utf-8", srcPath)

load: (multiline: String, delimiter: String, header: String, encoding: String, srcPath: String)org.apache.spark.sql.DataFrame
srcPath: String = s3a://telenor-se-dataplatform-prod-sthlm-dswbexport/test/IN/ingestion_date=2021-04-10/*
InputDf: org.apache.spark.sql.DataFrame = [VendorID: string, tpep_pickup_datetime: string ... 16 more fields]


In [49]:
// structure the data with right schema  
def getTimestampformat(col: String, timestampFormat: String) = {

    if (!timestampFormat.isEmpty && timestampFormat.contains("#")) {
      val pairs = timestampFormat.split("#").map(t => t.split("->")).map {
        case Array(k, v) => k -> v
      }.toMap
      pairs(col)
    } else if (!timestampFormat.isEmpty) {
      timestampFormat.split("->")(1)
    } else timestampFormat
  }  

def convertTypes(df: DataFrame, conversions: String, timestampFormat: String): DataFrame = {
    if (!conversions.isEmpty) {
      conversions.toLowerCase.split("#").map(t => t.split("->")).foldLeft(df) {
        (cdf, toType) =>
          val c = toType(0).trim()
          val ctype = toType(1).trim()
          if (ctype.toLowerCase() == "timestamp") {
            val ts = unix_timestamp(col(c), getTimestampformat(c, timestampFormat).toString).cast("timestamp")
            cdf.withColumn(c, ts)
          } else {
            cdf.withColumn(c, col(c).cast(ctype))
          }
      }
    } else {
      df
    }
  }

val conv = "tpep_pickup_datetime->timestamp#tpep_dropoff_datetime->timestamp#passenger_count->double#trip_distance->double#fare_amount->double#mta_tax->double#tip_amount->double#tolls_amount->double#improvement_surcharge->double#total_amount->double#congestion_surcharge->double"
val tsFormat = "tpep_pickup_datetime->yyyy-MM-dd HH:mm:ss#tpep_dropoff_datetime->yyyy-MM-dd HH:mm:ss"
val withDataTypesDf = convertTypes(InputDf, conv, tsFormat)

getTimestampformat: (col: String, timestampFormat: String)String
convertTypes: (df: org.apache.spark.sql.DataFrame, conversions: String, timestampFormat: String)org.apache.spark.sql.DataFrame
conv: String = tpep_pickup_datetime->timestamp#tpep_dropoff_datetime->timestamp#passenger_count->double#trip_distance->double#fare_amount->double#mta_tax->double#tip_amount->double#tolls_amount->double#improvement_surcharge->double#total_amount->double#congestion_surcharge->double
tsFormat: String = tpep_pickup_datetime->yyyy-MM-dd HH:mm:ss#tpep_dropoff_datetime->yyyy-MM-dd HH:mm:ss
withDataTypesDf: org.apache.spark.sql.DataFrame = [VendorID: string, tpep_pickup_datetime: timestamp ... 16 more fields]


In [50]:
// create partition column
def createPartitionsIngestionDate(srcPath: String): String = {
    val filePath = srcPath
    val partitionValue = "ingestion_date=[0-9]{4}-[0-9]{2}-[0-9]{2}".r.findFirstMatchIn(filePath).getOrElse("error-in-pathname").toString
    if (partitionValue.contains("=")) partitionValue.split("=")(1) else partitionValue
  }

def addPartitionColumnToDataframe(df: DataFrame, srcPath: String, parrtitionColumnSql: String, partitionColumnName: String): DataFrame = {
    if (parrtitionColumnSql.toLowerCase() == "ingestion_date") {
      df.withColumn(
        partitionColumnName,
        lit(createPartitionsIngestionDate(srcPath)))
    } else {
      df.withColumn(
        partitionColumnName,
        expr(parrtitionColumnSql))
    }
  }

val parrtitionColumnSql = "date_format(cast(tpep_pickup_datetime as timestamp),'yyyy-MM')"
val partitionColumnName = "ds"
val withPartitionColDf = addPartitionColumnToDataframe(withDataTypesDf, srcPath, parrtitionColumnSql, partitionColumnName).orderBy(col(partitionColumnName))

createPartitionsIngestionDate: (srcPath: String)String
addPartitionColumnToDataframe: (df: org.apache.spark.sql.DataFrame, srcPath: String, parrtitionColumnSql: String, partitionColumnName: String)org.apache.spark.sql.DataFrame
parrtitionColumnSql: String = date_format(cast(tpep_pickup_datetime as timestamp),'yyyy-MM')
partitionColumnName: String = ds
withPartitionColDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [VendorID: string, tpep_pickup_datetime: timestamp ... 17 more fields]


In [51]:
// create hive table if not exists 

def createHiveTableFromDataframe(df: DataFrame, partitionColumnName: String, fullyQualifiedDestinationTableName: String, destinationPath: String) {
    val fields = df.schema.fields.filterNot(
      _.name
        .trim()
        .equalsIgnoreCase(partitionColumnName)) //:+ partitionField
    val bdy =
      fields.map(f => s"${f.name}  ${f.dataType.catalogString}").mkString(",\n")

    val stmt =
      s"""
      CREATE EXTERNAL TABLE IF NOT EXISTS  ${fullyQualifiedDestinationTableName} (
       $bdy )
       PARTITIONED BY ( ${partitionColumnName} string)
       STORED AS PARQUET
       LOCATION '${destinationPath}'
      """
    println(stmt.trim())
    spark.sql(stmt)
  }

createHiveTableFromDataframe(withPartitionColDf, "ds", "default.nyc", "s3a://telenor-se-dataplatform-prod-sthlm-dswbexport/test/OUT/NYC/")

createHiveTableFromDataframe: (df: org.apache.spark.sql.DataFrame, partitionColumnName: String, fullyQualifiedDestinationTableName: String, destinationPath: String)Unit
CREATE EXTERNAL TABLE IF NOT EXISTS  default.nyc (
       VendorID  string,
tpep_pickup_datetime  timestamp,
tpep_dropoff_datetime  timestamp,
passenger_count  double,
trip_distance  double,
RatecodeID  string,
store_and_fwd_flag  string,
PULocationID  string,
DOLocationID  string,
payment_type  string,
fare_amount  double,
extra  string,
mta_tax  double,
tip_amount  double,
tolls_amount  double,
improvement_surcharge  double,
total_amount  double,
congestion_surcharge  double )
       PARTITIONED BY ( ds string)
       STORED AS PARQUET
       LOCATION 's3a://telenor-se-dataplatform-prod-sthlm-dswbexport/test/OUT/NYC/'


In [52]:
// write the parquet file

def writeParquetFile(df: DataFrame, fullyQualifiedDestinationTableName: String, partitionColumnName: String) = {
    val tmpTbl =  "tmp_tbl"
    df.createOrReplaceTempView(tmpTbl)

    val insertStmt =
      s"""INSERT OVERWRITE TABLE ${fullyQualifiedDestinationTableName}
        PARTITION(${partitionColumnName})
        SELECT *
        FROM $tmpTbl
        """.stripMargin
    spark.sql(insertStmt)
  }


spark.conf.set("hive.exec.dynamic.partition", true)
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
writeParquetFile(withPartitionColDf, "default.nyc", "ds")

writeParquetFile: (df: org.apache.spark.sql.DataFrame, fullyQualifiedDestinationTableName: String, partitionColumnName: String)org.apache.spark.sql.DataFrame
res106: org.apache.spark.sql.DataFrame = []


In [53]:
spark.sql("select * from default.nyc").show

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|     ds|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-------+
|       2| 2020-02-02 13:14:41|  2020-02-02 13:21:03|            2.0|         0.85|         1|                 N|          48|         246|           2|        6.0|  0.5|    0.5|       0.0|         0