In [1]:
import pandas as pd
from pyspark.sql.dataframe import DataFrame as spark_DataFrame
from pyspark.sql.functions import col, count, round 
from pyspark.sql import SparkSession
import numpy as np

In [2]:
spark = SparkSession.builder.appName('LeetCode').getOrCreate()

25/01/01 21:38:36 WARN Utils: Your hostname, madiv resolves to a loopback address: 127.0.1.1; using 192.168.1.11 instead (on interface wlo1)
25/01/01 21:38:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/01 21:38:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/01 21:39:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark_Signups = spark.read.option('header', True).option('delimiter', ',').csv('Signups.csv')
spark_Confirmations = spark.read.option('header', True).option('delimiter', ',').csv('Confirmations.csv')
pandas_Signups = pd.read_csv('Signups.csv')
pandas_Confirmations = pd.read_csv('Confirmations.csv')

                                                                                

In [4]:
spark_Signups.show(100)

                                                                                

+-------+-------------------+
|user_id|         time_stamp|
+-------+-------------------+
|      3|2020-03-21 10:16:13|
|      7|2020-01-04 13:57:59|
|      2|2020-07-29 23:09:44|
|      6|2020-12-09 10:39:37|
+-------+-------------------+



In [5]:
spark_Confirmations.show(100)

                                                                                

+-------+-------------------+---------+
|user_id|         time_stamp|   action|
+-------+-------------------+---------+
|      3| 2021-01-06 3:30:46|  timeout|
|      3|2021-07-14 14:00:00|  timeout|
|      7|2021-06-12 11:57:29|confirmed|
|      7|2021-06-13 12:58:28|confirmed|
|      7|2021-06-14 13:59:27|confirmed|
|      2| 2021-01-22 0:00:00|confirmed|
|      2|2021-02-28 23:59:59|  timeout|
+-------+-------------------+---------+



In [6]:
pandas_Signups.head(100)

Unnamed: 0,user_id,time_stamp
0,3,2020-03-21 10:16:13
1,7,2020-01-04 13:57:59
2,2,2020-07-29 23:09:44
3,6,2020-12-09 10:39:37


In [7]:
pandas_Confirmations.head(100)

Unnamed: 0,user_id,time_stamp,action
0,3,2021-01-06 3:30:46,timeout
1,3,2021-07-14 14:00:00,timeout
2,7,2021-06-12 11:57:29,confirmed
3,7,2021-06-13 12:58:28,confirmed
4,7,2021-06-14 13:59:27,confirmed
5,2,2021-01-22 0:00:00,confirmed
6,2,2021-02-28 23:59:59,timeout


### SQL

In [8]:
def confirmation_rate(signups: spark_DataFrame, confirmations: spark_DataFrame) -> pd.DataFrame:
    signups.createOrReplaceTempView('Signups')
    confirmations.createOrReplaceTempView('Confirmations')
    
    sqlQuery = \
    '''
    WITH Confirmed AS (
        -- CTE: Get all rows from the Confirmations table where the action is 'confirmed'
        SELECT * 
        FROM Confirmations
        WHERE action = 'confirmed'
    ),

    ConfirmationCounts AS (
        -- CTE: Calculate the count of confirmed actions for each user
        SELECT 
            s.user_id, 
            COALESCE(count(c.action), 0) AS confirmation_count  -- Count confirmed actions, default to 0 if no actions found
        FROM Signups s
        LEFT JOIN Confirmed c  -- Join Signups with the Confirmed CTE
        ON s.user_id = c.user_id  -- Match by user_id
        GROUP BY s.user_id  -- Group by user_id to get confirmation count per user
    ),

    TotalRequests AS (
        -- CTE: Calculate the total number of requests (actions) for each user
        SELECT
            s.user_id,
            COALESCE(count(*), 0) AS total_count  -- Count all requests (including confirmed and timed-out), default to 0 if no actions found
        FROM Signups s
        LEFT JOIN Confirmations c  -- Join Signups with the Confirmations table
        ON s.user_id = c.user_id  -- Match by user_id
        GROUP BY s.user_id  -- Group by user_id to get total count per user
    )

    -- Final Query: Calculate the confirmation rate as the ratio of confirmed actions to total requests for each user
    SELECT 
        cc.user_id,  -- User ID from ConfirmationCounts
        ROUND(AVG(cc.confirmation_count /tr.total_count), 2) AS confirmation_rate  -- Calculate confirmation rate, handle division by zero using NULLIF
    FROM ConfirmationCounts cc
    JOIN TotalRequests tr  -- Join ConfirmationCounts with TotalRequests by user_id
    ON cc.user_id = tr.user_id  -- Match by user_id
    GROUP BY cc.user_id;  -- Group by user_id to calculate the confirmation rate per user
    '''
    
    output = spark.sql(sqlQuery = sqlQuery)
    return output

output = confirmation_rate(signups = spark_Signups, confirmations = spark_Confirmations)
output.show(100)

[Stage 11:>                                                         (0 + 1) / 1]

+-------+-----------------+
|user_id|confirmation_rate|
+-------+-----------------+
|      7|              1.0|
|      3|              0.0|
|      6|              0.0|
|      2|              0.5|
+-------+-----------------+



                                                                                

### PySpark

In [9]:
def confirmation_rate(signups: spark_DataFrame, confirmations: spark_DataFrame) -> pd.DataFrame:
    # Filter the Confirmations DataFrame to get only rows where action is 'confirmed'
    Confirmed = confirmations.filter(confirmations['action'] == 'confirmed')
    
    # CTE: Calculate the count of confirmed actions for each user
    ConfirmationCounts = signups.alias('s').join(Confirmed.alias('c'), on = 'user_id', how = 'left')\
        .select(['user_id', 'action'])\
        .groupby(['user_id'])\
        .agg(count(col('action')).alias('confirmation_count'))\
        .select(['user_id', 'confirmation_count'])
    # Explanation: Join Signups with Confirmed DataFrame on user_id, and count the 'action' (which is 'confirmed'). 
    # If no confirmation exists, the count will be 0 due to LEFT JOIN.

    # CTE: Calculate the total number of confirmation requests for each user
    TotalRequests = signups.alias('s').join(confirmations.alias('c'), on = 'user_id', how = 'left')\
        .select(['user_id'])\
        .groupby(['user_id'])\
        .agg(count(col('user_id')).alias('total_count'))\
        .select(['user_id', 'total_count'])
    # Explanation: Join Signups with Confirmations on user_id, and count the total requests for each user.
    # Even users with no requests will be included (count will be 0) due to LEFT JOIN.

    # Final Calculation: Join the two CTEs (ConfirmationCounts and TotalRequests) to calculate confirmation rate
    ConfirmationRate = TotalRequests.alias('tr').join(ConfirmationCounts.alias('cc'), 
                                                      on = 'user_id', how = 'inner')\
        .groupby(['user_id', 'confirmation_count', 'total_count'])\
        .agg(round((col('confirmation_count')/col('total_count')), 2).alias('confirmation_rate'))\
        .select(['user_id', 'confirmation_rate'])
    # Explanation: Join the two CTEs to match the user_ids and calculate the confirmation rate.
    # The confirmation rate is calculated by dividing the 'confirmation_count' by 'total_count', 
    # rounded to 2 decimal places.
    
    # Return the DataFrame containing user_id and their respective confirmation rate
    return ConfirmationRate

output = confirmation_rate(signups = spark_Signups, confirmations = spark_Confirmations)
output.show(100)

[Stage 19:>                                                         (0 + 1) / 1]

+-------+-----------------+
|user_id|confirmation_rate|
+-------+-----------------+
|      7|              1.0|
|      3|              0.0|
|      6|              0.0|
|      2|              0.5|
+-------+-----------------+



                                                                                

### Pandas

In [10]:
def confirmation_rate(signups: pd.DataFrame, confirmations: pd.DataFrame) -> pd.DataFrame:
    # Filter the 'confirmations' DataFrame for rows where 'action' is 'confirmed'
    confirmed_confirmations = confirmations[confirmations['action'] == 'confirmed']
    
    # Merge the signups DataFrame with confirmed confirmations to get users with confirmed actions
    signups_with_confirmations = signups.merge(confirmed_confirmations, on='user_id', how='left')
    
    # Extract only the 'user_id' and 'action' columns for processing
    action_data = signups_with_confirmations[['user_id', 'action']]
    
    # Initialize a dictionary to store the confirmation counts for each user
    confirmation_counts_dict = {}
    
    # Iterate over each row in the action_data to count confirmations per user
    for row in action_data.values:
        user_id = row[0]  # Get the user_id
        action = row[1]   # Get the action (either 'confirmed' or NaN)
        
        # Initialize the user ID in the dictionary if not already present
        confirmation_counts_dict[user_id] = confirmation_counts_dict.get(user_id, 0)
        
        # If the action is 'confirmed', increment the confirmation count for the user
        if action == 'confirmed':
            confirmation_counts_dict[user_id] += 1

    # Convert the dictionary of user confirmation counts into a DataFrame
    confirmation_counts = pd.DataFrame(confirmation_counts_dict.items(), columns=['user_id', 'confirmation_count'])

    # Create the TotalRequests DataFrame which counts total confirmation requests per user
    total_requests = signups.merge(confirmations, on='user_id', how='left')\
                            .groupby(['user_id']).size().reset_index(name='total_count')

    # Merge the confirmation counts with the total requests to get confirmation data for each user
    merged_confirmation_data = total_requests.merge(confirmation_counts, on='user_id', how='inner')

    # Extract the relevant columns: 'confirmation_count' and 'total_count'
    relevant_data = merged_confirmation_data[['confirmation_count', 'total_count']]
    
    # Initialize a list to store the calculated confirmation rates
    confirmation_rate_list = []
    
    # Iterate through each row and calculate the confirmation rate as (confirmed count / total count)
    for row in relevant_data.values:
        # Calculate the confirmation rate and round to 2 decimal places
        rate = np.round((row[0] / row[1]), 2) if row[1] > 0 else 0.0  # Prevent division by zero
        confirmation_rate_list.append(rate)

    # Add the calculated confirmation rates to the DataFrame
    merged_confirmation_data['confirmation_rate'] = confirmation_rate_list
    
    # Select only the 'user_id' and 'confirmation_rate' columns for the final result
    result = merged_confirmation_data[['user_id', 'confirmation_rate']]
    
    return result

output = confirmation_rate(signups = pandas_Signups, confirmations = pandas_Confirmations)
output.head(100)

Unnamed: 0,user_id,confirmation_rate
0,2,0.5
1,3,0.0
2,6,0.0
3,7,1.0
