### IGNORE FOLLOWING CODE, JUST EXECTURE IT AND GO TO BLOCK BELOW

In [None]:
import requests
import pandas as pd
from datetime import datetime, timezone

class DownloadData(): 
    """
    Collection of methods that allows to donwload historical data trough Dukascopy API.
    """

    def __init__(self, ticker, start_date, end_date,timeframe):
        """
        Parameters
        ----------
        ticker : int
            Desired forex pair with format "XXX/YYY"
        start_date : str
            Start date with format "DD/MM/YYYY"
        end_date : str
            End date with format "DD/MM/YYYY"
        """
        timeframes = ['1day', '1hour', '10m','1min', '10sec', 'tick']
        if timeframe not in timeframes:
            raise SystemExit(f"Invalid timeframe, valid inputs are {timeframes}")
        else:
            self.timeframe = timeframe

        self.api_key = 'z3pstgimi8000000'
        self.ticker = ticker
        self.symbol = self.getTicker()
        
        # Date must be converted to UNIX with milliseconds
        self.start_date = datetime.timestamp(datetime.strptime(start_date, "%d-%m-%Y").replace(tzinfo=timezone.utc)) * 1000
        self.end_date = datetime.timestamp(datetime.strptime(end_date, "%d-%m-%Y").replace(tzinfo=timezone.utc)) * 1000

        if self.start_date > self.end_date:
            raise SystemExit(f"Start date can't be after end date.")

    def getTicker(self):
        url = 'https://freeserv.dukascopy.com/2.0/?path=api/instrumentList'
        params = {'key': self.api_key}
    
        response = requests.get(url, params=params)
        
        if response.status_code == 200:
            data = pd.DataFrame(response.json())
            filtered_data = data[data['name'] == self.ticker]
        else:
            print(f"Request failed with status code: {response.status_code}")

        if not filtered_data.empty:
            found_id = filtered_data['id'].values[0]
            del data, filtered_data, url, params, response
            return found_id
        else:
            raise SystemExit(f"No data found for name: {self.ticker}")

    def getData(self):

        url = 'https://freeserv.dukascopy.com/2.0/?path=api/historicalPrices'

        download_progress = None
        final_data = pd.DataFrame()
        attempt = 0
        '''
        I had to create a while loop because the API let me download max. only 5000 rows of data at once. 
        So I create a loop that will do multiple requests, basically it donwload 5k rows of data at time 
        until all data relative to the desired datetime range has been donwloaded.
        '''
        while(True):
            '''
            If I want data from 01/01/2023 to today, then I'll be given only the most recent 5k rows of data. 
            Then I check if I already downloaded data into the dataframe, if yes less_recent_value assumes 
            the value of the first row Timestamp, so that this cycle will download the 5k rows of data that 
            precedes the ones already downlaoded. 
            '''
            params = {
                'key': self.api_key,
                'instrument': self.symbol,
                'timeFrame': self.timeframe,
                'count' : 5000, # Max rows of data downloadable, can't insert a value higher than this. 
                'start' : self.start_date,
                'end' : self.end_date if download_progress is None else download_progress
            }

            attempt = attempt+1
            print(f'\r \t Downloading... {self.ticker} '
                 f'{datetime.utcfromtimestamp(params["end"] / 1000)} '
                 f'Call n. {attempt}', end='', flush=True)

            response = requests.get(url, params=params)

            if response.status_code == 200:
                data = pd.DataFrame(response.json())

                '''               
                There's a bug in the API: if you do rapidly many requests it can happen thtat
                you get data for an incorrect instrument. So this condition makes sure that
                the API provided the requested data, if not the code will continue to do
                requests until API deliever correct data. 
                '''

                if not data[data['id'] != self.symbol].empty:
                    continue 
                
                # Converting json in dataframe
                if params['timeFrame'] == 'tick':
                   data = pd.DataFrame(data['ticks'].apply(lambda x: {'Timestamp': x['timestamp'],'Bid': x['bid'],
                                                                       'Ask': x['ask']}).tolist())
                else:
                   data = pd.DataFrame(data['candles'].apply(lambda x: {'Timestamp': x['timestamp'],'Open': x['bid_open'], 
                                                                         'High': x['bid_high'], 'Low': x['bid_low'], 
                                                                         'Close': x['bid_close']}).tolist())

                # Appending data retrieved on this call to the main dataframe
                final_data = pd.concat([data, final_data], ignore_index=True)
            else:
                raise SystemExit(f"Request failed with status code: {response.status_code}")
            
            new_progress = final_data['Timestamp'].iloc[0]
            if not download_progress == new_progress: 
               download_progress = new_progress
            else:
               break
        
        # Clear print output
        print('\r' + ' ' * 80, end='\r', flush=True)

        # Check if there are duplicated rows
        if not final_data[final_data.duplicated()].empty:
            print("There are duplicated rows in the combined DataFrame.")

        # Convert timestamp to human-readable date and setting it as index
        final_data['Timestamp'] = pd.to_datetime(final_data['Timestamp'], unit='ms',utc=True)
        final_data.set_index('Timestamp',inplace=True)
        
        return final_data



# (start here) Classify price movements based on candlesticks statistics

This reseach comes from [this](https://www.forexfactory.com/thread/post/14707863#post14707863) post on ForexFactory. 

### Step 1: gather data and create a rolling TF

In [None]:
# This allow jupiter to upload in real time externally modified code
%load_ext autoreload
%autoreload 2 

import os
import pandas as pd
import numpy as np
from tabulate import tabulate
pd.options.mode.chained_assignment = None  # default='warn'

In [None]:
start_date = "01-12-2023"
end_date = "10-12-2023" #This code being executed on google servers make everything slow. 
                        #Do not download too much data or it will take much time
timeframe = 'tick'
price_frame = 1000

csv_file_path = f"{os.getcwd()}/x_DATA/{start_date}_{end_date}  {timeframe}.csv"

df = pd.DataFrame()


if os.path.exists(csv_file_path):
    df = pd.read_csv(csv_file_path)
else:
    df = DownloadData('GBP/USD', start_date,end_date,timeframe).getData()
    df.reset_index(inplace=True)    
    data_folder_path = f"{os.getcwd()}/x_DATA"
    if not os.path.exists(data_folder_path):
        os.makedirs(data_folder_path)

    df.to_csv(csv_file_path)

df

In [None]:
if "Ask" in df.columns:
    df = df.drop("Ask", axis=1)

df["Close"] = df["Bid"]
df["Open"] = df["Bid"].shift(price_frame)  # Shift the "Bid" values 1000 rows back
df["High"] = df["Bid"].rolling(window=price_frame).max()  # Calculate the rolling max over the last 1000 rows
df["Low"] = df["Bid"].rolling(window=price_frame).min()  # Calculate the rolling min over the last 1000 rows

df

#### Time of high and time of low
I'm adding a column that returns the high and low times.

In [None]:
# Calculate the index of the maximum value in the rolling window for "High time"
df['High time'] = price_frame - ( df.index.values - df['Bid'].rolling(window=price_frame).agg(lambda x: x.index.values[np.argmax(x.values)]) ) 
df["Low time"] = price_frame  - ( df.index.values - df['Bid'].rolling(window=price_frame).agg(lambda x: x.index.values[np.argmin(x.values)]) )
df['High first'] = df['High time'] > df["Low time"]

df = df.dropna() #Drop initial NaN values

df

#### How to interpret 'High time' and 'Low time' columns
e.g. 
- High time = 746 and Low time = 200
- That means: the high was made on tick number 746 of the current 1000 ticks candlestick

Here's a draw so that you can fully understand
![Drawing](https://i.imgur.com/ORcChmJ.png)

### Step 2: Directional bias

In [None]:
df['Open - Close'] = (df['Open'] - df['Close']).abs()
df['Upper Wick'] = (df['High'] - df[['Open', 'Close']].max(axis=1)).abs()
df['Lower Wick'] = (df[['Open', 'Close']].min(axis=1) - df['Low']).abs()

df['Bias'] = np.where(df['Open'] > df['Close'], 'Bearish', np.where(df['Close'] > df['Open'], 'Bullish', 'Doji'))

df


### Step 3: Strength of directional bias
It can be:
- weak
- medium ("normal")
- strong

I'll calculate the average candlestick like: $\frac{\text{Average Bullish candlestick OC}+\text{Average Bearish candlestick OC}}{2}$. 
Then I'll calculate the standard deviation of the bullish candles OC and bearish candles OC and then: $\frac{\text{St.dev Bullish candlestick OC}+\text{St.dev Bearish candlestick OC}}{2}$. 
And lastly I'll create a boundary around the average, meaning that:
- If a bullish candle OC is $>=\mu+\sigma$ then it's strong. 
- If a bullish candle OC is $>=\mu$ and $<\mu+\sigma$ then it's medium ("normal").
- If a bullish candle OC is $<=\mu-\sigma$ then it's weak.

In [None]:

# Filter rows where Bias is "Bullish"
bullish_rows = df[df['Bias'] == 'Bullish']
bearish_rows = df[df['Bias'] == 'Bearish']

# average bullish and bearish candle OC
average_bullish_diff = bullish_rows['Open - Close'].mean()
average_bearish_diff = bearish_rows['Open - Close'].mean()
mu = ( average_bullish_diff + average_bearish_diff ) / 2

# stdev bullish and bearish candle OC
stdev_bullish_diff = bullish_rows['Open - Close'].std()
stdev_bearish_diff = bearish_rows['Open - Close'].std()
sigma = ( stdev_bullish_diff + stdev_bearish_diff ) / 2


# assign strenght bias
df['Strength'] = np.where(df['Open - Close'] >= mu + sigma, 'Strong',
                          np.where((df['Open - Close'] > mu-sigma) & (df['Open - Close'] < mu + sigma), 'Medium', 'Weak'))

df

### Step 4: print results

In [None]:
# Possible values in each column
unique_biases = df['Bias'].unique()
unique_strengthes = df['Strength'].unique()
unique_high_first_values = df['High first'].unique()

# Create a list to store the results
result_data = []

pd.options.display.float_format = '{:.5f}'.format

# Iterate over combinations and calculate mean
for bias in unique_biases:
    for strength in unique_strengthes:
        for high_first in unique_high_first_values:
            subset = df[(df['Bias'] == bias) & (df['Strength'] == strength) & (df['High first'] == high_first)]
            mean_oc = subset['Open - Close'].mean()
            mean_oc = round(mean_oc,5)
            result_data.append({'Bias': bias, 'Strength': strength, 'High First': high_first, 'Avg. Open - Close': mean_oc})

# Convert the list of dictionaries to a DataFrame
result_df = pd.DataFrame(result_data)

# Print the formatted table
print(tabulate(result_df, headers='keys', tablefmt='simple_outline', showindex=False, floatfmt=".5f", numalign="center", stralign="center"))
