In [3]:
import sqlite3
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, date_format, to_timestamp
from pyspark.sql.types import DecimalType

def main():
    # Load the csv files
    subscribers_file = r"C:\Users\nikol\Desktop\Data_Engineer_Assessment\subscribers.csv"
    transactions_file = r"C:\Users\nikol\Desktop\Data_Engineer_Assessment\data_transactions.csv"

    # Create a Spark Session to create the Spark App
    spark = SparkSession.builder \
        .appName("Transactions with Subscribers App") \
        .getOrCreate()

    # Spark read the 2 datasets 
    subscribers_df = spark.read.csv(subscribers_file, header=False, inferSchema=True)
    transactions_df = spark.read.csv(transactions_file, header=False, inferSchema=True)

    # Clean the transactions data, subscribers data is cleaned
    transactions_df = transactions_df.toDF("timestamp", "subscriber_id", "amount", "channel")
    subscribers_df = subscribers_df.toDF("subscriber_id", "activation_date")

    # Convert subscriber_id to integer and then back to string in order to load it in the db as text
    # I want it to later compare it with the other dataset's subID that is string as well, so must have same type
    transactions_df = transactions_df.withColumn("subscriber_id", col("subscriber_id").cast("int"))
    transactions_df = transactions_df.withColumn("subscriber_id", col("subscriber_id").cast("string"))

    # Filter rows/ Clean the data
    filtered_df = transactions_df.filter(col("timestamp").rlike(r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\+\d{2}$"))  # Matches the timestamp
    filtered_df = filtered_df.filter(col("amount").rlike(r"\."))  # Cleans out non double values
    filtered_df = filtered_df.filter(col("channel").rlike(r"^[a-zA-Z]+$"))  # Keeps only letters (SMS, USSD)

    # Keep only subscriber_id values that have exactly 6 digits and convert them to strings
    filtered_df = (
        filtered_df
        .filter(col("subscriber_id").rlike(r"^\d{6}$"))  # Match only 6-digit values
        .withColumn("subscriber_id", col("subscriber_id").cast("string"))  # Ensure subscriber_id is a string
    )

    # Remove rows with null or empty values
    filtered_df = filtered_df.na.drop()

    # Convert column types
    converted_df = (
        filtered_df
        .withColumn("timestamp", to_timestamp("timestamp"))  # Convert timestamp
        .withColumn("amount", col("amount").cast(DecimalType(10, 4)))  # Convert amount to decimal
    )

    # Checking if this is the correct one..
    # print("Cleansed Transactions DataFrame:")
    # converted_df.show(150)

    # SQLite setup
    sqlite_db_path = r"C:\Users\nikol\Desktop\Data_Engineer_Assessment\nikosdb.db"
    conn = sqlite3.connect(sqlite_db_path)
    cursor = conn.cursor()

    try:
        
        # Create subscribers and table if it doesn't exist, in order to have it prepared
        cursor.execute('''CREATE TABLE IF NOT EXISTS subscribers (
            row_key TEXT PRIMARY KEY,
            sub_id TEXT,
            activation_date TEXT
        )''')

        # Create transactions table if it doesn't exist
        cursor.execute('''CREATE TABLE IF NOT EXISTS transactions (
            timestamp TEXT,
            sub_id TEXT,
            amount DECIMAL(10, 4),
            channel TEXT
        )''')

        # Prepare subscribers df to load it into the db table
        prepared_subscribers_df = (
            subscribers_df
            .withColumn("row_key", concat_ws("_", col("subscriber_id"), date_format("activation_date", "yyyyMMdd")) )
            .select(
                col("row_key"),
                col("subscriber_id").alias("sub_id"),
                col("activation_date").alias("act_dt")
            )
        )

        # Insert the prepared df into SQLite subscribers table
        for row in prepared_subscribers_df.collect():
            cursor.execute('''INSERT OR REPLACE INTO subscribers (row_key, sub_id, activation_date)
                              VALUES (?, ?, ?)''', (row['row_key'], row['sub_id'], row['act_dt']))

        # Prepare transactions df (cleaned data)
        prepared_transactions_df = converted_df.select(
            col("timestamp"),
            col("subscriber_id").alias("sub_id"),
            col("amount"),
            col("channel")
        )

        # Insert cleaned transaction into SQLite transactions table and then insert it into the db table
        transactions_data = [
            (row['timestamp'].strftime('%Y-%m-%d %H:%M:%S'),  # Convert datetime to string
             row['sub_id'],
             float(row['amount']),
             row['channel']) 
            for row in prepared_transactions_df.collect()
        ]
        
        # print(transactions_data)

        cursor.executemany('''INSERT INTO transactions (timestamp, sub_id, amount, channel)
                               VALUES (?, ?, ?, ?)''', transactions_data)
        
        # Perform INNER JOIN between 2 tables to get the common subscriber IDs
        # Also, selecting the values needed for the parquet file later
        cursor.execute('''
            SELECT
                transactions.timestamp,
                subscribers.row_key,
                subscribers.sub_id AS sub_id,  
                transactions.amount,
                transactions.channel,
                subscribers.activation_date AS act_dt
            FROM
                subscribers 
            INNER JOIN
                transactions 
            ON
                subscribers.sub_id = transactions.sub_id  
        ''')

        # Fetch all the results of the query
        result = cursor.fetchall()

        import pandas as pd
        # Print the result of the JOIN
        print("INNER JOIN Result:")
        for row in result:
            print(row)

        # Convert the query result to create a pandas dataframe to then convert the pd df into parquet file
        result_df = pd.DataFrame(result, columns=["timestamp", "row_key", "sub_id", "amount", "channel", "act_dt"])
        print(result_df)

        result_df.to_parquet(
            r"C:\Users\nikol\Desktop\Data_Engineer_Assessment\joined_results.parquet",
            engine='pyarrow',  
            index=False
            )
        print("Parquet file saved successfully from Pandas DataFrame!")


        #########   MAIN OBJECTIVE ENDS HERE   #########

        # I will create a db table in order to have the non common subIDs stored for later retrieval

        # Create non_common_subI' table
        cursor.execute('''CREATE TABLE IF NOT EXISTS non_common_subID (
            sub_id TEXT PRIMARY KEY
        )''')
        
        # Perform FULL OUTER JOIN emulation to find non-common sub_id values
        # FULL OUTER JOIN == 2 LEFT JOINS (or left and right) 
        cursor.execute('''
            INSERT INTO non_common_subID (sub_id)
            SELECT sub_id FROM (
                SELECT subscribers.sub_id 
                FROM subscribers
                LEFT JOIN transactions
                ON subscribers.sub_id = transactions.sub_id
                WHERE transactions.sub_id IS NULL
                UNION
                SELECT transactions.sub_id 
                FROM transactions
                LEFT JOIN subscribers
                ON transactions.sub_id = subscribers.sub_id
                WHERE subscribers.sub_id IS NULL
            )
        ''')
        
        # Fetch and print the non-common sub_id values
        cursor.execute('SELECT * FROM non_common_subID')
        non_common_sub_ids = cursor.fetchall()
        
        print("Non-Common sub_ids:")
        for row in non_common_sub_ids:
            print(row)

        conn.commit() 

        

    except Exception as e:
        print(f"Error: {e}")
    finally:
        conn.close()  # Close the connection

if __name__ == "__main__":
    main()


INNER JOIN Result:
('2021-08-16 02:14:01', '123456_20210810', '123456', 0.3, 'SMS', '2021-08-10')
('2021-08-16 02:54:43', '123451_20210810', '123451', 0.15, 'SMS', '2021-08-10')
('2021-08-16 02:04:29', '123452_20210810', '123452', 0.15, 'SMS', '2021-08-10')
('2021-08-16 02:39:05', '123453_20210810', '123453', 0.15, 'SMS', '2021-08-10')
('2021-08-16 02:14:19', '123454_20210810', '123454', 0.3, 'SMS', '2021-08-10')
('2021-08-16 02:14:02', '123455_20210810', '123455', 0.15, 'SMS', '2021-08-10')
('2021-08-16 02:09:14', '123457_20210810', '123457', 0.75, 'SMS', '2021-08-10')
('2021-08-16 02:06:21', '123459_20210810', '123459', 0.15, 'SMS', '2021-08-10')
('2021-08-16 02:57:22', '234591_20210810', '234591', 0.3, 'SMS', '2021-08-10')
('2021-08-16 04:02:10', '234592_20210810', '234592', 0.3, 'SMS', '2021-08-10')
('2021-08-16 02:00:51', '234593_20210810', '234593', 0.15, 'SMS', '2021-08-10')
('2021-08-16 02:41:21', '234594_20210810', '234594', 0.3, 'SMS', '2021-08-10')
('2021-08-16 02:06:27', '2