#Storage account

##Transient

In [None]:
storage_account = "stnetcoreconf"
storage_account_access_key = "YOUR_STORAGE_ACCOUNT_ACCESS_KEY"
spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", storage_account_access_key)
container = "netcoreconf"
directory = "daily_stock"

In [None]:
spark.conf.get(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net")

In [None]:
df = spark.read.parquet(f"abfss://{container}@{storage_account}.dfs.core.windows.net/{directory}")
display(df)

##Mounted

In [None]:
service_principal_application_id = "YOUR_SERVICE_PRINCIPAL_APPLICATION_ID"
service_principal_tenant_id = "YOUR_SERVICE_PRINCIPAL_TENANT_ID"
service_principal_secret = "YOUR_SERVICE_PRINCIPAL_SECRET"

# Service principal needs storage blob data contributor role
extra_configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": service_principal_application_id,
    "fs.azure.account.oauth2.client.secret": service_principal_secret,
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{service_principal_tenant_id}/oauth2/token",
    "fs.azure.createRemoteFileSystemDuringInitialization": "true"
}

storage_account = "stnetcoreconf"
container = "netcoreconf"
source = f"abfss://{container}@{storage_account}.dfs.core.windows.net/"

mount_point = "/mnt/netcoreconf"

dbutils.fs.mount(
  source = source,
  mount_point = mount_point,
  extra_configs = extra_configs)

In [None]:
df = spark.read.parquet("/mnt/netcoreconf/daily_stock")
display(df)

#Database

In [None]:
%sql CREATE DATABASE IF NOT EXISTS netcoreconf;

In [None]:
%sql USE netcoreconf;

In [None]:
%sql CREATE TABLE IF NOT EXISTS daily_stock
USING PARQUET
LOCATION 'dbfs:/mnt/netcoreconf/daily_stock';

In [None]:
%sql SELECT * FROM daily_stock;

#Python

In [None]:
from pyspark.sql import functions as F

df = spark.read.parquet("/mnt/netcoreconf/daily_stock")
df = (
    df.groupBy(F.col("PointOfSaleId"))
    .agg(
        F.count("*").alias("count"),
        F.min(F.col("Date")).alias("min_date"),
        F.max(F.col("Date")).alias("max_date"),
    )
    .orderBy("count", ascending=False)
    .withColumnRenamed("PointOfSaleId", "point_of_sale_id")
)
display(df)

display(spark.sql("""
SELECT 
    PointOfSaleId AS point_of_sale_id,
    COUNT(*) AS count,
    MIN(Date) AS min_date,
    MAX(Date) AS max_date
FROM
    netcoreconf.daily_stock
GROUP BY
    PointOfSaleId
ORDER BY
    count DESC
"""))