Skip to content

Commit

Permalink
Merge pull request #17 from alexclusive/main
Browse files Browse the repository at this point in the history
Made it do the train game, and some cleanup so sonarlint no longer yells (as much)
  • Loading branch information
jackmoore7 committed Jul 8, 2024
2 parents be05eeb + 57149d1 commit cc866bd
Show file tree
Hide file tree
Showing 12 changed files with 290 additions and 144 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ Get updates when a product you're tracking changes. This can include price, prom

### Search
Search for an item. Uses a Discord embed with pagination. Doesn't work very well because the way Lego's search function works is absolutely wack.

## Miscellaneous

### Train Game
Find all solutions to the 'train game'. Given a four digit number (0000-9999), get to the number 10 using addition, subtraction, multiplication, division, and exponentiation.
14 changes: 8 additions & 6 deletions coles.py → imports/coles.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import requests
import sqlite3 as sl
from bs4 import BeautifulSoup
import json
import urllib.parse

from bs4 import BeautifulSoup

con = sl.connect('fortnite.db', isolation_level=None)
cursor = con.cursor()
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'}
select_coles_version = "SELECT version FROM coles_version"

def update_build_number():
build_version = cursor.execute("SELECT version FROM coles_version").fetchone()[0]
build_version = cursor.execute(select_coles_version).fetchone()[0]
url = "https://www.coles.com.au/_next/data/"
r = requests.get(url, headers=headers, verify=False)
soup = BeautifulSoup(r.content, 'html.parser')
Expand All @@ -23,12 +25,12 @@ def update_build_number():

def search_item(query):
query = urllib.parse.quote(query)
build_version = cursor.execute("SELECT version FROM coles_version").fetchone()[0]
build_version = cursor.execute(select_coles_version).fetchone()[0]
url = "https://www.coles.com.au/_next/data/"
r = requests.get(url + build_version + "/en/search.json?q=" + query, headers=headers, verify=False)
if r.status_code == 404:
update_build_number()
build_version = cursor.execute("SELECT version FROM coles_version").fetchone()[0]
build_version = cursor.execute(select_coles_version).fetchone()[0]
r = requests.get(url + build_version + "/en/search.json?q=" + query, headers=headers, verify=False)
if r.status_code == 404:
return "Your search returned a 404"
Expand All @@ -44,7 +46,7 @@ def get_items(id_list):
r = requests.post(url, headers=headers, json=payload, verify=False)
r = r.json()
for item in r['results']:
id = item['id']
item_id = item['id']
name = item['name']
brand = item['brand']
description = item['description']
Expand All @@ -68,7 +70,7 @@ def get_items(id_list):
multibuy_unit_price = item['pricing']['multiBuyPromotion']['reward']
else:
multibuy_unit_price = ""
item_list.append([id, name, brand, description, current_price, on_sale, available, offer_description, multibuy_unit_price, image_url, promotion_type])
item_list.append([item_id, name, brand, description, current_price, on_sale, available, offer_description, multibuy_unit_price, image_url, promotion_type])
data = {
"invalid_ids": r['invalidProducts'],
"items": item_list
Expand Down
14 changes: 8 additions & 6 deletions ephemeral_port.py → imports/ephemeral_port.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
import subprocess

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
Expand All @@ -6,10 +9,9 @@
from selenium.webdriver.support.wait import WebDriverWait
from selenium.common.exceptions import NoSuchElementException, TimeoutException
from pyvirtualdisplay import Display
import os
from dotenv import load_dotenv
from transmission_rpc import Client
import subprocess

load_dotenv()

def test_port():
Expand Down Expand Up @@ -43,7 +45,7 @@ def test_port():

def get_new_port():
try:
vars = {}
variables = {}
display = Display(visible=0, size=(800, 600))
display.start()
service = Service(executable_path='/usr/bin/chromedriver')
Expand All @@ -60,12 +62,12 @@ def get_new_port():
driver.find_element(By.ID, "login_button").click()
try:
driver.find_element(By.XPATH, "//button[contains(.,\'Delete Port\')]").click()
except:
except Exception:
pass
driver.find_element(By.XPATH, "//button[contains(.,\'Request Matching Port\')]").click()
WebDriverWait(driver, 5).until(expected_conditions.visibility_of_element_located((By.XPATH, "//div[@id=\'epf-port-info\']/span")))
vars["new_port"] = driver.find_element(By.XPATH, "//div[@id=\'epf-port-info\']/span").text
new_port = "{}".format(vars["new_port"])
variables["new_port"] = driver.find_element(By.XPATH, "//div[@id=\'epf-port-info\']/span").text
new_port = "{}".format(variables["new_port"])
return int(new_port)
except NoSuchElementException as e:
print(f"Element not found: {e}.")
Expand Down
68 changes: 40 additions & 28 deletions epic_api.py → imports/epic_api.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
import os
import requests
import sqlite3 as sl

from time import sleep
import os
from datetime import datetime as dt

from key_handling import *
from imports.key_handling import *

con = sl.connect('fortnite.db', isolation_level=None)
cursor = con.cursor()

content_type = "application/x-www-form-urlencoded"
select_switch = "SELECT switch FROM keys"
bearer = "Bearer "

def get_fortnite_status(): #need to use the fortnite client token
url = "https://lightswitch-public-service-prod.ol.epicgames.com/lightswitch/api/service/fortnite/status"
key = cursor.execute("SELECT fortnite FROM keys").fetchall()[0][0]
r = requests.get(url, headers={"Content-Type":"application/x-www-form-urlencoded", "Authorization":"Bearer " + key})
auth = bearer + key
r = requests.get(url, headers={"Content-Type":content_type, "Authorization":auth})
if r.status_code == 401: #key expired, generate a new one. expires every 4 hours.
x = dt.now().isoformat()
get_account_key_fortnitePCGameClient()
# x = dt.now().isoformat()
get_account_key_fortnite_pc_game_client()
sleep(2)
return get_fortnite_status()
r = r.json()
Expand All @@ -27,10 +33,11 @@ def get_fortnite_status(): #need to use the fortnite client token
def get_fortnite_update_manifest(): #need to use the launcher client token
url = "https://launcher-public-service-prod06.ol.epicgames.com/launcher/api/public/assets/v2/platform/Windows/namespace/fn/catalogItem/4fe75bbc5a674f4f9b356b5c90567da5/app/Fortnite/label/Live"
key = cursor.execute("SELECT launcher FROM keys").fetchall()[0][0]
r = requests.get(url, headers={"Content-Type":"application/x-www-form-urlencoded", "Authorization":"Bearer " + key})
auth = bearer + key
r = requests.get(url, headers={"Content-Type":content_type, "Authorization":auth})
if r.status_code == 401: #key expired, generate a new one. expires every 4 hours.
x = dt.now().isoformat()
get_account_key_launcherAppClient2()
# x = dt.now().isoformat()
get_account_key_launcher_app_client_2()
sleep(2)
return get_fortnite_update_manifest()
r = r.json()
Expand All @@ -41,72 +48,77 @@ def get_fortnite_update_manifest(): #need to use the launcher client token

def get_fortnite_shop_item_details(id):
url = "https://catalog-public-service-prod06.ol.epicgames.com/catalog/api/shared/bulk/offers?returnItemDetails=True"
key = cursor.execute("SELECT switch FROM keys").fetchone()[0]
key = cursor.execute(select_switch).fetchone()[0]
auth = bearer + key
params = {
'id': id,
'country': "AU",
'locale': "EN"
}
r = requests.get(url, params=params, headers={"Content-Type":"application/x-www-form-urlencoded", "Authorization":"Bearer " + key})
r = requests.get(url, params=params, headers={"Content-Type":content_type, "Authorization":auth})
if r.status_code == 401:
x = dt.now().isoformat()
# x = dt.now().isoformat()
get_device_auth_2()
sleep(2)
return get_fortnite_shop_item_details(id)
return r.json()

def add_friend(user_id):
client_id = os.getenv('CLIENT_ID')
key = cursor.execute("SELECT switch FROM keys").fetchone()[0]
url = f"https://friends-public-service-prod.ol.epicgames.com/friends/api/v1/{client_id}/friends/{user_id}"
r = requests.post(url, headers={"Content-Type":"application/x-www-form-urlencoded", "Authorization":"Bearer " + key})
key = cursor.execute(select_switch).fetchone()[0]
auth = bearer + key
r = requests.post(url, headers={"Content-Type":content_type, "Authorization":auth})
if r.status_code == 401:
x = dt.now().isoformat()
# x = dt.now().isoformat()
get_device_auth_2()
sleep(2)
return add_friend(user_id)
return r.json()

def get_all_friends(include_pending: bool = False):
client_id = os.getenv('CLIENT_ID')
url = f"https://friends-public-service-prod.ol.epicgames.com/friends/api/public/friends/{client_id}"
key = cursor.execute(select_switch).fetchone()[0]
auth = bearer + key
params = {
'includePending': include_pending
}
client_id = os.getenv('CLIENT_ID')
key = cursor.execute("SELECT switch FROM keys").fetchone()[0]
url = f"https://friends-public-service-prod.ol.epicgames.com/friends/api/public/friends/{client_id}"
r = requests.get(url, params=params, headers={"Content-Type":"application/x-www-form-urlencoded", "Authorization":"Bearer " + key})
r = requests.get(url, params=params, headers={"Content-Type":content_type, "Authorization":auth})
if r.status_code == 401:
x = dt.now().isoformat()
# x = dt.now().isoformat()
get_device_auth_2()
sleep(2)
return get_all_friends()
return get_all_friends(include_pending)
return r.json()

def get_user_by_id(user_id):
key = cursor.execute("SELECT switch FROM keys").fetchone()[0]
url = f"https://account-public-service-prod.ol.epicgames.com/account/api/public/account/{user_id}"
r = requests.get(url, headers={"Content-Type":"application/x-www-form-urlencoded", "Authorization":"Bearer " + key})
key = cursor.execute(select_switch).fetchone()[0]
auth = bearer + key
r = requests.get(url, headers={"Content-Type":content_type, "Authorization":auth})
if r.status_code == 401:
x = dt.now().isoformat()
# x = dt.now().isoformat()
get_device_auth_2()
sleep(2)
return get_user_by_id(user_id)
return r.json()

def get_user_presence(user_id):
key = cursor.execute("SELECT switch FROM keys").fetchone()[0]
client_id = os.getenv('CLIENT_ID')
url = f"https://presence-public-service-prod.ol.epicgames.com/presence/api/v1/_/{client_id}/last-online"
r = requests.get(url, headers={"Content-Type":"application/x-www-form-urlencoded", "Authorization":"Bearer " + key})
key = cursor.execute(select_switch).fetchone()[0]
auth = bearer + key
r = requests.get(url, headers={"Content-Type":content_type, "Authorization":auth})
if r.status_code == 401:
x = dt.now().isoformat()
# x = dt.now().isoformat()
get_device_auth_2()
sleep(2)
return get_user_presence()
return get_user_presence(user_id)
try:
if r.json()[user_id]:
return r.json()[user_id][0]['last_online']
except Exception as e:
except Exception:
return None

def get_free_games():
Expand Down
5 changes: 3 additions & 2 deletions gcloud.py → imports/gcloud.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from google.cloud import vision, language_v1, videointelligence
import io
import requests
import os

from google.cloud import vision, language_v1, videointelligence

def classify_text(text):
"""
Classifying Content in a String
Expand Down Expand Up @@ -47,7 +48,7 @@ def classify_text(text):
# is that this category represents the provided text.
print("Confidence: {}".format(category.confidence))

def CNL(text):
def cnl(text):
client = language_v1.LanguageServiceClient()
document = language_v1.Document(
content=text, type_=language_v1.Document.Type.PLAIN_TEXT
Expand Down
22 changes: 11 additions & 11 deletions key_handling.py → imports/key_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,46 @@
con = sl.connect('fortnite.db', isolation_level=None)
cursor = con.cursor()

def get_account_key_fortnitePCGameClient():
url = "https://account-public-service-prod.ol.epicgames.com/account/api/oauth/token"
content_type = "application/x-www-form-urlencoded"
url = "https://account-public-service-prod.ol.epicgames.com/account/api/oauth/token"

def get_account_key_fortnite_pc_game_client():
username = os.getenv('GAMECLIENT_USERNAME')
password = os.getenv('GAMECLIENT_PASSWORD')
r = requests.post(url, headers={"Content-Type":"application/x-www-form-urlencoded"}, data={"grant_type":"client_credentials"}, auth=(username, password))
r = requests.post(url, headers={"Content-Type":content_type}, data={"grant_type":"client_credentials"}, auth=(username, password))
r = r.json()
try:
if r['access_token']:
print("Fortnite client token generated successfully.")
cursor.execute("UPDATE keys SET fortnite = ?", (r['access_token'],))
return r['access_token']
except:
except Exception:
print("Failed to generate a new Fortnite client token.")
return None


def get_account_key_launcherAppClient2():
url = "https://account-public-service-prod.ol.epicgames.com/account/api/oauth/token"
def get_account_key_launcher_app_client_2():
username = os.getenv('LAUNCHERCLIENT_USERNAME')
password = os.getenv('LAUNCHERCLIENT_PASSWORD')
r = requests.post(url, headers={"Content-Type":"application/x-www-form-urlencoded"}, data={"grant_type":"client_credentials"}, auth=(username, password))
r = requests.post(url, headers={"Content-Type":content_type}, data={"grant_type":"client_credentials"}, auth=(username, password))
r = r.json()
try:
if r['access_token']:
print("Launcher client token generated successfully.")
cursor.execute("UPDATE keys SET launcher = ?", (r['access_token'],))
return r['access_token']
except:
except Exception:
print("Failed to generate a new launcher client token.")
return None

def get_device_auth_2():
url = "https://account-public-service-prod.ol.epicgames.com/account/api/oauth/token"
r = requests.post(url, headers={"Content-Type":"application/x-www-form-urlencoded", "Authorization":"basic " + os.getenv('DEVICE_AUTH')}, data={"grant_type":"device_auth", "account_id":os.getenv('DA_ACCOUNT_ID'), "device_id":os.getenv('DA_DEVICE_ID'), "secret":os.getenv('DA_SECRET')})
r = requests.post(url, headers={"Content-Type":content_type, "Authorization":"basic " + os.getenv('DEVICE_AUTH')}, data={"grant_type":"device_auth", "account_id":os.getenv('DA_ACCOUNT_ID'), "device_id":os.getenv('DA_DEVICE_ID'), "secret":os.getenv('DA_SECRET')})
r = r.json()
try:
if r['access_token']:
print("Device auth token generated successfully.")
cursor.execute("UPDATE keys SET switch = ?", (r['access_token'],))
return r['access_token']
except:
except Exception:
print("Failed to generate a new device auth token.")
return None
File renamed without changes.
2 changes: 2 additions & 0 deletions openai_api.py → imports/openai_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import os
import requests
import openai

from openai import OpenAI

from third_party_api import *

client = OpenAI(
Expand Down
File renamed without changes.
10 changes: 5 additions & 5 deletions third_party_api.py → imports/third_party_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def fortnite_br_stats(username):
r = requests.get(url, params={"name": username}, headers={"Authorization": key})
return r

def getAccountID(username):
def get_account_id(username):
url = "https://fortniteapi.io/v1/lookup"
key = os.getenv('FNAPI_IO_KEY')
r = requests.get(url, params={"username": username}, headers={"Authorization": key})
Expand All @@ -25,10 +25,10 @@ def fish_loot_pool():
def fish_stats(username):
url = "https://fortniteapi.io/v1/stats/fish"
key = os.getenv('FNAPI_IO_KEY')
accountId = getAccountID(username)
if accountId == "no":
account_id = get_account_id(username)
if account_id == "no":
return "no"
r = requests.get(url, params={"accountId": accountId}, headers={"Authorization": key})
r = requests.get(url, params={"accountId": account_id}, headers={"Authorization": key})
if r.status_code != 200:
print("Failed to fetch user fish data")
return 404
Expand All @@ -52,5 +52,5 @@ def fortnite_images(name):
key = os.getenv('FNBR_API_KEY')
r = requests.get(url, headers={"x-api-key": key})
return r.json()['data'][0]['images']['icon']
except:
except Exception:
return None
File renamed without changes.
Loading

0 comments on commit cc866bd

Please sign in to comment.