In [1]:
from __future__ import annotations
import matplotlib.pyplot as plt
import FinanceDataReader as fdr
from models import WavePattern
from models import WaveRules
from models import WaveAnalyzer
from models import WaveOptions
from models.helpers import plot_pattern
import pandas as pd
import numpy as np

import importlib
import pandas as pd

importlib.reload(WaveRules)
importlib.reload(WaveAnalyzer)
importlib.reload(WavePattern)


def plot_graph(df, threshold=0.05):
    points_to_highlight = detect_zigzag(df, threshold)

    # Extracting index and price for highlighting
    indexes = [point[0] for point in points_to_highlight]
    prices = [point[1] for point in points_to_highlight]

    # Plotting the line chart for 'Close' vs. 'Date'
    plt.figure(figsize=(10, 6))
    plt.plot(df["Date"], df["Close"], label="Close Price")

    # Highlighting specified points with scatter plot
    plt.scatter(df.iloc[indexes]["Date"], prices, color="red")

    # Enhancing the plot
    plt.xlabel("Date")
    plt.ylabel("Close Price")
    plt.title("Stock Price with Highlighted Points")
    plt.legend()
    plt.grid(True)
    plt.xticks(rotation=45)
    plt.tight_layout()

    # Show the plot
    plt.show()


def detect_zigzag(df: pd.DataFrame, threshold: float) -> list[tuple]:
    zigzag_points = []
    last_pivot = 0
    up_trend = True

    for i in range(1, len(df)):
        if up_trend:
            if df["Low"].iloc[i] <= df["Low"].iloc[last_pivot]:
                if zigzag_points:
                    zigzag_points.pop()
                zigzag_points.append(
                    (
                        df.index[i],
                        df["Date"].iloc[i],
                        df["Low"].iloc[i],
                        df["High"].iloc[i],
                    )
                )
                last_pivot = i

            elif df["High"].iloc[i] / df["Low"].iloc[last_pivot] - 1 >= threshold:
                zigzag_points.append(
                    (
                        df.index[i],
                        df["Date"].iloc[i],
                        df["Low"].iloc[i],
                        df["High"].iloc[i],
                    )
                )
                up_trend = False
                last_pivot = i
        else:
            if df["High"].iloc[i] >= df["High"].iloc[last_pivot]:
                if zigzag_points:
                    zigzag_points.pop()
                zigzag_points.append(
                    (
                        df.index[i],
                        df["Date"].iloc[i],
                        df["Low"].iloc[i],
                        df["High"].iloc[i],
                    )
                )
                last_pivot = i

            elif df["High"].iloc[last_pivot] / df["Low"].iloc[i] - 1 >= threshold:
                zigzag_points.append(
                    (
                        df.index[i],
                        df["Date"].iloc[i],
                        df["Low"].iloc[i],
                        df["High"].iloc[i],
                    )
                )
                up_trend = True
                last_pivot = i

    # return zigzag_points
    return pd.DataFrame(zigzag_points, columns=["index", "Date", "Low", "High"])

In [2]:
df = fdr.DataReader("035420", "2022-10-09", "2023-03-10").reset_index()[
    ["Date", "Open", "High", "Low", "Close"]
]

idx_start = np.argmin(np.array(list(df["Low"])))

In [3]:
zigzag_df = detect_zigzag(df, 0.05)

In [None]:
wa = WaveAnalyzer.WaveAnalyzer(df=df, verbose=False)
waves_up = wa.find_impulsive_wave(idx_start=idx_start)

In [11]:
zig_zag_pattern = detect_zigzag(df, 0.1)

In [12]:
wa = WaveAnalyzer.WaveAnalyzer(df=df, verbose=False)
waves_up = wa.find_impulsive_wave_zigzag(zig_zag_pattern)

In [13]:
waves_up

[<models.MonoWave.MonoWaveUp at 0x2a773d5b0>,
 <models.MonoWave.MonoWaveDown at 0x2a773d490>,
 <models.MonoWave.MonoWaveUp at 0x2a773d040>,
 <models.MonoWave.MonoWaveDown at 0x2a773d280>,
 <models.MonoWave.MonoWaveUp at 0x2a773d400>]

In [14]:
import models.helpers as helpers

wavepattern_up = WavePattern.WavePattern(waves_up, verbose=True)

helpers.plot_pattern(df, wavepattern_up)

In [None]:
plot_graph(df, 0.01)

In [None]:
wa = WaveAnalyzer.WaveAnalyzer(df=df, verbose=False)
waves_up = wa.find_impulsive_wave(idx_start=idx_start)

In [None]:
wa

In [None]:
wa = WaveAnalyzer(df=df, verbose=False)
wave_options_impulse = WaveOptionsGeneratorCustom5(
    up_to=5
)  # generates WaveOptions up to [15, 15, 15, 15, 15]

# impulse = Impulse("impulse")
# leading_diagonal = LeadingDiagonal("leading diagonal")
# correction = Correction("correction")
# tdwave = TDWave("TD Wave")
impulse_custom = WaveRules.ImpulseCustom("impulse_custom")
# rules_to_check = [impulse, leading_diagonal, correction, tdwave]
rules_to_check = [impulse_custom]

print(f"Start at idx: {idx_start}")
print(f"will run up to {wave_options_impulse.number / 1e6}M combinations.")

# set up a set to store already found wave counts
# it can be the case, that 2 WaveOptions lead to the same WavePattern.
# This can be seen in a chart, where for example we try to skip more maxima as there are. In such a case
# e.g. [1,2,3,4,5] and [1,2,3,4,10] will lead to the same WavePattern (has same sub-wave structure, same begin / end,
# same high / low etc.
# If we find the same WavePattern, we skip and do not plot it

wavepatterns_up = set()

# loop over all combinations of wave options [i,j,k,l,m] for impulsive waves sorted from small, e.g.  [0,1,...] to
# large e.g. [3,2, ...]
for new_option_impulse in wave_options_impulse.options_sorted:
    waves_up = wa.find_impulsive_wave(
        idx_start=idx_start, wave_config=new_option_impulse.values
    )
    # print(new_option_impulse)
    if waves_up:
        wavepattern_up = WavePattern(waves_up, verbose=True)

        for rule in rules_to_check:
            if wavepattern_up.check_rule(rule):
                if wavepattern_up in wavepatterns_up:
                    print("SKIPPING")
                    continue
                else:
                    wavepatterns_up.add(wavepattern_up)
                    print(f"{rule.name} found: {new_option_impulse.values}")
                    fig = plot_pattern(
                        df=df,
                        wave_pattern=wavepattern_up,
                        title=str(new_option_impulse),
                    )
                    if fig:
                        fig.show()
            # else:
            # plot_pattern(
            #     df=df,
            #     wave_pattern=wavepattern_up,
            #     title=str(new_option_impulse),
            # )

In [None]:
wave_configs = [
    [2, 0, 1, 0, 0],
    [2, 0, 2, 0, 0],
    [2, 0, 3, 0, 0],
]

for wave_config in wave_configs:
    waves_up = wa.find_impulsive_wave(idx_start=idx_start, wave_config=wave_config)

    if waves_up:
        wavepattern_up = WavePattern(waves_up, verbose=True)

        for rule in rules_to_check:
            if wavepattern_up.check_rule(rule):
                if wavepattern_up in wavepatterns_up:
                    print("SKIPPING")
                    plot_pattern(
                        df=df,
                        wave_pattern=wavepattern_up,
                        title=str(wave_config),
                    )
                    continue
                else:
                    wavepatterns_up.add(wavepattern_up)
                    print(f"{rule.name} found: {wave_config}")
                    plot_pattern(
                        df=df,
                        wave_pattern=wavepattern_up,
                        title=str(new_option_impulse),
                    )
            else:
                plot_pattern(
                    df=df,
                    wave_pattern=wavepattern_up,
                    title=str(new_option_impulse),
                )

In [None]:
import math


def calculate_price_changes(prices):
    """주어진 가격 리스트로부터 가격 변동을 계산합니다."""
    return [prices[i] - prices[i - 1] for i in range(1, len(prices))]


def calculate_average_price_change(prices):
    """가격 변동의 평균을 계산합니다."""
    price_changes = calculate_price_changes(prices)
    return sum(price_changes) / len(price_changes)


def normalize_prices(prices):
    """평균 가격 변동을 기준으로 가격 데이터를 정규화합니다."""
    average_change = calculate_average_price_change(prices)
    return [price / average_change for price in prices]


def calculate_diagonal_distance(prices, start_day, end_day):
    """
    정규화된 시간과 가격 데이터를 사용하여 두 시점 사이의 대각선 거리를 계산합니다.
    """
    normalized_prices = normalize_prices(prices)
    time_difference = end_day - start_day
    price_difference = normalized_prices[end_day - 1] - normalized_prices[start_day - 1]
    return math.sqrt(time_difference**2 + price_difference**2)


# 예제 가격 데이터
example_prices = [1000, 1100, 1050, 1200, 1150]

# 1일과 5일 사이의 대각선 거리 계산
diagonal_distance_example = calculate_diagonal_distance(example_prices, 1, 5)
diagonal_distance_example

In [None]:
wave_config = [2, 0, 2, 0, 0]
waves_up = wa.find_impulsive_wave(idx_start=idx_start, wave_config=wave_config)

if waves_up:
    wavepattern_up = WavePattern(waves_up, verbose=True)

    for rule in rules_to_check:
        if wavepattern_up.check_rule(rule):
            if wavepattern_up in wavepatterns_up:
                print("SKIPPING")
                plot_pattern(
                    df=df,
                    wave_pattern=wavepattern_up,
                    title=str(wave_config),
                )
                continue
            else:
                wavepatterns_up.add(wavepattern_up)
                print(f"{rule.name} found: {wave_config}")
                plot_pattern(
                    df=df,
                    wave_pattern=wavepattern_up,
                    title=str(new_option_impulse),
                )
        else:
            plot_pattern(
                df=df,
                wave_pattern=wavepattern_up,
                title=str(new_option_impulse),
            )

In [None]:
def calculate_fibonacci_level(low, high, fib_ratio, mode="low_to_high"):
    """
    Calculate the Fibonacci level based on a given ratio and mode.

    :param low: The low point of the wave.
    :param high: The high point of the wave.
    :param fib_ratio: The Fibonacci ratio to apply.
    :param mode: The mode of calculation ('low_to_high' or 'high_to_low').
    :return: The calculated Fibonacci level.
    """
    if mode == "low_to_high":
        return low + (high - low) * fib_ratio
    elif mode == "high_to_low":
        return high - (high - low) * fib_ratio
    else:
        raise ValueError("Invalid mode. Use 'low_to_high' or 'high_to_low'.")


# 예제 사용
low_point = 100
high_point = 200
fibonacci_ratio = 0.618  # 피보나치 비율 예시

# 저점에서 고점까지 계산
fibonacci_level_low_to_high = calculate_fibonacci_level(
    low_point, high_point, fibonacci_ratio, "low_to_high"
)
print(
    f"Low to High Fibonacci level at ratio {fibonacci_ratio} is: {fibonacci_level_low_to_high}"
)

# 고점에서 저점까지 계산
fibonacci_level_high_to_low = calculate_fibonacci_level(
    low_point, high_point, fibonacci_ratio, "high_to_low"
)
print(
    f"High to Low Fibonacci level at ratio {fibonacci_ratio} is: {fibonacci_level_high_to_low}"
)

In [None]:
fibonacci_ratio = 0.3
fibonacci_level_high_to_low = calculate_fibonacci_level(
    low_point, high_point, fibonacci_ratio, "high_to_low"
)
print(
    f"High to Low Fibonacci level at ratio {fibonacci_ratio} is: {fibonacci_level_high_to_low}"
)

In [None]:
import FinanceDataReader as fdr

df = fdr.DataReader("454910", "2023-10-25", "2023-12-23").reset_index()[
    ["Date", "Open", "High", "Low", "Close"]
]

In [None]:
df.to_csv("test.csv")

In [None]:
32150	48500	16350
76900	97900	21000

In [None]:
wave1 = {
    "duration": 9,
    "points": (32150, 48500),
}

wave2 = {
    "duration": 3,
    "points": (76900, 97900),
}

In [None]:
width1 = wave1["duration"]
width2 = wave2["duration"]
print(width1, width2)

height1 = abs(wave1["points"][1] - wave1["points"][0])
height2 = abs(wave2["points"][1] - wave2["points"][0])
print(height1, height2)

low_y = min(wave1["points"][0], wave2["points"][0])
high_y = max(wave1["points"][1], wave2["points"][1])


# print(total_height)

In [None]:
import math

min_x = min(width1, width2)
total_height = high_y - low_y

x1 = width1 / min_x
x2 = width2 / min_x
y1 = height1 / total_height * 100
y2 = height2 / total_height * 100

y_to_x_ratio = min(y1, y2) / max_x

y1 /= y_to_x_ratio
y2 /= y_to_x_ratio

len1 = math.sqrt(x1**2 + y1**2)
len2 = math.sqrt(x2**2 + y2**2)

len1, len2

In [None]:
len1 / len2

In [None]:
x1 = width1 / min_x
x2 = width2 / min_x
print(x1, x2)
max_x = max(x1, x2)
max_x

In [None]:
y1 = height1 / total_height * 100
y2 = height2 / total_height * 100
y1, y2

In [None]:
y_to_x_ratio = min(y1, y2) / max_x
y_to_x_ratio

In [None]:
y1 /= y_to_x_ratio
y2 /= y_to_x_ratio

In [None]:
y1, y2

In [None]:
import math

len1 = math.sqrt(x1**2 + y1**2)
len2 = math.sqrt(x2**2 + y2**2)

In [None]:
len1 / len2

## 수정된 로직


In [None]:
# wave1 = {
#     "duration": 9,
#     "points": (32150, 48500),
# }

# wave2 = {
#     "duration": 3,
#     "points": (76900, 97900),
# }

wave1 = {
    "duration": 9,
    "points": (25850, 33150),
}

wave2 = {
    "duration": 3,
    "points": (34300, 45000),
}

In [None]:
x_to_y_ratio = 2.0

width1 = wave1["duration"]
width2 = wave2["duration"]
print(width1, width2)

height1 = abs(wave1["points"][1] - wave1["points"][0])
height2 = abs(wave2["points"][1] - wave2["points"][0])
print(height1, height2)

max_x = max(width1, width2)
max_y = max(height1, height2)
max_height = max(height1, height2)
low_y = min(
    wave1["points"][0], wave1["points"][1], wave2["points"][0], wave2["points"][1]
)
high_y = max(
    wave1["points"][0], wave1["points"][1], wave2["points"][0], wave2["points"][1]
)
print(low_y, high_y)

width1 /= max_x
width2 /= max_x
print(width1, width2)

width1 *= x_to_y_ratio
width2 *= x_to_y_ratio

height1 /= max_height
height2 /= max_height
print(height1, height2)

diagonal1 = math.sqrt(width1**2 + height1**2)
diagonal2 = math.sqrt(width2**2 + height2**2)
print(diagonal1, diagonal2)

In [None]:
import math

diagonal1 = math.sqrt(width1**2 + height1**2)
diagonal2 = math.sqrt(width2**2 + height2**2)
print(diagonal1)
print(diagonal2)

In [None]:
diagonal1 / diagonal2

In [None]:
import matplotlib.pyplot as plt

plt.plot([0, width1], [wave1["points"][0], wave1["points"][1]], color="blue")
plt.plot([0, width2], [wave2["points"][0], wave2["points"][1]], color="red")

In [None]:
from __future__ import annotations
import FinanceDataReader as fdr

# df = pd.read_csv(r"data/btc-usd_1d.csv")
# df


df = fdr.DataReader("454910", "2023-10-25", "2023-12-23").reset_index()[
    ["Date", "Open", "High", "Low", "Close"]
]

In [None]:
df

In [None]:
import plotly.express as px
import pandas as pd

fig = px.scatter(
    df,
    x="Date",
    y="AAPL.High",
    range_x=["2015-12-01", "2016-01-15"],
    title="Hide Gaps with rangebreaks",
)
fig.update_xaxes(
    rangebreaks=[
        dict(bounds=["sat", "mon"]),  # hide weekends
        # hide Christmas and New Year's
        dict(values=["2015-12-25", "2016-01-01"]),
    ]
)

In [None]:
import plotly.graph_objects as go

data = go.Candlestick(
    x=df["Date"], open=df["Open"], high=df["High"], low=df["Low"], close=df["Close"]
)
layout = dict(title="test")
fig = go.Figure(data=[data], layout=layout)

In [None]:
import pandas as pd

start_date = df.loc[0, "Date"].date().strftime("%Y-%m-%d")
end_date = df.loc[len(df) - 1, "Date"].date().strftime("%Y-%m-%d")

all_dates = pd.date_range(start_date, end_date, freq="D")

# df["Date"] 컬럼을 Python date 객체로 변환
df_dates = df["Date"].dt.date.values

# all_dates에서 df["Date"]에 없는 날짜 찾기
missing_dates = [d.date() for d in all_dates if d.date() not in df_dates]

fig.update_xaxes(rangebreaks=[dict(values=missing_dates)])

In [None]:
fig.update(layout_xaxis_rangeslider_visible=False)

In [None]:
class WV:

    def __init__(self, value):
        self.__value = value

    @property
    def value(self):
        return self.__value

    @value.setter
    def value(self, value):
        self.__value = value

    def get_values(self):
        return self.__value

In [None]:
w = WV(1)

In [None]:
w.value = 100

In [None]:
print(w.value)

In [None]:
w.get_values()