In [50]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
from azure.storage.blob import BlobServiceClient
import tempfile
import json
import re

In [51]:
folder_path='cde.ucr.cjis.gov/LATEST/participation/state/'

Blob Connection

In [52]:
# Create a connection to the Blob storage account
container_name = "bronze"
BLOB_ACCOUNT_NAME = 'usafactsbronze'
LINKED_SERVICE_NAME = 'Bronze'
BLOB_SAS_TOKEN = mssparkutils.credentials.getConnectionStringOrCreds(LINKED_SERVICE_NAME)
blob_service_client = BlobServiceClient("https://{}.blob.core.windows.net".format(BLOB_ACCOUNT_NAME), credential=BLOB_SAS_TOKEN)

In [53]:
# Azure storage access info 
blob_account_name = 'usafactssilver' # replace with your blob name 
blob_container_name = 'silver' # replace with your container name 
linked_service_name = 'silver' # replace with your linked service name 

blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name) 

wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/' % (blob_container_name, blob_account_name) 
spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token) 
 

Extracting directory name from the blob's name

In [54]:
directories = set()
container_client = blob_service_client.get_container_client(container_name)

try:
    # List all blobs in the container
    blobs = container_client.list_blobs(name_starts_with=folder_path)

    for blob in blobs:
        # Extract the directory name from the blob's name
        directory_name = '/'.join(blob.name.split('/')[:-1])

        # Add the directory to the set
        directories.add(directory_name)

except Exception as e:
    print(f"Error: {e}")

Converting Json to delta tables

In [56]:
for folder_name in directories:
    
    container_client = blob_service_client.get_container_client(container_name)

    # List all blobs in the container with the specified prefix
    blobs = container_client.list_blobs(name_starts_with=folder_name)
    dataframes = []
    error_list=[]

    # Read each JSON file individually and write separately
    for blob in blobs:
        try:
            blob_client = container_client.get_blob_client(blob.name)
            content = blob_client.download_blob().readall()
            json_data = json.loads(content)

            # Check if the JSON is nested or not
            if isinstance(json_data, dict) and len(json_data) > 0 :
                # Extract column names dynamically from JSON keys
                columns = ["data_year"] + [key.replace(" ", "_").replace(',','_').replace('(','').replace(')','') for key in json_data["keys"]]

                # Define the schema dynamically
                schema = StructType([StructField(col, IntegerType(), True) for col in columns])

                # Create a DataFrame from the JSON data with the dynamic schema and values
                data = [(row["data_year"], *[row[key] for key in json_data["keys"]]) for row in json_data["data"]]

                df = spark.createDataFrame(data, schema)
                df=df.orderBy('data_year')
                df=df.coalesce(1)
                silver_blob_relative_path1=blob.name
                wasbs_path1=wasbs_path+silver_blob_relative_path1
                df.write.format('delta').mode('overwrite').option("overwriteSchema",True).option("path",wasbs_path1).save()
                print('file_uploaded 1:', blob.name)
        

            elif isinstance(json_data, list) and len(json_data) > 0:
                # Directly create DataFrame from the JSON data
                df = spark.read.option("inferSchema", "true").json(spark.sparkContext.parallelize([json_data]))
                # Append the Dataframes to the empty list
                dataframes.append(df)

            elif isinstance(json_data, list) and len(json_data) == 0:
                print('No_json_data_found:', blob.name)
            
            else:
                print('Not_a_JSON_file:', blob.name)

        except Exception as e:

            error_list.append((wasbs_path, str(e)))
            print('error_found')

    if len(dataframes) > 0:

        result_df = dataframes[0]
        for df in dataframes[1:]:
            if len(df.columns)==len(result_df.columns):
                result_df = result_df.union(df)
            else:
                columns_df1 = result_df.columns
                columns_df2 = df.columns

                # Find the columns that are missing in each dataframe
                missing_columns_df1 = [col for col in columns_df2 if col not in columns_df1]
                missing_columns_df2 = [col for col in columns_df1 if col not in columns_df2]

                # Add missing columns with null values
                for col in missing_columns_df1:
                    result_df = result_df.withColumn(col, lit(None))

                for col in missing_columns_df2:
                    df = df.withColumn(col, lit(None))

                # Select columns in the same order
                df = df.select(result_df.columns)

                # Use union to merge the dataframes
                result_df = result_df.union(df)

        result_df=result_df.orderBy('data_year')
        result_df=result_df.coalesce(1)
        # Writing in Silver Layer
        silver_blob_relative_path2 = folder_name

        wasbs_path2=wasbs_path+silver_blob_relative_path2

        result_df.write.format('delta').mode('overwrite').option("overwriteSchema",True).option("path",wasbs_path2).save()
        print('file_uploaded 2:', silver_blob_relative_path2)




Uploading error log to blob

In [291]:
if len(error_list) !=0:
    pandas_df = pd.DataFrame(error_list,columns=["URL","Reason"])
    filelocation = f'{folder_name}'+'error_files'+'/'+ f"error.csv"
    blob_client = blob_service_client.get_blob_client(container_name,f"{filelocation}")
    csv_file = pandas_df.to_csv(index=False)
    blob_client.upload_blob(csv_file,overwrite=True)
    print('error_file_uploaded',filelocation)