In [1]:
# Before running, either set the matching secrets (https://docs.azuredatabricks.net/user-guide/secrets/secrets.html)
# or edit the variables below to contain valid connection details

secrets_scope = "KEYS"

data_lake_app_id = dbutils.secrets.get(secrets_scope, "DATA_LAKE_APP_ID")
data_lake_app_key = dbutils.secrets.get(secrets_scope, "DATA_LAKE_APP_KEY")
data_lake_app_tenant = dbutils.secrets.get(secrets_scope, "DATA_LAKE_APP_TENANT")
data_lake_account = dbutils.secrets.get(secrets_scope, "DATA_LAKE_ACCOUNT")

storage_account_key = dbutils.secrets.get(secrets_scope, "STORAGE_ACCOUNT_KEY")
storage_account = dbutils.secrets.get(secrets_scope, "STORAGE_ACCOUNT")

mount_folder = "test"
output_folder = "data"
data_lake_mount_point = "/mnt/lake"
storage_mount_point = "/mnt/blob"

# Data Lake connectioction information and credentials
data_lake_configs = {"dfs.adls.oauth2.access.token.provider.type": "ClientCredential",
           "dfs.adls.oauth2.client.id": data_lake_app_id,
           "dfs.adls.oauth2.credential": data_lake_app_key,
           "dfs.adls.oauth2.refresh.url": "https://login.microsoftonline.com/%s/oauth2/token" % data_lake_app_tenant}

storage_configs = {"fs.azure.account.key.%s.blob.core.windows.net" % storage_account: storage_account_key}

mounts = dbutils.fs.mounts()

if not any(mount[0] == data_lake_mount_point for mount in mounts):
  dbutils.fs.mount(
    source = "adl://%s.azuredatalakestore.net/%s" % (data_lake_account, mount_folder),
    mount_point = data_lake_mount_point,
    extra_configs = data_lake_configs)

if not any(mount[0] == storage_mount_point for mount in mounts):
  dbutils.fs.mount(
    source = "wasbs://%s@%s.blob.core.windows.net/%s" % (mount_folder, storage_account, mount_folder),
    mount_point = storage_mount_point,
    extra_configs = storage_configs)

dbutils.fs.mounts()

In [2]:
import json
from pyspark.sql.functions import from_json
from pyspark.sql.functions import col

# This is to allow reading avro files without .avro extension
spark.sparkContext._jsc.hadoopConfiguration().set("avro.mapred.ignore.inputs.without.extension", "false")

def combine_into_csv(root_folder):
  df = spark.read.format("com.databricks.spark.avro").load("%s/*/*/*/*.avro" % root_folder)
  df = df.withColumn('body', df['body'].cast('string'))
  body = df.select('body')
  json_schema = spark.read.json(body.rdd.map(lambda row: row.body)).schema
  data = body.withColumn('body', from_json(col('body'), json_schema)).select('body.*')
  data.coalesce(1).write.save(path=('%s/test.csv' % root_folder), format='csv', mode='append', sep=',')
  #data.write.save(path=('%s/test.csv' % root_folder), format='csv', mode='append', sep=',')

In [3]:
read_folder = "%s/%s" % (storage_mount_point, output_folder)
combine_into_csv(read_folder)

In [4]:
read_folder = "%s/%s" % (data_lake_mount_point, output_folder)
combine_into_csv(read_folder)

In [5]:
dbutils.fs.unmount(data_lake_mount_point)
dbutils.fs.unmount(storage_mount_point)