This notebook shows how the transformations for the stock files could have been completed leveraging Spark

In [26]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf      # sf = spark functions
import pyspark.sql.types as st          # st = spark types
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud.exceptions import NotFound
import os

sparkql = pyspark.sql.SparkSession.builder.master('local').getOrCreate()

data_dir = '../data'

file_names = ['ADBE','AMZN', 'CRM', 'CSCO', 'GOOGL', 'IBM','INTC','META','MSFT','NFLX','NVDA','ORCL','TSLA'] #excluded AAPL to start df

columns = 'stock_name string, date string, open float, high float, low float, close float, adj_close float, volume int' #schema to use

df = sparkql.read.csv(os.path.join(data_dir,'AAPL.csv'), header=True)
df = df.toDF('date', 'open', 'high', 'low', 'close', 'adj_close', 'volume') #rename the columns
df = df.withColumn('stock_name', sf.lit('AAPL')) #add column with stock name

#create composite key
df.createOrReplaceTempView("key") 
df = sparkql.sql("SELECT CONCAT(stock_name, date) AS sd_id, stock_name, date, open, high, low, close, adj_close, volume FROM key")
df = df.select('sd_id','stock_name', 'date', 'open', 'high', 'low', 'close', 'adj_close', 'volume')

#create df for each csv, transform it and then consolidate into one dataframe
for csv in file_names:
    idf = sparkql.read.csv(os.path.join(data_dir,csv+'.csv'), header=True)
    idf = idf.toDF('date', 'open', 'high', 'low', 'close', 'adj_close', 'volume') #rename the columns
    idf = idf.withColumn('stock_name', sf.lit(csv)) #add column with stock name

    #create composite key
    idf.createOrReplaceTempView("key") 
    idf = sparkql.sql("SELECT CONCAT(stock_name, date) AS sd_id, stock_name, date, open, high, low, close, adj_close, volume FROM key")
    idf = idf.select('sd_id','stock_name', 'date', 'open', 'high', 'low', 'close', 'adj_close', 'volume')
    
    #concat to df
    df = df.union(idf)

# Write to parquet file. Used coalesce in order to have one parquet file
df.coalesce(1).write.format("parquet").save(os.path.join(data_dir,'all_tech_stocks.parquet'))

Below are sections where I used the Jupyter notebooks to test out the code

In [9]:
import pandas as pd

from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud.exceptions import NotFound
import os
from datetime import datetime

data_dir = '../data'

file_names = ['AAPL','ADBE','AMZN', 'CRM', 'CSCO', 'GOOGL', 'IBM','INTC','META','MSFT','NFLX','NVDA','ORCL','TSLA'] #excluded AAPL to start df
    
old_names = ['Date','Open','High','Low','Close','Adj Close','Volume']
new_names = ['date', 'open', 'high', 'low', 'close', 'adj_close', 'volume']
    
rename_dict = {item[0]:item[1] for item in zip(old_names,new_names)}

li = []

for file in file_names:
    idf = pd.read_csv(os.path.join(data_dir,f'{file}.csv'),header=0)
    #rename the columns
    idf = idf.rename(columns=rename_dict)
    #insert column with the stock name
    idf['date'] = pd.to_datetime(idf['date'], format='%Y-%m-%d')

    idf.insert(0,'day', idf['date'].dt.day)
    idf.insert(0,'month', idf['date'].dt.month)
    idf.insert(0,'year', idf['date'].dt.year)
    
    idf.insert(0,'stock_name', file)
    #insert column with composite key
    idf.insert(0,'sd_id', idf['stock_name']+idf['date'].astype(str))

    print(idf)
    li.append(idf)

df.to_parquet(os.path.join(data_dir,'all_stocks.parquet'))



    



               sd_id stock_name  year  month  day       date        open  \
0     AAPL2010-01-04       AAPL  2010      1    4 2010-01-04    7.622500   
1     AAPL2010-01-05       AAPL  2010      1    5 2010-01-05    7.664286   
2     AAPL2010-01-06       AAPL  2010      1    6 2010-01-06    7.656429   
3     AAPL2010-01-07       AAPL  2010      1    7 2010-01-07    7.562500   
4     AAPL2010-01-08       AAPL  2010      1    8 2010-01-08    7.510714   
...              ...        ...   ...    ...  ...        ...         ...   
3266  AAPL2022-12-22       AAPL  2022     12   22 2022-12-22  134.350006   
3267  AAPL2022-12-23       AAPL  2022     12   23 2022-12-23  130.919998   
3268  AAPL2022-12-27       AAPL  2022     12   27 2022-12-27  131.380005   
3269  AAPL2022-12-28       AAPL  2022     12   28 2022-12-28  129.669998   
3270  AAPL2022-12-29       AAPL  2022     12   29 2022-12-29  127.989998   

            high         low       close   adj_close     volume  
0       7.660714    7

In [27]:
PROJECT_NAME = 'team-week-3'
DATASET_NAME = 'tech_stocks_world_events'

#create bigquery client
client = bigquery.Client()

#create dataset_id and table_ids
dataset_id = f"{PROJECT_NAME}.{DATASET_NAME}"
table_id = f"{PROJECT_NAME}.{DATASET_NAME}.stocks"

data_dir = '../data/all_tech_stocks.parquet'

#rename the parquet file in order to load to BigQuery
parq = '.snappy.parquet'
for file_name in os.listdir(data_dir):
    source = data_dir + file_name
    if parq in source and '.crc' not in source:
        os.rename(os.path.join(data_dir,file_name),os.path.join(data_dir,'stocks.parquet'))
    
#filepath to get loaded to BigQuery
DATA_FILE = os.path.join(data_dir,'stocks.parquet')

TABLE_SCHEMA = [
    bigquery.SchemaField('sd_id', 'STRING', mode='REQUIRED'),
    bigquery.SchemaField('stock_name', 'STRING', mode='NULLABLE'),
    bigquery.SchemaField('date', 'DATE', mode='NULLABLE'),
    bigquery.SchemaField('open', 'FLOAT', mode='NULLABLE'),
    bigquery.SchemaField('high', 'FLOAT', mode='NULLABLE'),
    bigquery.SchemaField('low', 'FLOAT', mode='NULLABLE'),
    bigquery.SchemaField('close', 'FLOAT', mode='NULLABLE'),
    bigquery.SchemaField('adj_close', 'STRING', mode='NULLABLE'),
    bigquery.SchemaField('volume', 'INTEGER', mode='NULLABLE'),
    ]

def create_dataset():
    if client.get_dataset(dataset_id) == NotFound:
        dataset = bigquery.Dataset(dataset_id)
        dataset.location = "US"
        dataset = client.create_dataset(dataset, exists_ok=True)
    else:
        pass

def create_stocks_table():
    job_config = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.PARQUET,
            autodetect=True,
            create_disposition='CREATE_NEVER',
            write_disposition='WRITE_TRUNCATE',
            ignore_unknown_values=True,
        )
    table = bigquery.Table(table_id, schema=TABLE_SCHEMA)
    table = client.create_table(table, exists_ok=True)

    with open(DATA_FILE, "rb") as source_file:
        job = client.load_table_from_file(source_file, table_id, job_config=job_config)

    job.result()

create_dataset()
create_stocks_table()

In [188]:
data_dir = '../data/all_tech_stocks.parquet'
filepath = '*.snappy.parquet'
test = sparkql.read.parquet(os.path.join(data_dir,filepath))

test.show()

+--------------+----------+----------+--------+--------+--------+--------+---------+---------+
|         sd_id|stock_name|      date|    open|    high|     low|   close|adj_close|   volume|
+--------------+----------+----------+--------+--------+--------+--------+---------+---------+
|TSLA2010-06-30|      TSLA|2010-06-30|1.719333|2.028000|1.553333|1.588667| 1.588667|257806500|
|TSLA2010-07-01|      TSLA|2010-07-01|1.666667|1.728000|1.351333|1.464000| 1.464000|123282000|
|TSLA2010-07-02|      TSLA|2010-07-02|1.533333|1.540000|1.247333|1.280000| 1.280000| 77097000|
|TSLA2010-07-06|      TSLA|2010-07-06|1.333333|1.333333|1.055333|1.074000| 1.074000|103003500|
|TSLA2010-07-07|      TSLA|2010-07-07|1.093333|1.108667|0.998667|1.053333| 1.053333|103825500|
|TSLA2010-07-08|      TSLA|2010-07-08|1.076000|1.168000|1.038000|1.164000| 1.164000|115671000|
|TSLA2010-07-09|      TSLA|2010-07-09|1.172000|1.193333|1.103333|1.160000| 1.160000| 60759000|
|TSLA2010-07-12|      TSLA|2010-07-12|1.196667|1.2