# A basic .NET for Apache Spark example

## Preparation

### Start the Backend in Debug mode

**_Important_**: Before you run any cells in this example, please ensure that you have [started the .NET for Apache Spark DotnetBacken in Debug mode](01-start-spark-debug.ipynb).

### Install the Microsoft.Spark NuGet package

In [1]:
#r "nuget: Microsoft.Spark,2.1.1"

Installed package Microsoft.Spark version 2.1.1

Loading extensions from `Microsoft.Data.Analysis.Interactive.dll`

---

## Coding

### Create a new SparkSession
The entry point to all .NET for Apache Spark functionality is a SparkSession. To create one, just use SparkSession.Builder():

In [2]:
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;

var spark = SparkSession.Builder().GetOrCreate();

[2023-04-18T08:47:59.5780415Z] [0a7f366303c6] [Info] [ConfigurationService] Using port 5567 for connection.
[2023-04-18T08:47:59.5806448Z] [0a7f366303c6] [Info] [JvmBridge] JvMBridge port is 5567
[2023-04-18T08:47:59.5815110Z] [0a7f366303c6] [Info] [JvmBridge] The number of JVM backend thread is set to 10. The max number of concurrent sockets in JvmBridge is set to 7.


Reference,SparkContext,Catalog
{ 5: Jvm: Microsoft.Spark.Interop.Ipc.JvmBridge },"{ Microsoft.Spark.SparkContext: Reference: { 6: Jvm: Microsoft.Spark.Interop.Ipc.JvmBridge }, DefaultParallelism: 1 }",{ Microsoft.Spark.Sql.Catalog.Catalog: Reference: { 8: Jvm: Microsoft.Spark.Interop.Ipc.JvmBridge } }


### Create a new DataFrame
There are multiple ways of creating new DataFrames. Most of the time you will read data from another source. For this basic example, we just define our DataFrame via the code below, however.

In [17]:
spark

Reference,SparkContext,Catalog
{ 5: Jvm: Microsoft.Spark.Interop.Ipc.JvmBridge },"{ Microsoft.Spark.SparkContext: Reference: { 6: Jvm: Microsoft.Spark.Interop.Ipc.JvmBridge }, DefaultParallelism: 1 }",{ Microsoft.Spark.Sql.Catalog.Catalog: Reference: { 8: Jvm: Microsoft.Spark.Interop.Ipc.JvmBridge } }


In [None]:
public class CurrencyDimension
{
    public int Id { get; set; }
    public decimal PriceInUSD { get; set; }
    public DateTime? FromDate { get; set; } = DateTime.Now;
    public DateTime? EndDate { get; set; }
    public bool IsCurrent { get; set; }

    public int CurrencyId { get; set; }
    public Currency? Currency { get; set; }
}

#### Filling data

In [74]:
static void FillData(int i, int id, Random random, List<GenericRow> data, int leftRange, int rightRange)
{
    DateTime fromDt = DateTime.Now;
    DateTime endDt = fromDt.AddMinutes(random.Next(1, 59));
        
    int m = i + 10000;
    
    for (; i < m; i++)
    {
        data.Add(
            new GenericRow(new object[] { 
            i, 
            (double)random.Next(leftRange, rightRange),
            new Timestamp(fromDt),  
            new Timestamp(endDt), 
            false,
            id
            }));
        
        fromDt = endDt;
        endDt = fromDt.AddMinutes(random.Next(1, 59));
    }

    data.Add(
        new GenericRow(new object[] { 
            m, 
            (double)random.Next(leftRange, rightRange),
            new Timestamp(fromDt), 
            null,
            true,
            id
            })
    );
}

In [95]:
var data = new List<GenericRow>();
Random random = new Random();

FillData(1, 1, random, data, 20, 500);
FillData(10002, 2, random, data, 150, 770);
FillData(20003, 3, random, data, 50, 320);

    

In [96]:
var schema = new StructType(new List<StructField>()
    {
        new StructField("Id", new IntegerType()),
        new StructField("PriceInUSD", new DoubleType()),
        new StructField("FromDate", new TimestampType()),
        new StructField("EndDate", new TimestampType()),
        new StructField("IsCurrent", new BooleanType()),
        new StructField("CurrencyId", new IntegerType())
    });

DataFrame df = spark.CreateDataFrame(data, schema);

df.Show();

+---+----------+--------------------+--------------------+---------+----------+
| Id|PriceInUSD|            FromDate|             EndDate|IsCurrent|CurrencyId|
+---+----------+--------------------+--------------------+---------+----------+
|  1|      64.0|2023-04-18 10:16:...|2023-04-18 10:45:...|    false|         1|
|  2|     215.0|2023-04-18 10:45:...|2023-04-18 11:31:...|    false|         1|
|  3|     134.0|2023-04-18 11:31:...|2023-04-18 12:20:...|    false|         1|
|  4|      79.0|2023-04-18 12:20:...|2023-04-18 12:54:...|    false|         1|
|  5|      90.0|2023-04-18 12:54:...|2023-04-18 13:34:...|    false|         1|
|  6|      44.0|2023-04-18 13:34:...|2023-04-18 14:25:...|    false|         1|
|  7|     364.0|2023-04-18 14:25:...|2023-04-18 15:06:...|    false|         1|
|  8|     450.0|2023-04-18 15:06:...|2023-04-18 15:34:...|    false|         1|
|  9|     238.0|2023-04-18 15:34:...|2023-04-18 15:54:...|    false|         1|
| 10|     114.0|2023-04-18 15:54:...|202

#### Show tail

In [45]:
var tail = df.OrderBy(Col("Id").Desc());
tail.Show();

+-----+----------+--------------------+--------------------+---------+----------+
|   Id|PriceInUSD|            FromDate|             EndDate|IsCurrent|CurrencyId|
+-----+----------+--------------------+--------------------+---------+----------+
|30003|     200.0|2023-11-10 06:27:...|                null|     true|     30003|
|30002|     139.0|2023-11-10 06:04:...|2023-11-10 06:27:...|    false|         3|
|30001|     304.0|2023-11-10 05:36:...|2023-11-10 06:04:...|    false|         3|
|30000|      87.0|2023-11-10 04:57:...|2023-11-10 05:36:...|    false|         3|
|29999|     204.0|2023-11-10 04:39:...|2023-11-10 04:57:...|    false|         3|
|29998|     237.0|2023-11-10 04:20:...|2023-11-10 04:39:...|    false|         3|
|29997|      89.0|2023-11-10 03:25:...|2023-11-10 04:20:...|    false|         3|
|29996|      86.0|2023-11-10 02:30:...|2023-11-10 03:25:...|    false|         3|
|29995|     289.0|2023-11-10 01:36:...|2023-11-10 02:30:...|    false|         3|
|29994|     243.

#### Describe df

In [46]:
df.Describe().Show();

+-------+-----------------+------------------+------------------+
|summary|               Id|        PriceInUSD|        CurrencyId|
+-------+-----------------+------------------+------------------+
|  count|            30003|             30003|             30003|
|   mean|          15002.0|  301.988401159884|    3.999800019998|
| stddev|8661.264399612795|181.17385702051124|216.01286156815587|
|    min|                1|              20.0|                 1|
|    max|            30003|             769.0|             30003|
+-------+-----------------+------------------+------------------+



#### Function for calcutation different in $ and %

In [48]:
static (double, double) CalculateDifferent(DataFrame price)
{
    var firstPrice = price.OrderBy(Col("FromDate")).First();
    var lastPrice = price.OrderBy(Col("FromDate").Desc()).First();

    var different = Math.Round(((double)lastPrice[1] - (double)firstPrice[1]), 5);
    var percents = Math.Round((different / (double)firstPrice[1] * 100), 5);
    return (different, percents);
}

#### Calculate different between first and last dates

In [49]:
int currencyId = 1;

var price = df
    .Select(Col("CurrencyId"), Col("PriceInUSD"))
    .Filter(Col("CurrencyId") == currencyId);

var (different, percents) =  CalculateDifferent(price);

Console.WriteLine($"Currency {currencyId} changed by " +
                  $"{different} $ or {percents} %");

Currency 1 changed by 16 $ or 12.8 %


#### Calculate different between different dates

In [50]:
int currencyId = 1;

var fromDt = DateTime.Now;
var endDt = DateTime.Now.AddDays(9);

var price = df
    .Select(Col("CurrencyId"), Col("PriceInUSD"), Col("FromDate"), Col("EndDate"))
    .Filter((Col("CurrencyId") == currencyId) & 
            (Col("FromDate") >= new Timestamp(fromDt) & 
             Col("FromDate") <= new Timestamp(endDt)));

var (different, percents) =  CalculateDifferent(price);

Console.WriteLine($"Currency {currencyId} changed by " +
                  $"{different} $ or {percents} %");

Currency 1 changed by -317 $ or -81.70103 %


#### Show max price

In [52]:
var max = df.Agg(Max(df.Col("PriceInUSD"))).Head().Get(0);

Console.WriteLine(max);

769


#### Show aggregations by date with one currency

In [110]:
var currencyId = 1;

In [81]:
df
    .Filter(Col("CurrencyId") == currencyId)
    .GroupBy(
        Year(Col("FromDate")).Alias("Year"))
    .Agg(
        Avg(Col("PriceInUSD")).Alias("Average_price"),
        Min(Col("PriceInUSD")).Alias("Min_price"),
        Max(Col("PriceInUSD")).Alias("Max_price")
    )
    .WithColumn("CurrencyId", Lit(currencyId))
    .OrderBy(Col("Year"))
    .Show();


+----+-------------+---------+---------+----------+
|Year|Average_price|Min_price|Max_price|CurrencyId|
+----+-------------+---------+---------+----------+
|2023|     259.3328|     20.0|    499.0|         1|
+----+-------------+---------+---------+----------+



In [76]:
df
    .Filter(Col("CurrencyId") == currencyId)
    .GroupBy(
        Year(Col("FromDate")).Alias("Year"),
        Month(Col("FromDate")).Alias("Month"))
    .Agg(
        Avg(Col("PriceInUSD")).Alias("Average_price"),
        Min(Col("PriceInUSD")).Alias("Min_price"),
        Max(Col("PriceInUSD")).Alias("Max_price")
    )
    .WithColumn("CurrencyId", Lit(currencyId))
    .OrderBy(Col("Year"), Col("Month"))
    .Show();

+----+-----+------------------+---------+---------+----------+
|Year|Month|     Average_price|Min_price|Max_price|CurrencyId|
+----+-----+------------------+---------+---------+----------+
|2023|    4| 455.2464454976303|    150.0|    766.0|         2|
|2023|    5|465.27450980392155|    150.0|    769.0|         2|
|2023|    6| 464.8255261371351|    150.0|    769.0|         2|
|2023|    7|455.38625750500336|    151.0|    769.0|         2|
|2023|    8|462.57849031396125|    150.0|    769.0|         2|
|2023|    9| 467.7481073640743|    150.0|    769.0|         2|
|2023|   10|459.53698811096433|    150.0|    769.0|         2|
|2023|   11|456.68827930174564|    151.0|    766.0|         2|
+----+-----+------------------+---------+---------+----------+



In [92]:
df
    .Filter(Col("CurrencyId") == currencyId)
    .GroupBy(
        Year(Col("FromDate")).Alias("Year"),
        Month(Col("FromDate")).Alias("Month"),
        DayOfMonth(Col("FromDate")).Alias("Days"))
    .Agg(
        Avg(Col("PriceInUSD")).Alias("Average_price"),
        Min(Col("PriceInUSD")).Alias("Min_price"),
        Max(Col("PriceInUSD")).Alias("Max_price")
    )
    .WithColumn("CurrencyId", Lit(currencyId))
    .OrderBy(Col("Year"), Col("Month"), Col("Days"))
    .Show();

+----+-----+----+------------------+---------+---------+----------+
|Year|Month|Days|     Average_price|Min_price|Max_price|CurrencyId|
+----+-----+----+------------------+---------+---------+----------+
|2023|    4|  18| 443.7647058823529|    187.0|    764.0|         2|
|2023|    4|  19| 451.8333333333333|    162.0|    766.0|         2|
|2023|    4|  20|           490.125|    154.0|    763.0|         2|
|2023|    4|  21|            425.28|    152.0|    760.0|         2|
|2023|    4|  22|458.72549019607845|    160.0|    760.0|         2|
|2023|    4|  23| 458.9166666666667|    166.0|    766.0|         2|
|2023|    4|  24| 424.6666666666667|    155.0|    758.0|         2|
|2023|    4|  25| 484.7962962962963|    150.0|    760.0|         2|
|2023|    4|  26|457.19565217391306|    157.0|    737.0|         2|
|2023|    4|  27|428.05172413793105|    151.0|    749.0|         2|
|2023|    4|  28|          441.0625|    151.0|    752.0|         2|
|2023|    4|  29|468.45283018867923|    150.0|  

#### Show aggregations by currency and date

In [102]:
df
    .GroupBy(
        Col("CurrencyId"))
    .Agg(
        Avg(Col("PriceInUSD")).Alias("Average_price"),
        Min(Col("PriceInUSD")).Alias("Min_price"),
        Max(Col("PriceInUSD")).Alias("Max_price")
    )
    .OrderBy(Col("CurrencyId"))
    .Show();

+----------+-----------------+---------+---------+
|CurrencyId|    Average_price|Min_price|Max_price|
+----------+-----------------+---------+---------+
|         1|260.2693730626937|     20.0|    499.0|
|         2|459.7851214878512|    150.0|    769.0|
|         3| 185.012798720128|     50.0|    319.0|
+----------+-----------------+---------+---------+



In [103]:
df
    .GroupBy(
        Col("CurrencyId"),
        Year(Col("FromDate")).Alias("Year"))
    .Agg(
        Avg(Col("PriceInUSD")).Alias("Average_price"),
        Min(Col("PriceInUSD")).Alias("Min_price"),
        Max(Col("PriceInUSD")).Alias("Max_price")
    )
    .OrderBy(Col("CurrencyId"), Col("Year"))
    .Show();

+----------+----+-----------------+---------+---------+
|CurrencyId|Year|    Average_price|Min_price|Max_price|
+----------+----+-----------------+---------+---------+
|         1|2023|260.2693730626937|     20.0|    499.0|
|         2|2023|459.7851214878512|    150.0|    769.0|
|         3|2023| 185.012798720128|     50.0|    319.0|
+----------+----+-----------------+---------+---------+



In [107]:
df
    .GroupBy(
        Col("CurrencyId"),
        Year(Col("FromDate")).Alias("Year"),
        Month(Col("FromDate")).Alias("Month"))
    .Agg(
        Avg(Col("PriceInUSD")).Alias("Average_price"),
        Min(Col("PriceInUSD")).Alias("Min_price"),
        Max(Col("PriceInUSD")).Alias("Max_price")
    )
    .OrderBy(Col("CurrencyId"), Col("Year"), Col("Month"))
    .Show();

+----------+----+-----+------------------+---------+---------+
|CurrencyId|Year|Month|     Average_price|Min_price|Max_price|
+----------+----+-----+------------------+---------+---------+
|         1|2023|    4| 259.2920065252855|     20.0|    499.0|
|         1|2023|    5|260.88778220451525|     20.0|    499.0|
|         1|2023|    6|255.27546138072455|     20.0|    499.0|
|         1|2023|    7|271.46948356807513|     20.0|    499.0|
|         1|2023|    8|254.55984042553192|     20.0|    499.0|
|         1|2023|    9| 258.6885934219734|     20.0|    499.0|
|         1|2023|   10| 260.5767230169051|     20.0|    499.0|
|         1|2023|   11| 261.6870897155361|     21.0|    496.0|
|         2|2023|    4|451.85342019543975|    150.0|    769.0|
|         2|2023|    5| 463.8925566343042|    150.0|    769.0|
|         2|2023|    6| 462.6224489795918|    150.0|    769.0|
|         2|2023|    7| 454.8009049773756|    150.0|    769.0|
|         2|2023|    8| 460.9437830687831|    150.0|   

In [108]:
df
    .GroupBy(
        Col("CurrencyId"),
        Year(Col("FromDate")).Alias("Year"),
        Month(Col("FromDate")).Alias("Month"),
        DayOfMonth(Col("FromDate")).Alias("Days"))
    .Agg(
        Avg(Col("PriceInUSD")).Alias("Average_price"),
        Min(Col("PriceInUSD")).Alias("Min_price"),
        Max(Col("PriceInUSD")).Alias("Max_price")
    )
    .OrderBy(Col("CurrencyId"), Col("Year"), Col("Month"), Col("Days"))
    .Show();

+----------+----+-----+----+------------------+---------+---------+
|CurrencyId|Year|Month|Days|     Average_price|Min_price|Max_price|
+----------+----+-----+----+------------------+---------+---------+
|         1|2023|    4|  18|236.46153846153845|     21.0|    454.0|
|         1|2023|    4|  19|265.40425531914894|     40.0|    499.0|
|         1|2023|    4|  20| 277.6938775510204|     20.0|    487.0|
|         1|2023|    4|  21|245.46808510638297|     41.0|    494.0|
|         1|2023|    4|  22|258.29787234042556|     20.0|    491.0|
|         1|2023|    4|  23| 285.0192307692308|     24.0|    489.0|
|         1|2023|    4|  24|277.45283018867923|     31.0|    494.0|
|         1|2023|    4|  25|230.93333333333334|     25.0|    471.0|
|         1|2023|    4|  26| 253.2888888888889|     38.0|    499.0|
|         1|2023|    4|  27|254.85185185185185|     53.0|    452.0|
|         1|2023|    4|  28| 263.0192307692308|     33.0|    493.0|
|         1|2023|    4|  29|           258.425| 

### Cleanup
Stop your spark session, once you are done.

In [None]:
spark.Stop();