# Blob life cycle - Research notebook

This notebook provide key steps to analyze your blob access patterns, with goal to create life cycle policies that would reduce the cost of storage.

## Prerequisites

Both data points are avilable in the same storage account used as your data lake.
The data points are:

- Blob Inventory snapshot - in parquet format
- Blob diagnostic logs (Read file) 

## Connecting to your datalake

In [None]:
## Inventory storage

invent_storage_account_name = "<your data lake storage account>"
invent_storage_account_key = "< your key>"

spark.conf.set("fs.azure.account.key.{0}.dfs.core.windows.net".format(invent_storage_account_name), invent_storage_account_key)

## Reading Diagnostic logs (Parquet) Files into DataFrame

Reading all access logs copied via data factory into a dataframe.

In [None]:
lake_container = 'inventorylake'
# Set the path to your parquet log files, note that with 'y=2023/*/*/*/*/' you read all logs of 2023
diag_log_path = 'diag_logs/resourceId=/subscriptions/11acf2e0-XXXX-XXXX-86ef-37c53a9XXXXX/resourceGroups/ext-lake/providers/Microsoft.Storage/storageAccounts/targetextsa/blobServices/default/y=2023/*/*/*/*/'
# Define the path to your parquet files so spark can read them
path = f"abfss://{lake_container}@{invent_storage_account_name}.dfs.core.windows.net/{diag_log_path}"

# Read the parquet files into a DataFrame
df = spark.read.parquet(path)


### Summary of activities for the entire period

The ```groupBy``` operation provides quick overview of the diffrent operations in your storage account.

Note, that for the access patterns we would need to focus on 'GetBlob' and 'ReadFile' operations. Using ```truncate=False``` will show the full length of each field.

In [None]:
df.groupBy("operationName").count().show(truncate=False)

+---------------------------+-----+
|operationName              |count|
+---------------------------+-----+
|BlobPreflightRequest       |84   |
|ListFilesystemDir          |393  |
|GetBlobServiceProperties   |55   |
|GetBlobMetadata            |7    |
|GetBlob                    |664  |
|ListBlobs                  |3148 |
|GetPathAccessControl       |6    |
|GetContainerProperties     |1318 |
|ListContainers             |793  |
|GetPathStatus              |857  |
|GetBlockList               |150  |
|GetBlobProperties          |647  |
|GetContainerServiceMetadata|45   |
|ReadFile                   |682  |
|GetFilesystemProperties    |3    |
|ListFilesystems            |5    |
|GetContainerACL            |2    |
+---------------------------+-----+



### Focusing on : time, specific operations and blob name

We need to ignore some of the operations and focus only on ```ReadFile``` and ```GetBlob```. We also need to ignore some of the platform related operations.

The following cell creates a final_df with 3 fields: time, operation name, blob name

In [None]:
from pyspark.sql.functions import col, regexp_extract, from_unixtime, unix_timestamp

# Define the format of your time column
time_format = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'"
# Filtering out platform operations
filtered_df = df.filter(~(col("uri").contains("$accountmetadata") | 
                          col("uri").contains("$logs") | 
                          col("uri").contains("%24logs")))
# Filtering out all non related operations
filtered_df = filtered_df.filter(col("operationName").isin("GetBlob", "ReadFile"))
# Regular expression pattern to extract desired blob_name
pattern = r'(?:https?:\/\/[^\/]+\/)([^?]+)'

# Extract blob_name using regexp_extract
df_with_blob_name = filtered_df.withColumn("blob_name", regexp_extract(col("uri"), pattern, 1))
# Convert the string column to datetime
df_with_datetime = df_with_blob_name.withColumn("time", from_unixtime(unix_timestamp(col("time"), time_format)))


final_df = df_with_datetime.select("time", "operationName", "blob_name")

final_df.show(truncate=False)

### Researching Access Patterns

#### Last access

While you can enable last access time on your storage account, it is not enabled by default. The following cell will show you the last access time for each blob.

In [None]:
from pyspark.sql.functions import max

# Group by blob_name and aggregate to get the latest timestamp for each blob
result_df = final_df.groupBy("blob_name").agg(max("time").alias("last_access_timestamp"))

# Show the result
result_df.show(truncate=False)


## Looking at the inventory 

The following cell performs several activities:

- Reading the blob inventory parquet file
- Remove platform related rows
- Select required fields ( try not to bring them in the first place )
- Convert time to human readable (from ```long```)

In [None]:
from pyspark.sql.functions import col,from_unixtime
# the inventory files are in this container (consider moving later to the lake)
container = 'inventory'
# Define the path to your parquet files
inventory_file = "parquet-all.parquet"
path = f"abfss://{container}@{invent_storage_account_name}.dfs.core.windows.net/2023/08/27/07-02-03/parquet-all/{inventory_file}"

# Read the parquet files into a DataFrame
_df = spark.read.parquet(path)
# first lets filter any platform related rows from the dataframe: Filter out rows where Name starts with "$logs"
filtered_inventory = _df.filter(~col("Name").startswith("$logs"))
# We need only these fields:
new_df = filtered_inventory.select("Name","Creation-Time", "Last-Modified", "Content-Length")
# Readable date time format
date_format = "yyyy-MM-dd HH:00:00"

# Convert Creation-Time and Last-Modified to the desired format
inventory_df = new_df.withColumn("Creation-Time", from_unixtime(col("Creation-Time") / 1000, date_format))
inventory_df = inventory_df.withColumn("Last-Modified", from_unixtime(col("Last-Modified") / 1000, date_format))

# Show the result
inventory_df.show(truncate=False)

### Joining the two data frames 

__Here's a step-by-step approach:__

- Prepare the inventory_df for joining - For each blob, create two rows: one for the creation event and one for the update event, along with their corresponding timestamps.

- Join the two DataFrames based on the Name in inventory_df and blob_name in final_df.

- Extract the relevant events based on the operationName column and the timestamp difference between Creation-Time and Last-Modified.

- Filter and construct the final DataFrame:

In [None]:
from pyspark.sql.functions import col,explode, arrays_zip, struct, lit, when,min,max


# Create a DataFrame for creation events
creation_df = inventory_df.select(
    col("Name"),
    col("Content-Length"),
    col("Creation-Time").alias("event_time"),
    lit("creation").alias("eventType")
)

# Create a DataFrame for update events
update_df = inventory_df.select(
    col("Name"),
    col("Content-Length"),
    col("Last-Modified").alias("event_time"),
    lit("update").alias("eventType")
)

# Combine the two DataFrames using union
inventory_expanded = creation_df.union(update_df)

# Remove rows that indicate same time for creation and update:
# Group by Name and aggregate to check if creation and update times are the same

agg_df = inventory_expanded.groupBy("Name", "Content-Length")\
                           .agg(min("event_time").alias("min_time"), 
                                max("event_time").alias("max_time"))\
                           .withColumn("eventType", 
                                       when(col("min_time") == col("max_time"), lit("creation")).otherwise(lit("update")))


# Select the appropriate columns based on the condition
_result = agg_df.withColumn("event_time", 
                            when(col("eventType") == "creation", col("min_time"))
                            .otherwise(col("max_time"))
)

_result = _result.select("Name", "event_time", "eventType", "Content-Length")

# Join with final_df (we created it in previous step from the logs). The join is done on the blob name.
joined_df = final_df.join(_result, final_df.blob_name == _result.Name, how="outer")

# If operationName is "ReadFile" or "GetBlob", update the eventType
final_result = joined_df.withColumn("eventType", 
                                    when(col("operationName").isin(["ReadFile", "GetBlob"]), col("operationName"))
                                    .otherwise(col("eventType"))
)

# Select the desired columns and order by the name of the blob and the event time
final_result = final_result.select("event_time", "eventType", "Name", "Content-Length").orderBy("Name", "event_time")

final_result.show(truncate=False)

In [None]:
# Calculate the total number of distinct blobs from inventory_df
total_blobs_inventory = inventory_expanded.agg(countDistinct("Name").alias("total_blobs_inventory")).collect()[0]["total_blobs_inventory"]

# Print the result
print("Total number of distinct blobs in inventory_df:", total_blobs_inventory)

Total number of distinct blobs in inventory_df: 83213


### Storage behaviour

If we examine the dataframe and count the number of events (creation, update, Read, Get)

In [None]:
from pyspark.sql.functions import count

# Group by eventType and count the occurrences
grouped_result = final_result.groupBy("eventType").agg(count("Name").alias("count"))

# Display the result
grouped_result.show(truncate=False)


The following table shows the number of events for each operation type on the storage we were experimenting with:
|eventType|count|
|---------|-----|
|GetBlob  |7    |
|creation |83177|
|update   |27   |
|ReadFile |29   |

This indicate clearly that most (99.92%) of the operations are blob creation.

### How many files are accessed

The next task is to try and sort the access patterns into bins. One way of doing this is to calculate the time from creation/update to the last access via the logs, this can tell us how many files are being accessed in each period from the creation of the file.


- Filter the DataFrame for rows with eventType of either "Get" or "Read".
- Convert the event_time column from string to a timestamp type.
- Filter rows where the event_time is older than 30 days from the current date.
- Group by eventType and count the distinct Name (i.e., number of unique blobs).

The following cells address this. The first just shows how many blobs were accessed in the time period of the logs. The other cells would show how many "buckets" or bins and how many blobs we have in each, the last adds the percentile.

In [None]:
from pyspark.sql.functions import to_timestamp, current_date, datediff, countDistinct

# Convert event_time from string to timestamp
filtered_df = final_result.withColumn("event_time", to_timestamp("event_time", "yyyy-MM-dd HH:mm:ss"))

# Filter rows with eventType of "Get" or "Read"
filtered_df = filtered_df.filter(col("eventType").isin(["GetBlob", "ReadFile"]))

# Filter rows where event_time is older than 30 days from current date (it could be specific or parameter)
filtered_df = filtered_df.filter(datediff(current_date(), col("event_time")) > 30)

# Group by eventType and count distinct blobs
grouped_result = filtered_df.groupBy("eventType").agg(countDistinct("Name").alias("num_blobs"))

# Display the result
grouped_result.show(truncate=False)


Here the results are showing minimal access to blobs. The following table shows the number of events for each operation type on the storage we were experimenting with:

|eventType|num_blobs|
|---------|---------|
|GetBlob  |4        |
|ReadFile |5        |


In [None]:
from pyspark.sql.functions import floor,collect_list, countDistinct, datediff, current_date

# Assuming last_access_time is the current date for simplicity. 
# You can replace current_date() with the appropriate column or value if you have a specific last access time.
_filtered_df = filtered_df.withColumn("days_diff", datediff(current_date(), col("event_time")))

# Bucket the blobs based on the time difference
# Using floor function to create buckets. Each bucket will have a range of 5 days (2 days on either side of a central value).
_filtered_df = _filtered_df.withColumn("bucket", floor(col("days_diff") / 5))

# Count the number of distinct blobs in each bucket
bucket_counts = _filtered_df.groupBy("bucket").agg(countDistinct("Name").alias("num_blobs"))

# Convert the bucket number to denote the access pattern in days
bucket_counts = bucket_counts.withColumn("days", (col("bucket") * 5) + 2) # Adding 2 to get the central value

# Select the desired columns and order by days
bucket_counts = bucket_counts.select("days", "num_blobs").orderBy("days")

# Display the result
bucket_counts.show(truncate=False)

Here ```days``` indicate the number of days from the creation of the blob to the last access. The following table shows the number of events for each operation type on the storage we were experimenting with:

|days|num_blobs|
|----|---------|
|47  |3        |
|52  |1        |
|57  |1        |
|62  |2        |
|252 |2        |

So there were total number of 9 blobs accessed in the last 252 days. within this period we could see that there are 5 distinct groups of access patterns.

In [None]:
from pyspark.sql.functions import sum, round

# Calculate the total number of distinct blobs from inventory_df
total_blobs_inventory = inventory_expanded.agg(countDistinct("Name").alias("total_blobs_inventory")).collect()[0]["total_blobs_inventory"]

# Add the total_blobs to the bucket_counts DataFrame
bucket_counts = bucket_counts.withColumn("total_blobs", lit(total_blobs_inventory))

# Calculate the percentage of blobs accessed for each bucket
bucket_counts = bucket_counts.withColumn("percentage_accessed", 
                                         round((col("num_blobs") / col("total_blobs")) * 100, 3).cast("double"))

# Display the result
bucket_counts.show(truncate=False)


Similar look on the same data, with the ratio of each group from the total number of blobs.

|days|num_blobs|total_blobs|percentage_accessed|
|----|---------|-----------|-------------------|
|47  |3        |83213      |0.004              |
|52  |1        |83213      |0.001              |
|57  |1        |83213      |0.001              |
|62  |2        |83213      |0.002              |
|252 |2        |83213      |0.002              |

While the storage account we were experimenting with is a shallow example on access patterns, we still could observe few common access patterns. 

### Outcome

The above result would provide few "buckets" of blobs that have similar access patterns. It would show also the precentile of the total number of blobs in the storage, this should help in the decsion making when trying to decide what policy should be used.

### Size calculations

In [None]:
from pyspark.sql.functions import sum

# Conversion factor from bytes to MB
bytes_to_mb = 1 / (2**20)

# Calculate the total size of all blobs in MB
total_size_mb = inventory_df.agg((sum("Content-Length") * bytes_to_mb).alias("total_size_mb")).collect()[0]["total_size_mb"]

# Calculate the total size of accessed blobs in MB
accessed_size_mb = filtered_df.agg((sum("Content-Length") * bytes_to_mb).alias("accessed_size_mb")).collect()[0]["accessed_size_mb"]

# Calculate the size of blobs that were not accessed in MB
not_accessed_size_mb = total_size_mb - accessed_size_mb

print(f"Total size of all blobs (MB): {total_size_mb:.2f}")
print(f"Total size of accessed blobs (MB): {accessed_size_mb:.2f}")
print(f"Size of blobs not accessed (MB): {not_accessed_size_mb:.2f}")


These are the results from above cell:

```
Total size of all blobs (MB): 8863.65
Total size of accessed blobs (MB): 5019.23
Size of blobs not accessed (MB): 3844.42
```

So while 99.92% of the operation are blob creation, we see that the 9 blobs that were accessed were with larger size. This is a good indication that for better fine tuned blob cycle policies we need to look at size as well.