In [1]:
import numpy as np
import string
import time
import json
import shutil
import os
import pyspark
from datetime import timedelta
from datetime import datetime as dt
from delta import configure_spark_with_delta_pip
from faker import Faker

In [2]:
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.default.parallelism", "24")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
sc = spark.sparkContext

In [3]:
fake = Faker()

In [4]:
class Environment:

    def __init__(self) -> None:
        self.DIR_ROOT = os.getcwd()+"\sx"
        self.DIR_COMPANIES = os.path.join(self.DIR_ROOT, "companies")
        self.generate_root_env()
    
    def generate_root_env(self):
        # if self.DIR... does not exist, create it
        for key, value in self.__dict__.items():
            if key.startswith("DIR_"):
                if not os.path.exists(value):
                    os.makedirs(value)


class Company():
    def __init__(self, name = fake.company(),address = fake.street_address(),city = fake.city(),state = fake.state(),zip = fake.zipcode(),phone = fake.phone_number(),email = fake.email()):
        self.name:str = name
        self.address:str = address
        self.city:str = city
        self.state:str = state
        self.zip:str = zip
        self.phone:str = phone
        self.email:str = email
        self.ticker:str = None
        self.next:Company = None
        
    def __repr__(self):
        return str(self.name)
    
    def __str__(self):
        return str(self.name)


class CompanyRegister:
    def __init__(self, ENV:Environment):
        self.head = None
        self.ENV = ENV
        self._get_existing_companies()
    
    def generate_companies(self, num_companies=10):
        for n in range(num_companies):
            c = Company(name = fake.company(),address = fake.street_address(),city = fake.city(),state = fake.state(),zip = fake.zipcode(),phone = fake.phone_number(),email = fake.email())
            self._make_ticker(c)
            self.generate_company_env(c)
            self._add_node(c)

    def generate_company_env(self, company:Company):
        os.makedirs(os.path.join(self.ENV.DIR_COMPANIES, company.ticker), exist_ok=True)
        os.makedirs(os.path.join(self.ENV.DIR_COMPANIES, company.ticker, "trades"), exist_ok=True)
        os.makedirs(os.path.join(self.ENV.DIR_COMPANIES, company.ticker, "details"), exist_ok=True)

        # write company details to json in details folder
        details = {
            "name": company.name,
            "ticker": company.ticker,
            "address": company.address,
            "city": company.city,
            "state": company.state,
            "zip": company.zip,
            "phone": company.phone,
            "email": company.email
        }
        with open(os.path.join(self.ENV.DIR_COMPANIES, company.ticker, "details", "0.json"), "w") as f:
            json.dump(details, f)
        
        time.sleep(2)

    def _get_existing_companies(self):
        for company in os.listdir(self.ENV.DIR_COMPANIES):
            # load the company details from the json file
            with open(os.path.join(self.ENV.DIR_COMPANIES, company, "details", "0.json"), "r") as f:
                details = json.load(f)
            # create a company object
            c = Company(name = details["name"], address = details["address"], city = details["city"], state = details["state"], zip = details["zip"], phone = details["phone"], email = details["email"])
            self._add_node(c)

    def _make_ticker(self, company):
        if company is None:
            return
        az = string.ascii_uppercase
        ticker = company.name[0:4].upper()
        ticker.replace(" ", "")
        # remove any non-alphabetical characters
        ticker = ''.join([i for i in ticker if i in az])
        iter = self.head
        while iter is not None:
            if iter.ticker == ticker:
                ticker = ticker + az[np.random.randint(0, 25)] + az[np.random.randint(0, 25)]
            iter = iter.next
        company.ticker = ticker
        return
    
    def _add_node(self, company):
        self._make_ticker(company)
        if self.head is None:
            self.head = company
            return
        company.next = self.head
        self.head = company
        return
    
    def _gen_tickers(self):
        iter = self.head
        while iter is not None:
            yield iter.ticker
            iter = iter.next
        return
    
    def _gen_companies(self):
        iter = self.head
        while iter is not None:
            yield iter
            iter = iter.next
        return


class StockExchange():
    def __init__(self, CompanyRegister:CompanyRegister):
        self.CR = CompanyRegister
        self.start_date = dt.strptime("2022-11-14 09:00:00", "%Y-%m-%d %H:%M:%S")
        self.end_date = dt.strptime("2022-11-20 17:00:00", "%Y-%m-%d %H:%M:%S")
    
    def _init_prices(self, ENV:Environment):
        tickers = list(self.CR._gen_tickers())
        list_prices = []
        # generate start prices
        for tick in tickers:
            list_prices.append({
                "ticker": tick,
                "price": round(np.random.uniform(1, 1000),2),
                "timestamp": str(self.start_date)
            })
        
        # write the start prices and trade time to json within each company folder
        for pricedict in list_prices:
            ticker = pricedict["ticker"]
            jsonpath = ENV.DIR_COMPANIES + "/" + ticker + f"/trades/0.json"
            with open(jsonpath, "w") as f:
                json.dump(pricedict, f)
    
    def _get_last_trade(self, company:Company, ENV:Environment):
        ticker = company.ticker
        tickerpath = os.path.join(ENV.DIR_COMPANIES,ticker,"trades")
        # get the last trade
        last_trade_n = len(os.listdir(tickerpath)) - 1
        if last_trade_n < 0:
            last_trade_n = 0
            self._init_prices(ENV)
        last_trade = {}
        with open(f"{tickerpath}\{last_trade_n}.json", "r") as f:
            last_trade = json.load(f)
        return last_trade
    
    def _write_trade(self, company:Company, new_price, new_timestamp, ENV:Environment):
        ticker = company.ticker
        tickerpath = ENV.DIR_COMPANIES + "/" + ticker + "/trades/"
        # write the new trade to json
        new_trade = {
            "ticker": ticker,
            "price": new_price,
            "timestamp": str(new_timestamp)
        }
        with open(tickerpath + str(len(os.listdir(tickerpath))) + ".json", "w") as f:
            json.dump(new_trade, f)
    
    def generate_historic_trades(self, ENV:Environment):
        # generate historic trades from self.start_date to current date

        total_days = (self.end_date - self.start_date).days + 1
        end_hour = self.end_date.time().hour
        start_hour = self.start_date.time().hour
        total_hours = end_hour - start_hour + 1

        iter_date = self.start_date
        for day in range(total_days):
            for hour in range(start_hour, start_hour+total_hours):
                # create new iter_date adding number of days and set the time to the current hour
                iter_date = self.start_date + timedelta(days=day)
                iter_date = iter_date.replace(hour=hour)
                # for each company generate a historic trade
                for company in self.CR._gen_companies():
                    # get the last trade
                    last_trade = self._get_last_trade(company, ENV)
                    # generate a new price
                    new_price = round(last_trade["price"] + np.random.normal(-0.1, 5), 2)
                    # generate a 10% chance of being a big move
                    if np.random.randint(0, 10) == 0:
                        # increased chance to generate a big move
                        new_price = round(last_trade["price"] + np.random.uniform(-10, 10), 2)
                    # if new price is negative, set to 0
                    if new_price < 0:
                        new_price = 0
                    # write the new trade to json
                    self._write_trade(company, new_price, iter_date, ENV)
            
    
    def _generate_current_trade(self, ENV:Environment, company:Company):
        # overwrite self.start_date to now
        self.start_date = dt.now()
        # generate a trade
        # get the last trade
        last_trade = self._get_last_trade(company, ENV)
        # generate a new price
        new_price = round(last_trade["price"] + np.random.normal(-0.1, 5), 2)
        # generate a 10% chance of being a big move
        if np.random.randint(0, 10) == 0:
            # increased chance to generate a big move
            new_price = round(last_trade["price"] + np.random.uniform(-10, 10), 2)
        # if new price is negative, set to 0
        if new_price < 0:
            new_price = 0
        # generate a new timestamp
        new_timestamp = dt.now()
        # write the new trade to json
        self._write_trade(company, new_price, new_timestamp, ENV)
    
    def generate_live_trades(self, ENV:Environment, sleep_time=1):
        # for each existing company generate a new trade based on the last trade
        # generate a new trade every 5 seconds
        cont = True
        while cont:
            for company in self.CR._gen_companies():
                self._generate_current_trade(ENV, company)
            time.sleep(sleep_time)

In [5]:
ENV = Environment()
CR = CompanyRegister(ENV)
CR.generate_companies(5)

In [6]:
SX = StockExchange(CR)
SX.start_date = dt(2022, 11, 1, 9, 0, 0)
SX.end_date = dt(2022, 11, 27, 17, 0, 0)
SX.generate_historic_trades(ENV)

In [7]:
SX.generate_live_trades(ENV, sleep_time=0.1)

In [None]:
len(os.listdir(ENV.DIR_COMPANIES + "/FISH/trades"))