In [8]:
import polars as pl
from polars import col
import requests
import pulp
from concurrent.futures import ThreadPoolExecutor
from itertools import combinations 
import json
import re 
from datetime import datetime
from datetime import timezone
import os

In [15]:
# get the latest items.json from github
def update_items():
    response = requests.get("https://github.com/ao-data/ao-bin-dumps/raw/refs/heads/master/items.json")
    
    if not os.path.exists("./data"): 
        os.makedirs("./data") 
    
    with open("./data/items.json", mode="wb") as file:
        file.write(response.content)

In [10]:
# use dumped json file to get item stats
with open('./data/items.json') as f:
    data: dict = json.load(f)


# recursively gets all combinations of @uniquename and @weight to populate item data
def extract_unique_weight_tuples(data: dict):
    tuples: list = []
    if isinstance(data, dict):
        for key, value in data.items():
            if key == "@uniquename" and "@weight" in data:
                if re.search("LEVEL[1-8]", data["@uniquename"]) == None:
                    tuples.append((data["@uniquename"], data["@weight"]))
                else:
                    # for some reason albion data projects expects items with quality to have 
                    # "LEVEL1@1", "LEVEL2@2", etc on the name and the game files does not have that
                    tuples.append((data["@uniquename"] + f"@{data["@uniquename"][data["@uniquename"].find("LEVEL") + 5]}", data["@weight"]))
            else:
                tuples.extend(extract_unique_weight_tuples(value))
    elif isinstance(data, list):
        for item in data:
            tuples.extend(extract_unique_weight_tuples(item))
    return tuples

def format_tuples_as_dict(tuples):
    return {tuple[0]: tuple[1] for tuple in tuples}

weight_dicts: dict = format_tuples_as_dict(extract_unique_weight_tuples(data))

list_of_items = list(weight_dicts.keys())



In [11]:

lista_tabelas: list = []
locations: str = "Lymhurst,Bridgewatch,Fort Sterling,Martlock,Thetford"

items_string: str = ""
index: int = 0
links: list = []
# The api url has a limit of 4096 chars, so we need to subdivide the call to have only so many items
item_string_size_limit = 3900
while index < len(list_of_items):
    while len(items_string) <= item_string_size_limit:
        items_string += f"{list_of_items[index]},"
        index += 1
        if index == len(list_of_items): 
            break
    
    link = f"https://west.albion-online-data.com/api/v2/stats/prices/{items_string}?locations={locations}&qualities=1"
    links.append(link)
    items_string = ""
    
    
# defines the function to get the request content, useful for parallelize
# uses gzip to not tax the server
my_headers: dict = {'accept-encoding':'gzip'}
def get_data(url: str):
    content = requests.get(url, json=True, headers=my_headers).content
    return content

# get all json results in paralel
with ThreadPoolExecutor() as executor:
    conteudos =  list(executor.map(get_data, links))

In [None]:

# ignores rows with 0 price, as those mean none is getting sold
for conteudo in conteudos:
    tabela = pl.read_json(conteudo)

    lista_tabelas.append(tabela)

resorce_list: list = ["_WOOD_", "_STONE_", "_HIDE_", "_ORE_", "_FIBER_", "_PLANKS_", "_BRICK_", "_LEATHER_", "_METAL_", "_CLOTH_", "_SOUL_", "_RUNE_"]
time_now: datetime = datetime.now(tz=timezone.utc)#.replace(tzinfo=None)

# The polars lazy api is generally faster
final = pl.concat(lista_tabelas).lazy().select(
    col("city"),
    col("sell_price_min"),
    col("sell_price_max"),
    col("buy_price_min"),
    col("buy_price_max"),
    col("sell_price_min_date").str.to_datetime(),
    # each quality of an item is basically a new item for this purpose, 
    # so it's better to concat item and quality
    item_name = pl.concat_str([
        col("item_id"),
        col("quality"),
    ], separator = "_"),
    buy_price_mean = ((col("sell_price_min") + col("sell_price_max")) /2) ,
    sell_price_mean = ((col("buy_price_min") + col("buy_price_max")) /2),
    buy_price_max_age =  (time_now - col("buy_price_max_date").str.to_datetime().dt.replace_time_zone("UTC")).dt.total_minutes(),
    sell_price_min_age =  (time_now - col("buy_price_min_date").str.to_datetime().dt.replace_time_zone("UTC")).dt.total_minutes(),
).with_columns(
    resource_tag = pl.when(col("item_name").str.contains_any(resorce_list)).then(True).otherwise(False)
)


teste = final.collect()
final.collect()

In [None]:

cities: list = ["Bridgewatch", "Martlock", "Thetford", "Fort Sterling", "Lymhurst", "Bridgewatch"]

# will be a configurable value for how old the information can be
max_age: int = 120

# crete the combinations 2 by 2
city_combinations: list = list(combinations(cities, 2))
per_comb: list= []

products = final.collect().get_column("item_name").unique().to_list()

# This ugly loop would probably be better with .map() sintax and
# possibly tuples or structs, at least its not that slow
for comb in city_combinations:
    orig, dest = comb
    products_list: list = []
    buy_prices: list = []
    profits: list = []
    weights: list = []
    tags: list = []
    ages: list = []
    filtered_by_cities = final.filter(col("city").is_in([orig, dest]))
    # massaging the data into dicts gives a huge speed up
    buy = dict(filtered_by_cities.filter(col("city") == orig).select(["item_name", "sell_price_min"]).collect().iter_rows())
    buy_ages = dict(filtered_by_cities.filter(col("city") == orig).select(["item_name", "sell_price_min_age"]).collect().iter_rows())
    sell = dict(filtered_by_cities.filter(col("city") == dest).select(["item_name", "buy_price_max"]).collect().iter_rows())
    sell_ages = dict(filtered_by_cities.filter(col("city") == dest).select(["item_name", "buy_price_max_age"]).collect().iter_rows())
    tag = dict(filtered_by_cities.filter(col("city") == dest).select(["item_name", "resource_tag"]).collect().iter_rows())

    for p in products:
        buy_price: int = buy[p]
        buy_age: int = buy_ages[p]
        sell_price: int = sell[p]
        sell_age: int = sell[p]
        if buy_price > 0 and sell_price > 0 and buy_age <= max_age and sell_age <= max_age:
            products_list.append(f"{orig}_{dest}_{p}")
            buy_prices.append(buy_price)
            profits.append(sell_price-buy_price)
            weights.append(float(weight_dicts[p[0:-2]]))
            tags.append(tag[p])
            ages.append((buy_age, sell_age))

    per_comb.append((products_list, buy_prices, profits, weights, tags, ages))


per_comb

In [None]:
# these 4 will be configurable
max_weight = 4116 # default weight of t8 ox 
silver_limit = 500_000
resource_limit = 1000
non_resource_limit = 10 # non resources are riskier

for comb in per_comb:
    # sometimes there will not be any items that are not too old, in this case we skip the iteration
    if len(comb[0]) == 0: continue 

    products_list, buy_prices, profits, weights, tags, ages = comb


    # Create a PuLP problem instance as an Integer Linear Program (ILP)
    prob = pulp.LpProblem("MaximizeProfit", pulp.LpMaximize)

    # Create decision variables for each product 
    x = pulp.LpVariable.dicts("Product", products_list, cat=pulp.LpInteger, lowBound=0)
    y = pulp.LpVariable.dicts("PurchaseLimit", products_list, cat=pulp.LpInteger, lowBound=0)  # New decision variable for purchase limit


    # Set the objective function (maximize profit)
    prob += pulp.lpSum([profits[i] * x[products_list[i]] for i in range(len(products_list))])

    # Add the budget constraint
    prob += pulp.lpSum([buy_prices[i] * x[products_list[i]] for i in range(len(products_list))]) <= silver_limit

    # Add the weight constraint
    prob += pulp.lpSum([weights[i] * x[products_list[i]] for i in range(len(products_list))]) <= max_weight

    # user should be able the max to buy of resources and non resources,
    # this limit is added here
    for i in range(len(products_list)):
        if tags[i]:
            y[products_list[i]].upBound = resource_limit
        else:
            y[products_list[i]].upBound = non_resource_limit

    for i in range(len(products_list)):
        prob += y[products_list[i]] >= x[products_list[i]]

    # Solve the problem
    #solver_list = pulp.listSolvers(onlyAvailable=True)
    #print(solver_list)
    pulp.GLPK_CMD(msg=False).solve(prob)


    # Print the solution
    print("Solution:")
    for i in range(len(products_list)):
        if x[products_list[i]].value() > 0:
            print(f"Product {products_list[i]} is selected: {x[products_list[i]].value()} units and {int(profits[i])} profit per")
            print(f"ages: {ages[i]} minutes")
    print(f"Total profit: {int(pulp.value(prob.objective)):,d}")