In [0]:
import pandas as pd

In [0]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [0]:
dbutils.fs.unmount("/mnt/alv")

In [0]:
def mount_blob(storage_account_name, access_key):
  #access_key = dbutils.secrets.get(scope="storage", key=storage_account_name)
  mount_list = [
    {
      "folder_name": "alv",
      "message" : "Mount Azure Blob Storage (alv) in DBFS..."
    }       
  ]
  
  for item in mount_list:
    print(item['message'])    
    folder_name = item['folder_name']
    mount_point = '/mnt/{}'.format(folder_name)
    blob_path = 'wasbs://{}@{}.blob.core.windows.net'.format(folder_name, storage_account_name)
    config = "fs.azure.account.key.{}.blob.core.windows.net".format(storage_account_name)
    try:
      dbutils.fs.ls(mount_point)
      print('{} has been mounted.'.format(mount_point))
    except:
      print('DBFS Mount does not exist, creating mount {}...'.format(mount_point))
      
      dbutils.fs.mount(
        source = blob_path,
        mount_point = mount_point,
        extra_configs = {
          config:access_key
        }
      )

In [0]:
storageAccountName = 'bidseadlg2'
# blobContainerName = 'alv'
storageAccountAccessKey = 'XXXXXX'
mount_blob(storageAccountName, storageAccountAccessKey)

In [0]:
def read_data_to_dataframe(file_location):
  file_type = "csv"
  # CSV options
  infer_schema = "false"
  first_row_is_header = "true"
  delimiter = ","
  encoding = 'utf-8'

  # The applied options are for CSV files. For other file types, these will be ignored.
  df = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .option("sep", delimiter) \
    .option("encoding", encoding) \
    .load(file_location)
  return df

In [0]:
def output_csv_file(df, path):
   (df.coalesce(1) # uncomment this line to ensure that the data is written to just one file
   .write
   .option("header", "true")
   .option("sep", ",")
   .option("encoding", "utf-8")
   .option("ESCAPE quote", '"')
   .mode("overwrite") # overwrite any pre-existing data at the path that we're writing to
   .csv(path)
  )

In [0]:
file_list = []
df_list = []
folder_path = '/mnt/alv/'
files = dbutils.fs.ls(folder_path + 'TEST')

for f in files:
  file_list.append(f.name)
  print(file_list)
  df = read_data_to_dataframe(f.path).toPandas()
  df = df.rename(columns={df.columns[len(df.columns)-1]: 'SourceFile'})
  df_list.append(df)

print(df.head())

In [0]:
df_combine = pd.concat(df_list)
df_combine = df_combine.dropna(axis = 1, how = 'all')
df_combine.info()

In [0]:
# pandas dataframe to spark dataframe
df_spark = spark.createDataFrame(df_combine)
# display(df_spark)

In [0]:
write_path = "{}/TEST_merge".format(folder_path)
output_csv_file(df_spark, write_path)

tmp_files = dbutils.fs.ls(write_path)
output_file = [x for x in tmp_files if x.name.startswith("part-")]

# Move the wrangled-data CSV file from a sub-folder (wrangled_data_folder) to the root of the blob container
# While simultaneously changing the file name
dbutils.fs.mv(output_file[0].path, "{}/TEST.csv".format(folder_path))
dbutils.fs.rm(write_path, recurse = True)