## Referencing libraries
Microsoft.SqlServer.DacFx has to be referenced using configurations cell which will cause the spark session to restart because of using the -f parameter

In [92]:
%%configure -f
{ "conf": {"spark.dotnet.extraPackages": "nuget:Microsoft.SqlServer.DacFx,160.6161.0" }}

In [93]:
#r "nuget:Newtonsoft.Json"

In [94]:
#r "nuget:SynapseQueryParserKernel"

## Load QueryStoreQueryText into dataframe
The QueryStoreQueryText table is a Synapse SQL table that has the incremental loading of the QueryStore tables with this query 


```
SELECT txt.query_sql_text,txt.statement_sql_handle,qry.query_id,qry.object_id,qry.is_internal_query,qry.last_execution_time,SUM(count_executions) AS count_executions
FROM sys.query_store_query_text txt
INNER JOIN sys.query_store_query qry
    ON Qry.query_text_id = txt.query_text_id
JOIN sys.query_store_plan pln
    ON qry.query_id=pln.query_id
JOIN sys.query_store_runtime_stats runstate
ON pln.plan_id=runstate.plan_id
GROUP BY txt.query_sql_text,txt.statement_sql_handle,qry.query_id,qry.object_id,qry.is_internal_query,qry.last_execution_time    
```

In [1]:
%%spark
val df_scala=spark.read.synapsesql("retaildw.dbo.QueryStoreQueryText")
df_scala.createOrReplaceTempView("vquerystorequerytext")

## Back to spark .net and datatypes adjustment

In [96]:

var df=spark.Sql("select * from vquerystorequerytext");

In [97]:
df.PrintSchema();

In [98]:
using Microsoft.Spark.Sql;
df=(
    df
    .WithColumn("Command",df["query_sql_text"])
    .WithColumn("last_execution_time",df["last_execution_time"].Cast("date"))
    .Drop(df["query_sql_text"])
    .WithColumn("sqlAnalytics",Lit(""))
);

In [99]:
df.PrintSchema();

## Parse
This is the most important step, creating a Spark UDF to be used to call the parsing library. the main class is *SynapseVisitor* and has only one public method which is *ProcessVisitor* 

In [100]:
using Newtonsoft.Json;
using QueryParserKernel;
using Microsoft.Spark.Sql;

Func<Column,Column> parseSQL = Udf<string,string>(
    strSQL => {
        SynapseQueryModel model = new SynapseQueryModel();
        SynapseVisitor synapseVisitor = new SynapseVisitor();
        model = synapseVisitor.ProcessVisitor(strSQL);
        var responseMessage = JsonConvert.SerializeObject(model);
        return responseMessage;
    }
);



In [101]:
df=df.WithColumn("sqlAnalytics",parseSQL(df["Command"]));


## Extract the schema and load the json into columns

In [102]:
df.PrintSchema();

In [103]:
IEnumerable<Row> rows = df.Collect();
var json_data = rows.First().GetAs<string>("sqlAnalytics");



In [104]:
// Save and load to get the scahme 

var workspaceName=Env.GetWorkspaceName();
FS.Mkdirs($"/synapse/workspaces/{workspaceName}/sparkpools/tmp/");
FS.Put($"/synapse/workspaces/{workspaceName}/sparkpools/tmp/json_data.json",json_data,true);

var df_schema=spark.Read().Json($"/synapse/workspaces/{workspaceName}/sparkpools/tmp/json_data.json");
var json_schema=df_schema.Schema().Json;

## Extract Columns from JSON column sqlAnalytics

In [105]:
df=df.WithColumn("sqlAnalytics", FromJson(Col("sqlAnalytics"), json_schema)).Select("*","sqlAnalytics.*");

In [113]:
Display(df);

In [107]:
df.PrintSchema();

In [114]:
// Replace empty arrays with null
using Microsoft.Spark.Sql;
df = (
    df
    .WithColumn("JoinedTables", When(Size(df["JoinedTables"])==0, Lit(null)).Otherwise(df["JoinedTables"]))
    .WithColumn("JoinedColumns",When(Size(df["JoinedColumns"])==0, Lit(null)).Otherwise(df["JoinedColumns"]))
    .WithColumn("InsertStatementTargets",When(Size(df["InsertStatementTargets"])==0,Lit(null)).Otherwise(df["InsertStatementTargets"]))
    .WithColumn("DeleteStatementTargets",When(Size(df["DeleteStatementTargets"])==0,Lit(null)).Otherwise(df["DeleteStatementTargets"]))
    .WithColumn("Errors", When(Size(df["Errors"])==0,Lit(null)).Otherwise(df["Errors"]))
    .WithColumn("CopyStatementFrom", When(Size(df["CopyStatementFrom"])==0,Lit(null)).Otherwise(df["CopyStatementFrom"]))
    .WithColumn("CopyStatementInto", When(Size(df["CopyStatementInto"])==0, Lit(null)).Otherwise(df["CopyStatementInto"]))
);

## Saving

## Write the dataframe

In [118]:
// With no errors
df.Filter("errors is NULL").Write().Mode("overwrite").SaveAsTable("SynapseSqlAnalytics");
// with errors
df.Filter("errors is NOT NULL").Write().Mode("overwrite").SaveAsTable("SynapseSqlAnalytics_Errors");