## Pipeline Demo: Daily Stock Market Data Analytics *(Buildout and documentation in progress)*

#### End-to-end pipeline demonstrating the implementation of the following steps:
1. **Collect data from API and store in data lake**: Python, AWS CLI, AWS S3, AWS EC2 *(In progress)*
    * Download list of <a href="https://www.nasdaq.com/screening/companies-by-industry.aspx" target="blank">NASDAQ companies</a>.
    * Filter out companies for which to collect market data.
    * Query Daily Adjusted Stock Market Time Series <a href="https://www.alphavantage.co/documentation" target="blank">web service</a> for all specified companies.<br><br>
2. **Transform data in data lake** Python, PySpark, AWS EMR (Spark) *(Coming soon)*
3. **Import data into analytics columnar database**: AWS Redshift, SQL *(Coming soon)*
4. **Build pipeline orchestration & scheduling engine**: Python, Apache Airflow *(Coming soon)*
5. **Surface data to public using RESTful web API**: Python, Django *(Coming soon)*

### Get configurations.

Create function to get configuration value from configuration JSON file. By default, configuration file is located in the current working directory and named `configuration.json`.

In [1]:
def getConfigurationValue(configurationKey):

    # Import packages and functions.
    import json, os

    # Get current working directory.
    currentWorkingDirectory = os.getcwd()

    # Load configuration file.
    with open(os.path.join(currentWorkingDirectory, 'configuration.json'), 'r') as configurationFile:
        dictConfigurations = json.load(configurationFile)

    return(dictConfigurations[configurationKey])

**Test:** Print sample configuration values.

In [2]:
print('NASDAQCompaniesSourceURL:', getConfigurationValue('NASDAQCompaniesSourceURL'))
print('NASDAQCompaniesDestinationPathPart:', getConfigurationValue('NASDAQCompaniesDestinationPathPart'))

NASDAQCompaniesSourceURL: https://www.nasdaq.com/screening/companies-by-industry.aspx?exchange=NASDAQ&render=download
NASDAQCompaniesDestinationPathPart: Data/NASDAQCompanies.csv


### Programmatically download list of <a href="https://www.nasdaq.com/screening/companies-by-industry.aspx" target="blank">NASDAQ companies</a>.

Create function to download companies CSV into Pandas data frame.

In [3]:
def getNASDAQCompanies(downloadFile = False):
    # Import package(s)/function(s).
    import os, urllib
    import pandas as pd
    
    # Declare variables.

    # Get current working directory.
    currentWorkingDirectory = os.getcwd()

    # Get source URL for NASDAQ companies file.
    NASDAQCompaniesSourceURL = getConfigurationValue('NASDAQCompaniesSourceURL')
    
    # Get destination path part for NASDAQ companies file.
    NASDAQCompaniesDestinationPathPart = getConfigurationValue('NASDAQCompaniesDestinationPathPart')
    
    # Get destination path for NASDAQ companies file.
    NASDAQCompaniesDestinationPath = os.path.abspath(os.path.join(currentWorkingDirectory, NASDAQCompaniesDestinationPathPart))

    # Download data file, if so specified.
    if (downloadFile):
        # Remove destination file, it it exists.
        if(os.path.isfile(NASDAQCompaniesDestinationPath)):
            os.remove(NASDAQCompaniesDestinationPath)

        # Download file.
        downloadedFile, downloadedFileHeaders = urllib.request.urlretrieve(NASDAQCompaniesSourceURL,
                                                                           NASDAQCompaniesDestinationPathPart)

    # Create dfCompanies data frame from NASDAQ companies file.
    dfCompanies = pd.read_csv(NASDAQCompaniesDestinationPathPart)
    
    # Drop emppty last column created as a result of trailing comma.
    dfCompanies.drop(dfCompanies.columns[-1], axis=1, inplace=True)

    return(dfCompanies)

**Test:** Download companies CSV into Pandas data frame.

In [4]:
# dfCompanies = getNASDAQCompanies(downloadFile=True)
dfCompanies = getNASDAQCompanies()

print('dfCompanies count:', len(dfCompanies))

dfCompanies.head(10)

dfCompanies count: 3298


Unnamed: 0,Symbol,Name,LastSale,MarketCap,ADR TSO,IPOyear,Sector,Industry,Summary Quote
0,PIH,"1347 Property Insurance Holdings, Inc.",7.15,42791080.0,,2014.0,Finance,Property-Casualty Insurers,https://www.nasdaq.com/symbol/pih
1,PIHPP,"1347 Property Insurance Holdings, Inc.",24.2,0.0,,,,,https://www.nasdaq.com/symbol/pihpp
2,TURN,180 Degree Capital Corp.,1.9,59130970.0,,,Finance,Finance/Investors Services,https://www.nasdaq.com/symbol/turn
3,FLWS,"1-800 FLOWERS.COM, Inc.",12.05,777670600.0,,1999.0,Consumer Services,Other Specialty Stores,https://www.nasdaq.com/symbol/flws
4,FCCY,1st Constitution Bancorp (NJ),20.2,163868300.0,,,Finance,Savings Institutions,https://www.nasdaq.com/symbol/fccy
5,SRCE,1st Source Corporation,49.46,1329959000.0,,,Finance,Major Banks,https://www.nasdaq.com/symbol/srce
6,VNET,"21Vianet Group, Inc.",7.69,473742100.0,61604954.0,2011.0,Technology,"Computer Software: Programming, Data Processing",https://www.nasdaq.com/symbol/vnet
7,TWOU,"2U, Inc.",85.23,4492967000.0,,2014.0,Technology,Computer Software: Prepackaged Software,https://www.nasdaq.com/symbol/twou
8,JOBS,"51job, Inc.",85.44,3113106000.0,36436171.0,2004.0,Technology,Diversified Commercial Services,https://www.nasdaq.com/symbol/jobs
9,CAFD,8point3 Energy Partners LP,11.72,926919200.0,,2015.0,Public Utilities,Electric Utilities: Central,https://www.nasdaq.com/symbol/cafd


Create function to filter companies by Symbol or Name.

In [5]:
def getCompaniesByKeyword(dfCompanies, companyKeyword):
    companyKeyword = companyKeyword.lower()
    # Filter dfCompanies by Symbol or Name based on search argument.
    # Use vectorized string methods: lower() and contains()
    dfCompanies = dfCompanies[dfCompanies.Symbol.str.lower().str.contains(companyKeyword)
                              | dfCompanies.Name.str.lower().str.contains(companyKeyword)]
    
    return(dfCompanies)

**Test:** Filter companies by Symbol or Name.

In [6]:
# Filter by 'technologies'.
dfCompaniesFiltered = getCompaniesByKeyword(dfCompanies, 'technologies')

print('dfCompaniesFiltered count:', len(dfCompaniesFiltered))

dfCompaniesFiltered

dfCompaniesFiltered count: 67


Unnamed: 0,Symbol,Name,LastSale,MarketCap,ADR TSO,IPOyear,Sector,Industry,Summary Quote
50,AEY,"ADDvantage Technologies Group, Inc.",1.34,1.370283e+07,,,Consumer Services,Office Equipment/Supplies/Services,https://www.nasdaq.com/symbol/aey
51,IOTS,Adesto Technologies Corporation,7.25,1.551049e+08,,2015,Technology,Semiconductors,https://www.nasdaq.com/symbol/iots
62,ADVM,"Adverum Biotechnologies, Inc.",6.4,3.979090e+08,,2014,Health Care,Biotechnology: Biological Products (No Diagnos...,https://www.nasdaq.com/symbol/advm
89,AKAM,"Akamai Technologies, Inc.",70.19,1.193452e+10,,1999,Miscellaneous,Business Services,https://www.nasdaq.com/symbol/akam
95,AKTS,"Akoustis Technologies, Inc.",6.64,1.482094e+08,,,Public Utilities,Telecommunications Equipment,https://www.nasdaq.com/symbol/akts
115,AMOT,"Allied Motion Technologies, Inc.",39.08,3.684065e+08,,,Capital Goods,Electrical Products,https://www.nasdaq.com/symbol/amot
153,AETI,"American Electric Technologies, Inc.",1.0213,8.854314e+06,,,Energy,Industrial Machinery/Components,https://www.nasdaq.com/symbol/aeti
200,AGTC,Applied Genetic Technologies Corporation,4.2,7.604058e+07,,2014,Health Care,Biotechnology: Biological Products (No Diagnos...,https://www.nasdaq.com/symbol/agtc
209,AQB,"AquaBounty Technologies, Inc.",3.1,3.905551e+07,,,,,https://www.nasdaq.com/symbol/aqb
306,ACLS,"Axcelis Technologies, Inc.",25.8,8.289235e+08,,2000,Technology,Industrial Machinery/Components,https://www.nasdaq.com/symbol/acls


### Query Alpha Advantage's Daily Adjusted Stock Market Time Series <a href="https://www.alphavantage.co/documentation/#dailyadj" target="blank">web service</a> for all specified companies.

Create function to query web service to load daily adjusted time series for specified stock symbol into Pandas data frame. Optionally, save service response as JSON file.

In [7]:
def getAlphaAdvantageDailyAdjustedStockMarketTimeSeries(ticker, downloadFile = False):
    # Import package(s)/function(s).
    import os, json, requests
    import pandas as pd
    
    # Declare variables.

    # Get current working directory.
    currentWorkingDirectory = os.getcwd()

    # Get API Key.
    APIKey = getConfigurationValue('AlphaVantageAPIKey')
    
    # Get service request URL template.
    serviceRequestURLTemplate = getConfigurationValue('AlphaVantageTimeSeriesDailyAdjustedServiceRequestURLTemplate')
    
    # Get service request URL.
    serviceRequestURL = serviceRequestURLTemplate.replace('$symbol$', ticker).replace('$apiKey$', APIKey)
    
    # Get destination path part.
    destinationPathPart = getConfigurationValue('AlphaAdvantageTimeSeriesDailyAdjustedDestinationPathPart')

    # Get destination path.
    destinationPath = os.path.abspath(os.path.join(currentWorkingDirectory, destinationPathPart, ticker + '.json'))

    # Issue request to service. Get response.
    response = requests.get(serviceRequestURL)
    
    # Convert response to JSON format.
    JSONResponse = response.json()

    # Get time series dictionary from JSON.
    dictResponse = JSONResponse['Time Series (Daily)']
    
    # Specify column order to use for data frame.
    columns = ['open', 'high', 'low', 'close', 'adjustedClose', 'volume', 'dividend', 'split']
    
    # Construct data frame from dictionary.
    # Rename columns.
    # Use specified column order.
    dfMarketData = (pd.DataFrame
                    .from_dict(dictResponse, orient='index')
                    .rename(columns={'1. open':'open',
                                     '2. high':'high',
                                     '3. low':'low',
                                     '4. close':'close',
                                     '5. adjusted close':'adjustedClose',
                                     '6. volume':'volume',
                                     '7. dividend amount':'dividend',
                                     '8. split coefficient':'split'})
                   [columns])
    
    # Download data file, if so specified.
    if (downloadFile):
        # Remove destination file, it it exists.
        if(os.path.isfile(destinationPath)):
            os.remove(destinationPath)
            
        with open(destinationPath, 'w') as fileDestinationPath:
            json.dump(JSONResponse, fileDestinationPath, ensure_ascii=False, indent=0, sort_keys=True)

    # Return Pandas data frame.
    return(dfMarketData)

**Test:** Query web service to get daily adjusted time series for `Amazon`.

In [8]:
#getAlphaAdvantageDailyAdjustedStockMarketTimeSeries('AMZN', downloadFile=True)
getAlphaAdvantageDailyAdjustedStockMarketTimeSeries('AMZN').head(10)

Unnamed: 0,open,high,low,close,adjustedClose,volume,dividend,split
2000-01-03,81.5,89.56,80.0,89.38,89.38,16117600,0.0,1.0
2000-01-04,85.38,91.5,81.75,81.94,81.94,17487400,0.0,1.0
2000-01-05,70.75,75.13,69.63,71.75,71.75,38457400,0.0,1.0
2000-01-06,71.31,72.69,64.0,65.56,65.56,18752000,0.0,1.0
2000-01-07,67.0,70.5,66.19,69.56,69.56,10505400,0.0,1.0
2000-01-10,72.56,72.63,65.56,69.19,69.19,14757900,0.0,1.0
2000-01-11,66.88,70.0,65.0,66.75,66.75,10532700,0.0,1.0
2000-01-12,67.88,68.0,63.0,63.56,63.56,10804500,0.0,1.0
2000-01-13,64.94,67.19,63.13,65.94,65.94,10448100,0.0,1.0
2000-01-14,66.75,68.5,64.0,64.25,64.25,6853600,0.0,1.0


**Test:** Query web service to get daily adjusted time series for `Google`.

In [9]:
#getAlphaAdvantageDailyAdjustedStockMarketTimeSeries('GOOG', downloadFile=True)
getAlphaAdvantageDailyAdjustedStockMarketTimeSeries('GOOG').head(10)

Unnamed: 0,open,high,low,close,adjustedClose,volume,dividend,split
2014-03-27,568.0,568.0,552.92,558.46,558.46,13100,0.0,1.0
2014-03-28,561.2,566.43,558.67,559.99,559.99,41100,0.0,1.0
2014-03-31,566.89,567.0,556.93,556.97,556.97,10800,0.0,1.0
2014-04-01,558.71,568.45,558.71,567.16,567.16,7900,0.0,1.0
2014-04-02,565.106,604.83,562.19,567.0,567.0,146700,0.0,1.0
2014-04-03,569.85,587.28,564.13,569.74,569.74,5085200,0.0,1.0
2014-04-04,574.65,577.77,543.0,543.14,543.14,6351900,0.0,1.0
2014-04-07,540.74,548.48,527.15,538.15,538.15,4389600,0.0,1.0
2014-04-08,542.6,555.0,541.61,554.9,554.9,3142600,0.0,1.0
2014-04-09,559.62,565.37,552.95,564.14,564.14,3321700,0.0,1.0
