import pandas as pd
import numpy as np
import plotly.graph_objects as go
import math
# =========================================================
# SMART LOG SCALE
# =========================================================
def get_smart_log_range(values):
values = [v for v in values if v > 0]
min_val = max(min(values), 1)
max_val = max(values)
lower_power = math.floor(
math.log10(min_val)
)
upper_power = math.ceil(
math.log10(max_val)
)
if min_val > (10 ** lower_power) * 5:
lower_power += 1
return lower_power, upper_power
# =========================================================
# MAIN DYNAMIC FUNCTION
# =========================================================
def create_dynamic_distribution_chart(
df,
segment_col="SEGMENT",
risk_col="RISK_LEVEL",
metric_cols=None,
title="Dynamic Population Distribution",
width=1500,
height=900,
log_scale=True
):
"""
FULLY DYNAMIC ENTERPRISE DISTRIBUTION CHART
SUPPORTS
--------
- Single Amount
- Multiple Amount Columns
- Count Columns
- Any Combination
- Multiple Segments
- Multiple Risks
EXAMPLE metric_cols
-------------------
["TXN_AMOUNT"]
["TXN_AMOUNT", "TXN_COUNT"]
["AMOUNT_1", "AMOUNT_2"]
["AMOUNT_1", "AMOUNT_2", "COUNT"]
"""
# =====================================================
# DEFAULT METRICS
# =====================================================
if metric_cols is None:
metric_cols = ["TXN_AMOUNT"]
# =====================================================
# COPY DATA
# =====================================================
plot_df = df.copy()
# =====================================================
# RISK STYLES
# =====================================================
risk_styles = {
"RR": {
"color": "#4472C4",
"symbol": "circle"
},
"MR": {
"color": "#ED7D31",
"symbol": "diamond"
},
"HR": {
"color": "#C00000",
"symbol": "square"
}
}
# =====================================================
# METRIC STYLES
# =====================================================
metric_symbols = [
"circle",
"diamond",
"square",
"x",
"triangle-up",
"cross"
]
metric_opacity = [
0.80,
0.65,
0.55,
0.45,
0.75,
0.60
]
# =====================================================
# SEGMENTS
# =====================================================
segments = list(
plot_df[segment_col].unique()
)
gap = 26
x_map = {
seg: idx * gap
for idx, seg in enumerate(segments)
}
plot_df["BASE_X"] = (
plot_df[segment_col]
.map(x_map)
)
# =====================================================
# SCATTER JITTER
# =====================================================
np.random.seed(42)
plot_df["X"] = (
plot_df["BASE_X"]
+
np.random.uniform(
-6,
6,
len(plot_df)
)
)
# =====================================================
# LOG SCALE VALUES
# =====================================================
all_values = []
for col in metric_cols:
if col in plot_df.columns:
all_values += list(
plot_df[col]
)
lower_power, upper_power = (
get_smart_log_range(all_values)
)
tickvals = [
10 ** i
for i in range(
lower_power,
upper_power + 1
)
]
ticktext = [
f"{int(v):,}"
for v in tickvals
]
# =====================================================
# FIGURE
# =====================================================
fig = go.Figure()
# =====================================================
# SEGMENT SHADING
# =====================================================
shades = [
"rgba(180,180,180,0.05)",
"rgba(120,120,120,0.03)"
]
for idx, seg in enumerate(segments):
center = x_map[seg]
fig.add_vrect(
x0=center - 10,
x1=center + 10,
fillcolor=shades[
idx % len(shades)
],
line_width=0,
layer="below"
)
# =====================================================
# DYNAMIC METRICS
# =====================================================
risks = ["RR", "MR", "HR"]
for metric_idx, metric_col in enumerate(metric_cols):
if metric_col not in plot_df.columns:
continue
for risk in risks:
temp = plot_df[
plot_df[risk_col] == risk
]
if temp.empty:
continue
# =============================================
# HOVER TEXT
# =============================================
hover_text = (
"Segment: "
+
temp[segment_col]
.astype(str)
+
"<br>"
+
"Risk: "
+
risk
+
"<br>"
+
metric_col
+
": "
+
temp[metric_col]
.round(0)
.astype(int)
.astype(str)
)
# =============================================
# SCATTER
# =============================================
fig.add_trace(
go.Scatter(
x=temp["X"],
y=temp[metric_col],
mode="markers",
marker=dict(
size=8,
color=risk_styles[risk]["color"],
symbol=metric_symbols[
metric_idx
%
len(metric_symbols)
],
opacity=metric_opacity[
metric_idx
%
len(metric_opacity)
],
line=dict(
width=0.5,
color="black"
)
),
text=hover_text,
hovertemplate=
"%{text}<extra></extra>",
showlegend=False
)
)
# =====================================================
# X AXIS LABELS
# =====================================================
tickvals_x = []
ticktext_x = []
cumulative = 0
for seg in segments:
center = x_map[seg]
tickvals_x.append(center)
count = len(
plot_df[
plot_df[segment_col] == seg
]
)
start = cumulative + 1
cumulative += count
end = cumulative
rr_count = len(
plot_df[
(plot_df[segment_col] == seg)
&
(plot_df[risk_col] == "RR")
]
)
mr_count = len(
plot_df[
(plot_df[segment_col] == seg)
&
(plot_df[risk_col] == "MR")
]
)
hr_count = len(
plot_df[
(plot_df[segment_col] == seg)
&
(plot_df[risk_col] == "HR")
]
)
label = (
f"<b>{seg.replace('_', ' ')}</b>"
f"<br>"
f"Population Range"
f"<br>"
f"{start:,} - {end:,}"
f"<br>"
f"Total: {count:,}"
f"<br><br>"
f"RR: {rr_count}"
f" | "
f"MR: {mr_count}"
f" | "
f"HR: {hr_count}"
)
ticktext_x.append(label)
# =====================================================
# VISUAL LEGEND - RISKS
# =====================================================
for risk in risks:
fig.add_trace(
go.Scatter(
x=[None],
y=[None],
mode="markers",
marker=dict(
size=12,
color=risk_styles[risk]["color"],
symbol="circle",
line=dict(
width=0.5,
color="black"
)
),
name=f"{risk} Risk"
)
)
# =====================================================
# VISUAL LEGEND - METRICS
# =====================================================
for idx, metric_col in enumerate(metric_cols):
fig.add_trace(
go.Scatter(
x=[None],
y=[None],
mode="markers",
marker=dict(
size=12,
color="gray",
symbol=metric_symbols[
idx % len(metric_symbols)
]
),
name=metric_col
)
)
# =====================================================
# LAYOUT
# =====================================================
fig.update_layout(
title=dict(
text=title,
x=0.5,
font=dict(size=24)
),
width=width,
height=height,
template="plotly_white",
paper_bgcolor="white",
plot_bgcolor="white",
hovermode="closest",
margin=dict(
l=100,
r=40,
t=90,
b=320
),
legend=dict(
orientation="h",
yanchor="top",
y=-0.15,
xanchor="center",
x=0.5,
font=dict(size=10)
),
xaxis=dict(
title="Segments",
tickmode="array",
tickvals=tickvals_x,
ticktext=ticktext_x,
tickfont=dict(
size=10
),
showgrid=False,
zeroline=False
),
yaxis=dict(
title="Metric Values",
type="log"
if log_scale
else "linear",
tickvals=tickvals,
ticktext=ticktext,
range=[
lower_power,
upper_power
]
if log_scale
else None,
gridcolor="rgba(220,220,220,0.6)",
zeroline=False
)
)
return fig
# =========================================================
# SAMPLE DATA
# =========================================================
np.random.seed(200)
segments = [
"BD_EXTERNAL_ENTITY",
"BD_EXTERNAL_OTHERS",
"BD_INTERNAL",
"BD_HIGH_RISK",
"BD_RETAIL",
"BD_CORPORATE"
]
risks = ["RR", "MR", "HR"]
rows = []
for seg in segments:
for risk in risks:
# =================================================
# BASE VALUES
# =================================================
if risk == "RR":
amount_1_base = np.random.randint(
100_000,
2_000_000
)
amount_2_base = np.random.randint(
300_000,
5_000_000
)
elif risk == "MR":
amount_1_base = np.random.randint(
5_000_000,
15_000_000
)
amount_2_base = np.random.randint(
10_000_000,
30_000_000
)
else:
amount_1_base = np.random.randint(
20_000_000,
50_000_000
)
amount_2_base = np.random.randint(
40_000_000,
90_000_000
)
# =================================================
# RANDOM POPULATION
# =================================================
population_size = np.random.randint(
40,
90
)
# =================================================
# GENERATE DATA
# =================================================
for i in range(population_size):
amount_1 = np.random.lognormal(
mean=np.log(amount_1_base),
sigma=1.0
)
amount_2 = np.random.lognormal(
mean=np.log(amount_2_base),
sigma=1.0
)
txn_count = np.random.randint(
1,
150
)
rows.append({
"SEGMENT": seg,
"RISK_LEVEL": risk,
"AMOUNT_1": amount_1,
"AMOUNT_2": amount_2,
"TXN_COUNT": txn_count
})
# =========================================================
# DATAFRAME
# =========================================================
df = pd.DataFrame(rows)
# =========================================================
# CREATE CHART
# =========================================================
fig = create_dynamic_distribution_chart(
df=df,
metric_cols=[
"AMOUNT_1",
"AMOUNT_2",
"TXN_COUNT"
],
title="Fully Dynamic Enterprise Distribution"
)
fig.show()