In [1]:
mainfile = """\
from __future__ import print_function
import os
import configparser
import sys
import traceback
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import pyspark
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, array, ArrayType, DateType, DecimalType
from pyspark.sql.functions import *
from pyspark.sql.functions import concat, lit, col, udf
from delta import * 
import shutil

import logging
logging.basicConfig(level="INFO")
logger = logging.getLogger(__name__) # __name__=docai
logger.info("This is an INFO message on the root logger.")


if __name__ == "__main__":
    # Start Spark Session
    
    table_name = "delta-table"
    container_name = "update-me"
    storage_account_name = "update-me"
    account_access_key = "update-me"

    try:
        builder = SparkSession \
                    .builder \
                    .appName("DocumentAI") \
                    .master("local") \
                    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.1.0") \
                    .config("spark.jars.packages", "org.apache.hadoop:hadoop-azure-datalake:3.3.1") \
                    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
                    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
                    .config(f"spark.hadoop.fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "SharedKey") \
                    .config(f"spark.hadoop.fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", f"{account_access_key}")

        logger.info(SparkConf().getAll())
        spark = configure_spark_with_delta_pip(builder).getOrCreate()
        spark.sparkContext.setLogLevel('INFO')
    
    except Exception as error:
        logger.info(f"Exception occurred {error}")
        logger.info("Spark builder connection prompted out due to : %s", error)
        logger.info(traceback.format_exc())

    # Create a data schema
    schema = StructType([
            StructField("city", StringType(), True),
            StructField("dates", StringType(), True),
            StructField("population", IntegerType(), True)])


    dates = ["1991-02-25","1998-05-10", "1993/03/15", "1992/07/17", "1992-05-23", "2022-06-23"]
    cities = ['Caracas', 'Ccs', 'São Paulo', 'Madrid', "San Francisco", "New Hampshire"]
    population = [37800000, 19795791, 12341418, 6489162, 8483993, 76234589]

    # Dataframe:
    data = spark.createDataFrame(list(zip(cities, dates, population)), schema=schema)

    # write to delta-lake
    # data.write.format("delta").save(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{table_name}")
    # logger.info("Successfully written")
    # data.show(truncate=False)

    # Append the new-data to delta-table
    data.write \
        .format("delta") \
        .mode("append")  \
        .save(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{table_name}")


    # Read from delta-table
    new_df = spark.read.format("delta").load(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{table_name}")
    new_df.show()

    spark.stop()
"""

In [2]:
# Write mainfile

mainpy_path = './main.py'
with open(mainpy_path,'w') as f:
    f.write(mainfile)

In [None]:
sparkjob = """\
    from __future__ import print_function
import pyspark
import traceback
import configparser
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType,  DateType
from pyspark.sql.functions import *
from delta import * 
from delta.tables import DeltaTable
import shutil

import logging
logging.basicConfig(level="INFO")
logger = logging.getLogger(__name__) # __name__=docai
logger.info("This is an INFO message on the root logger.")


logger.info("Get configuration files to create a connection to Azure Storage Gen2")
config = configparser.ConfigParser()
config.read('config.cfg.template', encoding='utf-8-sig')
container_name       =  config['AZURE']['CONTAINER_NAME']
storage_account_name =  config['AZURE']['STORAGE_ACCOUNT_NAME']
account_access_key   =  config['AZURE']['ACCOUNT_ACCESS_KEY']

logger.info("Get storage name for table and raw data")
table_name             = config['STORAGE']['TABLE_NAME']
Azure_10k_filings_data = config['STORAGE']['AZURE_10K_FILINGS_DATA']

logger.info("Delta table and storage data url")
AZURE_10K_CSV_DATA = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{Azure_10k_filings_data}"
DELTA_TABLE        = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{table_name}"
    
    
# load data from Azure Gen2
def session(spark):

    logger.info("predefined schema of StrucType")
    docai_schema =  StructType(
                        [StructField("cik_number", LongType(), True),
                        StructField("company_name", StringType(), True),
                        StructField("form_id", StringType(), True),
                        StructField("date", DateType(), True),
                        StructField("file_url", StringType(), True)
                        ])
    
    logger.info("read csv dataframe from Azure")
    logger.info(f"reading {Azure_10k_filings_data} filings data from azure storage : {storage_account_name}")
    df_filings = spark.read \
                      .format("csv") \
                      .option("header", "true") \
                      .option("inferSchema", "true") \
                      .option("nullValue", "null") \
                      .load(AZURE_10K_CSV_DATA)
    
    # If there is a need to specify the schema
    df_filings = spark.read \
                      .format("csv") \
                      .option("header", "true") \
                      .schema(docai_schema) \
                      .option("nullValue", "null") \
                      .load(AZURE_10K_CSV_DATA)
                      
                      
    df_filings.show(15)
    

    # Clear any previous runs
    logger.info("clearing any previous runs")
    shutil.rmtree(DELTA_TABLE, ignore_errors=True)
                                         
    logger.info("Atomically append new data to an existing Delta table")
    logger.info(f"Updating/Appending data to the delta table : {table_name}")
    df_filings.write \
              .format("delta") \
              .mode("append")  \
              .save(DELTA_TABLE)
    
    logger.info(f"Data stored/appended to the Delta table {table_name}")


def readTable(spark):
    
    logger.info("sample data from the delta-table")
    table_data = spark.read \
                      .format("delta") \
                      .option("header", "true") \
                      .option("inferSchema", "true") \
                      .load(DELTA_TABLE)
                      
    return table_data.show(10)



if __name__ == "__main__":

    try:
        builder = SparkSession.builder \
                .appName("Documentai") \
                .master("local[*]") \
                .config("spark.jars.packages", "io.delta:delta-core_2.12:1.1.0") \
                .config("spark.jars.packages", "org.apache.hadoop:hadoop-azure-datalake:3.3.1") \
                .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
                .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
                .config(f"spark.hadoop.fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "SharedKey")\
                .config(f"spark.hadoop.fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", f"{account_access_key}") 

        logger.info(SparkConf().getAll())
        spark = configure_spark_with_delta_pip(builder).getOrCreate()
        spark.sparkContext.setLogLevel('INFO')
        
    except Exception as error:
        logger.info(f"Exception occurred {error}")
        logger.info("Spark builder connection prompted out due to : %s", error)
        logger.info(traceback.format_exc())
        
    logger.info("instantiating the session to read data from Azure and write to delta-lake")
    session(spark)
    
    logger.info("Read and display few rows of data from delta-table")
    readTable(spark)
    
    logger.info("Stopping Spark session...")
    spark.stop()
"""

In [None]:
spark_job_path = './sparkjob.py'
with open(spark_job_path,'w') as f:
    f.write(sparkjob)

In [3]:
dockerfile = """\
FROM gcr.io/datamechanics/spark:3.2.0-hadoop-3.3.1-java-11-scala-2.12-python-3.8-dm16

RUN python -m venv /opt/spark-env

USER root

WORKDIR /app

RUN . /opt/spark-env/bin/activate && pip install --upgrade pip

COPY requirements.txt /app/
RUN . /opt/spark-env/bin/activate && pip install -r requirements.txt

COPY main.py /app
COPY sparkjob.py /app
"""

In [4]:
docker_path = './Dockerfile'
with open(docker_path,'w') as f:
    f.write(dockerfile)

In [6]:
requirements = """\
pyspark 
delta-spark
click
"""

In [7]:
text_path = './requirements.txt'
with open(text_path,'w') as f:
    f.write(requirements)

In [None]:
# Build docker image

!docker build -t sparkrun .

In [None]:
# To view the built docker images

!docker images

In [None]:
# Define the credentials to connect to Azure

resourceGroupName = '<rg name>' 
location ='<rg location>'
acrName = '<acr name>'
tenant_id = '<tenant_id>'

In [None]:
!echo $resourceGroupName
!echo $location
!echo $acrName

In [None]:
# Login to Azure portal

!az login

In [None]:
# Login to ACR

!az acr login --name $acrName

In [None]:
# Tag the docker image to be pushed to ACR

!docker tag sparkrun $acrName".azurecr.io/sparkrun:v1"

In [None]:
!docker images

In [None]:
# Push the docker image to ACR

!docker push $acrName".azurecr.io/sparkrun:v1"

In [None]:
# List the docker image in the ACR repository

!az acr repository list --name $acrName --output table