**Blob Inventory Analytics**

In [None]:
# set the details of account and container where configuration.json file is stored
# and that storage account is connected to synapse workspace
storage_account = "reportanalysis"
container_name = "reportdata"
file_name = "configuration"

# name of the database in which tables will be stored
database_name = "reportdata"

In [None]:
!pip uninstall azure-storage-blob --yes
!pip install azure-storage-blob==2.1.0

In [None]:
# importing libraries
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import count as count
from pyspark.sql.functions import substring_index
from pyspark.sql.functions import lit
from pyspark.sql.functions import col
from azure.storage.blob import BlockBlobService
import datetime

In [None]:
# storing distribution of data in different containers 
def store_data_distribution_in_containers_details(csvFile, database_name):
    container_info_df = csvFile.withColumn("ContainerName", substring_index(csvFile.Name, '/', 1))
    container_info_df = container_info_df.groupBy(['ReportGenerationDate','ContainerName']).sum('Content-Length').withColumnRenamed("sum(Content-Length)", "Size")
    container_info_df.write.mode('ignore').saveAsTable("{0}.ContainerInfo".format(database_name))

In [None]:
def store_data_distribution_in_blob_details(csvFile, database_name):
    # creating dataframe and storing table containing BlobType and their corresponding count
    blob_type_info_df = csvFile.groupBy(['ReportGenerationDate','BlobType']).agg(count('BlobType').alias('BlobTypeCount'))
    blob_type_info_df.write.mode('ignore').saveAsTable("{0}.BlobTypeInfo".format(database_name))

In [None]:
def store_data_distribution_in_access_tier_details(csvFile, database_name):
    # creating dataframe and storing table containing AccessTier and its corresponding count
    access_tier_info_df = csvFile.filter(csvFile.BlobType=="BlockBlob").groupBy(['ReportGenerationDate','AccessTier']).agg(count('AccessTier').alias('AccessTierCount'))
    access_tier_info_df.write.mode('ignore').saveAsTable("{0}.AccessTierInfo".format(database_name))

In [None]:
def store_soft_deleted_data_size_details(csvFile, database_name):
    # creating dataframe and storing table containing soft deleted data size
    if "Deleted" in csvFile.columns:
        soft_deleted_info_df = csvFile.filter(csvFile.Deleted==True).groupBy("ReportGenerationDate").agg({'Content-Length':'sum'}).withColumnRenamed("sum(Content-Length)", "Size")
    else:
        soft_deleted_info_df = spark.createDataFrame([Row("9999-12-31T00:00:00Z",0)],["ReportGenerationDate","Size"])
    if len(soft_deleted_info_df.head(1))==0:
        soft_deleted_info_df = spark.createDataFrame([Row("9999-12-31T00:00:00Z",0)],["ReportGenerationDate","Size"])
    soft_deleted_info_df.write.mode('ignore').saveAsTable("{0}.SoftDeletedInfo".format(database_name))

In [None]:
def store_content_type_distribution_details(csvFile, database_name):
    # creating dataframe and storing table containing File type and its corresponding size
    if "hdi_isfolder" in csvFile.columns:
        content_type_info_df = csvFile.withColumn("FileType", when(csvFile['hdi_isfolder']==True,'Unknown').otherwise(substring_index(csvFile['Name'], '.', -1)))
    else:
        content_type_info_df = csvFile.withColumn("FileType", when(csvFile['Name'].contains("."),substring_index(csvFile['Name'], '.', -1)).otherwise('Unknown'))
    content_type_info_by_count_df = content_type_info_df.groupBy(['ReportGenerationDate','FileType']).agg(sum('Content-Length').alias('Sum'))
    content_type_info_by_count_df.write.mode('ignore').saveAsTable("{0}.ContentTypeInfo".format(database_name))

In [None]:
def store_data_growth_in_account_details(csvFile, database_name):
    # creating dataframe and storing table containing Date and corresponding data size on that day 
    growth_in_data_df = csvFile.groupBy('ReportGenerationDate').sum('Content-Length').withColumnRenamed("sum(Content-Length)", "Size").orderBy("ReportGenerationDate","Size")
    growth_in_data_df.write.mode('ignore').saveAsTable("{0}.dailyDataSizeInfo".format(database_name))

In [None]:
def store_data_creation_in_account_details(csvFile, database_name):
    data_creation_with_time_df = csvFile.withColumn("Date", to_date(csvFile['Creation-Time'], 'dd-MM-yyyy'))
    data_creation_with_time_df = data_creation_with_time_df.groupBy('Date').sum('Content-Length').withColumnRenamed("sum(Content-Length)", "Size")
    data_creation_with_time_df.write.mode('ignore').saveAsTable("{0}.dataCreationWithTime".format(database_name))

In [None]:
def store_last_access_time_details(csvFile, database_name):
    last_access_time_df = csvFile.filter(csvFile.BlobType=="BlockBlob")
    last_access_time_df = last_access_time_df.filter(last_access_time_df.AccessTier=="Hot")
    if "LastAccessTime" in csvFile.columns and len(last_access_time_df.head(1))!=0:
        max_ts = last_access_time_df.agg({"ReportGenerationDate": "max"}).collect()[0][0]
        min_ts = last_access_time_df.agg({"ReportGenerationDate": "min"}).collect()[0][0]
        last_access_time_df = last_access_time_df.filter(last_access_time_df.ReportGenerationDate==max_ts)
        last_access_time_df = last_access_time_df.withColumn("DaysLastAccessed", when((last_access_time_df["LastAccessTime"].isNull() | (last_access_time_df["LastAccessTime"]=='')),(datetime.datetime.now().date()-min_ts).days).otherwise(((unix_timestamp(current_date(),"dd") - unix_timestamp(last_access_time_df["LastAccessTime"], "dd"))/86400).cast(IntegerType())))
        last_access_time_df = last_access_time_df.withColumn("TotalSize",lit(last_access_time_df.agg({"Content-Length":"sum"}).collect()[0][0]))
        last_access_time_df.select("ReportGenerationDate","Name","Content-Length","DaysLastAccessed","TotalSize").write.mode('ignore').saveAsTable("{0}.lastAccessTime".format(database_name))

In [None]:
def store_data_occupied_by_snapshot_details(csvFile, database_name):
    if "Snapshot" in csvFile.columns:
        snapshot_data_df = csvFile.na.drop(how='all',subset=['Snapshot'])
        snapshot_data_df = snapshot_data_df.groupBy("ReportGenerationDate").agg({'Content-Length':'sum'}).withColumnRenamed("sum(Content-Length)", "Size")
        snapshot_data_df.write.mode('ignore').saveAsTable("{0}.snapshotData".format(database_name))

In [None]:
def store_modifications_in_data_details(csvFile, database_name):
    last_modified_count_df = csvFile.withColumn("Date", to_date(csvFile['Last-Modified'], 'dd-MM-yyyy'))
    last_modified_count_df = last_modified_count_df.groupBy('Date').agg(count('Date').alias('NumberOfModifications'))
    last_modified_count_df.write.mode('ignore').saveAsTable("{0}.lastModifiedCount".format(database_name))

In [None]:
def store_reports_analysed_dates(csvFile, database_name):
    reports_analysed_df = csvFile.select("ReportGenerationDate").distinct()
    reports_analysed_df.write.mode('ignore').saveAsTable("{0}.reportsanalysed".format(database_name))

In [None]:
def get_json_link_of_reports(storage_account_name, access_key, destination_container, rule_name):
    blob_service = BlockBlobService(storage_account_name, access_key)
    blob_list = blob_service.list_blobs(destination_container)
    links_list = []
    for blob in blob_list:
        if rule_name+"-manifest.json" in blob.name:
            link = "wasbs://{0}@{1}.blob.core.windows.net/{2}".format(destination_container,storage_account_name,blob.name)
            links_list.append(link)
    return links_list

In [None]:
def populating_tables(csvFile, database_name):
    store_data_distribution_in_containers_details(csvFile, database_name)
    store_data_distribution_in_blob_details(csvFile, database_name)
    store_data_distribution_in_access_tier_details(csvFile, database_name)
    store_soft_deleted_data_size_details(csvFile, database_name)
    store_content_type_distribution_details(csvFile, database_name)
    store_data_growth_in_account_details(csvFile, database_name)
    store_data_creation_in_account_details(csvFile, database_name)
    store_last_access_time_details(csvFile, database_name)
    store_data_occupied_by_snapshot_details(csvFile, database_name)
    store_modifications_in_data_details(csvFile, database_name)
    store_reports_analysed_dates(csvFile, database_name)

In [None]:
def setting_credentials(storage_account_name, access_key):
    access_link = "fs.azure.account.key.{0}.blob.core.windows.net".format(storage_account_name)
    spark.conf.set(access_link,access_key)

In [None]:
def processing_reports(list_of_report_links_json_file, report_dates_analysed):
    report_df = spark.read.option("multiLine", "true").json(list_of_report_links_json_file[0])
    reports_analysed = []
    data_collect = report_df.collect()[0]

    # initialising dataframe with first row
    destination_container = data_collect["destinationContainer"]
    prefix_path = data_collect["files"][0]["blob"]
    report_generation_date = data_collect["inventoryCompletionTime"].split("T")[0]
    reports_analysed.append(report_generation_date)
    
    file_format = data_collect["ruleDefinition"]["format"]
    fileLink = "wasbs://{0}@{1}.blob.core.windows.net/{2}".format(destination_container, storage_account_name, prefix_path)

    if file_format=="csv":
        fileData = spark.read.csv(fileLink, header=True, inferSchema=True)
    else:
        fileData = spark.read.parquet(fileLink)
    
    # appending inventory report generation date to the dataframe
    fileData = fileData.withColumn('ReportGenerationDate',lit(report_generation_date))

    # iterating over rest of the the rows
    for json_file_report_link in list_of_report_links_json_file[1:]:

        report_df = spark.read.option("multiLine", "true").json(json_file_report_link)
        data_collect = report_df.collect()[0]

        if(data_collect["status"]=='Pending'):
            continue
        
        destination_container = data_collect["destinationContainer"]
        prefix_path = data_collect["files"][0]["blob"]
        report_generation_date = data_collect["inventoryCompletionTime"].split("T")[0]

        if report_generation_date in reports_analysed or report_generation_date in report_dates_analysed:
            continue

        file_format = data_collect["ruleDefinition"]["format"]
        fileLink = "wasbs://{0}@{1}.blob.core.windows.net/{2}".format(destination_container, storage_account_name, prefix_path)

        if file_format=="csv":
            fileDataTemp = spark.read.csv(fileLink, header=True, inferSchema=True)
        else:
            fileDataTemp = spark.read.parquet(fileLink)

        fileDataTemp = fileDataTemp.withColumn('ReportGenerationDate',lit(report_generation_date))
        reports_analysed.append(report_generation_date)
        fileData = fileData.unionByName(fileDataTemp, allowMissingColumns=True)

    # changing datatype of column Content-Length from string to integer
    fileData = fileData.withColumn("Content-Length", fileData["Content-Length"].cast(IntegerType()))
    # changing datatype of column ReportGenerationDate to Date type
    fileData = fileData.withColumn("ReportGenerationDate", to_date('ReportGenerationDate'))
    # changing datatype of column LastAccessTime to Date type
    fileData = fileData.withColumn("LastAccessTime", to_date('LastAccessTime'))
    return fileData

In [None]:
def get_template_data(storage_account, container_name, file_name):
    json_file_link = "abfss://{0}@{1}.dfs.core.windows.net/{2}.json".format(container_name, storage_account, file_name)
    json_file = spark.read.option("multiLine", "true").json(json_file_link)
    json_file_data = json_file.collect()[0]
    return json_file_data

In [None]:
def get_report_dates(database_name):
    # creating database if it does not exist
    spark.sql("CREATE DATABASE IF NOT EXISTS {0}".format(database_name))
    table_list=spark.sql("SHOW TABLES IN {0}".format(database_name))
    table_data=table_list.filter(table_list.tableName=="reportsanalysed").collect()
    table_res_list = []
    if len(table_data)>0:
        fetch_table = "SELECT * FROM {0}.reportsanalysed".format(database_name)
        table_res = spark.sql(fetch_table)
        # changing datatype of column ReportGenerationDate from DateType to StringType
        table_res = table_res.withColumn("ReportGenerationDate", table_res["ReportGenerationDate"].cast(StringType()))
        table_res_list = table_res.select("ReportGenerationDate").rdd.flatMap(lambda x: x).collect()
    return table_res_list

In [None]:
# reading the data from template file
json_file_data = get_template_data(storage_account, container_name, file_name)

# initializing with the data stored in template file
storage_account_name = json_file_data["storageAccountName"]
access_key = json_file_data["accessKey"]
destination_container = json_file_data["destinationContainer"]
rule_name = json_file_data["ruleName"]

# setting credentials for the spark session
setting_credentials(storage_account_name, access_key)

# list of all the report links
list_of_report_links = get_json_link_of_reports(storage_account_name, access_key, destination_container, rule_name)
# print("No of reports - ",len(list_of_report_links))

report_dates_analysed = get_report_dates(database_name)

# processsing all the reports in a dataframe
file_data = processing_reports(list_of_report_links, report_dates_analysed)

# storing all processed dataframes in respective tables
populating_tables(file_data, database_name)

In [None]:
# command to drop the database and the corresponding tables
# database_name = "temp1"
# spark.sql("DROP DATABASE IF EXISTS {0} CASCADE".format(database_name))