In [2]:
import requests
import pandas as pd
from pyspark.sql import SparkSession

spark_session = SparkSession.builder \
    .appName("PySpark Stock Ohlcs") \
    .config("spark.jars", "/Users/matthewyip/bootcamp-python/bootcamp-python/postgresql-42.7.7.jar") \
    .getOrCreate()

jdbc_url = "jdbc:postgresql://localhost:5432/bootcamp_2504p"

connection_properties = {
    "user": "postgres",
    "password": "admin1234",
    "driver": "org.postgresql.Driver"
}

symbol="MSFT"
start_period="1657237004" # ~2022
end_period="1751931404" # ~2025
url="https://query1.finance.yahoo.com/v8/finance/chart/" + symbol \
  + "?period1=" + start_period \
  + "&period2=" + end_period \
  + "&interval=1d&events=history"
print(url)

headers = {
  "User-Agent": "Mozilla/5.0" # (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3
}
response = requests.get(url, headers=headers)
data = response.json()
# print(data)
# print(type(data))

result =data["chart"]["result"][0]  # MSFT

timestamp = pd.Series(result["timestamp"])
quote = result["indicators"]["quote"][0]


lows = pd.Series(quote["low"])
highs = pd.Series(quote["high"])
opens = pd.Series(quote["open"])
closes = pd.Series(quote["close"])
volumes = pd.Series(quote["volume"])

# Validation
# 1. count
isCountValid = (len(timestamp) == len(lows) == len(highs) == len(opens) == len(closes) == len(volumes))

if not isCountValid:
    print("Count invalid.")
    exit()
# 2. check for null values
# list -> Series -> isnull().any()
if timestamp.isnull().any() \
  or lows.isnull().any() \
  or highs.isnull().any() \
  or opens.isnull().any() \
  or closes.isnull().any() \
  or volumes.isnull().any():
  print("Data Invalid: Missing data.")
  exit()

converted_dates = pd.to_datetime(timestamp, unit="s").dt.date
# print(converted_dates)

# Many series -> A DataFrame
df = pd.DataFrame({
    "symbol": symbol,
    "date": converted_dates,
    "low": lows,
    "high": highs,
    "open": opens,
    "close": closes,
    "volume": volumes
})
# print(df.head())

# Spark session -> Spark DataFrame
spark_df = spark_session.createDataFrame(df)
# print(spark_df.show())


# write into database
spark_df.write.jdbc(
   url = jdbc_url,
   table = "stock_ohlcs",
   mode = "append",
   properties = connection_properties
)

https://query1.finance.yahoo.com/v8/finance/chart/MSFT?period1=1657237004&period2=1751931404&interval=1d&events=history


                                                                                