## Data Pipeline Overview

### Key Features:

To me, a successful data pipeline should do the following:

1) Automate data extraction and wrangling. The work is done upfront.  Once code is final, little work is required to maintain it, depending on the data sources.  

2) Produce reusable code so others can access it. Custom python data pull and data wrangle modules make a user friendly API.  Creating useful paramters makes working with data much easier for analysis.  This is my preferred method for databases as well.

3) Produce datasets that are optimized for aggregations. In other words, datasets that can be loaded and used for analysis in other applications. I prefer to work with single flat files or data frame objects if resources permit.  Once DataPull functions are written and automated, the focus is primarily on doing analysis.  For this exercise, I automated the extraction and wrangling, and produced flat files that were exported for use in Tableau.  In general, this workflow is used in most data pipelines.  





https://public.tableau.com/app/profile/paul.witt2290/viz/FREDDASHBOARD/M1TimeSeries

For this exercise, I built several custom python modules. These modules could be made available on gitlabs.  


### Documentation
DataPull:
class DataPull.pull_data()

Pulls data from FRED API endpoints.  Automatically loops through each endpoint, pulls all available data, appends and merges them together as appropriate.  The result is one raw dataset that can be used for more in-depth analysis, including aggregations, data quality checks, and model inputs.  Also useful for stakeholders who just want raw data.

returns dataframe object of raw data.    

DataWrangle:

Cleans, formats and does other custom aggregations to the data pulled from FRED.  Cleans column headings, creates year and month columns.  This module is created separate from DataPull because there will be a continual need to add new functions in the future.  It is also good to keep raw data in its original format to troubleshoot and understand potential data issues.  
functions:

#### clean_data(data)
accepts dataframe object.  Cleans column headings, creates year and month columns.  


#### DQ_H6Measure(data,frequency,season_adj_short,year)

Parameters:
data: DataFrame object.

frequency: string - frequency of data requested: 'Monthly', 'Yearly', etc.

season_adj_short: string- provide seasonal adjustment parameter i.e. "NSA"

year: calendar year, default is None. Will return all years.

# Automation

For automation, I use the python schedule module.  It allows for scheduling jobs at time intervals of the developers choosing. https://schedule.readthedocs.io/en/stable/

The automation script incorporates different checks to help troubleshoot potential data issues.  Larger pulls can be scheduled at night or early morning.  Runs from terminal on windows, mac or linux

![alt text](https://github.com/pbwitt/Federal-Reserve-Econiomic-Data-Pipeline/blob/main/Automation%20Terminal%20.png)





In [None]:
from IPython.display import Image

Image(url="money-supply.jpeg")

In [None]:
import xmltodict
import requests
#df = pd.DataFrame(dict_data, columns=dict_data.keys())

#observations=pd.DataFrame(dict_data.get('observations', {}).get('observation'))
#observations['series_id']=str(v)

#appended_data_2.append(observations)

In [None]:
series_id_money=['MDLNM','CURRNS','M1NS']
appended_data_2 = []
import pandas as pd
try:
    for x ,v  in enumerate(series_id_money):
                #print(len(unique_series_id_list))
        #print('https://api.stlouisfed.org/fred/series/observations?series_id='+v+'&api_key=d0640df392d50e841ab2e3c22bf258ed')
        obervations = requests.get('https://api.stlouisfed.org/fred/series/observations?series_id='+v+'&api_key=d0640df392d50e841ab2e3c22bf258ed')
        

        dict_data = xmltodict.parse(obervations.content)
        df = pd.DataFrame(dict_data, columns=dict_data.keys())

        observations=pd.DataFrame(dict_data.get('observations', {}).get('observation'))
        observations['series_id']=str(v)

        appended_data_2.append(observations)
        #print(appended_data_2)
        
except:
    print(str(v) + " this series id did not read")
    pass

observations_data = pd.concat(appended_data_2)       
        

In [None]:
observations_data.series_id.unique()

# Data Wrangling and Analysis



### H.6 Money Stock Measures Data Requirements

An Analysis presented to the Federal Reserve Board 

Tasks to be completed:

Using the St. Louis FRED API, pull data on monthly, non-seasonally adjusted M1 and its sub-components from January 2022 through August 2022, inclusive. 

M1 is one of two money stock measures MPOA publishes on the H.6 statistical release.
The sub-components of M1 include currency, demand deposits, and other liquid deposits.

Data on monthly, non-seasonally adjusted M1 and its sub-components can be referenced from FRED’s H.6 Money Stock Measure, Release Table 3 (https://fred.stlouisfed.org/release/tables?rid=21&eid=1217611). 

Calculate M1 for each month in the above range by summing, for a given month, the M1 sub-components. 
Create an output CSV with the following information:


Col 1:  Date of M1 observation in yyyy-mm format

Col 2:  Pulled monthly, non-seasonally adjusted M1 value in $billions from FRED

Col 3:  Calculated monthly, non-seasonally adjusted M1 value in $billions constructed from M1 sub-components pulled from FRED

Col 4:  Difference between column 2 and column 3 values in $billions

We are expecting an output CSV containing nine rows (one row of labels and eight rows of M1 data) and four columns.  An example of the expected output for December 2021 would look like: 

date, pulled_m1, calculated_m1, difference

In [1]:
import DataPull as dp
import pandas as pd

In [2]:
fetch=dp.DataPull()

DataPull Class Invoked. This is a draft demonstration. Author Paul Witt.  Original Work.


In [3]:
result_set=fetch.data_pull()

pulling all releases
pulling observations
                   observations
@count                      765
@file_type                  xml
@limit                   100000
@observation_end     9999-12-31
@observation_start   1600-01-01
                   observations
@count                      765
@file_type                  xml
@limit                   100000
@observation_end     9999-12-31
@observation_start   1600-01-01
                   observations
@count                       29
@file_type                  xml
@limit                   100000
@observation_end     9999-12-31
@observation_start   1600-01-01
                   observations
@count                      909
@file_type                  xml
@limit                   100000
@observation_end     9999-12-31
@observation_start   1600-01-01
finished observations...merging datasets


In [4]:
result_set.series_id.unique()

array(['M1NS', 'DEMDEPNS', 'MDLNM', 'CURRNS'], dtype=object)

In [5]:
result_set

Unnamed: 0,@realtime_start,@realtime_end,@date,@value,series_id,id_x,realtime_start,realtime_end,title,observation_start,...,units_short,seasonal_adjustment,seasonal_adjustment_short,last_updated,popularity,group_popularity,notes,release_id,id_y,name
0,2022-10-26,2022-10-26,1959-01-01,142.2,M1NS,M1NS,2022-10-26,2022-10-26,M1,1959-01-01,...,Bil. of $,Not Seasonally Adjusted,NSA,2022-10-25 14:23:12-05,55,89,"Before May 2020, M1 consists of (1) currency o...",21,21,H.6 Money Stock Measures
1,2022-10-26,2022-10-26,1959-02-01,139.3,M1NS,M1NS,2022-10-26,2022-10-26,M1,1959-01-01,...,Bil. of $,Not Seasonally Adjusted,NSA,2022-10-25 14:23:12-05,55,89,"Before May 2020, M1 consists of (1) currency o...",21,21,H.6 Money Stock Measures
2,2022-10-26,2022-10-26,1959-03-01,138.4,M1NS,M1NS,2022-10-26,2022-10-26,M1,1959-01-01,...,Bil. of $,Not Seasonally Adjusted,NSA,2022-10-25 14:23:12-05,55,89,"Before May 2020, M1 consists of (1) currency o...",21,21,H.6 Money Stock Measures
3,2022-10-26,2022-10-26,1959-04-01,139.7,M1NS,M1NS,2022-10-26,2022-10-26,M1,1959-01-01,...,Bil. of $,Not Seasonally Adjusted,NSA,2022-10-25 14:23:12-05,55,89,"Before May 2020, M1 consists of (1) currency o...",21,21,H.6 Money Stock Measures
4,2022-10-26,2022-10-26,1959-05-01,138.7,M1NS,M1NS,2022-10-26,2022-10-26,M1,1959-01-01,...,Bil. of $,Not Seasonally Adjusted,NSA,2022-10-25 14:23:12-05,55,89,"Before May 2020, M1 consists of (1) currency o...",21,21,H.6 Money Stock Measures
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2463,2022-10-26,2022-10-26,2022-05-01,2180.9,CURRNS,CURRNS,2022-10-26,2022-10-26,Currency Component of M1,1947-01-01,...,Bil. of $,Not Seasonally Adjusted,NSA,2022-10-25 14:23:18-05,40,61,"The currency component of M1, sometimes called...",21,21,H.6 Money Stock Measures
2464,2022-10-26,2022-10-26,2022-06-01,2184.1,CURRNS,CURRNS,2022-10-26,2022-10-26,Currency Component of M1,1947-01-01,...,Bil. of $,Not Seasonally Adjusted,NSA,2022-10-25 14:23:18-05,40,61,"The currency component of M1, sometimes called...",21,21,H.6 Money Stock Measures
2465,2022-10-26,2022-10-26,2022-07-01,2185.4,CURRNS,CURRNS,2022-10-26,2022-10-26,Currency Component of M1,1947-01-01,...,Bil. of $,Not Seasonally Adjusted,NSA,2022-10-25 14:23:18-05,40,61,"The currency component of M1, sometimes called...",21,21,H.6 Money Stock Measures
2466,2022-10-26,2022-10-26,2022-08-01,2186.2,CURRNS,CURRNS,2022-10-26,2022-10-26,Currency Component of M1,1947-01-01,...,Bil. of $,Not Seasonally Adjusted,NSA,2022-10-25 14:23:18-05,40,61,"The currency component of M1, sometimes called...",21,21,H.6 Money Stock Measures


In [6]:
result_set[result_set.series_id=='M1NS'].head()

Unnamed: 0,@realtime_start,@realtime_end,@date,@value,series_id,id_x,realtime_start,realtime_end,title,observation_start,...,units_short,seasonal_adjustment,seasonal_adjustment_short,last_updated,popularity,group_popularity,notes,release_id,id_y,name
0,2022-10-26,2022-10-26,1959-01-01,142.2,M1NS,M1NS,2022-10-26,2022-10-26,M1,1959-01-01,...,Bil. of $,Not Seasonally Adjusted,NSA,2022-10-25 14:23:12-05,55,89,"Before May 2020, M1 consists of (1) currency o...",21,21,H.6 Money Stock Measures
1,2022-10-26,2022-10-26,1959-02-01,139.3,M1NS,M1NS,2022-10-26,2022-10-26,M1,1959-01-01,...,Bil. of $,Not Seasonally Adjusted,NSA,2022-10-25 14:23:12-05,55,89,"Before May 2020, M1 consists of (1) currency o...",21,21,H.6 Money Stock Measures
2,2022-10-26,2022-10-26,1959-03-01,138.4,M1NS,M1NS,2022-10-26,2022-10-26,M1,1959-01-01,...,Bil. of $,Not Seasonally Adjusted,NSA,2022-10-25 14:23:12-05,55,89,"Before May 2020, M1 consists of (1) currency o...",21,21,H.6 Money Stock Measures
3,2022-10-26,2022-10-26,1959-04-01,139.7,M1NS,M1NS,2022-10-26,2022-10-26,M1,1959-01-01,...,Bil. of $,Not Seasonally Adjusted,NSA,2022-10-25 14:23:12-05,55,89,"Before May 2020, M1 consists of (1) currency o...",21,21,H.6 Money Stock Measures
4,2022-10-26,2022-10-26,1959-05-01,138.7,M1NS,M1NS,2022-10-26,2022-10-26,M1,1959-01-01,...,Bil. of $,Not Seasonally Adjusted,NSA,2022-10-25 14:23:12-05,55,89,"Before May 2020, M1 consists of (1) currency o...",21,21,H.6 Money Stock Measures


In [7]:
series_id_money=['DEMDEPNS','MDLNM','CURRNS','M1NS']

In [None]:
#result_set.to_csv('../result_set.csv')

In [None]:
#result_set=pd.read_csv('../result_set.csv')

# Explore Data

Pull in the data wrangle module.  These modules are reuseable and can alway be applied to the data pulls.  

In [10]:
import importlib
import DataWrangle as DW

In [11]:
series=result_set['series_id'].unique().tolist()

In [12]:
#importlib.reload(DW)

Apply data clean module. Adds Month Year.  Should chage data types as well.  
There are records with non-integers in the vlaue column.  Requires more work to clean 

In [13]:
#before 
result_set.columns

Index(['@realtime_start', '@realtime_end', '@date', '@value', 'series_id',
       'id_x', 'realtime_start', 'realtime_end', 'title', 'observation_start',
       'observation_end', 'frequency', 'frequency_short', 'units',
       'units_short', 'seasonal_adjustment', 'seasonal_adjustment_short',
       'last_updated', 'popularity', 'group_popularity', 'notes', 'release_id',
       'id_y', 'name'],
      dtype='object')

In [14]:
#After
DW.DataWrangle.clean_data(result_set)

In [15]:
##apply data quality function. Takes in parameters that allows the logic to be applied to series from Fred. 
## Lots more could be done to customize this function.  # I call it a data quality function because I assume that is \
# what it is trying to achieve.  

In [16]:
## The function accepts parameters. very convient for others to use.  Very useful for filtering data.  
# Currently fuction only filters on NSA but that could be altered.   
results_2022=DW.DataWrangle.DQ_H6Measure(result_set,'Monthly','NSA',2022)

run DQ measure


In [17]:
results_2022

Unnamed: 0,date,Currency Component of M1,Demand Deposits,Other Liquid Deposits: Total,calculated_m1,pulled_m1,difference
0,2022-01-01,2134.1,4758.0,13657.3,20549.4,20549.3,0.1
1,2022-02-01,2142.3,4691.7,13694.0,20528.0,20528.1,-0.1
2,2022-03-01,2165.3,4811.9,13823.6,20800.8,20800.8,0.0
3,2022-04-01,2175.7,4878.8,13765.5,20820.0,20820.0,0.0
4,2022-05-01,2180.9,4934.5,13429.5,20544.9,20544.8,0.1
5,2022-06-01,2184.1,4949.4,13413.6,20547.1,20547.1,0.0
6,2022-07-01,2185.4,4965.8,13337.1,20488.3,20488.3,0.0
7,2022-08-01,2186.2,5189.7,13024.1,20400.0,20400.0,0.0
8,2022-09-01,2188.5,5119.5,12941.4,20249.4,20249.4,0.0


In [18]:
results_2022[['date','calculated_m1','pulled_m1','difference']]

Unnamed: 0,date,calculated_m1,pulled_m1,difference
0,2022-01-01,20549.4,20549.3,0.1
1,2022-02-01,20528.0,20528.1,-0.1
2,2022-03-01,20800.8,20800.8,0.0
3,2022-04-01,20820.0,20820.0,0.0
4,2022-05-01,20544.9,20544.8,0.1
5,2022-06-01,20547.1,20547.1,0.0
6,2022-07-01,20488.3,20488.3,0.0
7,2022-08-01,20400.0,20400.0,0.0
8,2022-09-01,20249.4,20249.4,0.0


In [None]:
##Pull all years.  This is a complete dataset.  
#The complete dataset can be sent to Tableau, models or other exporation software. 
DW.DataWrangle.DQ_H6Measure(result_set,'Monthly','NSA')

In [None]:
#DW.DataWrangle.DQ_H6Measure(result_set,'Monthly','NSA').to_csv("../complete_dataset.csv")


# Data Exploration Tableau

https://public.tableau.com/views/FREDDASHBOARDM1DataQualityMeasure/MoneyStockMeasureDash?:language=en-US&:display_count=n&:origin=viz_share_link

<div class='tableauPlaceholder' id='viz1666747305354' style='position: relative'><noscript><a href='#'><img alt='Federal Reserve Board Economic Data (FRED) ' src='https:&#47;&#47;public.tableau.com&#47;static&#47;images&#47;FR&#47;FREDDASHBOARDM1DataQualityMeasure&#47;MoneyStockMeasureDash&#47;1_rss.png' style='border: none' /></a></noscript><object class='tableauViz'  style='display:none;'><param name='host_url' value='https%3A%2F%2Fpublic.tableau.com%2F' /> <param name='embed_code_version' value='3' /> <param name='site_root' value='' /><param name='name' value='FREDDASHBOARDM1DataQualityMeasure&#47;MoneyStockMeasureDash' /><param name='tabs' value='no' /><param name='toolbar' value='yes' /><param name='static_image' value='https:&#47;&#47;public.tableau.com&#47;static&#47;images&#47;FR&#47;FREDDASHBOARDM1DataQualityMeasure&#47;MoneyStockMeasureDash&#47;1.png' /> <param name='animate_transition' value='yes' /><param name='display_static_image' value='yes' /><param name='display_spinner' value='yes' /><param name='display_overlay' value='yes' /><param name='display_count' value='yes' /><param name='language' value='en-US' /></object></div> 