In [1]:
# need to split everything by sales rep, probably straight after raw_df import

In [28]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.functions import col, when, min as min_
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.window import Window
from functools import reduce
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
import delta
import numpy as np
import statsmodels.api as sm
from itertools import combinations
from scipy import integrate
import matplotlib.pyplot as plt
from scipy.stats import poisson
import math
import json

In [29]:
def create_spark_session():
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .appName("YourAppName") \
        .getOrCreate()
    return spark

# Initialize SparkSession and make it globally available
spark = create_spark_session()

from pyspark.sql.types import StructField, StructType, FloatType

def load_and_prepare_data(file_location, file_type="csv", infer_schema="true", first_row_is_header="true", delimiter=",", temp_table_name="raw_data"):
    df = spark.read.format(file_type) \
        .option("inferSchema", infer_schema) \
        .option("header", first_row_is_header) \
        .option("sep", delimiter) \
        .load(file_location)  # This line completes the DataFrame reading process

    df.createOrReplaceTempView(temp_table_name)
    return df


# Correct usage:
file_location = "/Users/mennahjafar/Desktop/scalr/backend/Final_Corrected_Complex_datasetv2_v2.csv"
df = load_and_prepare_data(file_location)

#modifying the opportunity stage column to replace Closed with Closed Won or Closed Lost depending on how it's ended
df = df.withColumn("Opportunity Stage", 
                            when((col("Opportunity Stage") == "Closed") & (col("Closed Won") == 1), "Closed Won")
                            .when((col("Opportunity Stage") == "Closed") & (col("Closed Lost") == 1), "Closed Lost")
                            .otherwise(col("Opportunity Stage")))


df.show()
df_json = df.toJSON().collect()

['{"PERIOD":"2021-11-01","OwnerId":1,"DealID":1,"Opportunity Stage":"Pitching","Closed won":0,"Closed lost":0,"Inbound Calls":1,"Inbound Email":3,"Outbound Email":1,"Meetings Scheduled":1,"Lead Response Time":8,"Average Contract Value":4307.5,"Sales Cycle Length":0,"Linkedin Messaging":7,"Daily opportunity movements":2,"Outbound leads created":0,"Contacts per/in company":1,"Sales by territory":1,"Sales by industry":2,"Sales by channel":1,"Number of Demos":3}', '{"PERIOD":"2021-01-17","OwnerId":1,"DealID":1,"Opportunity Stage":"Negotiation","Closed won":0,"Closed lost":0,"Inbound Calls":6,"Inbound Email":2,"Outbound Email":3,"Meetings Scheduled":0,"Lead Response Time":36,"Average Contract Value":4307.5,"Sales Cycle Length":6,"Linkedin Messaging":1,"Daily opportunity movements":2,"Outbound leads created":3,"Contacts per/in company":6,"Sales by territory":1,"Sales by industry":2,"Sales by channel":1,"Number of Demos":1}', '{"PERIOD":"2021-02-02","OwnerId":1,"DealID":1,"Opportunity Stage":

In [6]:
%run "/Users/mennahjafar/Desktop/scalr/backend/scalr_v2_functions.ipynb"

Note: you may need to restart the kernel to use updated packages.


In [7]:
raw_df = spark.read.table('raw_data')

display(raw_df)

DataFrame[PERIOD: date, OwnerId: int, DealID: int, Opportunity stage: string, Closed won: int, Closed lost: int, Inbound Calls: int, Inbound Email: int, Outbound Email: int, Meetings Scheduled: int, Lead Response Time: int, Average Contract Value: double, Sales Cycle Length: int, Linkedin Messaging: int, Daily opportunity movements: int, Outbound leads created: int, Contacts per/in company: int, Sales by territory: int, Sales by industry: int, Sales by channel: int, Number of Demos: int]

In [8]:
def get_data_by_owner_and_deal_pyspark(owner_id, deal_id=None):
    """
    Filters the dataset for a specific OwnerId in PySpark, optionally filters by DealID,
    and returns datasets for each DealID (if DealID is not specified) sorted chronologically.
    
    Parameters:
    - owner_id: The OwnerId to filter by.
    - deal_id: Optional. The specific DealID to filter by. If None, returns data for all DealIDs for the owner.
    
    Returns:
    - A dictionary of DataFrames, with DealID as keys, each DataFrame sorted chronologically.
      If a specific DealID is provided, returns a single DataFrame for that DealID.
    """
    # Filter by OwnerId
    owner_data = df.filter(df['OwnerId'] == owner_id)
    
    if deal_id is not None:
        # If a specific DealID is provided, further filter by DealID
        deal_data = owner_data.filter(owner_data['DealID'] == deal_id).orderBy('PERIOD')
        return {deal_id: deal_data}
    else:
        # If no specific DealID is provided, separate by DealID and sort each subset
        deal_ids = owner_data.select('DealID').distinct().collect()
        sorted_deal_datasets = {}
        
        for row in deal_ids:
            deal_id = row['DealID']
            deal_data = owner_data.filter(owner_data['DealID'] == deal_id).orderBy('PERIOD')
            sorted_deal_datasets[deal_id] = deal_data
        
        return sorted_deal_datasets




def print_data_for_owner(deal_datasets, deal_id=None):
    """
    Prints the datasets for a specific OwnerId. If a DealID is specified, it prints only that dataset;
    otherwise, it prints all datasets for the OwnerId.
    
    Parameters:
    - deal_datasets: A dictionary of DataFrames with DealID as keys, each DataFrame sorted chronologically.
    - deal_id: Optional. The specific DealID to print data for. If None, prints data for all DealIDs.
    """
    if deal_id is not None:
        # Print data for the specified DealID
        if deal_id in deal_datasets:
            print(f"Data for DealID={deal_id}:\n")
            deal_datasets[deal_id].show()
        else:
            print(f"No data found for DealID={deal_id}.")
    else:
        # Print data for all DealIDs
        for did, data in deal_datasets.items():
            print(f"Data for DealID={did}:\n")
            data.show()

# Example usage:

# Retrieve data for OwnerId = 2 for all DealIDs
data_for_owner = get_data_by_owner_and_deal_pyspark(owner_id=2)

# Print data for all DealIDs for OwnerId = 2
print_data_for_owner(data_for_owner)

# Print data for a specific DealID (e.g., DealID = 3) for OwnerId = 2
print_data_for_owner(data_for_owner, deal_id=3)


Data for DealID=255:

+----------+-------+------+-----------------+----------+-----------+-------------+-------------+--------------+------------------+------------------+----------------------+------------------+------------------+---------------------------+----------------------+-----------------------+------------------+-----------------+----------------+---------------+
|    PERIOD|OwnerId|DealID|Opportunity Stage|Closed won|Closed lost|Inbound Calls|Inbound Email|Outbound Email|Meetings Scheduled|Lead Response Time|Average Contract Value|Sales Cycle Length|Linkedin Messaging|Daily opportunity movements|Outbound leads created|Contacts per/in company|Sales by territory|Sales by industry|Sales by channel|Number of Demos|
+----------+-------+------+-----------------+----------+-----------+-------------+-------------+--------------+------------------+------------------+----------------------+------------------+------------------+---------------------------+----------------------+-----



+----------+-------+------+-----------------+----------+-----------+-------------+-------------+--------------+------------------+------------------+----------------------+------------------+------------------+---------------------------+----------------------+-----------------------+------------------+-----------------+----------------+---------------+
|    PERIOD|OwnerId|DealID|Opportunity Stage|Closed won|Closed lost|Inbound Calls|Inbound Email|Outbound Email|Meetings Scheduled|Lead Response Time|Average Contract Value|Sales Cycle Length|Linkedin Messaging|Daily opportunity movements|Outbound leads created|Contacts per/in company|Sales by territory|Sales by industry|Sales by channel|Number of Demos|
+----------+-------+------+-----------------+----------+-----------+-------------+-------------+--------------+------------------+------------------+----------------------+------------------+------------------+---------------------------+----------------------+-----------------------+---

In [9]:
import pandas as pd

# Load the dataset


def get_data_by_owner_and_deal(owner_id, deal_id=None):
    """
    Filters the dataset for a specific OwnerId, optionally filters by DealID,
    and returns datasets for each DealID (if DealID is not specified) sorted chronologically.
    
    Parameters:
    - owner_id: The OwnerId to filter by.
    - deal_id: Optional. The specific DealID to filter by. If None, returns data for all DealIDs for the owner.
    
    Returns:
    - A dictionary of DataFrames, with DealID as keys, each DataFrame sorted chronologically.
      If a specific DealID is provided, returns a single DataFrame for that DealID.
    """
    # Filter by OwnerId
    owner_data = df[df['OwnerId'] == owner_id]
    
    # If a specific DealID is provided, further filter by DealID
    if deal_id is not None:
        deal_data = owner_data[owner_data['DealID'] == deal_id].sort_values(by='PERIOD')
        return deal_data
    else:
        # If no specific DealID is provided, separate by DealID and sort each subset
        deal_groups = owner_data.groupby('DealID')
        sorted_deal_datasets = {deal_id: group.sort_values(by='PERIOD') for deal_id, group in deal_groups}
        return sorted_deal_datasets

# Example usage:
# To get data for OwnerId = 1, for all DealIDs:
# data_for_owner_1 = get_data_by_owner_and_deal(owner_id=1)

# To get data for OwnerId = 1 and DealID = 2:
# data_for_owner_1_deal_2 = get_data_by_owner_and_deal(owner_id=1, deal_id=2)
    


def print_data_by_owner_and_deal_pyspark(owner_id):
    """
    Filters the dataset for a specific OwnerId in PySpark, separates the dataset by DealID,
    sorts each subset chronologically, and prints the subsets.
    
    Parameters:
    - owner_id: The OwnerId to filter and print data for.
    """
    # Filter by OwnerId
    owner_data = df.filter(df['OwnerId'] == owner_id)
    
    # Get distinct DealIDs for the filtered data
    deal_ids = owner_data.select('DealID').distinct().collect()
    
    # Iterate over each DealID, filter, sort, and print
    for row in deal_ids:
        deal_id = row['DealID']
        deal_data = owner_data.filter(owner_data['DealID'] == deal_id).orderBy('PERIOD')
        print(f"Data for OwnerId={owner_id}, DealID={deal_id}:\n")
        deal_data.show()

# Assuming spark is your SparkSession and raw_df is already loaded as a Spark DataFrame
# Example usage: To print data for OwnerId = 2
print_data_by_owner_and_deal_pyspark(2)




Data for OwnerId=2, DealID=255:

+----------+-------+------+-----------------+----------+-----------+-------------+-------------+--------------+------------------+------------------+----------------------+------------------+------------------+---------------------------+----------------------+-----------------------+------------------+-----------------+----------------+---------------+
|    PERIOD|OwnerId|DealID|Opportunity Stage|Closed won|Closed lost|Inbound Calls|Inbound Email|Outbound Email|Meetings Scheduled|Lead Response Time|Average Contract Value|Sales Cycle Length|Linkedin Messaging|Daily opportunity movements|Outbound leads created|Contacts per/in company|Sales by territory|Sales by industry|Sales by channel|Number of Demos|
+----------+-------+------+-----------------+----------+-----------+-------------+-------------+--------------+------------------+------------------+----------------------+------------------+------------------+---------------------------+-----------------

In [10]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import numpy as np

def calculate_all_point_biserial_correlations(df, metrics, owner_id=None, binary_col='Closed won'):
    """
    Calculates the point-biserial correlation coefficient for a list of metrics against a binary variable.
    Can filter by OwnerId or use the entire dataset. The correlation coefficients are converted to percentages.

    Parameters:
    - df: PySpark DataFrame containing the dataset.
    - metrics: List of metric names (strings) to calculate correlations for.
    - owner_id: (Optional) The OwnerId to filter the dataset by. If None, uses the entire dataset.
    - binary_col: Name of the binary column to calculate correlation against.

    Returns:
    - A dictionary with each metric as keys and their point-biserial correlation with 'Closed won' as percentage values.
    """
    if owner_id is not None:
        df_filtered = df.filter(F.col('OwnerId') == owner_id)  # Filter by OwnerId if specified
    else:
        df_filtered = df  # Use the entire dataset if no OwnerId is specified

    correlations = {}
    for metric in metrics:
        mean_1 = df_filtered.filter(F.col(binary_col) == 1).agg({metric: 'mean'}).collect()[0][0]
        mean_0 = df_filtered.filter(F.col(binary_col) == 0).agg({metric: 'mean'}).collect()[0][0]
        std_dev = df_filtered.agg({metric: 'stddev'}).collect()[0][0]
        n_1 = df_filtered.filter(F.col(binary_col) == 1).count()
        n_0 = df_filtered.filter(F.col(binary_col) == 0).count()
        n = df_filtered.count()
        
        if std_dev is not None and std_dev > 0 and n_1 > 0 and n_0 > 0:  # Ensure valid calculation
            r_pb = ((mean_1 - mean_0) / std_dev) * np.sqrt((n_1 * n_0) / float(n)**2)
            correlations[metric] = r_pb * 100  # Convert correlation to percentage
        else:
            correlations[metric] = None  # Handle cases where calculation cannot be performed
    
    return correlations

# Define your metrics list and use the function as needed
metrics = [
    "Inbound Calls", "Inbound Email", "Outbound Email", "Meetings Scheduled",
    "Lead Response Time", "Average Contract Value", "Sales Cycle Length",
    "Linkedin Messaging", "Daily opportunity movements", "Outbound leads created",
    "Contacts per/in company", "Sales by territory", "Sales by industry", "Sales by channel",
    "Number of Demos"
]

def rank_correlations(correlations):
    """
    Sorts the correlations dictionary from highest to lowest correlation values.

    Parameters:
    - correlations: Dictionary with metrics as keys and correlation values as values.

    Returns:
    - A sorted list of tuples (metric, correlation) from highest to lowest correlation.
    """
    # Filter out None values, sort by correlation value in descending order
    sorted_correlations = sorted([(metric, corr) for metric, corr in correlations.items() if corr is not None], key=lambda x: x[1], reverse=True)
    return sorted_correlations

# Example usage
correlations_entire_dataset = calculate_all_point_biserial_correlations(df, metrics)
ranked_correlations_dataset = rank_correlations(correlations_entire_dataset)
print(ranked_correlations_dataset)


[('Sales Cycle Length', 16.747742177370903), ('Inbound Calls', 5.104723293950217), ('Outbound leads created', 3.906225223000387), ('Contacts per/in company', 3.797144663170826), ('Meetings Scheduled', 3.023959801872689), ('Linkedin Messaging', 2.1138727780122943), ('Daily opportunity movements', 1.0582514362620583), ('Sales by channel', 1.0388267273719696), ('Sales by industry', 0.7276481832942652), ('Average Contract Value', 0.5220574573207232), ('Sales by territory', 0.26393443816242806), ('Inbound Email', 0.1186096630638303), ('Lead Response Time', -1.6915284671445645), ('Outbound Email', -4.076348041515255), ('Number of Demos', -4.255219828143324)]


In [13]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import numpy as np

#calculating causal metrics for entire dataset and for a specific ownerid by performing point biserial coefficient calculations

def calculate_all_point_biserial_correlations(df, metrics, owner_id=None, binary_col='Closed won'):
    """
    Calculates the point-biserial correlation coefficient for a list of metrics against a binary variable.
    Can filter by OwnerId or use the entire dataset.

    Parameters:
    - df: PySpark DataFrame containing the dataset.
    - metrics: List of metric names (strings) to calculate correlations for.
    - owner_id: (Optional) The OwnerId to filter the dataset by. If None, uses the entire dataset.
    - binary_col: Name of the binary column to calculate correlation against.

    Returns:
    - A dictionary with each metric as keys and their point-biserial correlation with 'Closed won' as values.
    """
    if owner_id is not None:
        df_filtered = df.filter(F.col('OwnerId') == owner_id)  # Filter by OwnerId if specified
    else:
        df_filtered = df  # Use the entire dataset if no OwnerId is specified

    correlations = {}
    for metric in metrics:
        mean_1 = df_filtered.filter(F.col(binary_col) == 1).agg({metric: 'mean'}).collect()[0][0]
        mean_0 = df_filtered.filter(F.col(binary_col) == 0).agg({metric: 'mean'}).collect()[0][0]
        std_dev = df_filtered.agg({metric: 'stddev'}).collect()[0][0]
        n_1 = df_filtered.filter(F.col(binary_col) == 1).count()
        n_0 = df_filtered.filter(F.col(binary_col) == 0).count()
        n = df_filtered.count()
        
        if std_dev is not None and std_dev > 0 and n_1 > 0 and n_0 > 0:  # Ensure valid calculation
            r_pb = (mean_1 - mean_0) / std_dev * np.sqrt((n_1 * n_0) / float(n)**2)
            correlations[metric] = r_pb
        else:
            correlations[metric] = None  # Handle cases where calculation cannot be performed
    
    return correlations

# Define your metrics list and use the function as needed
metrics = [
    "Inbound Calls", "Inbound Email", "Outbound Email", "Meetings Scheduled",
    "Lead Response Time", "Average Contract Value", "Sales Cycle Length",
    "Linkedin Messaging", "Daily opportunity movements", "Outbound leads created",
    "Contacts per/in company", "Sales by territory", "Sales by industry", "Sales by channel",
    "Number of Demos"
]



# For the entire dataset




def rank_correlations(correlations):
    """
    Sorts the correlations dictionary from highest to lowest correlation values.

    Parameters:
    - correlations: Dictionary with metrics as keys and correlation values as values.

    Returns:
    - A sorted list of tuples (metric, correlation) from highest to lowest correlation.
    """
    # Filter out None values, sort by correlation value in descending order
    sorted_correlations = sorted([(metric, corr) for metric, corr in correlations.items() if corr is not None], key=lambda x: x[1], reverse=True)
    return sorted_correlations


correlations_entire_dataset = calculate_all_point_biserial_correlations(df, metrics)
ranked_correlations_dataset = rank_correlations(correlations_entire_dataset)
print("Ranked correlations for company", ranked_correlations_dataset)


Ranked correlations for company [('Sales Cycle Length', 0.16747742177370903), ('Inbound Calls', 0.05104723293950217), ('Outbound leads created', 0.03906225223000387), ('Contacts per/in company', 0.03797144663170826), ('Meetings Scheduled', 0.03023959801872689), ('Linkedin Messaging', 0.021138727780122945), ('Daily opportunity movements', 0.010582514362620582), ('Sales by channel', 0.010388267273719696), ('Sales by industry', 0.007276481832942652), ('Average Contract Value', 0.005220574573207232), ('Sales by territory', 0.002639344381624281), ('Inbound Email', 0.001186096630638303), ('Lead Response Time', -0.016915284671445645), ('Outbound Email', -0.04076348041515254), ('Number of Demos', -0.042552198281433244)]


In [14]:
# Example usage
specific_owner_id = 1  # To analyze data for a specific OwnerId
correlations_specific_owner = calculate_all_point_biserial_correlations(df, metrics, specific_owner_id)
ranked_correlations_specific_owner = rank_correlations(correlations_specific_owner)

print('Ranked correltions for Owner ID',specific_owner_id,ranked_correlations_specific_owner)

Ranked correltions for Owner ID 1 [('Sales Cycle Length', 0.1767785540858695), ('Inbound Calls', 0.14119886542349377), ('Average Contract Value', 0.08706118150676469), ('Contacts per/in company', 0.057821893180009), ('Outbound leads created', 0.05518440426294179), ('Sales by territory', 0.05160464437735598), ('Linkedin Messaging', 0.04081293255820049), ('Outbound Email', 0.03638115925522479), ('Daily opportunity movements', 0.029766522691343184), ('Sales by industry', 0.018969582209685424), ('Meetings Scheduled', 0.0011837180080740593), ('Sales by channel', 0.00048193183830747027), ('Inbound Email', -0.018307232417347424), ('Number of Demos', -0.07643338600443007), ('Lead Response Time', -0.07766978444819662)]


In [16]:
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# calculation of the signals function by performing a binary logistic regression. Can be done both dataset/company wide and OwnerID wide

def analyze_data_and_train_model(raw_df, owner_id=None):
    # Filter the DataFrame by OwnerId if specified
    if owner_id is not None:
        df_filtered = raw_df.filter(raw_df['OwnerId'] == owner_id)
    else:
        df_filtered = raw_df

    # Define the window specification for cumulative sum
    windowSpec = Window.partitionBy("DealID").orderBy("PERIOD").rowsBetween(Window.unboundedPreceding, Window.currentRow)

    # List of metrics to include in cumulative sum
    metrics = [
        "Inbound Calls", "Inbound Email", "Outbound Email", "Meetings Scheduled",
        "Lead Response Time", "Average Contract Value", "Sales Cycle Length",
        "Linkedin Messaging", "Daily opportunity movements", "Outbound leads created",
        "Contacts per/in company", "Sales by territory", "Sales by industry", "Sales by channel",
        "Number of Demos", "Closed won"
    ]

    # Calculate cumulative sum for each metric within each DealID
    for metric in metrics:
        df_filtered = df_filtered.withColumn(f"{metric}_cumulative", F.sum(F.col(metric)).over(windowSpec))

    # Select only the last row for each DealID after cumulative calculation
    windowSpecLastRow = Window.partitionBy("DealID").orderBy(F.desc("PERIOD"))
    df_cumulative = df_filtered.withColumn("row", F.row_number().over(windowSpecLastRow)) \
                               .filter(F.col("row") == 1) \
                               .drop("row")

    # Prepare data for logistic regression
    featureCols = [col for col in df_cumulative.columns if col.endswith("_cumulative") and col != "Closed won_cumulative"]
    assembler = VectorAssembler(inputCols=featureCols, outputCol="features")
    df_assembled = assembler.transform(df_cumulative)
    df_model = df_assembled.select(F.col("features"), F.col("Closed won_cumulative").alias("label"))

    # Train the logistic regression model
    lr = LogisticRegression(featuresCol="features", labelCol="label")
    model = lr.fit(df_model)

    # Make predictions on the dataset
    predictions = model.transform(df_model)

    # Evaluate the model
    evaluator = BinaryClassificationEvaluator(labelCol="label")
    accuracy = evaluator.evaluate(predictions)
    print(f"Model Accuracy: {accuracy}")

    # Calculate and display odds ratios
    coefficients = model.coefficients.toArray()
    odds_ratios = (np.exp(coefficients) * 100)-100
    print("Odds Ratios for Each Metric (as percentages):")
    for name, odds_ratio in zip(featureCols, odds_ratios):
        print(f"{name}: {odds_ratio:.2f}%")

    return model, accuracy


model, accuracy = analyze_data_and_train_model(raw_df)
# Example usage
  # Use this to analyze data for a specific OwnerId
# specific_owner_id = None  # Uncomment this and comment out the above line to analyze the entire dataset

# Assuming spark is your SparkSession and raw_df is loaded






24/04/12 20:28:10 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


Model Accuracy: 0.6334163238796275
Odds Ratios for Each Metric (as percentages):
Inbound Calls_cumulative: 0.95%
Inbound Email_cumulative: 4.69%
Outbound Email_cumulative: -1.21%
Meetings Scheduled_cumulative: 6.02%
Lead Response Time_cumulative: 0.09%
Average Contract Value_cumulative: -0.01%
Sales Cycle Length_cumulative: -0.00%
Linkedin Messaging_cumulative: 2.52%
Daily opportunity movements_cumulative: -5.08%
Outbound leads created_cumulative: 3.47%
Contacts per/in company_cumulative: -0.48%
Sales by territory_cumulative: 0.77%
Sales by industry_cumulative: 1.77%
Sales by channel_cumulative: 1.43%
Number of Demos_cumulative: -2.16%


In [17]:
owner_id=3
model_for_owner, accuracy_for_owner = analyze_data_and_train_model(raw_df)

Model Accuracy: 0.6334163238796275
Odds Ratios for Each Metric (as percentages):
Inbound Calls_cumulative: 0.95%
Inbound Email_cumulative: 4.69%
Outbound Email_cumulative: -1.21%
Meetings Scheduled_cumulative: 6.02%
Lead Response Time_cumulative: 0.09%
Average Contract Value_cumulative: -0.01%
Sales Cycle Length_cumulative: -0.00%
Linkedin Messaging_cumulative: 2.52%
Daily opportunity movements_cumulative: -5.08%
Outbound leads created_cumulative: 3.47%
Contacts per/in company_cumulative: -0.48%
Sales by territory_cumulative: 0.77%
Sales by industry_cumulative: 1.77%
Sales by channel_cumulative: 1.43%
Number of Demos_cumulative: -2.16%


In [18]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from itertools import chain
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
from scipy.stats import skew, kurtosis


#function that calculates mean, skewness, kurtosis, maximum and minimum recorded value for each integer variable for ownerID and company wide
def calculate_metrics_stats(df, metrics):
    owner_window = Window.partitionBy("OwnerId").orderBy("Period")
    deal_window = Window.partitionBy("OwnerId", "DealID").orderBy("Period")
    
    for metric in metrics:
        df = df.withColumn(f"Cumulative_{metric}", F.sum(metric).over(deal_window))
    
    results = {}
    
    for metric in metrics:
        cum_metric = f"Cumulative_{metric}"
        pd_df = df.select("OwnerId", cum_metric).toPandas()
        
        max_val = pd_df[cum_metric].max()
        min_val = pd_df[cum_metric].min()
        avg_val = pd_df[cum_metric].mean()
        skew_val = skew(pd_df[cum_metric], bias=False)
        kurt_val = kurtosis(pd_df[cum_metric], bias=False)
        
        pmf_val = pd_df[cum_metric].value_counts().sort_index() / len(pd_df)
        
        results[metric] = {
            "Max": max_val,
            "Min": min_val,
            "Average": avg_val,
            "Skewness": skew_val,
            "Kurtosis": kurt_val,
            "PMF": pmf_val.to_dict()
        }
    
    return results

# Assuming df is your Spark DataFrame and it's already been loaded and processed
metrics = [
    "Inbound Calls", "Inbound Email", "Outbound Email", "Meetings Scheduled",
    "Lead Response Time", "Average Contract Value", "Sales Cycle Length",
    "Linkedin Messaging", "Daily opportunity movements", "Outbound leads created",
    "Contacts per/in company", "Sales by territory", "Sales by industry", "Sales by channel",
    "Number of Demos"
]

results = calculate_metrics_stats(df, metrics)

for metric, stats in results.items():
    print(f"---{metric}---")
    print(f"Maximum: {stats['Max']}")
    print(f"Minimum: {stats['Min']}")
    print(f"Average: {stats['Average']}")
    print(f"Skewness: {stats['Skewness']}")
    print(f"Kurtosis: {stats['Kurtosis']}")
    print("PMF:")
    for value, probability in stats['PMF'].items():
        print(f"  Value: {value}, Probability: {probability}")
    print("\n")


data_for_owner = df.filter(df["OwnerId"] == '3')  # Make sure '3' matches an existing OwnerId in your DataFrame


results_for_owner= calculate_metrics_stats(data_for_owner,metrics)



for metric, stats in results_for_owner.items():
    print(f"---{metric}---")
    print(f"Maximum: {stats['Max']}")
    print(f"Minimum: {stats['Min']}")
    print(f"Average: {stats['Average']}")
    print(f"Skewness: {stats['Skewness']}")
    print(f"Kurtosis: {stats['Kurtosis']}")
    print("PMF:")
    for value, probability in stats['PMF'].items():
        print(f"  Value: {value}, Probability: {probability}")
    print("\n")


---Inbound Calls---
Maximum: 45
Minimum: 0
Average: 13.826666666666666
Skewness: 0.5705885318205103
Kurtosis: -0.22180480378475043
PMF:
  Value: 0, Probability: 0.017222222222222222
  Value: 1, Probability: 0.025555555555555557
  Value: 2, Probability: 0.03611111111111111
  Value: 3, Probability: 0.02388888888888889
  Value: 4, Probability: 0.03277777777777778
  Value: 5, Probability: 0.04388888888888889
  Value: 6, Probability: 0.04833333333333333
  Value: 7, Probability: 0.03722222222222222
  Value: 8, Probability: 0.051111111111111114
  Value: 9, Probability: 0.06166666666666667
  Value: 10, Probability: 0.03777777777777778
  Value: 11, Probability: 0.04055555555555555
  Value: 12, Probability: 0.03888888888888889
  Value: 13, Probability: 0.035
  Value: 14, Probability: 0.042777777777777776
  Value: 15, Probability: 0.035
  Value: 16, Probability: 0.03388888888888889
  Value: 17, Probability: 0.035555555555555556
  Value: 18, Probability: 0.03666666666666667
  Value: 19, Probabilit

In [31]:
from pyspark.sql.functions import min, max, datediff

# function which calculates mean, skewness, kurtosis, maximum and minimum recorded values and probability mass function of time elapsed
#in days across dealIDs both dataset wide and company wide

# Calculate the start and end dates for each DealID and this can be done for ownerID or for the company just by replacing df with the datframe
# of choice
deal_dates = df.groupBy("DealID").agg(
    min("PERIOD").alias("Start_Date"),
    max("PERIOD").alias("End_Date")
)

# Calculate the time elapsed in days for each DealID
deal_dates = deal_dates.withColumn("Days_Elapsed", datediff("End_Date", "Start_Date"))

# Show the results
deal_dates.show(30)

from pyspark.sql.functions import min, max, datediff, col
from scipy.stats import skew, kurtosis
import pandas as pd

# Assuming the previous steps have been executed and deal_dates DataFrame is available

# Convert the deal_dates Spark DataFrame to a Pandas DataFrame
deal_dates_pd = deal_dates.select("Days_Elapsed").toPandas()

# Calculate basic statistics
max_days = deal_dates_pd["Days_Elapsed"].max()
min_days = deal_dates_pd["Days_Elapsed"].min()
avg_days = deal_dates_pd["Days_Elapsed"].mean()

# Calculate skewness and kurtosis
skew_days = skew(deal_dates_pd["Days_Elapsed"], bias=False)
kurt_days = kurtosis(deal_dates_pd["Days_Elapsed"], bias=False)

# Calculate PMF
pmf_days = deal_dates_pd["Days_Elapsed"].value_counts().sort_index() / len(deal_dates_pd)

pmf_days_array = []
# Display the results
print(f"Maximum Elapsed Days: {max_days}")
print(f"Minimum Elapsed Days: {min_days}")
print(f"Average Elapsed Days: {avg_days}")
print(f"Skewness of Elapsed Days: {skew_days}")
print(f"Kurtosis of Elapsed Days: {kurt_days}")
print("PMF of Elapsed Days:")
for value, prob in pmf_days.items():
    pmf_days_array.append({"Days": value, "Probability": prob})
    # print(f"  Days: {value}, Probability: {prob}")

print(pmf_days_array)

+------+----------+----------+------------+
|DealID|Start_Date|  End_Date|Days_Elapsed|
+------+----------+----------+------------+
|   148|2021-01-06|2021-10-31|         298|
|   243|2021-01-03|2021-08-31|         240|
|    31|2021-01-05|2021-08-31|         238|
|    85|2021-01-07|2021-07-27|         201|
|   137|2021-01-06|2021-10-31|         298|
|   251|2021-01-04|2021-06-30|         177|
|    65|2021-01-04|2021-07-06|         183|
|    53|2021-01-08|2021-03-12|          63|
|   255|2021-01-02|2021-05-09|         127|
|   133|2021-01-05|2021-08-31|         238|
|   296|2021-01-12|2021-11-30|         322|
|    78|2021-01-07|2021-06-30|         174|
|   322|2021-01-07|2021-12-24|         351|
|   321|2021-01-07|2021-06-30|         174|
|   108|2021-01-04|2021-06-30|         177|
|   155|2021-01-11|2021-12-19|         342|
|    34|2021-01-03|2021-08-31|         240|
|   193|2021-01-06|2021-10-31|         298|
|   211|2021-01-12|2021-11-30|         322|
|   101|2021-01-03|2021-09-01|  

24/04/12 21:45:41 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1026768 ms exceeds timeout 120000 ms
24/04/12 21:45:41 WARN SparkContext: Killing executors is not supported by current scheduler.
24/04/12 21:47:09 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

In [20]:

#function which calculates mean,skewness,kurtosis, and probability mass function
# for the number of elapsed time steps for a deal both OwnerID specific and company wide specific



def calculate_deal_transitions_statistics_debug(df):
    transitions_count = df.groupBy("DealID").count().withColumnRenamed("count", "Transitions")
    transitions_pd = transitions_count.select("Transitions").toPandas()
    
    max_transitions = transitions_pd["Transitions"].max()
    min_transitions = transitions_pd["Transitions"].min()
    avg_transitions = transitions_pd["Transitions"].mean()
    
    # Check variance before calculating skewness and kurtosis
    if transitions_pd["Transitions"].var() == 0:  # No variance
        skew_transitions = None
        kurt_transitions = None
    else:
        skew_transitions = skew(transitions_pd["Transitions"], bias=False)
        kurt_transitions = kurtosis(transitions_pd["Transitions"], bias=False)
    
    pmf_transitions = transitions_pd["Transitions"].value_counts().sort_index() / len(transitions_pd)
    
    statistics = {
        "Maximum Transitions": max_transitions,
        "Minimum Transitions": min_transitions,
        "Average Transitions": avg_transitions,
        "Skewness of Transitions": skew_transitions,
        "Kurtosis of Transitions": kurt_transitions,
        "PMF of Transitions": pmf_transitions.to_dict()
    }
    
    return statistics

# Example usage remains the same



# Call the function and store the results
stats = calculate_deal_transitions_statistics_debug(df)


# Access individual statistics from the results
maximum_transitions = stats["Maximum Transitions"]
minimum_transitions = stats["Minimum Transitions"]
average_transitions = stats["Average Transitions"]
kurtosis_transitions = stats["Kurtosis of Transitions"]
# PMF is a dictionary itself, you can access it directly or iterate over its items
pmf_transitions = stats["PMF of Transitions"]

# Example: printing the maximum transitions
print("Maximum Transitions:", maximum_transitions)

# Example: iterating over PMF to print each value and its probability
print("PMF of Transitions:")
for value, probability in pmf_transitions.items():
    print(f"  Transitions: {value}, Probability: {probability}")






Maximum Transitions: 5
PMF of Transitions:
  Transitions: 5, Probability: 1.0


In [21]:
# Filter the original Spark DataFrame for a specific OwnerId
data_for_owner = df.filter(df["OwnerId"] == '3')  # Make sure '3' matches an existing OwnerId in your DataFrame

# Now data_for_owner is still a Spark DataFrame and should work with your function
stats_for_owner = calculate_deal_transitions_statistics_debug(data_for_owner)

# Access and print statistics specifically for this OwnerId
print("Statistics for OwnerId 3:")
print("Maximum Transitions:", stats_for_owner["Maximum Transitions"])
print("Minimum Transitions:", stats_for_owner["Minimum Transitions"])
print("Average Transitions:", stats_for_owner["Average Transitions"])
print("Skewness of Transitions:", stats_for_owner.get("Skewness of Transitions", "N/A"))  # Using .get() for safety
print("Kurtosis of Transitions:", stats_for_owner.get("Kurtosis of Transitions", "N/A"))  # in case of None values
print("PMF of Transitions:")
for value, probability in stats_for_owner["PMF of Transitions"].items():
    print(f"  Transitions: {value}, Probability: {probability}")

Statistics for OwnerId 3:
Maximum Transitions: 5
Minimum Transitions: 5
Average Transitions: 5.0
Skewness of Transitions: None
Kurtosis of Transitions: None
PMF of Transitions:
  Transitions: 5, Probability: 1.0


In [22]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from itertools import chain
from pyspark.sql import SparkSession

# This is the calculation of sales rep specific Transition probabilities

# Specify the OwnerId for the sales rep you're interested in
specific_owner_id = '3'  # Replace 'owner_id_value' with the actual OwnerId

# Filter the DataFrame for the specific OwnerId
df_owner_filtered = df.filter(df["OwnerId"] == specific_owner_id)

# Map the stages to numeric values directly within the DataFrame for the specific OwnerId
stage_to_num_owner = {
    'Interaction': 1,
    'Pitching': 2,
    'Negotiation': 3,
    'Commitment': 4,
    'Closed Won': 5,
    'Closed Lost': 6
}

# Apply mapping to convert stages to numeric values for the specific OwnerId
mapping_expr_owner = F.create_map([F.lit(x) for x in chain(*stage_to_num_owner.items())])
df_owner_filtered = df_owner_filtered.withColumn("NumericStage", mapping_expr_owner[df_owner_filtered['Opportunity Stage']])

# Define a window specification to partition by DealID, ordered by PERIOD, for the specific OwnerId
windowSpec_owner = Window.partitionBy("DealID").orderBy("PERIOD")

# Use the LAG function to find the previous stage for each record within the same DealID for the specific OwnerId
df_owner_filtered = df_owner_filtered.withColumn("PrevStage", F.lag("NumericStage").over(windowSpec_owner))

# Keep all rows, including those where the state remains the same (self-transitions), for the specific OwnerId
transitions_df_owner = df_owner_filtered.filter(df_owner_filtered["PrevStage"].isNotNull())

# Count the number of transitions from PrevStage to NumericStage across all DealIDs, including self-transitions, for the specific OwnerId
transition_counts_owner = transitions_df_owner.groupBy("PrevStage", "NumericStage").count()

# Sum all the transitions coming out of each specific state for the specific OwnerId
total_transitions_out_owner = transition_counts_owner.groupBy("PrevStage").agg(F.sum("count").alias("TotalOutgoingTransitions"))

# Calculate the transition probabilities with aliasing to avoid ambiguity, specifically for the OwnerId
transition_probabilities_owner = transition_counts_owner.alias("tc").join(
    total_transitions_out_owner.alias("tto"), 
    F.col("tc.PrevStage") == F.col("tto.PrevStage")
).withColumn(
    "TransitionProbability",
    F.col("tc.count") / F.col("tto.TotalOutgoingTransitions")
).select(
    F.col("tc.PrevStage").alias("PrevStage"), 
    F.col("tc.NumericStage").alias("NumericStage"), 
    F.col("tc.count").alias("count"), 
    F.col("tto.TotalOutgoingTransitions").alias("TotalOutgoingTransitions"), 
    "TransitionProbability"
)

# Show the calculated transition probabilities - for Spark, specific to the OwnerId
transition_probabilities_owner.show()

# Convert the Spark DataFrame to a Pandas DataFrame, specific to the OwnerId
pandas_df_owner = transition_probabilities_owner.toPandas()

# Pivot the DataFrame to create the transition matrix in Pandas, specific to the OwnerId
transition_matrix_owner = pandas_df_owner.pivot(index='PrevStage', columns='NumericStage', values='TransitionProbability').fillna(0)

# Ensure the matrix includes all stages from 1 to 6, specific to the OwnerId
all_stages_owner = [1, 2, 3, 4, 5, 6]
transition_matrix_owner = transition_matrix_owner.reindex(index=all_stages_owner, columns=all_stages_owner, fill_value=0)

# Optional: Convert to a NumPy array for further analysis, specific to the OwnerId
transition_matrix_np_owner = transition_matrix_owner.to_numpy()

# Visualization/Analysis, specific to the OwnerId
print("Transition Probability Matrix for Specific OwnerId as a DataFrame:")
print(transition_matrix_owner)
print("\nTransition Probability Matrix for Specific OwnerId as a NumPy array:")
print(transition_matrix_np_owner)


+---------+------------+-----+------------------------+---------------------+
|PrevStage|NumericStage|count|TotalOutgoingTransitions|TransitionProbability|
+---------+------------+-----+------------------------+---------------------+
|        6|           1|    4|                      39|  0.10256410256410256|
|        3|           1|   42|                      72|   0.5833333333333334|
|        2|           3|   30|                      72|   0.4166666666666667|
|        1|           2|    4|                      67|  0.05970149253731343|
|        3|           5|    1|                      72| 0.013888888888888888|
|        2|           5|    3|                      72| 0.041666666666666664|
|        1|           4|   59|                      67|   0.8805970149253731|
|        4|           3|    2|                      32|               0.0625|
|        2|           1|   17|                      72|   0.2361111111111111|
|        2|           4|    9|                      72|         

In [23]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from itertools import chain
from pyspark.sql import SparkSession

# This is the calculation of the company wide/entire dataset's transition probabilities

# Map the stages to numeric values directly within the DataFrame
stage_to_num = {
    'Interaction': 1,
    'Pitching': 2,
    'Negotiation': 3,
    'Commitment': 4,
    'Closed Won': 5,
    'Closed Lost': 6
}

# Apply mapping to convert stages to numeric values
mapping_expr = F.create_map([F.lit(x) for x in chain(*stage_to_num.items())])
df = df.withColumn("NumericStage", mapping_expr[df['Opportunity Stage']])

# Define a window specification to partition by DealID, ordered by PERIOD
windowSpec = Window.partitionBy("DealID").orderBy("PERIOD")

# Use the LAG function to find the previous stage for each record within the same DealID
df = df.withColumn("PrevStage", F.lag("NumericStage").over(windowSpec))

# Keep all rows, including those where the state remains the same (self-transitions)
transitions_df = df.filter(df["PrevStage"].isNotNull())

# Count the number of transitions from PrevStage to NumericStage across all DealIDs, including self-transitions
transition_counts = transitions_df.groupBy("PrevStage", "NumericStage").count()

# Sum all the transitions coming out of each specific state
total_transitions_out = transition_counts.groupBy("PrevStage").agg(F.sum("count").alias("TotalOutgoingTransitions"))

# Calculate the transition probabilities with aliasing to avoid ambiguity
transition_probabilities = transition_counts.alias("tc").join(
    total_transitions_out.alias("tto"), 
    F.col("tc.PrevStage") == F.col("tto.PrevStage")
).withColumn(
    "TransitionProbability",
    F.col("tc.count") / F.col("tto.TotalOutgoingTransitions")
).select(
    F.col("tc.PrevStage").alias("PrevStage"), 
    F.col("tc.NumericStage").alias("NumericStage"), 
    F.col("tc.count").alias("count"), 
    F.col("tto.TotalOutgoingTransitions").alias("TotalOutgoingTransitions"), 
    "TransitionProbability"
)

# Show the calculated transition probabilities - for Spark
transition_probabilities.show()

# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = transition_probabilities.toPandas()

# Pivot the DataFrame to create the transition matrix in Pandas
transition_matrix = pandas_df.pivot(index='PrevStage', columns='NumericStage', values='TransitionProbability').fillna(0)

# Ensure the matrix includes all stages from 1 to 6
all_stages = [1, 2, 3, 4, 5, 6]
transition_matrix = transition_matrix.reindex(index=all_stages, columns=all_stages, fill_value=0)

# Optional: Convert to a NumPy array for further analysis
transition_matrix_np = transition_matrix.to_numpy()

# Visualization/Analysis
print("Transition Probability Matrix as a DataFrame:")
print(transition_matrix)
print("\nTransition Probability Matrix as a NumPy array:")
print(transition_matrix_np)


+---------+------------+-----+------------------------+---------------------+
|PrevStage|NumericStage|count|TotalOutgoingTransitions|TransitionProbability|
+---------+------------+-----+------------------------+---------------------+
|        6|           1|   23|                     204|  0.11274509803921569|
|        3|           1|  188|                     360|   0.5222222222222223|
|        2|           3|  137|                     359|   0.3816155988857939|
|        1|           2|   16|                     334|  0.04790419161676647|
|        1|           3|    2|                     334| 0.005988023952095809|
|        3|           5|    4|                     360| 0.011111111111111112|
|        2|           5|   11|                     359|  0.03064066852367688|
|        1|           4|  308|                     334|   0.9221556886227545|
|        4|           3|    4|                     139|  0.02877697841726619|
|        2|           1|  113|                     359|   0.3147

In [24]:
from pyspark.sql.functions import col, sum, avg, dense_rank, first, row_number
from pyspark.sql.window import Window

def calculate_elapsed_transitions_with_latest_stage(df, owner_id=None):
    if owner_id is not None:
        df = df.filter(col("OwnerId") == owner_id)
    
    windowSpecChrono = Window.partitionBy("DealID").orderBy("PERIOD")
    windowSpecChronoDesc = Window.partitionBy("DealID").orderBy(col("PERIOD").desc())
    
    df = df.withColumn("CumulativeClosedWon", sum(col("Closed won")).over(windowSpecChrono))\
           .withColumn("CumulativeClosedLost", sum(col("Closed lost")).over(windowSpecChrono))
    
    df_filtered = df.filter((col("CumulativeClosedWon") == 0) & (col("CumulativeClosedLost") == 0))

    # Aggregate to get the average "Average Contract Value" and count the number of transitions
    df_agg = df_filtered.groupBy("DealID")\
                        .agg(
                            avg(col("Average Contract Value")).alias("AvgContractValue"),
                            count("*").alias("NumTransitions")
                        )

    windowSpecRanking = Window.orderBy(col("AvgContractValue").desc())
    df_agg_ranked = df_agg.withColumn("Rank", dense_rank().over(windowSpecRanking)).filter(col("Rank") <= 5)

    # Use row_number to get the most recent record for each DealID in the filtered set
    df_latest_stage = df_filtered.withColumn("rn", row_number().over(windowSpecChronoDesc))\
                                 .filter(col("rn") == 1).select("DealID", "Opportunity Stage")

    # Join to combine the latest stage with the aggregate results
    result = df_agg_ranked.join(df_latest_stage, "DealID").select("DealID", "NumTransitions", "Opportunity Stage").collect()

    return result

# Assuming df is your DataFrame
elapsed_transitions_with_latest_stage = calculate_elapsed_transitions_with_latest_stage(df, owner_id=None)
for deal in elapsed_transitions_with_latest_stage:
    print(f"DealID: {deal['DealID']}, Elapsed Transitions: {deal['NumTransitions']}, Latest Stage: {deal['Opportunity Stage']}")




24/04/12 20:29:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:29:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:29:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:29:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:29:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:29:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


DealID: 132, Elapsed Transitions: 4, Latest Stage: Commitment
DealID: 165, Elapsed Transitions: 4, Latest Stage: Commitment
DealID: 209, Elapsed Transitions: 2, Latest Stage: Negotiation
DealID: 271, Elapsed Transitions: 4, Latest Stage: Commitment
DealID: 344, Elapsed Transitions: 4, Latest Stage: Commitment


24/04/12 20:29:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:29:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [25]:
owner_id = "1"

elapsed_transitions_with_latest_stage_for_owner = calculate_elapsed_transitions_with_latest_stage(df, owner_id)
for deal in elapsed_transitions_with_latest_stage:
    print("For Owner ID",owner_id,f"DealID: {deal['DealID']}, Elapsed Transitions: {deal['NumTransitions']}, Latest Stage: {deal['Opportunity Stage']}")

24/04/12 20:30:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 2

For Owner ID 1 DealID: 132, Elapsed Transitions: 4, Latest Stage: Commitment
For Owner ID 1 DealID: 165, Elapsed Transitions: 4, Latest Stage: Commitment
For Owner ID 1 DealID: 209, Elapsed Transitions: 2, Latest Stage: Negotiation
For Owner ID 1 DealID: 271, Elapsed Transitions: 4, Latest Stage: Commitment
For Owner ID 1 DealID: 344, Elapsed Transitions: 4, Latest Stage: Commitment


24/04/12 20:30:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [26]:
import numpy as np
from scipy.stats import poisson

# Assuming you've already defined or loaded owner-specific data and transition matrix
stats = calculate_deal_transitions_statistics_debug(df)
# Access individual statistics from the results
maximum_transitions = stats["Maximum Transitions"]
minimum_transitions = stats["Minimum Transitions"]
average_transitions = stats["Average Transitions"]
kurtosis_transitions = stats["Kurtosis of Transitions"]
# PMF is a dictionary itself, you can access it directly or iterate over its items
pmf_transitions = stats["PMF of Transitions"]

def calculate_adjusted_matrix_sum(matrix, stats, elapsed_transitions):
    pmf_transitions = stats["PMF of Transitions"]
    max_transitions = int(stats["Maximum Transitions"])
    probabilities = np.zeros(max_transitions + 1)
    for transition, probability in pmf_transitions.items():
        probabilities[transition] = probability

    print("Original Probabilities:", probabilities)
    
    adjusted_probabilities = probabilities[elapsed_transitions:]
    print("Adjusted Probabilities Before Normalization:", adjusted_probabilities)
    
    if adjusted_probabilities.sum() > 0:
        adjusted_probabilities /= adjusted_probabilities.sum()
    else:
        print("No probabilities to adjust; returning zero matrix.")
        return np.zeros_like(matrix)
    
    print("Normalized Adjusted Probabilities:", adjusted_probabilities)

    weighted_sum_matrix = np.zeros_like(matrix, dtype=float)
    for i, prob in enumerate(adjusted_probabilities, start=1):
        matrix_nth_power = np.linalg.matrix_power(matrix, i)
        print(f"Matrix to the {i}-th power * Probability {prob}:")
        print(matrix_nth_power * prob)
        weighted_sum_matrix += matrix_nth_power * prob
    
    return weighted_sum_matrix

elapsed_transitions_with_latest_stage = calculate_elapsed_transitions_with_latest_stage(df, owner_id=None)

# Define a mapping from stage names to indices
stage_to_index_mapping = {
    "Interaction": 0,
    "Pitching": 1,
    "Negotiation": 2,
    "Commitment": 3,
    "Closed Won": 4,
    "Closed Lost": 5
}

import numpy as np
from scipy.stats import poisson

# Assuming df is your PySpark DataFrame and it includes a column for "Average Contract Value"

def get_avg_contract_value(deal_id, df):
    # Hypothetical function to retrieve "Average Contract Value" for a given Deal ID
    # Implement according to how your data is structured
    avg_value = df.filter(df['DealID'] == deal_id).select('Average Contract Value').collect()[0][0]
    return avg_value

def get_owner_id(deal_id, df):
    # Hypothetical function to retrieve "OwnerId" for a given Deal ID
    # Implement according to how your data is structured
    owner_id = df.filter(df['DealID'] == deal_id).select('OwnerId').collect()[0][0]
    return owner_id

for deal in elapsed_transitions_with_latest_stage:
    deal_id = deal['DealID']
    num_transitions = deal['NumTransitions']
    latest_stage_name = deal['Opportunity Stage']
    
    # Fetch the "OwnerId" for the current Deal ID
    owner_id = get_owner_id(deal_id, df)
    
    # Fetch the "Average Contract Value" for the current Deal ID
    avg_contract_value = get_avg_contract_value(deal_id, df)
    
    latest_stage_index = stage_to_index_mapping.get(latest_stage_name, -1)
    
    if latest_stage_index != -1:
        result_matrix = calculate_adjusted_matrix_sum(transition_matrix_np, stats, num_transitions)
        
        if 0 <= latest_stage_index < result_matrix.shape[0]:
            probability_to_closed_won = result_matrix[latest_stage_index, 4]
            # Include "Average Contract Value" in the print statement
            print(f"Owner ID: {owner_id}, DealID: {deal_id}, Probability from Stage {latest_stage_name} to Closed Won: {probability_to_closed_won}, Avg Contract Value: {avg_contract_value}")
        else:
            print(f"Owner ID: {owner_id}, DealID: {deal_id}, latest stage index out of bounds.")
    else:
        print(f"Owner ID: {owner_id}, DealID: {deal_id}, invalid latest stage name.")



24/04/12 20:30:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 2

Original Probabilities: [0. 0. 0. 0. 0. 1.]
Adjusted Probabilities Before Normalization: [0. 1.]
Normalized Adjusted Probabilities: [0. 1.]
Matrix to the 1-th power * Probability 0.0:
[[0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]]
Matrix to the 2-th power * Probability 1.0:
[[1.93264837e-01 8.82982237e-03 6.58479734e-02 5.47240491e-03
  1.20950195e-01 6.05634767e-01]
 [2.40509077e-01 1.55745666e-01 1.80133330e-01 3.05252876e-01
  1.94714090e-02 9.88876422e-02]
 [1.29968234e-01 2.52364575e-02 2.14286373e-01 5.20722007e-01
  1.83188255e-02 9.14681029e-02]
 [1.02065285e-01 1.95120550e-02 6.83375366e-01 1.86806044e-01
  1.66024270e-03 6.58100777e-03]
 [4.74747475e-01 3.37688260e-01 5.44365814e-04 1.11610113e-01
  1.06453759e-02 6.47644106e-02]
 [4.56772229e-01 3.23679281e-01 1.23937483e-03 1.30479971e-01
  1.28547884e-02 7.49743548e-02]]
Owner ID: 3, DealID: 132, Probability from Stage Commitment to Closed Won:

In [27]:
import numpy as np
from scipy.stats import poisson

# Assuming you've already defined or loaded owner-specific data and transition matrix
stats_for_owner = calculate_deal_transitions_statistics_debug(data_for_owner)
# Define a mapping from stage names to indices
stage_to_index_mapping = {
    "Interaction": 0,
    "Pitching": 1,
    "Negotiation": 2,
    "Commitment": 3,
    "Closed Won": 4,
    "Closed Lost": 5
}

def calculate_adjusted_matrix_sum_owner(matrix_owner, stats_owner, elapsed_transitions):
    pmf_transitions_owner = stats_owner["PMF of Transitions"]
    max_transitions_owner = int(stats_owner["Maximum Transitions"])
    probabilities_owner = np.zeros(max_transitions_owner + 1)
    for transition, probability in pmf_transitions_owner.items():
        probabilities_owner[transition] = probability

    print("Original Probabilities:", probabilities_owner)
    adjusted_probabilities_owner = probabilities_owner[elapsed_transitions:]
    print("Adjusted Probabilities Before Normalization:", adjusted_probabilities_owner)
    if adjusted_probabilities_owner.sum() > 0:
        adjusted_probabilities_owner /= adjusted_probabilities_owner.sum()
    else:
        print("No probabilities to adjust; returning zero matrix.")
        return np.zeros_like(matrix_owner)
    
    print("Normalized Adjusted Probabilities:", adjusted_probabilities_owner)
    weighted_sum_matrix_owner = np.zeros_like(matrix_owner, dtype=float)
    for i, prob in enumerate(adjusted_probabilities_owner, start=1):
        matrix_nth_power_owner = np.linalg.matrix_power(matrix_owner, i)
        print(f"Matrix to the {i}-th power * Probability {prob}:")
        print(matrix_nth_power_owner * prob)
        weighted_sum_matrix_owner += matrix_nth_power_owner * prob
    
    return weighted_sum_matrix_owner



stage_to_index_mapping = {
    "Interaction": 0,
    "Pitching": 1,
    "Negotiation": 2,
    "Commitment": 3,
    "Closed Won": 4,
    "Closed Lost": 5
}
elapsed_transitions_with_latest_stage_and_value = calculate_elapsed_transitions_with_latest_stage(df, owner_id)



# Modify your existing for loop to include the "Average Contract Value"
# Modify your existing for loop to include the "Average Contract Value"
for deal in elapsed_transitions_with_latest_stage_for_owner:
    deal_id = deal['DealID']
    num_transitions = deal['NumTransitions']  # Extract the NumTransitions value
    latest_stage_name = deal['Opportunity Stage']
    # Ensure to use the correct key from your data to fetch the average contract value
    avg_contract_value = get_avg_contract_value(deal_id, df) # Make sure 'Average Contract Value' matches the field name in your data

    latest_stage_index = stage_to_index_mapping.get(latest_stage_name, -1)  # Convert stage name to index
    
    if latest_stage_index != -1:
        # Pass num_transitions instead of the whole deal dictionary
        result_matrix_owner = calculate_adjusted_matrix_sum_owner(transition_matrix_np_owner, stats_for_owner, num_transitions)
        
        if 0 <= latest_stage_index < result_matrix_owner.shape[0]:
            probability_to_closed_won = result_matrix_owner[latest_stage_index, 4]  # "Closed Won" at index 4
            # Include "Average Contract Value" in your print statement
            print(f"For Owner ID {owner_id}, DealID: {deal_id}, Stage: {latest_stage_name}, Probability to Closed Won: {probability_to_closed_won:.2%}, Avg Contract Value: {avg_contract_value}")
        else:
            print(f"For Owner ID {owner_id}, DealID: {deal_id}, latest stage index out of bounds.")
    else:
        print(f"For Owner ID {owner_id}, DealID: {deal_id}, invalid latest stage name.")



24/04/12 20:30:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 20:30:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/12 2

Original Probabilities: [0. 0. 0. 0. 0. 1.]
Adjusted Probabilities Before Normalization: [0. 1.]
Normalized Adjusted Probabilities: [0. 1.]
Matrix to the 1-th power * Probability 0.0:
[[0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]]
Matrix to the 2-th power * Probability 1.0:
[[0.1578127  0.         0.1334912  0.00746269 0.05752488 0.64370854]
 [0.28804977 0.13562396 0.20457176 0.23106689 0.01359954 0.12708808]
 [0.08555912 0.03482587 0.18643162 0.55013993 0.015625   0.12741846]
 [0.12059295 0.02755752 0.69711538 0.14106551 0.00086806 0.01280058]
 [0.48611111 0.2530058  0.         0.19306247 0.01157407 0.05624655]
 [0.52350427 0.26787537 0.         0.14017519 0.01246439 0.05598078]]
For Owner ID 3, DealID: 129, Stage: Interaction, Probability to Closed Won: 5.75%, Avg Contract Value: 4874.16
Original Probabilities: [0. 0. 0. 0. 0. 1.]
Adjusted Probabilities Before Normalization: [0. 1.]
Normalized Adjusted Pr