## Instructions and Description

This Python notebook is designed to perform complex financial analysis on the real-time order book data for the Ethereum-USD trading pair on the Binance exchange using Amberdata's S3 buckets. Below is an overview of the structure and functionalities of the code.

### 1. Importing Essential Libraries
The code kicks off by importing the necessary Python and PySpark libraries:

- `delta.tables`: To work with Delta tables in PySpark.
- `pyspark.sql.window`: To apply window functions in Spark SQL.
- `pyspark.sql.functions`: To use a wide range of functions in Spark SQL.
- `pyspark.sql.types`: To define specific data types.

### 2. Data Ingestion and Basic Filtering
The code reads data from an S3 bucket containing DeltaLake tables. It applies basic filters to isolate data (feel free to change the filters):

- Year: 2023
- Month: August
- Day: 29
- Exchange: Binance
- Trading Pair: ETH/USDT

### 3. Data Transformation
After reading the data, it undergoes several transformations to prepare for analysis:

- Type casting: Converting timestamp to `long` data type.
- Column selection: Isolating relevant columns like price and quantity.

#### Additional Data Filtering:

- `bid_book`: Contains the data for bid orders.
- `ask_book`: Contains the data for ask orders.

### 4. Calculating Slippage and Other Metrics
The code calculates slippage, cost, and other metrics based on the real-time order book. The following metrics are computed:

- `cum_quantity`: Cumulative sum of quantities.
- `cum_cost`: Cumulative sum of cost.
- `slippage`: Calculated based on the difference between the desired and actual order amounts.
- `currency_slippage`: Slippage in terms of currency.
- `dollar_slippage`: Slippage in terms of US dollars.
- `percent_slippage`: Slippage as a percentage of the order.

### 5. Combining Bid and Ask Books
The `bid_book` and `ask_book` are then joined on the `exchangeTimestamp` field, and aggregate metrics are computed.

### 6. Resultant Order Book Analysis
Finally, the code groups the data by `exchangeTimestamp` to calculate first non-null instances of various slippage metrics for both buying and selling.

#### Additional Features:

- The code uses the Spark DataFrame's `.cache()` method to cache intermediate DataFrames for improved performance.
- Window functions are applied for partitioning and ordering data.

### Sample Code Variables
You can set your cost basis like this:

```python
cost_basis = 1_000_000
```

This sets your cost basis to 1 million USD, and slippage calculations will be based on this.

*Note: Due to the real-time nature of the data and heavy calculations, this notebook might take a considerable amount of time to run.*

In [None]:
# 1. Importing Essential Libraries
from delta.tables import DeltaTable
from pyspark.sql.window import Window
from pyspark.sql.functions import *
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType

In [None]:
# 2. Data Ingestion and Basic Filtering
trades = (
        DeltaTable
        .forPath(spark, 's3://amberdata-marketdata-deltalake/spot/order-book-snapshots/').toDF()
        .where('year = "2023" \
                AND month = "08"\
                AND day = "29"\
                AND exchange = "binance" \
                AND pair = "eth_usdt"')
        .select(f.explode('orderBookSides')).select("col.exchangeTimestamp", "col.isBid", f.explode('col.data'))
         ).cache()

In [None]:
# 3.1 Data Transformation: Type Casting
trades = trades.withColumn('exchangeTimestamp', f.col('exchangeTimestamp').cast('long'))

# 3.2 Data Transformation: Column Selection
trades = trades\
          .select(f.col("col")\
          .getItem(0).alias("price"), f.col("col")\
          .getItem(1).alias("quantity"), "*")\
          .drop("col", 'timestamp_window')

# 3.3 Additional Data Filtering: Bid and Ask Books
bid_book = trades.filter(f.col("isBid") == True).select("*")
ask_book = trades.filter(f.col("isBid") == False).select("*")

# 3.4 Renaming Columns
bid_book = bid_book.drop("isBid")
bid_book = bid_book.withColumnRenamed('price', 'bid_price').withColumnRenamed('quantity', 'bid_quantity')
ask_book = ask_book.drop("isBid")
ask_book = ask_book.withColumnRenamed('price', 'ask_price').withColumnRenamed('quantity', 'ask_quantity')

In [None]:
# 4.1 Preview of Bid and Ask Books
ask_book.limit(5).display()
bid_book.limit(5).display()

ask_price,ask_quantity,exchangeTimestamp
1729.32,84.7922,1693347960000
1729.34,0.1498,1693347960000
1729.35,17.3918,1693347960000
1729.43,0.577,1693347960000
1729.45,3.0753,1693347960000


In [None]:
# 5.1 Window Functions Initialization
timestampWindow = Window.partitionBy("exchangeTimestamp")

# 5.2 Defining Windows for Bid and Ask Books
bid_window = Window.partitionBy('exchangeTimestamp')\
        .orderBy(f.desc('bid_price'))\
        .rowsBetween(Window.unboundedPreceding, 0)

ask_window = Window.partitionBy('exchangeTimestamp')\
        .orderBy(f.asc('ask_price'))\
        .rowsBetween(Window.unboundedPreceding, 0)

In [None]:
# 6.1 Slippage and Metrics Calculation
cost_basis = 1_000_000

# 6.2 Calculations for Bid Book
bid_book = bid_book\
        .withColumn("order_amount", lit(cost_basis))\
        .withColumn("sell_order_price_1", max("bid_price").over(timestampWindow))\
        .withColumn("desired_amount", col("order_amount") / col("sell_order_price_1"))\
        .withColumn("cum_quantity", sum("bid_quantity").over(bid_window))\
        .withColumn("cum_cost", sum(col("bid_quantity") * col("bid_price")).over(bid_window))\
        .withColumn("slippage", when(col('order_amount') - col('cum_cost') < 0, ((col('order_amount') - (col('cum_cost') - (col('bid_quantity') * col('bid_price')))) / col('bid_price')) + (col('cum_quantity') - col('bid_quantity')))\
            .otherwise(0))\
        .withColumn("sell_currency_slippage", when(col('slippage') != 0, (col("desired_amount") - col("slippage")) *-1 ))\
        .withColumn("sell_dollar_slippage", when(col('slippage') != 0, (col("order_amount") - (col("slippage")*col("sell_order_price_1"))) *-1))\
        .withColumn("sell_percent_slippage", when(col('slippage') != 0, ((col("desired_amount") / col("slippage")) -1 ) *-1 ))

# 6.3 Calculations for Ask Book
ask_book = ask_book\
        .withColumn("order_amount", lit(cost_basis))\
        .withColumn("buy_order_price_1", min("ask_price").over(timestampWindow))\
        .withColumn("desired_amount", col("order_amount") / col("buy_order_price_1"))\
        .withColumn("cum_quantity", sum("ask_quantity").over(ask_window))\
        .withColumn("cum_cost", sum(col("ask_quantity") * col("ask_price")).over(ask_window))\
        .withColumn("slippage", when(col('order_amount') - col('cum_cost') < 0, ((col('order_amount') - (col('cum_cost') - (col('ask_quantity') * col('ask_price')))) / col('ask_price')) + (col('cum_quantity') - col('ask_quantity')))\
            .otherwise(0))\
        .withColumn("buy_currency_slippage", when(col('slippage') != 0, col("desired_amount") - col("slippage")))\
        .withColumn("buy_dollar_slippage", when(col('slippage') != 0, col("order_amount") - (col("slippage")*col("buy_order_price_1"))))\
        .withColumn("buy_percent_slippage", when(col('slippage') != 0,(col("desired_amount") / col("slippage"))-1))

# 6.4 Join and group Bid and Ask Book
orderbook = bid_book.join(ask_book, on='exchangeTimestamp', how='inner')

orderbook = orderbook.groupby("exchangeTimestamp")\
            .agg(first(col("buy_currency_slippage"), ignorenulls=True).alias("buy_currency_slippage"),
                 first(col("buy_dollar_slippage"), ignorenulls=True).alias("buy_dollar_slippage"),
                 first(col("buy_percent_slippage"), ignorenulls=True).alias("buy_percent_slippage"),
                 first(col("sell_currency_slippage"), ignorenulls=True).alias("sell_currency_slippage"),
                 first(col("sell_dollar_slippage"), ignorenulls=True).alias("sell_dollar_slippage"),
                 first(col("sell_percent_slippage"), ignorenulls=True).alias("sell_percent_slippage"))

In [None]:
# 7. Display Results
orderbook.limit(5).display()