In [2]:
#allow Jupyter to find the spark installation

import findspark
findspark.init()
findspark.find()

#create a new spark session

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
spark #run this cell to access the sparkui if needed

In [17]:
#set your parameters

server = 'database-1.choyskek6t2g.eu-north-1.rds.amazonaws.com' #data from files and the aggregations will be stored in this sql server
port = '1433' #sql server port
database = 'arqivaTechTest' #store data in this database
workingTablePrefix = 'wrk_' #database working tables will have this prefix
dbUser = 'rdsAccess' #the user you will use to read and write to the sql server
dbPassword = 'xxxxxx' #the password for the dbUser - future work: pull this direct from AWS Secrets
localStorage = 'C:\\temp\\arqiva\\' #files from s3 will be pulled to local storage to this location
storageBucket = 'jameslester78-files' #files will land here and be archived here, in seperate folders


#future work: remove need to escape slash in filepath

In [59]:
#functions required for project

#future work: add in error handling and logging

from datetime import datetime
from pyspark.sql.functions import sum
from pyspark.sql.functions import col
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import datediff
from pyspark.sql.functions import current_date
from pyspark.sql.functions import floor

import boto3
import pyodbc
import re

def getSqlTable(server,database,table,user,password):

    #fetch a table from a sql server rds and load it into a dataframe
    
    df = spark.read.format("jdbc").options(
        url=f"jdbc:sqlserver://{server}:{port};database={database};", 
        driver="com.microsoft.sqlserver.jdbc.SQLServerDriver",
        dbtable=f'{table}',
        user=f"{user}",
        password=f"{password}" #future work:this could be pulled from AWS secrets manager
    ).load()
    
    return df



def writeWorkingTableToDb(server,port,database,workingTablePrefix,table,dbUser,dbPassword,df):

    #create a working table on the rds
    
    df.write.format("jdbc") \
          .mode('overwrite') \
          .option("url", f"jdbc:sqlserver://{server}:{1433};database={database};") \
          .option("dbtable", f'{workingTablePrefix}{table}') \
          .option("user", f'{dbUser}') \
          .option("password", f'{dbPassword}') \
          .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
          .save()
    
    return 

def createAggregationTable():

    #Sum the transaction amount field for each user and write it to the RDS
    
    #pulling all data into a spark df, aggregating it then pushing it back to rds might not be optimal
    #it may be better to do the aggregation on the rds, however we want to show some spark coding for this 
    #technical test

    #fetch tables from rds into dataframes
    transactions = getSqlTable(server,database,'transactions',dbUser,dbPassword)
    users = getSqlTable(server,database,'users',dbUser,dbPassword)

    #aggregate the transaction data frame
    transactionsAgg = transactions.filter("EndDate is NULL").groupBy("userFK").agg(sum("amount").alias("transactionSum"))

    #join the aggreated dataframe to the users dataframe and select the columns we are interested in
    transSum = users.join(transactionsAgg, users.id == transactionsAgg.userFK, 'inner').select(users.id,users.name,transactionsAgg.transactionSum)

    #for debugging only - show the resultant dataframe in the ui
    transSum.show()

    writeWorkingTableToDb (server,port,database,'','transSum',dbUser,dbPassword,transSum)

def findFilesToProcess(bucketName,localStorage):

    #download files from s3 to local storage
    #return the local storeage filepath of downloaded files
    
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucketName)

    #prepare lists to hold files
    #future work: enable script to process multiple files at once if found in bucket
    
    userFiles = []
    transactionsFiles = []

    for obj in bucket.objects.all():
        key = obj.key
        body = obj.get()['Body'].read()

        if key == 'unprocessed/users.csv':
            userFiles.append(key)

            file = open(localStorage + 'users.csv', 'w')
            file.write(body.decode("utf-8"))
            file.close()
            
        if key == 'unprocessed/transactions.csv':
            transactionsFiles.append(key) 
            
            file = open(localStorage + 'transactions.csv', 'w')
            file.write(body.decode("utf-8"))
            file.close()            
            
    return userFiles,transactionsFiles

def createDataframeFromLocalFiles(fileList):

    #convert file downloaded from s3 into dataframes
    
    returnList = []
    
    for list in fileList:
        for ele in list:
            localLoc = localStorage + ele.replace('unprocessed/','')
            df = spark.read.format("csv").option("header", "true").load(localLoc)
            returnList.append((ele,localLoc,df))
    
    return returnList

def transformFiles(createDataframeFromLocalFilesReturned):

    transformedData = []  
    
    #calculate age, remove any user under 18 years old
    #cast datatypes where not string
    #return processed dataframes
    
    for file in createDataframeFromLocalFilesReturned:

        if file[0] == 'unprocessed/users.csv':
            
            df = file[2]
            df = df.withColumn("date_of_birth",to_timestamp(col("date_of_birth"), 'dd/MM/yyyy')) #convert dob to timestamp
            df = df.withColumn("age",floor(datediff(current_date(),col("date_of_birth"))/365))  #calculate age in years
            df = df.filter(col("age") > 18)  #remove users under 18
            df = df.drop("age")  #no need to retain the age working column 
            df = df.withColumn("user_id",col("user_id").cast("integer"))  #convert field to int
            transformedData.append ((file[0],file[1],file[2],df))   
    
        if file[0] == 'unprocessed/transactions.csv':
            
            df = file[2]
            df = df.withColumn("transaction_date",to_timestamp(col("transaction_date"), 'dd/MM/yyyy')) #convert txn date to timestamp
            df = df.withColumn("amount",col("amount").cast("integer")) #convert field to int     
            df = df.withColumn("transaction_id",col("transaction_id").cast("integer")) #convert field to int     
            df = df.withColumn("user_id",col("user_id").cast("integer")) #convert field to int     
            transformedData.append ((file[0],file[1],file[2],df))

    return (transformedData)


def updateSDC2Tables():

    #update the scd2 tables
    
    #in order to avoid FK errors:
    #inserts first, user table followed by transactions
    #delete transactions next, then users
    #finally updates
    
    #data should be loaded as scd2, which means we need a new column (startdate and enddate)
    #add new field id for surrogte key and fk field in txns table
    
    deleteUsers =   "update users "\
                    "set endDate = getdate() "\
                    "from users a "\
                    "inner join wrk_users b on a.user_id = b.user_id "\
                    "where b.type = 'd' and a.endDate IS NULL"
    
    insertUsers =   "insert into users (user_id,name,email,date_of_birth) "\
                    "select user_id,name,email,date_of_birth "\
                    "from wrk_users "\
                    "where type = 'i'"
    
    updateUser  =   "update users "\
                    "set endDate = getdate() "\
                    "from users a "\
                    "inner join wrk_users b on a.user_id = b.user_id  "\
                    "where b.type = 'u'  and a.endDate IS NULL "\
                    "insert into users ( user_id,name,email,date_of_birth) select user_id,name,email,date_of_birth  "\
                    "from wrk_users "\
                    "where type = 'u'  "
    
    #end any transactions where the user has been deleted
    
    endtxns =           "update transactions "\
                        "set EndDate = getdate() "\
                        "from transactions a "\
                        "inner join wrk_users b on a.user_id = b.user_id "\
                        "where type = 'd'"
    
    #update FKs for amended users who how have new surrogate PKs
    
    updateTxnUserFK =   "update transactions "\
                        "set EndDate = getdate() "\
                        "from transactions a "\
                        "inner join wrk_users b on a.user_id = b.user_id "\
                        "inner join (select transaction_id,max(id) id from transactions group by transaction_id) c on c.id = a.id "\
                        "where type = 'u' "\
                        "\n"\
                        "insert into transactions (userFk,transaction_id,user_id,amount,transaction_date) "\
                        "select c.id,a.transaction_id,a.user_id,a.amount,a.transaction_date "\
                        "from transactions a "\
                        "inner join wrk_users b on a.user_id = b.user_id "\
                        "inner join (select user_id,max(id) id from users group by user_id) c on b.user_id = c.user_id "\
                        "inner join (select transaction_id,max(id) id from transactions group by transaction_id) d on d.id = a.id "\
                        "where type = 'u' "
    
    #update transaction table with transaction table changes
    
    deleteTransactions =    "update transactions "\
                            "set endDate = getdate() "\
                            "from wrk_transactions a  "\
                            "inner join (select transaction_id,max(transaction_id) id from transactions group by transaction_id) b on a.transaction_id = b.transaction_id "\
                            "where a.type = 'd'  "
    
    insertTransaction =     "insert into transactions(transaction_id,user_id,amount,transaction_date,userFK)  "\
                            "select transaction_id,a.user_id,amount,transaction_date,b.id "\
                            "from wrk_transactions  a "\
                            "inner join (select user_id,max(id) id from users group by user_id) b on a.user_id = b.user_id "\
                            "where type = 'i' "
    
    
    updateTransaction =     "update transactions  "\
                            "set endDate = getdate()  "\
                            "from (select transaction_id,max(id) id from transactions group by transaction_id) a  "\
                            "inner join wrk_transactions b on a.transaction_id = b.transaction_id   "\
                            "inner join transactions d on d.transaction_id = a.transaction_id "\
                            "where b.type = 'u'   "\
                            "\n  "\
                            "insert into transactions (transaction_id,user_id,amount,transaction_date,userFK) "\
                            "select transaction_id,a.user_id,amount,transaction_date,b.id "\
                            "from wrk_transactions a "\
                            "inner join (select user_id,max(id) id from users group by user_id) b on a.user_id = b.user_id "\
                            "where type = 'u'  "
    
    connectString = 'Driver={ODBC Driver 17 for SQL Server};Server='
    connectString += f'{server};Database={database};Uid={dbUser};Pwd={dbPassword};'
    
    cnxn  = pyodbc.connect(connectString, autocommit=False) 
    crsr = cnxn.cursor()
    crsr.execute(insertUsers)
    crsr.execute(deleteUsers)
    crsr.execute(updateUser)
    crsr.execute(endtxns)
    crsr.execute(updateTxnUserFK)
    crsr.execute(deleteTransactions)
    crsr.execute(insertTransaction)
    crsr.execute(updateTransaction)
    cnxn.commit()
    cnxn.close()

    return


def moveS3File (source,destination,sourceBucket):

    #move files from unprocessed folder to processed folder
    
    #get datetime for archive suffix
    now = datetime.now().strftime("%Y%m%d%H%M%S")
    
    copy_source = {
        'Bucket': f'{sourceBucket}',
        'Key': f'{source}'
    }    

    s3 = boto3.resource('s3')
    bucket = s3.Bucket(sourceBucket)

    #using regular expresiion add a timestamp to the filename before the period
    
    destination = re.sub(r'(\.)', rf'_{now}\1', destination)

    bucket.copy(copy_source, destination)

    s3.Object(sourceBucket,source).delete()
    
    return bucket

In [31]:
#execute steps for a pair of user and transaction files

outputFiles = findFilesToProcess(storageBucket,localStorage) #download files from S3
createDataframeFromLocalFilesReturned = createDataframeFromLocalFiles(outputFiles) #create dataframes
transformedFilesReturned = transformFiles(createDataframeFromLocalFilesReturned) #transform dataframes

#view dataframes
users  = transformedFilesReturned[0][-1] 
transactions = transformedFilesReturned[1][-1]
users.show()
transactions.show()

#write dataframes to rds
writeWorkingTableToDb (server,port,database,workingTablePrefix,'users',dbUser,dbPassword,users)
writeWorkingTableToDb (server,port,database,workingTablePrefix,'transactions',dbUser,dbPassword,transactions)

#update the slowly changing dimension tables
updateSDC2Tables()

#refresh the aggregation table
createAggregationTable()

#archive the files (move them to the processed folder and timestamp them)
moveS3File('unprocessed/users.csv','processed/users.csv','jameslester78-files')
moveS3File('unprocessed/transactions.csv','processed/transactions.csv','jameslester78-files')


#future work - data quality checks - eg each user id has only one userFK in transaction table

+-------+-----+---------------+-------------------+----+
|user_id| name|          email|      date_of_birth|type|
+-------+-----+---------------+-------------------+----+
|      1|James|james@gmail.com|1978-09-13 00:00:00|   i|
|      2|Craig|  craig@aol.com|1972-06-02 00:00:00|   i|
+-------+-----+---------------+-------------------+----+

+--------------+-------+------+-------------------+----+
|transaction_id|user_id|amount|   transaction_date|type|
+--------------+-------+------+-------------------+----+
|             4|      1|   170|2024-07-02 00:00:00|   u|
|             5|      3|    10|2024-07-03 00:00:00|   i|
|             6|      4|     6|2024-07-03 00:00:00|   i|
+--------------+-------+------+-------------------+----+

+---+-----+--------------+
| id| name|transactionSum|
+---+-----+--------------+
|126|James|         80.00|
|125|  Jay|         24.00|
|131|James|        170.00|
|124|Lydia|         40.00|
+---+-----+--------------+

