In [1]:
%load_ext lab_black

In [2]:
import pandas as pd
import pandas_datareader.data as web

# bokeh
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import ColumnDataSource

In [3]:
def candlestick(df, plot_width=600, plot_height=300):
    df = df.reset_index()
    source = ColumnDataSource(df)
    inc = ColumnDataSource(df[df.Close >= df.Open])
    dec = ColumnDataSource(df[df.Open > df.Close])
    w = (df.index[1] - df.index[0]) / 2  # X軸の1メモリの半分

    fig = figure(plot_width=plot_width, plot_height=plot_height)
    fig.segment("index", "High", "index", "Low", source=source, color="black")
    fig.vbar(
        "index", w, "Open", "Close", source=inc, line_color="black", fill_color="white"
    )
    fig.vbar(
        "index", w, "Open", "Close", source=dec, line_color="black", fill_color="black"
    )
    fig.xaxis.major_label_overrides = {
        i: pd.to_datetime(date).strftime("%Y-%m-%d")
        for i, date in enumerate(source.data["Date"])
    }
    fig.xaxis.bounds = (0, df.index[-1])  # X軸の範囲を明示的に指定
    fig.outline_line_color = "black"
    return fig

In [4]:
# ソフトバンク
df = web.DataReader("9984.JP", "stooq").dropna().sort_index()

In [5]:
fig = candlestick(df["2020-8-1":])
output_notebook()
show(fig)

前後の7点（前に3点、後ろに3点)の範囲を取得するにはpandasのrolling(7, center=True)を使用

In [6]:
df.rolling(7, center=True)

Rolling [window=7,center=True,axis=0]

In [7]:
list(df[:10].rolling(7, center=True))

[               Open     High      Low    Close    Volume
 Date                                                    
 2016-09-12  3303.25  3323.22  3245.28  3259.89   9759952
 2016-09-13  3271.59  3272.55  3198.50  3209.22  11218682
 2016-09-14  3164.88  3189.73  3130.30  3167.31  11436453
 2016-09-15  3149.78  3153.68  3064.04  3091.32  13272436,
                Open     High      Low    Close    Volume
 Date                                                    
 2016-09-12  3303.25  3323.22  3245.28  3259.89   9759952
 2016-09-13  3271.59  3272.55  3198.50  3209.22  11218682
 2016-09-14  3164.88  3189.73  3130.30  3167.31  11436453
 2016-09-15  3149.78  3153.68  3064.04  3091.32  13272436
 2016-09-16  3140.05  3172.67  3115.68  3120.06  11927006,
                Open     High      Low    Close    Volume
 Date                                                    
 2016-09-12  3303.25  3323.22  3245.28  3259.89   9759952
 2016-09-13  3271.59  3272.55  3198.50  3209.22  11218682
 2016-09-14 

In [8]:
list(df[:10].rolling(7, center=True).High)

[Date
 2016-09-12    3323.22
 2016-09-13    3272.55
 2016-09-14    3189.73
 2016-09-15    3153.68
 Name: High, dtype: float64,
 Date
 2016-09-12    3323.22
 2016-09-13    3272.55
 2016-09-14    3189.73
 2016-09-15    3153.68
 2016-09-16    3172.67
 Name: High, dtype: float64,
 Date
 2016-09-12    3323.22
 2016-09-13    3272.55
 2016-09-14    3189.73
 2016-09-15    3153.68
 2016-09-16    3172.67
 2016-09-20    3162.93
 Name: High, dtype: float64,
 Date
 2016-09-12    3323.22
 2016-09-13    3272.55
 2016-09-14    3189.73
 2016-09-15    3153.68
 2016-09-16    3172.67
 2016-09-20    3162.93
 2016-09-21    3250.63
 Name: High, dtype: float64,
 Date
 2016-09-13    3272.55
 2016-09-14    3189.73
 2016-09-15    3153.68
 2016-09-16    3172.67
 2016-09-20    3162.93
 2016-09-21    3250.63
 2016-09-23    3285.23
 Name: High, dtype: float64,
 Date
 2016-09-14    3189.73
 2016-09-15    3153.68
 2016-09-16    3172.67
 2016-09-20    3162.93
 2016-09-21    3250.63
 2016-09-23    3285.23
 2016-09-26   

In [9]:
df[:20].rolling(7, center=True).High.max()

Date
2016-09-12        NaN
2016-09-13        NaN
2016-09-14        NaN
2016-09-15    3323.22
2016-09-16    3285.23
2016-09-20    3285.23
2016-09-21    3285.23
2016-09-23    3285.23
2016-09-26    3288.77
2016-09-27    3288.77
2016-09-28    3288.77
2016-09-29    3288.77
2016-09-30    3288.77
2016-10-03    3288.77
2016-10-04    3288.77
2016-10-05    3335.22
2016-10-06    3335.22
2016-10-07        NaN
2016-10-11        NaN
2016-10-12        NaN
Name: High, dtype: float64

In [10]:
df[:20].rolling(7, center=True).High.max() == df[:20].High

Date
2016-09-12    False
2016-09-13    False
2016-09-14    False
2016-09-15    False
2016-09-16    False
2016-09-20    False
2016-09-21    False
2016-09-23     True
2016-09-26    False
2016-09-27    False
2016-09-28    False
2016-09-29     True
2016-09-30    False
2016-10-03    False
2016-10-04    False
2016-10-05    False
2016-10-06    False
2016-10-07    False
2016-10-11    False
2016-10-12    False
Name: High, dtype: bool

In [11]:
def show_peaks(target):
    fig = candlestick(target)
    target = target.reset_index()
    upPeak = target[target.rolling(7, center=True).High.max() == target.High]  # (1)
    fig.circle(upPeak.index, upPeak.High, color="red")
    dnPeak = target[target.rolling(7, center=True).Low.min() == target.Low]  # (2)
    fig.circle(dnPeak.index, dnPeak.Low, color="blue")
    return fig, upPeak, dnPeak

In [12]:
fig, _, _ = show_peaks(df["2020-8-1":])
output_notebook()
show(fig)

In [13]:
from itertools import combinations
from scipy.stats import linregress

target = df["2020-8-1":]
fig, upPeak, _ = show_peaks(target)  # (3)
lgs = []
for select in combinations(upPeak.index, 3):  # (4)
    peaks = [upPeak.High[i] for i in upPeak.index if i in select]  # (5)
    lgs.append(linregress(select, peaks))  # (6)

for lg in [lg for lg in lgs if lg.pvalue < 0.05]:  # (7)
    fig.line(
        [0, len(target)],
        [lg.intercept, lg.intercept + len(target) * lg.slope],
        color="red",
    )
output_notebook()
show(fig)

In [14]:
from itertools import combinations
from scipy.stats import linregress

target = df["2020-8-1":]
fig, upPeak, _ = show_peaks(target)
lgs = []
for r in upPeak.rolling(5):  # (8)
    for select in combinations(r.index, 3):
        peaks = [upPeak.High[i] for i in upPeak.index if i in select]
        lgs.append(linregress(select, peaks))
for lg in [lg for lg in lgs if lg.pvalue < 0.05]:
    fig.line(
        [0, len(target)],
        [lg.intercept, lg.intercept + len(target) * lg.slope],
        color="red",
    )
output_notebook()
show(fig)

In [15]:
from itertools import combinations
from scipy.stats import linregress

target = df["2020-8-1":]
fig, _, dnPeak = show_peaks(target)
lgs = []
for r in dnPeak.rolling(5):
    for select in combinations(r.index, 3):
        peaks = [dnPeak.Low[i] for i in dnPeak.index if i in select]
        lgs.append(linregress(select, peaks))
for lg in [lg for lg in lgs if lg.stderr < 0.5]:
    fig.line(
        [0, len(target)],
        [lg.intercept, lg.intercept + len(target) * lg.slope],
        color="blue",
    )
output_notebook()
show(fig)

In [16]:
from itertools import combinations
from scipy.stats import linregress
from pandas import concat

target = df["2020-8-1":]
fig, upPeak, dnPeak = show_peaks(target)
lgs = []
peaks = concat(
    [
        upPeak.rename(columns={"High": "price"}).price,
        dnPeak.rename(columns={"Low": "price"}).price,
    ]
).sort_index()  # (9)
for r in peaks.rolling(7):
    for select in combinations(r.index, 3):
        prices = [peaks[i] for i in select]
        lgs.append(linregress(select, prices))
for lg in [lg for lg in lgs if lg.stderr < 0.5]:
    fig.line(
        [0, len(target)],
        [lg.intercept, lg.intercept + len(target) * lg.slope],
        color="green",
    )
output_notebook()
show(fig)

In [17]:
from itertools import combinations
from scipy.stats import linregress
from pandas import concat

target = df["2020-8-1":]
fig, upPeak, dnPeak = show_peaks(target)
peaks = concat(
    [
        upPeak.rename(columns={"High": "price"}).price,
        dnPeak.rename(columns={"Low": "price"}).price,
    ]
).sort_index()
lgs = []
for r in peaks.rolling(7):
    for select in combinations(r.index, 3):
        prices = [peaks[i] for i in select]
        lgs.append((select, linregress(select, prices)))
for lg in [lg for lg in lgs if lg[1].pvalue < 0.05]:
    select, lin = lg
    fig.line(
        [select[0], len(target)],
        [
            lin.intercept + select[0] * lin.slope,
            lin.intercept + len(target) * lin.slope,
        ],
        color="green",
    )
output_notebook()
show(fig)

In [18]:
from itertools import combinations
from scipy.stats import linregress
from pandas import concat
from math import atan, degrees


def trendlines(peaks, last_index, rolling=7) -> []:
    std = peaks.std()
    items = []
    for r in peaks.rolling(rolling):
        for select in combinations(r.index, 3):
            prices = [peaks[i] for i in select]
            items.append((sorted(select), linregress(select, prices)))

    lines = []
    for item in items:
        lin = item[1]
        if lin.pvalue < 0.05:
            lines.append(item)

    if len(lines) > 0:
        thAngle = 0.5
        thDistance = 5
        newLines = [lines[0]]
        for line1 in lines[1:]:
            for line2 in newLines:
                if (
                    abs(
                        degrees(atan(line1[1].slope / std))
                        - degrees(atan(line2[1].slope / std))
                    )
                    <= thAngle
                    and abs(
                        line1[1].intercept
                        + line1[1].slope * last_index
                        - line2[1].intercept
                        - line2[1].slope * last_index
                    )
                    <= thDistance * std
                ):  # (10)
                    break
            else:
                newLines.append(line1)
        lines = newLines
    return lines


target = df["2020-8-1":]
fig, upPeak, dnPeak = show_peaks(target)
peaks = concat(
    [
        upPeak.rename(columns={"High": "price"}).price,
        dnPeak.rename(columns={"Low": "price"}).price,
    ]
).sort_index()
lines = trendlines(peaks, len(target))
for line in lines:
    pos, lin = line
    fig.line(
        [pos[0], len(target)],
        [lin.intercept + pos[0] * lin.slope, lin.intercept + len(target) * lin.slope],
        color="green",
    )

output_notebook()  # 出力先をノートブックに設定
show(fig)

In [19]:
target = df["2020-8-1":]
fig, upPeak, dnPeak = show_peaks(target)
peaks = concat(
    [
        upPeak.rename(columns={"High": "price"}).price,
        dnPeak.rename(columns={"Low": "price"}).price,
    ]
).sort_index()
lines = trendlines(peaks, len(target))
pos = len(target) - 1
price = (df.High.iloc[pos] + df.Low.iloc[pos]) / 2
lines = [
    line
    for line in sorted(
        lines, key=lambda x: abs(x[1].intercept + x[1].slope * pos - price)
    )
][
    :4
]  # (11)
for line in lines:
    pos, lin = line
    fig.line(
        [pos[0], len(target)],
        [lin.intercept + pos[0] * lin.slope, lin.intercept + len(target) * lin.slope],
        color="green",
    )

output_notebook()  # 出力先をノートブックに設定
show(fig)

In [20]:
%time

from bokeh.plotting import figure, show, output_notebook, output_file
from bokeh.models import Slope, Span, CustomJS
from bokeh import events
from itertools import combinations
from scipy.stats import linregress
from math import atan, degrees


def trendlines(peaks, pos, offset, rolling=7) -> []:
    std = peaks.std()
    lines = []
    for r in peaks[(pos - offset < peaks.index) * (peaks.index <= pos)].rolling(
        rolling
    ):
        for select in combinations(r.index, 3):
            prices = [peaks[i] for i in select]
            lin = linregress(select, prices)
            if lin.pvalue < 0.05:
                lines.append(lin)

    if len(lines) > 0:
        thAngle = 0.5
        thDistance = 5
        newLines = [lines[0]]
        for line1 in lines[1:]:
            for line2 in newLines:
                if (
                    abs(
                        degrees(atan(line1.slope / std))
                        - degrees(atan(line2.slope / std))
                    )
                    <= thAngle
                    and abs(
                        line1.intercept
                        + line1.slope * pos
                        - line2.intercept
                        - line2.slope * pos
                    )
                    <= thDistance * std
                ):
                    break
            else:
                newLines.append(line1)
        lines = newLines
    return lines


target = df["2020-8-1":]
fig, upPeak, dnPeak = show_peaks(target)
peaks = concat(
    [
        upPeak.rename(columns={"High": "price"}).price,
        dnPeak.rename(columns={"Low": "price"}).price,
    ]
).sort_index()

offset = 50
rolling = 7
lines = [None] * offset
prev = None
for pos in range(offset, len(target)):
    if prev is None or pos - rolling // 2 - 1 in peaks.index:
        prev = trendlines(peaks, pos, offset, rolling=rolling)
    price = (target.High.iloc[pos] + target.Low.iloc[pos]) / 2
    lines.append(
        [
            line
            for line in sorted(
                prev, key=lambda x: abs(x.intercept + x.slope * pos - price)
            )
        ][:4]
    )
lines.append(
    [
        line
        for line in sorted(
            prev, key=lambda x: abs(x.intercept + x.slope * len(target) - price)
        )
    ][:4]
)

slopes = []
for line in lines[-1]:
    slope = Slope(gradient=line.slope, y_intercept=line.intercept, line_color="red")
    fig.add_layout(slope)
    slopes.append(slope)

span = Span(
    location=len(target),
    dimension="height",
    line_color="green",
    line_width=5,
    line_alpha=0.2,
)
fig.add_layout(span)


def pan_event(span, lines, slopes):
    return CustomJS(
        args=dict(span=span, lines=lines, slopes=slopes),
        code="""
        var loc = Math.round(parseFloat(cb_obj['x']))
        if (span.location != loc) {
            span.location = loc
            if (loc < lines.length && lines[loc]) {
                for (var i = 0; i < Math.min(lines[loc].length, slopes.length); ++i) {
                    slopes[i].visible = true
                    slopes[i].gradient = lines[loc][i][0]
                    slopes[i].y_intercept = lines[loc][i][1]
                }
                for (var i = lines[loc].length; i < slopes.length; ++i) {
                    slopes[i].visible = false
                }
            }
            else {
                for (var i = 0; i < slopes.length; ++i) {
                    slopes[i].visible = false
                }                    
            }
        }
    """,
    )


fig.js_on_event(events.Tap, pan_event(span=span, lines=lines, slopes=slopes))
fig.js_on_event(events.Press, pan_event(span=span, lines=lines, slopes=slopes))
fig.js_on_event(events.Pan, pan_event(span=span, lines=lines, slopes=slopes))
fig.toolbar.active_drag = None

output_notebook()
show(fig)

CPU times: user 9 µs, sys: 1e+03 ns, total: 10 µs
Wall time: 34.6 µs


In [21]:
from bokeh.models import ColumnDataSource, CrosshairTool, Label

xlabel = Label(
    x_units="data",
    y_units="screen",
    render_mode="css",
    border_line_width=1,
    border_line_color="black",
    border_line_alpha=1.0,
    background_fill_color="white",
    background_fill_alpha=1.0,
    visible=False,
)

ylabel = Label(
    x_units="screen",
    y_units="data",
    render_mode="css",
    border_line_width=1,
    border_line_color="black",
    border_line_alpha=1.0,
    background_fill_color="white",
    background_fill_alpha=1.0,
    visible=False,
)

fig.add_layout(xlabel)
fig.add_layout(ylabel)

# https://docs.bokeh.org/en/latest/docs/user_guide/interaction/callbacks.html


def display_event(source, xlabel, ylabel):
    return CustomJS(
        args=dict(source=source, xlabel=xlabel, ylabel=ylabel),
        code="""
        try {
            xlabel.visible = true
            xlabel.x = cb_obj['x']
            xlabel.y = 25             // 日付を下辺に表示
            xlabel.x_offset = -40
            xlabel.y_offset = -20
            var date = new Date(source.data['Date'][Number(cb_obj['x']).toFixed(0)])
            xlabel.text = date.toISOString().substr(0, 10)
        } catch (e) {}
        ylabel.visible = true
        ylabel.y = cb_obj['y']
        ylabel.x = cb_obj['sx'] - 20  // 価格を十字の右に表示
        ylabel.y_offset = -10
        ylabel.text = Number(cb_obj['y']).toFixed(0)
    """,
    )


def leave_event(xlabel, ylabel):
    return CustomJS(
        args=dict(xlabel=xlabel, ylabel=ylabel),
        code="""
        xlabel.visible = false
        ylabel.visible = false
    """,
    )


source = ColumnDataSource(target)

fig.js_on_event(
    events.MouseMove, display_event(source=source, xlabel=xlabel, ylabel=ylabel)
)
fig.js_on_event(events.MouseLeave, leave_event(xlabel=xlabel, ylabel=ylabel))

fig.add_tools(CrosshairTool())
fig.toolbar.active_drag = None

output_notebook()  # 出力先をノートブックに設定
output_file("trendline.html")  # 出力先にファイルも指定
show(fig)