In [1]:
from __future__ import print_function

import sys
from operator import add
from random import random

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import random


def get_random_int(_):
    return random.randint(0, 10)


def simulation(row):
    """
    get simulation result
    :return:
    """
    fixed_cost = 120000
    profit = row.volume * (row.price - row.cost) - fixed_cost
    return profit


# get simulation fraction
simulation_fraction = 100.0

# create sparkcontext
spark = SparkSession.builder.getOrCreate()

# load sales and cost dataframe
sales_df = spark.read.json("./sales_scenarios.json")
cost_df = spark.read.json("./sales_cost.json")

In [2]:
sales_df.show()
cost_df.show()

+-----+---------------+------+
|price|sales_scenarios|volume|
+-----+---------------+------+
|    8|     hot market|100000|
|   10|      ok market| 75000|
|   11|    slow market| 50000|
+-----+---------------+------+

+----+------+
|cost|  name|
+----+------+
| 7.5|  high|
| 6.5|medium|
| 5.5|   low|
+----+------+



In [3]:
# define a udf to create new column with random int
udf_get_random_int = udf(get_random_int, IntegerType())

# sample sales dataframe with random int
sampled_sales_df = sales_df.sample(withReplacement=True, fraction=simulation_fraction)
sampled_sales_df.show(300)

+-----+---------------+------+
|price|sales_scenarios|volume|
+-----+---------------+------+
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8|     hot market|100000|
|    8| 

In [4]:
sampled_sales_with_randint_df = sampled_sales_df.withColumn("randint", udf_get_random_int('sales_scenarios'))
sampled_sales_with_randint_df.show()

+-----+---------------+------+-------+
|price|sales_scenarios|volume|randint|
+-----+---------------+------+-------+
|    8|     hot market|100000|      2|
|    8|     hot market|100000|      3|
|    8|     hot market|100000|     10|
|    8|     hot market|100000|      1|
|    8|     hot market|100000|      8|
|    8|     hot market|100000|     10|
|    8|     hot market|100000|      2|
|    8|     hot market|100000|      8|
|    8|     hot market|100000|      9|
|    8|     hot market|100000|      3|
|    8|     hot market|100000|      2|
|    8|     hot market|100000|      6|
|    8|     hot market|100000|      7|
|    8|     hot market|100000|      3|
|    8|     hot market|100000|     10|
|    8|     hot market|100000|      5|
|    8|     hot market|100000|      1|
|    8|     hot market|100000|      3|
|    8|     hot market|100000|      7|
|    8|     hot market|100000|      4|
+-----+---------------+------+-------+
only showing top 20 rows



In [5]:
# sample cost dataframe with random int
sampled_cost_df = cost_df.sample(withReplacement=True, fraction=simulation_fraction)
sampled_cost_with_randint_df = sampled_cost_df.withColumn("randint", udf_get_random_int('name'))
sampled_cost_with_randint_df.show()

+----+----+-------+
|cost|name|randint|
+----+----+-------+
| 7.5|high|      6|
| 7.5|high|      2|
| 7.5|high|      9|
| 7.5|high|      4|
| 7.5|high|      3|
| 7.5|high|      3|
| 7.5|high|      5|
| 7.5|high|      6|
| 7.5|high|      9|
| 7.5|high|      7|
| 7.5|high|      5|
| 7.5|high|      0|
| 7.5|high|      3|
| 7.5|high|      2|
| 7.5|high|      6|
| 7.5|high|      1|
| 7.5|high|      2|
| 7.5|high|      7|
| 7.5|high|      1|
| 7.5|high|      6|
+----+----+-------+
only showing top 20 rows



In [7]:
# get joined dataframe
joined_df = sampled_sales_with_randint_df.join(sampled_cost_with_randint_df,
                                               sampled_sales_with_randint_df.randint == sampled_cost_with_randint_df.randint)
joined_df.show(300)

+-----+---------------+------+-------+----+------+-------+
|price|sales_scenarios|volume|randint|cost|  name|randint|
+-----+---------------+------+-------+----+------+-------+
|    8|     hot market|100000|     10| 5.5|   low|     10|
|    8|     hot market|100000|     10| 5.5|   low|     10|
|    8|     hot market|100000|     10| 5.5|   low|     10|
|    8|     hot market|100000|     10| 5.5|   low|     10|
|    8|     hot market|100000|     10| 5.5|   low|     10|
|    8|     hot market|100000|     10| 5.5|   low|     10|
|    8|     hot market|100000|     10| 5.5|   low|     10|
|    8|     hot market|100000|     10| 5.5|   low|     10|
|    8|     hot market|100000|     10| 5.5|   low|     10|
|    8|     hot market|100000|     10| 5.5|   low|     10|
|    8|     hot market|100000|     10| 5.5|   low|     10|
|    8|     hot market|100000|     10| 5.5|   low|     10|
|    8|     hot market|100000|     10| 5.5|   low|     10|
|    8|     hot market|100000|     10| 5.5|   low|     1

In [8]:
# get total profit by map reduce with profit simulation
# each simulation is to calculate net_profit of one scenario
total_profit = joined_df.rdd.map(simulation).reduce(add)
total_simulations = joined_df.count()

print('Profit is {}'.format(total_profit / total_simulations))

spark.stop()

Profit is 98511.76825588413
