In [1]:
import pandas as pd
from matplotlib import pyplot as plt
import warnings
warnings.filterwarnings('ignore')
import calendar

In [2]:
ALL_AUCTION_DATA = 'psa_card/data/allauctionprices.csv'
TRANSACTION_DATA = 'psa_card/data/transaction.csv'
POPULATION_DATA = 'psa_card/data/population_report.csv'
CERTIFICATE_DATA = 'psa_card/data/certificate.csv'

### Step 1 : Parse all available auction data and store it in file path: ALL_AUCTION_DATA

In [3]:
all_auction_data = pd.read_csv(ALL_AUCTION_DATA, dtype=str)
# Generating year column
all_auction_data[all_auction_data.columns] = all_auction_data.apply(lambda x: x.str.strip())
all_auction_data['year'] = all_auction_data['name'].str[:4].astype('int')

FileNotFoundError: [Errno 2] No such file or directory: 'psa_card/data/allauctionprices.csv'

In [None]:
# Visualizing auction data - head
all_auction_data.head(10)

In [None]:
# Visualizing auction data - tail
all_auction_data.tail(10)

In [None]:
# Record Aggregation across years
all_auction_data_grouped_year = all_auction_data.groupby('year').size().reset_index(name='count')

plt.figure(figsize=(15, 5))
plt.bar(all_auction_data_grouped_year['year'], all_auction_data_grouped_year['count'])
plt.xlabel('year')
plt.ylabel('Total Auction Listings')
plt.title('Auction listings across years')
plt.show()

### Discussion:
Clearly our auction listing has some antique collections from 1948 and 1971, while the majority of listings are in between 1990 and 1994

### Step 2 : 
Using the URL parsed (Ex: https://www.psacard.com/auctionprices/basketball-cards/1991-little-basketball-big-leaguers/larry-bird/summary/3232640), our downstream task collects the dollar amount across grades associated with a collection marked by year, league, and player (Example, in the above URL, the year is 1991, and the league is LITTLE BASKETBALL BIG LEAGUERS and athlete mentioned is Larry Bird. Here, 2 specific scenarios arise across each row of the card grades: (A) Statistical dollar value of the card is arranged by metrics such as Most recent price($), Average Price($), SMR Price($), 'Population', 'Pop Higher'. All the dollar value reporting metrics (marked with a dollar sign) may either be empty or have a value with/without outward links. For those with outward links - we can successfully retrieve for a given basketball player across grades, fine-grained details, (B) For the population, we can retrieve across leagues, available cards alongside important across sets. The associated card number from this page will be important in our downstream tasks. I have modeled both A, B in two RDBMS tables: population_report and transaction table. 

In [None]:
population_report = pd.read_csv(POPULATION_DATA, dtype=str)
population_report[population_report.columns] = population_report.apply(lambda x: x.str.strip())

In [None]:
# Visualizing head of the Population report table
population_report.head(10)

In [None]:
# Visualizing tail of the Population report table
population_report.tail(10)

### Discussions
The above population report table garners every piece of information available from the outward link (ex: https://www.psacard.com/pop/basketball-cards/1991/little-basketball-big-leaguers/177415) on the summary page. The foreign key columns `{fk_name, fk_url, fk_count, fk_category}` are inherited from the parent table information and can essentially be used to join back with the `all_auction_data` table. Furthermore, population_id on this table is sometimes shared among multiple players, i.e., in this table, two or more tables can have the same `population_id` mirroring the state of the page, they have been crawled from. Quantitative metrics {1, ..., 10, Total} carries the numeric score attached with Grade, +, and Q variables. They can be easily dispersed as a different column to achieve ease in analytical computation in exchange for increased space, or they can be denormalized into a different table. The argument behind the choice has been mentioned in the summary discussions which reflects my though around distributed system design concepts and denormalized data.   

In [None]:
transaction = pd.read_csv(TRANSACTION_DATA, dtype=str)
transaction[transaction.columns] = transaction.apply(lambda x: x.str.strip())

In [None]:
# Visualizing head of the Transaction table
transaction.head(10)

In [None]:
# Visualizing tail of the Transaction table
transaction.tail(10)

### Discussions
The transaction table crawls every bit of information from the outward link describing the fine-grained details of the transaction across grades. The page associates a given transaction to multiple certificates (with their own certificate ids) across grades. The treatment of the columns `{fk_name, fk_url, fk_count, fk_category}` as a foreign key as a utility to join back to both to population_report and all_auction_table. The `cert` column carries the unique certificate id which is in turn crawled in the downstream. 

In [None]:
certificate = pd.read_csv(CERTIFICATE_DATA, dtype=str)
certificate[certificate.columns] = certificate.apply(lambda x: x.str.strip())

In [None]:
# Visualizing head of the certificate table 
certificate.head(10)

In [None]:
# Visualizing tail of the certificate table 
certificate.tail(10)

### Discussions: 
The certificate table contains fine-grained certificate table information and can be tied back using the `cert` column using the upstream table. The quantitative assessment columns such as `price` can be converted to dollars using the below-mentioned dataframe transform or in the upward analytical storage columns. The data-modeling has been kept highly flexible and can be re-modeled in ETL or migration to analytical data warehouses for all of the tables. 

In [None]:
certificate['price_in_dollars'] = certificate['price'].str.replace('$', '').astype('float')
certificate_player_contribution = certificate[['player', 'price_in_dollars']].groupby('player').sum(\
                                                                                                   ).reset_index().sort_values('price_in_dollars', ascending=False)
plt.figure(figsize=(15, 5))
plt.bar(certificate_player_contribution['player'], certificate_player_contribution['price_in_dollars'])
plt.xlabel('Team/Player Name')
plt.ylabel('Total Contribution')
plt.title('Total U.S. Dollar contribution of team/player in PSACARD marketplace')
plt.show()

# Pipelining Code

The attached zip with this report contains the scripts used in pipeling the jobs. There are two schemes of running this file. <br/>
<b>(A) From the terminal, using the entry-point meant to assimilate the code. </b>
![Demo of the terminal screen](res/Terminal_Demo.PNG)


<b> Using Schedulers - Airflow </b> <br/>
![Demo of the Airflow (Part - 1)](res/Airflow_Demo1.PNG)

### Using Schedulers (Part-2)
![Demo of the Airflow (Part - 2)](res/Airflow_Demo2.PNG)

### Runtime compelexity discussion
![Runtime Complexity](res/Airflow_Demo3.PNG)

N.B: All of these images are in the res folder in the attached zip, should you find some of them hazy. I have also attached the airflow DAG configuration which can be used to direcly run the complete pipeline on another system.  

## Architecture Complexity
The above image quantitatively reports the fraction of the total time taken by the complete pipeline which has been used by each of the individual job-tasks. While refresh_auction_listing took around 8.5 seconds, around 10 minutes was spent on the immediate downstream task populate_transaction_tables and a little over 5 minutes by the final task in the pipeline. While, the pipeline in its current design, made use of nothing more than a single processor core, and populate_transaction_tables had a lot of heavy-lifting assigned to it. An immediate performance optimization (from an engineering standpoint) can be leveraged using multiprocessing or map-reduce over sharding splits. In the first strategy, for an available 'N' number of cores, the code would parallelly perform crawling for N tasks rather than doing them sequentially, which would enable us to harness more efficient utilization of the CPU time. Assuming, an average time spent of 'y' in each crawl given 'N' - parallel processing, we might be able to reach close to an ideal value of `yn/N` architectural boost, where 'n' is the total number of independent crawls performed. In the second strategy, given our understanding of the current pipeline that the downstream task of `populate_transaction_tables` doesn't start until all the available listings are fetched by `refresh_auction_listing`. We can essentially map the domain of our current problem to an effective distributed map-reduced strategy, wherein for a given 'N' task-nodes (splits/available worker nodes), we can employ an efficient distribution scheme of assigning equal tasks (or, loads) to the worker nodes. One nearly fair would be sharding, where we can essentially use the mod of an ordinary hash function to assign each split their respective worker nodes. Each worker node would then independently crawl their quota of data-rows for the two tables, which we could assimilate it back to the file system.

One of the strong motivations to go with denormalized data over normalizations across multiple tables was also strongly motivated by the concerns that the present architecture crawls data over the internet into the raw or first landing point of the data lake. In Big Data ecosystems, we strongly want the data to be as much denormalized as efficient as that would enable us to lower our costs of read-replications, sharding, and data storage. Further, the treatments of such raw layers are very diverse around teams and business structures. There are applications wherein expensive table-joins are averted by maintaining a denormalized dataset (which adds to the storage complexity and storage costs) as a tradeoff for cheaper, faster compute, and faster access across data-field. In contrast, certain applications motivate consistency and may not be expecting to use too many joins or a wider data-view. Such concerns are efficiently handled by migrating the data to built-to-purpose data warehouse or lake, wherein application/domain recommendation treatments to the data model and schema further aid to the elegance of engineering sophisticated data pipelines

## Advanced Treatment
1) To ensure the system's high-availability and recovery-checkpoints for disaster-recovery efforts, the data should be migrated from the raw layer to the respective Data-Marts/Warehouses and fast/slow OLAP layers supporting read replication. Having read replications at law layers is also an important consideration to avoid faulty ETL or data-sync pipelines corrupting/deleting the raw data. A recommended solution is to go with at least 3 copies of the transformed data in the mature layers and at least 2 copies in the raw or less-mature data layers. 
2) Fault Tolerance: In the context of the present design, We should partition our file system underneath the raw layer based on ingestion timestamps (year=2020/month=9/day=08) and the exposed schematics (like schema_name/schema_file_name). For highly materialized raw ingestion, I could recommend having more partitions based on a cluster the data closely aligns with (like business units who would be first beneficiaries, such as /finance, /hr, /marketing, etc.) or use-cases in case of highly agile and fluidic teams. If the data brought in is highly disjoint and atomic, less partitioning would also be an equally compelling choice. Another real-world concern in partitioning is data-access levels. In the presence of limited partitions, more data pipelines would have visibility, control, and access in the same folder structures. Concerns of privacy and throughput motivate one aspect of such design, the other aspect is motivated by factors such as accidental delete of the entire folder by a faulty engineering pipeline impacting several teams. 
3) Sophisticated data encryptions and anonymization algorithms are salient characteristics in higher, mature data layers, it's very common to as well find some data encryption, anonymization being employed in the raw layers. Such recommendations are as well very subjective motivated by the understanding of security in the 'data-at-rest' platforms, frequency, length of movement of data to ensure an added confidence of data breach, corruption while at rest or in transit. For connected Big Data ecosystems, it may sometime assume even crucial stand specifically when the ecosystem hosts a lot of third-party platforms like Data Visualization Softwares, etc.  

## Choice of Technology: 
<b>1) Programming Language:</b> Python - efficient, fast to prototype pipelines. Excellent support with data-wrangling tools like Python, code support tasks. Jave would also be an efficient choice for building an ETL pipeline as it as well inherits all the same advantages. <br/> <br/>
<b>2) Scheduler:</b> Oozie/Airflow - equally good scheduling tools for book-keeping, advanced pipelining (multiple flow dependencies, etc.). Excellent UI and poweful inbuilt set of utilities were my primary concerns while making a pick between either of them. Light-weight Azkaban is my favorite, but I am good exposure in all three of the mentioned tools. <br/> <br/>
<b>3) Database:</b> RDBMS/No-SQL depending on the scope of the development. If the development is expected to stretch longer, We, the engineering team, are constantly threatened by even small updates to the webpage which may break the established pipeline in production. In that case, it's a better choice to use NoSQL and ingest the data into our lowest stack of Data Lake. On the contrary, if the lifetime of the pipeline is shorter, We would be more concentrated on robustness, data compliance, and data integrity. From there, with the help of ETL pipelines, we can disperse the data into both RDBMS and NoSQL data warehouses, depending on the end application supported (REST/Visualization Tool/Machine Learning Pipelines, etc.) and our expectations from the data lake in terms of latency and throughput (Fast Access Layers, Slow Access Layers, etc.) Depending on the software stack, Redshift/DynamoDB, HBASE(with/without SQL skin like Phoenix) on AWS or BigQuery/BigTable on Google Cloud or Hive, SQL Cloud/CosmosDB on Azure or with Spark stacks on Databricks are all well-respected solutions. I have used all the technologies mentioned in the database stack in different stages of my employment, and I bring with me the experience of tackling them efficiently. The choice of database stack is also heavily concentrated on the types of applications where such data would be immediately employed. For example, Machine Learning teams would stress on capturing the diversity of the data, such as not letting go of any fraction of data, even if their relative volume is not significant (for example, a large set of the data field which has not been on the webpage, throughout) or some software application engineering teams would want to keep only stable data fields (provided those excluded were not a set of fields which are deemed at the crucial at the point in time). However, most teams lie in between the spectrum of two extremes, and such decisions are more often motivated by engineering management practices such as whether to bear extra resources for engineering or maintenance or even choices like restricting the technological landscape the team owns to mitigate increasing technical debts, etc.  <br/>