### Import libraries

In [1]:
import pandas as pd
import ipywidgets as widgets
from IPython.display import display

import requests

import datetime

### Get an accessible list of stock options from _AlphaVantage_

In [2]:
dropdown_df = pd.read_csv('listing_status.csv')

In [3]:
# Define a dropdown widget
dropdown_widget = widgets.Dropdown(options=dropdown_df['symbol'].unique().tolist())

# Define a function to update data based on selection
def dropdown_update(change):
    display(dropdown_df[dropdown_df['symbol'] == change])

# Connect the function to the dropdown widget
dropdown_widget.observe(dropdown_update, names='value')
# Display the dropdown widget and its corresponding row from the dataframe
display(dropdown_widget)

Dropdown(options=('A', 'AA', 'AAA', 'AAAU', 'AAC', 'AAC-U', 'AAC-WS', 'AACG', 'AACI', 'AACIU', 'AACIW', 'AACT'…

In [4]:
# Display the corresponding row from the dataframe
display(dropdown_df[dropdown_df['symbol'] == dropdown_widget.value])

Unnamed: 0,symbol,name,exchange,assetType,ipoDate,delistingDate,status
0,A,Agilent Technologies Inc,NYSE,Stock,1999-11-18,,Active


### Extract the data from the API to a csv file

In [5]:
demoKey = f"YSX1MOJ9YMQUJ6TI"
date    = datetime.datetime.now().strftime("%Y-%m-%d")
url     = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol={dropdown_widget.value}&outputsize=full&apikey={demoKey}&datatype=csv"
fname   = f"{dropdown_widget.value}_Daily_{date}.csv"

In [6]:
# Get the json data
response = requests.get(url)
#data = response.json()
with open(f"Downloads/{fname}", 'wb') as f:
    f.write(response.content)
print(f"Downloaded {fname}: {response.status_code}")

Downloaded A_Daily_2023-07-21.csv: 200


### Clean and process the extracted data using Apache Spark 
- filter out irrelevant data 
- perform transformations and store the resultant data in a data frame.

#### Use apache spark to read the csv file and remove irrelevant columns


In [7]:
# Entry point
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ETL").getOrCreate()

# Read the file from the dropdown_widget_value in the Downloads folder
df = spark.read.csv(f"Downloads/{fname}", header=True)
df.show(4)

23/07/21 20:01:53 WARN Utils: Your hostname, QTs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.139 instead (on interface en0)
23/07/21 20:01:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/21 20:01:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+--------------------+
|                   {|
+--------------------+
|    "Information"...|
|                   }|
+--------------------+



In [8]:
# Select the columns of interest from the dataframe df
df_interest = df.select("timestamp", "adjusted_close", "volume")
df_interest.show(8)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `timestamp` cannot be resolved. Did you mean one of the following? [`{`].;
'Project ['timestamp, 'adjusted_close, 'volume]
+- Relation [{#17] csv


23/07/21 20:02:10 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Explaining each column in the data
1. Open: The opening price of the stock for the day
2. High: The highest price of the stock for the day
3. Low: The lowest price of the stock for the day
4. Close: The closing price of the stock for the day
5. Adjusted Close: The closing price of the stock for the day, adjusted for corporate actions
6. Volume: The number of shares traded for the day
7. Dividend Amount: The dividend paid by the company for the day
8. Split Coefficient: The ratio of the stock split. 2 means 2 shares for the cost of 1

Currently, I'm interested in the Adjusted Close and the Volume which can be used to draw a fast conclusion on which stocks are profitable.

I don't feel the need for any transformations of the data. The only "transformations" necessary were filtering the dataframe to get the relevant columns, which has already been done.

However, after looking at the data, I realised spark saves this data as strings so I'll cast them to date, float and interger respectively.

In my minds eye, I see :
- The timestamp to be the x-axis timescale progression
- Adjusted_close can be the y-axis
- The volume can be the color scale so I'll need to trasform this column into a range in which to group these values

Seems like a plan!

In [None]:
from pyspark.sql.functions import when

# Modify the data types of the columns
df_interest = df_interest.withColumn("adjusted_close", df_interest["adjusted_close"].cast("float"))
df_interest = df_interest.withColumn("volume", df_interest["volume"].cast("integer"))
df_interest = df_interest.withColumn("timestamp", df_interest["timestamp"].cast("date"))
# Create a new column from the volume column by splitting the values into 5 equal bins
df_interest = df_interest.withColumn("volume_bins", when(df_interest["volume"] < 1000000, "< 1M")\
                                        .when((df_interest["volume"] >= 1000000) & (df_interest["volume"] < 2000000), "1M - 2M")\
                                        .when((df_interest["volume"] >= 2000000) & (df_interest["volume"] < 3000000), "2M - 3M")\
                                        .when((df_interest["volume"] >= 3000000) & (df_interest["volume"] < 4000000), "3M - 4M")\
                                        .otherwise(">= 4M"))

df_interest.printSchema()

# Convert the spark dataframe to a pandas dataframe
df_pandas = df_interest.toPandas()
df_pandas.head()

root
 |-- timestamp: date (nullable = true)
 |-- adjusted_close: float (nullable = true)
 |-- volume: integer (nullable = true)
 |-- volume_bins: string (nullable = false)



Unnamed: 0,timestamp,adjusted_close,volume,volume_bins
0,2023-07-14,119.75,1225454,1M - 2M
1,2023-07-13,119.349998,1229777,1M - 2M
2,2023-07-12,120.0,1595607,1M - 2M
3,2023-07-11,118.480003,1096749,1M - 2M
4,2023-07-10,118.360001,1889164,1M - 2M


### Create a connection to access Google BigQuery database

In [None]:
from google.cloud import bigquery
from google.oauth2 import service_account
# Create a connection to google bigquery

# Path to your service account key JSON file
service_account_key_path = "etl-1-393004-fd0ba98bf9de.json"

# Create credentials from the service account key
credentials = service_account.Credentials.from_service_account_file(
    service_account_key_path
)

# Create a BigQuery client
client = bigquery.Client(credentials=credentials)


In [None]:
#Perform operations on the client object
# Check for dataset existence
dataset_id  = "stock_market"
dataset_ref = client.dataset(dataset_id)
project_id  = "etl-1-393004"
table_id    = f"{dropdown_widget.value}_daily_{date}"
table_ref   = client.dataset(dataset_id).table(table_id)

try:
    client.get_dataset(dataset_ref)
    print("Dataset {} already exists".format(dataset_id))
except:
    print("Dataset {} is not found".format(dataset_id))
    dataset = bigquery.Dataset(dataset_ref)
    dataset.location = "US"
    dataset = client.create_dataset(dataset)
    print("Dataset {} created.".format(dataset_id))

Dataset stock_market already exists


In [None]:
# Check for table existence
table_exists = False
try:
    client.get_table(table_ref)
    table_exists = True
    print("Table {} already exists".format(table_id))
except:
    print("Table {} is not found".format(table_id))

# Create or recreate the table if it doesn't exist or if it exists but needs to be recreated
if not table_exists:
    table = bigquery.Table(table_ref)
    table = client.create_table(table)
    print("Table {} created.".format(table_id))
else:
    client.delete_table(table_ref)
    table = bigquery.Table(table_ref)
    table = client.create_table(table)
    print("Table {} recreated.".format(table_id))

Table A_daily_2023-07-17 already exists
Table A_daily_2023-07-17 recreated.


#### Load the processed data into target database table

In [None]:
# Print the table schema
table = client.get_table(table_ref)
# Write the df_pandas dataframe to this BigQuery table
client.load_table_from_dataframe(df_pandas, table_ref).result()

# Check table for data and print the table schema
print("Loaded {} rows and {} columns to {}".format(table.num_rows, len(table.schema), table_ref.path))

# Show the top 5 rows of the table
client.list_rows(table, max_results=5).to_dataframe()

Loaded 0 rows and 0 columns to /projects/etl-1-393004/datasets/stock_market/tables/A_daily_2023-07-17


Unnamed: 0,timestamp,adjusted_close,volume,volume_bins
0,2023-07-03,119.32,996224,< 1M
1,2023-04-24,138.221375,856654,< 1M
2,2023-04-14,138.940033,954261,< 1M
3,2023-03-28,132.166595,942618,< 1M
4,2023-02-15,151.219788,982411,< 1M


In [None]:
# Query the table to extract the data
query = "select * from `etl-1-393004.stock_market.{}`".format(table_id)

# Run the query
query_job = client.query(query)

# print("The query data:\nDate\tAdjusted Close\tVolume")
# for row in query_job:
#     print(f"{row[0]}\t{row[1]:.2f}\t{row[2]}\t{row[3]}")



The query data:
Date	Adjusted Close	Volume


23/07/17 13:36:29 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


2023-07-03	119.32	996224	< 1M
2023-04-24	138.22	856654	< 1M
2023-04-14	138.94	954261	< 1M
2023-03-28	132.17	942618	< 1M
2023-02-15	151.22	982411	< 1M
2023-02-14	152.51	895375	< 1M
2023-02-13	154.06	925379	< 1M
2023-02-10	152.02	934821	< 1M
2023-02-09	151.85	852195	< 1M
2023-02-06	151.71	771456	< 1M
2023-02-03	154.01	992821	< 1M
2023-01-27	155.15	646223	< 1M
2023-01-26	155.68	776264	< 1M
2023-01-25	155.22	736164	< 1M
2023-01-23	158.50	974865	< 1M
2023-01-13	156.37	921756	< 1M
2023-01-12	155.94	809123	< 1M
2022-12-30	149.13	699759	< 1M
2022-12-29	150.34	854392	< 1M
2022-12-28	147.35	784312	< 1M
2022-12-27	148.80	879543	< 1M
2022-12-23	148.49	779426	< 1M
2022-11-29	150.19	954556	< 1M
2022-11-25	156.18	642657	< 1M
2022-11-09	136.32	971246	< 1M
2022-10-28	138.08	901955	< 1M
2022-10-27	135.81	930704	< 1M
2022-08-29	127.26	867312	< 1M
2022-08-25	135.11	969054	< 1M
2022-08-12	132.66	865778	< 1M
2022-08-09	128.02	894724	< 1M
2022-08-05	132.72	675823	< 1M
2022-08-04	132.88	895554	< 1M
2022-07-15