In [1]:
import pyspark.sql.functions as psf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import time
import requests
import pandas as pandas
from dotenv import load_dotenv
import os
from helpers import request_prices_polygon, round_cols
from pathlib import Path


spark = SparkSession.builder.getOrCreate()

# Load environment variables from .env file
load_dotenv()
# Access the API key
api_key = os.getenv("API_KEY")

# Read in csv and replace FB and ANTM tickers (they have changed)
df_csv = spark.read.csv("./input_data/stocks.csv", header=True, sep=',') \
    .withColumn('symbol', psf.regexp_replace('symbol', 'FB', 'META')) \
    .withColumn('symbol', psf.regexp_replace('symbol', 'ANTM', 'ELV')) \
    .withColumn('initial_investment', psf.lit(10000))



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/08 21:04:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [54]:
df = spark.read.parquet('./outputs/polygon_stock_data.parquet')
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
df.orderBy('company_name').show()
# test that pulling new data only works 
# df_mod = df.withColumn("date", psf.expr("CASE when date=='2024-02-07' THEN to_date('2024-02-06') else date end"))
# df_mod.write.mode('overwrite').partitionBy('date').format('parquet').save('./outputs/polygon_stock_data.parquet')

+--------------------+------+------------------+------+----------+
|        company_name|symbol|initial_investment| price|      date|
+--------------------+------+------------------+------+----------+
|          3M Company|   MMM|           10000.0| 93.84|2024-02-06|
|          3M Company|   MMM|           10000.0|162.41|2022-02-08|
|           AT&T Inc.|     T|           10000.0| 17.33|2024-02-06|
|           AT&T Inc.|     T|           10000.0| 23.94|2022-02-08|
|         AbbVie Inc.|  ABBV|           10000.0|175.01|2024-02-06|
|         AbbVie Inc.|  ABBV|           10000.0|143.51|2022-02-08|
| Abbott Laboratories|   ABT|           10000.0|113.31|2024-02-06|
| Abbott Laboratories|   ABT|           10000.0|128.65|2022-02-08|
|Accenture Plc Cla...|   ACN|           10000.0|366.65|2024-02-06|
|Accenture Plc Cla...|   ACN|           10000.0|345.07|2022-02-08|
|  Adobe Incorporated|  ADBE|           10000.0|615.85|2024-02-06|
|  Adobe Incorporated|  ADBE|           10000.0|511.31|2022-02

In [41]:
def compare_share_prices(df):
    print("comparing the share prices across dates")
    max_date = df.select(psf.max("date")).collect()[0][0]
    min_date = df.select(psf.min("date")).collect()[0][0]
    max_window = window = Window.partitionBy("initial_investment").orderBy(psf.col('change_percentage').desc())
    
    df_compare = df.filter(f"date ='{max_date}' or date ='{min_date}'") \
        .groupBy(["company_name", "symbol", "initial_investment"]).pivot("date").sum('price') \
        .withColumn('no_shares', psf.col('initial_investment')/psf.col(f'{min_date}')) \
        .withColumn('current_value', psf.col('no_shares')*psf.col(f'{max_date}')) \
        .withColumn('change_percentage', (psf.try_subtract(f'{max_date}', f'{min_date}'))/psf.col(f'{min_date}')*100) \
        .withColumn("rank", psf.rank().over(max_window)) 
    
    df_compare = round_cols(df_compare, cols=[x for x in df_compare.columns if x not in ('company_name', 'symbol', 'date', 'rank')])
    print(max_date, min_date)

    print(f"writing comparison outputs to: ./outputs/polygon_stock_comparison.csv")
    pdf = df_compare.toPandas()
    pdf.to_csv('./outputs/polygon_stock_comparison.csv', sep=',', encoding='utf-8', index=False)

    # sum of all the current investments' value
    sum_of_investments = df_compare.groupBy().sum().collect()
    greatest_relative_inrease = df_compare.filter("rank==1").collect()[0]
    initial_investment_total = sum_of_investments[0]['sum(initial_investment)']
    current_investment_total = sum_of_investments[0]['sum(current_value)']
    
    # create path if doesnt exist
    Path("./outputs").mkdir(parents=True, exist_ok=True)

    
    # write answers to txt file
    print(f"writing comparison results to: ./outputs/results.txt")
    with open('./outputs/results.txt', 'w') as f:
        print('Stock price comparison\n', f'Current date: {current_date}\n', f'Start date: {old_date}\n' , file=f)
    
    with open('./outputs/results.txt', 'a') as f:
      print("Greatest relative increase: ", greatest_relative_inrease['company_name'], f"({greatest_relative_inrease['symbol']})",file=f)
      print("Growth %: ", greatest_relative_inrease['change_percentage'],file=f)
      print("Gross profit %: ", greatest_relative_inrease['current_value']-greatest_relative_inrease['initial_investment'], '\n',file=f)
      
      print("Initial total investment: ", initial_investment_total, file=f)
      print("Current total investment (growth): ", current_investment_total, file=f)
        
    df_compare.show()
# df_compare.groupBy("company_name").pivot("date").count().show()

In [12]:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")


current_date = (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d')
old_date = (datetime.today() - relativedelta(years=2)).strftime('%Y-%m-%d')

select_expression = ["company_name", "symbol", "cast(initial_investment as double) as initial_investment", "CAST(date as date) as date", "CAST(price as double) as price"]

if (Path.cwd() / 'outputs' / 'polygon_stock_data.csv').exists():
    df_polygon_data = spark.read.csv("./outputs/polygon_stock_data.csv", header=True, inferSchema=True, sep=',')
    max_date = df_polygon_data.select(psf.max("date")).collect()[0][0]
    min_date = df_polygon_data.select(psf.min("date")).collect()[0][0]

    if datetime.strptime(current_date, '%Y-%m-%d').date() > max_date:
        print("pulling new data available, requesting now...")
        dict_new = request_prices_polygon(stocks_dict=stocks_dict, date_period=current_date, api_key=api_key)
        df_new = spark.createDataFrame(dict_new).selectExpr(select_expression)
        df_new.write.mode('overwrite').format('parquet').save('./outputs/polygon_stock_data.parquet')
    
    if datetime.strptime(old_date, '%Y-%m-%d').date() < min_date:
        print("Older data available, requesting now...")
        dict_old = request_prices_polygon(stocks_dict=stocks_dict, date_period=old_date, api_key=api_key)
        df_old = spark.createDataFrame(dict_old).selectExpr(select_expression)
        df_old.write.mode('overwrite').format('parquet').save('./outputs/polygon_stock_data.parquet')

    pri
    df = spark.read.parquet('./outputs/polygon_stock_data.parquet')
    
    pdf = df.toPandas()
    pdf.to_csv('./outputs/polygon_stock_data.csv', sep=',', encoding='utf-8')

In [13]:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

In [20]:
select_expression = ["company_name", "symbol", "cast(initial_investment as double) as initial_investment", "CAST(date as date) as date", "CAST(price as double) as price"]
df_polygon_data = spark.read.csv("./outputs/polygon_stock_data.csv", header=True, sep=',').selectExpr(select_expression)
print(df_polygon_data)
df_polygon_data.show()

DataFrame[company_name: string, symbol: string, initial_investment: double, date: date, price: double]
+--------------------+------+------------------+----------+------+
|        company_name|symbol|initial_investment|      date| price|
+--------------------+------+------------------+----------+------+
|          Apple Inc.|  AAPL|           10000.0|2024-02-07|189.41|
|Microsoft Corpora...|  MSFT|           10000.0|2024-02-07|414.05|
|     Amazon.com Inc.|  AMZN|           10000.0|2024-02-07|170.53|
|           Tesla Inc|  TSLA|           10000.0|2024-02-07|187.58|
|Alphabet Inc. Cla...| GOOGL|           10000.0|2024-02-07|145.54|
|Alphabet Inc. Cla...|  GOOG|           10000.0|2024-02-07|146.68|
|Berkshire Hathawa...| BRK.B|           10000.0|2024-02-07|397.66|
|   Johnson & Johnson|   JNJ|           10000.0|2024-02-07|157.98|
|UnitedHealth Grou...|   UNH|           10000.0|2024-02-07|519.39|
|  NVIDIA Corporation|  NVDA|           10000.0|2024-02-07|700.99|
|Meta Platforms In...|  ME

In [48]:
df_test = spark.read.parquet('./outputs/polygon_stock_data.parquet')
df_test.groupBy(['date', 'company_name']).count().filter('count>1').show()

duplicates = df_test.groupBy(['date', 'company_name']).count().filter('count>1').count()
print(duplicates==0)

+----+------------+-----+
|date|company_name|count|
+----+------------+-----+
+----+------------+-----+

True
