In [None]:
%%spark
//Imports [Collapse me]
    import org.apache.spark.sql.delta.DeltaLog
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.Path
    import org.apache.parquet.hadoop.ParquetFileReader
    import org.apache.parquet.hadoop.metadata.ParquetMetadata
    import org.apache.parquet.hadoop.metadata.FileMetaData
    import org.apache.parquet.hadoop.metadata.BlockMetaData
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, FloatType,TimestampType}
    import scala.collection.mutable.ListBuffer
    import org.apache.spark.sql.DataFrame
    import java.time.LocalDateTime
    import org.apache.spark.sql.functions.lit
    import io.delta.tables._

/*****************************************************************************************************
    LAST UPDATED: 2024-02-14

    Instructions:
    - Connect Notebook to Lakehouse (does not have to be in same Workspace)
    - Update deltaTable parameter immedately below instructions with name of table to analyze
    - Review other parameters (append vs overwrite) 
    - Run
    - Review four output tables that have "zz_DeltaAnalyzerOutput"

    zz_1_DeltaAnalyzerOutput_parquetFiles
        This table has one row per Parquet file
        Ideally, there should not be thousands of these
        This table only uses parquet file metadata and should be quick to populate

    zz_2_DeltaAnalyzerOutput_rowRowgroups
        This table has one row per rowgroup and shows rowgroups for every parquet file
        Look for the number of rows per rowgroup.  Ideally this should be 1M to 16M rows (higher the better)
        This table only uses parquet file metadata and should be quick to populate

    zz_3_DeltaAnalyzerOutput_columnChunks
        One row per column/chunk within rowgroups
        Large number of output and has much more detail about dictionaries and compression
        This table only uses parquet file metadata and should be quick to populate
    
    zz_4_DeltaAnalyzerOutput_columns
        One row per column of the table
        Look to see how many unique values per column.  If using floating point, consider modifying parquet file to use DECIMAL(17,4)
        This table runs a compute query against the Detla table so may take time depending on size of Delta table

    Run 
    %%sql
    OPTIMIZE tablename vorder 


    Footnote:
        Useful doc
        https://www.javadoc.io/doc/org.apache.parquet/parquet-hadoop/latest/index.html

*****************************************************************************************************/

// Main Parameter
var deltaTableSchema: String = ""               // Empty string for no schema, otherwise schema for table eg. "dbo"
val deltaTable: String =  "YOUR_DELTA_TABLENAME_HERE"   

// Secondary Parameters
val timeStamp = LocalDateTime.now().toString
val overwriteOrAppend: String = "Overwrite"     // "Append" or "Overwrite"
val printToScreen: Boolean = true               // true or false
val skipColumnCardinality: Boolean = true       // true or false
val skipWriteToTable: Boolean = true            // true or false
val locale = new java.util.Locale("en", "US")

//Code [Collapse me]
    // Variables to help handle schema Y or N
    val HasSchema: Boolean = (deltaTableSchema.length>0)
    val deltaTableWithSchema = if (HasSchema) deltaTableSchema + "." + deltaTable else deltaTable
    val deltaTableSchemaPath = if (HasSchema) deltaTableSchema + "/" else ""
    val deltaTableSchemaFile = if (HasSchema) "zz/" else "zz_"                  // prefix for output files
    val deltaTableSchemaSQL  = if (HasSchema) deltaTableSchema + "." else ""

    val LakehouseID: String = sc.hadoopConfiguration.get("trident.lakehouse.id")
    val defaultFS: String = sc.hadoopConfiguration.get("fs.defaultFS")
    val deltaTablePath: String = s"${defaultFS}/${LakehouseID}/Tables/${deltaTable}/"

    val deltaLog: DeltaLog = DeltaLog.forTable(spark, deltaTablePath)
    val formatter = java.text.NumberFormat.getInstance

    deltaLog.update()
    
    // Figure out zOrder from history
    val fullHistoryDF = DeltaTable.forPath(spark, deltaTablePath).history(1)
    val operationParametersMapping : Map[String,Any] = fullHistoryDF.select("operationParameters").head()(0).asInstanceOf[Map[String,Any]]
    val zOrderBy = ""//operationParametersMapping.get("zOrderBy")



    println(s"Files: ${deltaLog.snapshot.allFiles.count}")

    var parquetFiles: ListBuffer[(String,String,String,String,String,Long,Long,String)] = ListBuffer()
    var rowGroups: ListBuffer[(String,String,String,Int,Int,Long,Long,Long,Float)] = ListBuffer()
    var columnChunks: ListBuffer[(String,String,String,Int,String,String,String,String,Long,Long,Long,String,Long,String)] = ListBuffer()

    // For every Parquet file that belongs to the Delta Table
    val totalNumberRowGroups: Int = deltaLog.snapshot.allFiles.collect().map{ file => 

        val path: String = file.path    
        val configuration = new Configuration()

        val parquetFileReader: ParquetFileReader = ParquetFileReader.open(configuration, new Path(deltaTablePath + path))

        val rowGroupMeta = parquetFileReader.getRowGroups() //java.util.ArrayList
        val parquetFileMetaData: FileMetaData = parquetFileReader.getFileMetaData()
        val parquetFileMetaDataKeyValues = parquetFileMetaData.getKeyValueMetaData() // Array

        val parquetRecordCount: Long = parquetFileReader.getRecordCount()

        val rowGroupSize: Int = rowGroupMeta.size

        parquetFiles.append(
            (
                deltaTable ,
                timeStamp,
                path , 
                parquetFileMetaDataKeyValues.get("com.microsoft.parquet.vorder.enabled"),
                parquetFileMetaDataKeyValues.get("com.microsoft.parquet.vorder.level"),
                parquetRecordCount,
                rowGroupSize,
                parquetFileMetaData.getCreatedBy
                
            )
        )

        for(rowGroupNumber <- 0 to rowGroupSize -1)
        {
            print(".")
            val rowGroup: BlockMetaData = rowGroupMeta.get(rowGroupNumber)

            //println(rowGroup.getColumns.size)
            //val rowGroupColumns: List[org.apache.parquet.hadoop.metadata.ColumnChunkMetaData] = rowGroup.getColumns
            val rowGroupColumns = rowGroup.getColumns

            for(columnChunkID <- 0 to rowGroupColumns.size-1)
                {
                val columnStat = rowGroupColumns.get(columnChunkID)

                columnChunks.append(
                    (
                        deltaTable,
                        timeStamp,
                        path,
                        rowGroupNumber,
                        columnStat.getPath().toString,
                        columnStat.getCodec().toString,
                        columnStat.getPrimitiveType().toString,
                        columnStat.getStatistics().toString().replace("\"","'"), // need to convert double quotes to single quotes for Power BI
                        columnStat.getTotalSize(),
                        columnStat.getTotalUncompressedSize(),
                        columnStat.getValueCount(),
                        columnStat.hasDictionaryPage().toString,
                        columnStat.getDictionaryPageOffset(),
                        columnStat.getEncodings().toString                      
                    )
                )
            }

            val compressedPercent: Double = rowGroup.getCompressedSize.toFloat / rowGroup.getTotalByteSize * 100
            rowGroups.append(
                (
                    deltaTable,
                    timeStamp,
                    path, 
                    rowGroupNumber,
                    rowGroupSize ,
                    rowGroup.getRowCount,
                    rowGroup.getCompressedSize,
                    rowGroup.getTotalByteSize,
                    rowGroup.getCompressedSize.toFloat / rowGroup.getTotalByteSize                
                )
            )
        }
        rowGroupSize
    }.sum

    println("")
    //println("Total number of row groups: " + totalNumberRowGroups)



    // Column Names cannot have spaces!!
    var parquetFilesDF: DataFrame = parquetFiles.toDF("TableName","Timestamp","Filename","vorder_enabled","vorder_level","Rowcount","RowGroups","CreatedBy")
    var rowGroupsDF: DataFrame = rowGroups.toDF("TableName","Timestamp","Filename","RowGroupID","TotalFileRowGroups","Rowcount","CompressedSize","UncompressedSize","CompressionRatio")
    var columnChunksDF: DataFrame = columnChunks.toDF("TableName","Timestamp","Filename","ColumnChunkID","Path","Codec","PrimativeType","Statistics","TotalSize","TotalUncompressedSize","ValueCount","HasDict","DictOffset","Encodings"   )

    /********************************************************************************************************************
        Create DF for one row per column
    ********************************************************************************************************************/

    var columns: ListBuffer[(String,String,String,Long,String,Long,Long,Long,Long,Double,Double)] = ListBuffer()
    val columnList: Array[String] = spark.table(deltaTableWithSchema).columns

    val totalRows = rowGroupsDF.agg(sum("Rowcount")).first().getLong(0)
    val tableSize = rowGroupsDF.agg(sum("CompressedSize")).first().getLong(0)
    val totalRowGroups = parquetFilesDF.agg(sum("RowGroups")).first().getLong(0)

    if(totalRows<=1000000)
    {

        /* Using single query for all columns */

        var sql: String = "select 1 as dummy"
        for(column <- columnList) 
            {
            sql+=(s", count(distinct ${column}) as ${column}")
            }
        sql +=s" from ${deltaTableSchemaSQL}${deltaTable}"
        val distinctDFblock: DataFrame = spark.sql(sql)
    
        for(column <- columnList) 
        {
            val filtercondition: String = s"Path = '[${column.toString}]'"
            val distinctCount = distinctDFblock.select(col(column).cast("long")).first().getLong(0)
            val primativeType= columnChunksDF.filter(filtercondition).select(col("PrimativeType")).first().getString(0)
            val columnSize = columnChunksDF.filter(filtercondition).agg(sum("TotalSize")).first().getLong(0)
            val columnSizeUncompressed = columnChunksDF.filter(filtercondition).agg(sum("TotalUncompressedSize")).first().getLong(0)

            columns.append(
                    (
                    deltaTable,
                    timeStamp,
                    column ,
                    distinctCount,
                    primativeType,
                    columnSize,
                    columnSizeUncompressed,
                    totalRows ,
                    tableSize ,
                    distinctCount.toDouble/totalRows*100.0,
                    columnSize.toDouble/tableSize*100.0
                    )
                )
        }

    } else {

        var NotTheseColumns = List("requestPathUri","clientRequestId","fileSystemID")
        for(column <- columnList) 
            {
            
            val filtercondition: String = s"Path = '[${column.toString}]'"

            var primativeType = "None detected" 
            var columnSize : Long = 0
            var columnSizeUncompressed : Long = 0
            val filteredRows = columnChunksDF.filter(filtercondition).count()

            if(filteredRows>0)
            {
                // may error here because partitioned columns do not show up in columnChunkDF
                primativeType= columnChunksDF.filter(filtercondition).select(col("PrimativeType")).first().getString(0)
                columnSize = columnChunksDF.filter(filtercondition).agg(sum("TotalSize")).first().getLong(0)
                columnSizeUncompressed = columnChunksDF.filter(filtercondition).agg(sum("TotalUncompressedSize")).first().getLong(0)
            }

            var distinctCount: Long = 0;
            if(!NotTheseColumns.contains(column))
            {
                var sql:String = ""

                if(totalRows<10000000)
                {

                    if(skipColumnCardinality)
                    {
                        distinctCount=0
                        println(s"Skipping column cardinality checks [${column}]")
                    } else {
                        println("Running Precise DCOUNT on " + column) 
                        sql = s"select count(distinct ${column}) as ${column} from ${deltaTableSchemaSQL}${deltaTable}"
                        val distinctDF: DataFrame  = spark.sql(sql)
                        distinctCount = distinctDF.select(col(column).cast("long")).first().getLong(0)
                        }
                    
                    } else {

                    if(skipColumnCardinality)
                    {
                        distinctCount=0
                        println(s"Skipping column cardinality checks [${column}]")
                    } else {
                        println("Running Approx DCOUNT on " + column) 
                        sql = s"select approx_count_distinct(${column}) as ${column} from ${deltaTableSchemaSQL}${deltaTable}"
                        val distinctDF: DataFrame  = spark.sql(sql)
                        distinctCount = distinctDF.select(col(column).cast("long")).first().getLong(0)
                    }   
                }


                } else {
                    println("Skipping " + column) 
                    }

                columns.append(
                        (
                        deltaTable,
                        timeStamp,
                        column ,
                        distinctCount,
                        primativeType,
                        columnSize,
                        columnSizeUncompressed,
                        totalRows ,
                        tableSize ,
                        distinctCount.toDouble/totalRows*100.0,
                        columnSize.toDouble/tableSize*100.0
                        )
                    )
            }
        }
    
    var columnsDF: DataFrame = columns.toDF("TableName","Timestamp","ColumnName","DistinctCount","PrimitiveType","ColumnSize","ColumnSizeUncompressed","TotalRows","TableSize","CardinalityOfTotalRows","SizePercentOfTable")

    parquetFilesDF = parquetFilesDF
                    .withColumn("TotalTableRows",lit(totalRows))
                    .withColumn("TotalTableRowGroups",lit(totalRowGroups))
    rowGroupsDF = rowGroupsDF
                    .withColumn("TotalTableRows",lit(totalRows))
                    .withColumn("RatioOfTotalTableRows", col("Rowcount")*100.0 / lit(totalRows))
                    .withColumn("TotalTableRowGroups",lit(totalRowGroups))


    // Fix data types   (rowGroupsDF.printSchema())
        parquetFilesDF = parquetFilesDF.withColumn("Timestamp",col("Timestamp").cast(TimestampType))
        rowGroupsDF = rowGroupsDF.withColumn("Timestamp",col("Timestamp").cast(TimestampType))
        columnChunksDF = columnChunksDF.withColumn("Timestamp",col("Timestamp").cast(TimestampType))
        columnsDF = columnsDF.withColumn("Timestamp",col("Timestamp").cast(TimestampType))

    // Display Super Summary
        val ParquetFiles = parquetFilesDF.count()
        val MaxRowgroupRowCount = rowGroupsDF.agg(max("Rowcount")).first().getLong(0)
        val MinRowgroupRowCount = rowGroupsDF.agg(min("Rowcount")).first().getLong(0)
        val HasVOrder = parquetFilesDF.agg(min("vorder_enabled")).first()
        val formatter = java.text.NumberFormat.getIntegerInstance(locale)
        //val TableSizeFromColumnChunks = columnChunksDF.agg(sum("TotalSize")).first().getLong(0)
        val tableSizeFromRowGroups = rowGroupsDF.agg(sum("CompressedSize")).first().getLong(0)
        
        displayHTML(s"""
            <table>
                <tr>
                    <td colspan="2"><h1>Super Summary</h1></td>
                </tr>
                <tr>
                    <td><b>Total Rows</b></td>
                    <td>${formatter.format(totalRows)}</td>
                </tr>
                <tr>
                    <td><b>Has V-ORDER</b></td>
                    <td>$HasVOrder</td>
                </tr>
                <tr>
                    <td><b>zOrderBy</b></td>
                    <td>$zOrderBy</td>
                </tr>
                <tr>
                    <td><b>Total Size in Bytes (RowGroups)</b></td>
                    <td>${formatter.format(tableSizeFromRowGroups)}</td>
                </tr>
                <tr>
                    <td><b>Total Parquet Files</b></td>
                    <td>${formatter.format(ParquetFiles)}</td>
                </tr>
                <tr>
                    <td><b>Total RowGroups</b></td>
                    <td>${formatter.format(totalRowGroups)}</td>
                </tr>
                <tr>
                    <td><b>Max rows per RowGroups</b></td>
                    <td>${formatter.format(MaxRowgroupRowCount)}</td>
                </tr>
                <tr>
                    <td><b>Min rows per RowGroups</b></td>
                    <td>${formatter.format(MinRowgroupRowCount)}</td>
                </tr>
                <tr>
                    <td><b>Avg rows per RowGroups</b></td>
                    <td>${formatter.format(totalRows/totalRowGroups)}</td>
                </tr>
            </table>
            """)


    // Display Dataframes to screen
        if(printToScreen)
        {
            displayHTML("<h1>Parquet Files</h1>")
            display(parquetFilesDF)
            displayHTML("<h1>Rowgroups</h1>Focus on rowcount per rowgroup.  One rowgroup = one segment in Fabric semantic model")
            display(rowGroupsDF)
            displayHTML("<h1>Column Chunks</h1> Blocks of columns inside each Rowgroup across all Parquet files")
            display(columnChunksDF)
            displayHTML("<h1>Delta table Columns</h1>")
            display(columnsDF)
        }

    // Save Dataframe to Lakehouse files
        if(!skipWriteToTable)
        {
            parquetFilesDF
                .write
                .mode(overwriteOrAppend)
                .option("overwriteSchema", "true")
                .format("delta")
                .save(s"./Tables/${deltaTableSchemaFile}1_DeltaAnalyzerOutput_parquetFiles")

            rowGroupsDF
                .write
                .mode(overwriteOrAppend)
                .option("overwriteSchema", "true")
                .format("delta")
                .save(s"./Tables/${deltaTableSchemaFile}2_DeltaAnalyzerOutput_rowGroups")

            columnChunksDF
                .write
                .mode(overwriteOrAppend)
                .option("overwriteSchema", "true")
                .format("delta")
                .save(s"./Tables/${deltaTableSchemaFile}3_DeltaAnalyzerOutput_columnChunks")

            columnsDF
                .write
                .mode(overwriteOrAppend)
                .option("overwriteSchema", "true")
                .format("delta")
                .save(s"./Tables/${deltaTableSchemaFile}4_DeltaAnalyzerOutput_columns")
        }