Build a pipeline to process stock loan data from several prime brokers where the datasets consist of columnar data (date, stockid, broker, measure_one, measure_two). The zip file from the broker is called 'dc-test-file-ingestion.zip' and you can assume it is in the FILEPATH (confirm by os.listdir) if you'd like). Design a data schema for storing this data, including any raw and derived tables you would like to use. Clean as many data errors as you can find so that the table is useful for machine learning.

Document your assumptions in the code. Your answer should in the form of a python function.

Finally, in words, describe a robust ingestion pipeline that can handle such messy data files, the infrastructure you would need to run this pipeline. What are the possible "real world" scenarios that you need to handle which are outside of the "academic" scenario where clean data comes daily? How would you detect these cases and handle them? Does this list match the data issues you see in the provided sample data? Include this as a comment block in your code.

In [1]:
"""
NOTES
- The basic data cleaning methods are only processed here. More sophisticated measures like checking for missing data using metadata & looking at spikes, confirming file structures
and data types should be added once the nuances of the data sets are understood.
- Additional derives stats should include: standard deviation of the measures, a count or display of the distinct stockid represented over time, a count or display of the
distinct broker represented over time.
- In response to describing a robust ingestion pipeline and how to handle different scenarios, I have added my answer to Q6, data monitoring as this covers the same aspects here.

ASSUMPTIONS
- apart from dropping NA & removing duplicates, we assume the data is clean, timely and correct.
- The zip file contains only one CSV file, and its name is unknown.
- We assume the CSV file is present in the zip archive.
- Assume the 'date' column is in datetime format, and 'stockid' and 'broker' columns are strings.

"""

import pandas as pd
import os
from zipfile import ZipFile
from io import BytesIO

def process_stock_loan_data(filepath='FILEPATH'):
    # Confirm the file names in the zip archive using os.listdir
    with ZipFile(os.path.join(filepath, 'dc-test-file-ingestion.zip'), 'r') as zip_ref:
        file_names = zip_ref.namelist()

    # Load the CSV file into a Pandas DataFrame
    with ZipFile(os.path.join(filepath, 'dc-test-file-ingestion.zip'), 'r') as zip_ref:
        with zip_ref.open(file_names[0]) as file:
            df = pd.read_csv(file)

    # Data Cleaning and Transformation
    df['date'] = pd.to_datetime(df['date'])
    df['stockid'] = df['stockid'].astype(str) 
    df['broker'] = df['broker'].astype(str)

    # Handle for missing values by dropping rows with any NaN values.
    df = df.dropna()

    # Handle for duplicates by keeping the first occurrence.
    df = df.drop_duplicates()

    # Schema: store the raw data in a table named 'raw_data'
    raw_data_table = df.copy()

    # Create a derived table that looks at the dates available per measure, you can duplicate this for measure two as well. 
    derived_measure_one_dates = raw_data_table.groupby(['measure_one','date']).agg(
        measure_one_dates=('date', 'min'),
        measure_one_dates=('date', 'max')
    ).reset_index()

    # Create a derived table that reports on each measures basis stats
    derived_measures_stats = raw_data_table.groupby(['date', 'stockid', 'broker']).agg(
        measure_one_mean=('measure_one', 'mean'),
        measure_one_mean=('measure_one', 'min'),
        measure_one_mean=('measure_one', 'max'),
        measure_two_mean=('measure_one', 'mean'),
        measure_two_mean=('measure_one', 'min'),
        measure_two_mean=('measure_one', 'max')
    ).reset_index()

    # Return the processed data
    return raw_data_table, derived_measures_stats, derived_measure_one_dates


"""
0. Use case

- What: Why is the data being procured
- Action: Understand the use case, so you can work backward to satisfy this at a minimum in the data lifecycle / pipeline setup. 

1. Sources

- What: Different Vendor/source level qualitative nuances, data summary. 
- Actions: Understand the competitive advantages, use cases, proof of concept studies and limitations of the different data vendors coming in from a qualitative 
perspective (including choice for symbology, coverage, mapping, any third party licensing requirements, ddq' etc).

2. Structure

- What: How is the data stored? Checking for file & scherma changes sytematically. Is it relational? Would multiple tables be required to form a complete update cycle? 
What is the format? e.g. parquet, json, csv etc.
- Actions: Bucket each data product inbound by the structures offered, understand if they are the best options offered per vendor (based on alternatives from 
the vendor and any third parties). Find a balance of opting for an optimized number of structures upstream to deal with. Are there any advantages or disadvantages 
in taking one type of structure per data product vs the alternatives offered? Look at utalising auto schema detection libraries or build one. Simple steps are crucial like 
checking for column count changes, column name changes, data type changes, deviation of the size of updates coming through

3. Delivery

- What: How is the data being distributed? eg files, sql/oracle updates, rest api, snowflake? Another proprietary third party 
- Actions: Optimize which delivery mechanism per data set provides a balance between timeliness, engineering resource requirements downstream, least IT/Infrastructure challenges, 
most secure based on company standards. If only a subset of the data is required, is there a smart api that can be used instead of taking receipt of the entire data set everyday. 
This should be use case based unless you can afford to take the entire dataset and store it (which may make sense in some instances where a data vendor charges extra to revisit history)

4. Reference & Metadata

- What: What reference data, metadata, symbology (id's) and any other complimentary data does each dataset offer
- Actions: Build the security master and mapping system to accommodate the collective reference data / symbology provided by each of the data sets so that 
upstream use cases across multiple datasets is possible & effective. 

5. Storage

- What: How will the data be stored? Will there be multiple levels of storage required based on different use cases? Does it comply with internal data governance? 
- Actions: Balance the different stages of data storage from raw -> data warehouse -> data lake. What users will be accessing from each level? Do you need a to impliment a
less frequently updated mirror for disaster recovery purposes? Ensure processes and storage upkeep data integrity, confidentiality, and compliance with data protection regulations. 
I am happy to elaborate on how I have staged data in past data lake projects. 

6. Updates

- What: How often does the dataset update per data product.
- Actions: Understand how often appends can happen, how often corrections can happen and how to accommodate for both in a timely manner. 
Understand what is the most optimal update frequency to satisfy the use case/s. Build a mechanism to take only new data rather than loading all the data everytime. Importantly, to preserve point in time,
keep historical corrections of data.

7. Sequencing

- What: From a set of data products, do some updates need to precede others? 
- Actions: Here we are looking to build pipelines with independent data sets at the front of the queue (eg security master, corporate actions, symbology,
instrument reference) and then followed by data sets that come to stick onto the security master system based on priority of importance driven by the 
analytics needs real time vs delayed reporting downstream. You should also invest in data orchestration tools to help manage sequencing and alerting of updates (eg apache airflow).

8. Quality & Remediation

- What: Making sure the timeliness, completeness & technical aspects are checked on any new data update.
- Actions: Start with basic technical data checks (types, changes to schema, missing data etc), then move to data family specific tests (e.g. spikes for time series data), 
then more sophisticated tests based on the data set (specific nuances). The checks will take longer on the initial history build and then apply systematic alerts prioritized by 
importance of upstream needs. Alerts can be in real time and or email -like alerts. Following this, there should be tools and documentation to help remediation. 

9. Upstream delivery

- What: What are the internal upstream delivery mechanisms (api vs direct analytics)? Are permissions put in place?
- Action: Build out of a different api's to allow multiple user types to discover, interact with and use the data. Making sure the right permissions are in place based on 
internal regulation and the data usage licensing. 

10. Consumer review

- What: Initial understanding and continual review of upstream use for data.
- Actions: It's key to review the use cases your data & pipelines are feeding, from improvements to timeliness, coverage and quality, continually working with 
upstream can help optimize many of the facets discussed here. 

11. Vendor review

- What: Initial understanding (independent and in conjunction with the data vendors product team) & continual review of the source data.
- Actions: The independent review of data should be done first without any biases to understand how the data behaves and how it updates, you can also connect with 
the data vendors product team following that to understand their SLA's on the data and delivery, continually proactive review of the source data will help you keep on top 
of changes to the data which may affect the pipeline process. Sign up to alerts for changes in the data published by the source but also do your own systematic checks independently.

12. Statistics

- What: Understanding data update patterns and changes over time
- Actions: Tools and libraries to help run periodic tests on data sets and update logs to understand SLA's on data. Useful for audits & commercial negotiations during contract renewals. Logging 
methods shoudl be robust to include errors or anomalies during the lifecycle of the data.

13. Documentation

- What: Notes on pipeline processes. 
- Actions: Create accessible Well typed, standardized & defined notes on pipelines & remediation. Make sure they are updated on a regular basis.
"""


SyntaxError: keyword argument repeated: measure_one_dates (2828043505.py, line 49)