Skip to content

Commit

Permalink
Ca rewards pipeline fixes (#74)
Browse files Browse the repository at this point in the history
* Add delimiter option to load data from csv

* Use proper csv writer on proposer_rewards script

* Fix buffer yielding

* Change approved proposals delimiter to ','

* Add internal id to results on proposers rewards

* Mask approved proposal row, proposal_id -> internal_id

* Implemented csv_merger tool

* Use pre python3.10 contexts

* Python fmt

* Matching adjusts

* Python fmt

* Fix typos

* Fix reading threshold approval column from csv

* Add warning about missed reviewed proposals

* Lint clean
  • Loading branch information
Daniel Sanchez committed Nov 19, 2021
1 parent e148b8a commit 31b4e8e
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 36 deletions.
62 changes: 62 additions & 0 deletions scripts/python/csv_merger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import csv
import glob
from pathlib import Path
import os
from typing import List, Generator, Iterator, TextIO, Tuple
import typer
from contextlib import contextmanager


@contextmanager
def open_files(files: Iterator[str]) -> Generator[Iterator[TextIO], None, None]:
files_objs = [open(file, encoding="utf-8", newline="") for file in files]
yield iter(files_objs)
for file in files_objs:
file.close()


def search_file_pattern(pattern: str, base_path: Path) -> Iterator[str]:
yield from map(
lambda file: os.path.join(base_path, file),
glob.iglob(pathname=pattern, root_dir=base_path),
)


def file_as_csv(file: TextIO, delimiter: chr) -> Tuple[List[str], Iterator[List[str]]]:
reader = csv.reader(file, delimiter=delimiter)
return next(reader), reader


def merge_csv(
pattern: str,
base_path: Path,
output_file: Path,
input_delimiter: chr,
output_delimiter: chr,
):
print(pattern)
files = search_file_pattern(pattern, base_path)
with open(output_file, "w", encoding="utf-8", newline="") as out_file:
with open_files(files) as fs:
writer = csv.writer(out_file, delimiter=output_delimiter)
header, first_content = file_as_csv(next(fs), delimiter=input_delimiter)
writer.writerow(header)
writer.writerows(first_content)
for file in fs:
# skip headers and use just content
_, content = file_as_csv(file, delimiter=input_delimiter)
writer.writerows(content)


def merge_csv_files(
output_file: Path = typer.Option(...),
pattern: str = typer.Option(...),
base_path: Path = typer.Option(default=Path("./")),
input_delimiter: str = typer.Option(default=","),
output_delimiter: str = typer.Option(default=","),
):
merge_csv(pattern, base_path, output_file, input_delimiter, output_delimiter)


if __name__ == "__main__":
typer.run(merge_csv_files)
53 changes: 29 additions & 24 deletions scripts/python/proposers_rewards.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import sys
import asyncio
import json
import csv
import itertools
import enum
import os
from collections import namedtuple
from io import StringIO

import pydantic
import httpx
Expand Down Expand Up @@ -124,7 +126,7 @@ def get_proposals_from_file(proposals_file_path: str) -> Dict[str, Proposal]:
Proposal(**proposal_data)
for proposal_data in load_json_from_file(proposals_file_path)
)
proposals_dict = {proposal.proposal_id: proposal for proposal in proposals}
proposals_dict = {proposal.chain_proposal_id: proposal for proposal in proposals}
return proposals_dict


Expand Down Expand Up @@ -215,7 +217,7 @@ async def get_proposals_voteplans_and_challenges_from_api(
proposal.chain_proposal_id: proposal for proposal in await proposals_task
}
voteplans_proposals = {
proposal.proposal_id: proposal
proposal.chain_proposal_id: proposal
for proposal in itertools.chain.from_iterable(
voteplan.proposals for voteplan in await voteplans_task
)
Expand All @@ -240,9 +242,14 @@ class SanityException(Exception):
def sanity_check_data(
proposals: Dict[str, Proposal], voteplan_proposals: Dict[str, ProposalStatus]
):
if set(proposals.keys()) != set(voteplan_proposals.keys()):
proposals_set = set(proposals.keys())
voteplan_proposals_set = set(voteplan_proposals.keys())
if proposals_set != voteplan_proposals_set:
from pprint import pformat

diff = proposals_set.difference(voteplan_proposals_set)
raise SanityException(
"Extra proposals found, voteplan proposals do not match servicing station proposals"
f"Extra proposals found, voteplan proposals do not match servicing station proposals: \n{pformat(diff)}"
)
if any(proposal.tally is None for proposal in voteplan_proposals.values()):
raise SanityException("Some proposal do not have a valid tally available")
Expand Down Expand Up @@ -304,6 +311,7 @@ def calc_vote_difference_and_threshold_success(
Result = namedtuple(
"Result",
(
"internal_id",
"proposal_id",
"proposal",
"overall_score",
Expand Down Expand Up @@ -357,6 +365,7 @@ def calc_results(
depletion -= proposal.proposal_funds

result = Result(
internal_id=proposal.proposal_id,
proposal_id=proposal_id,
proposal=proposal.proposal_title,
overall_score=proposal.proposal_impact_score / 100,
Expand Down Expand Up @@ -414,21 +423,15 @@ def calculate_total_stake_from_block0_configuration(block0_config: Dict[str, Dic
# Output results


def output_csv(results: List[Result]) -> Generator[str, None, None]:
def output_csv(results: List[Result], f: TextIO):
fields = results[0]._fields
yield f"{';'.join(fields)}\n"
yield from (
f"{';'.join(str(getattr(result, field)) for field in fields)}\n"
for result in results
)

writer = csv.writer(f)
writer.writerow(fields)
writer.writerows(results)

def output_json(results: List[Result]) -> Generator[str, None, None]:
yield json.dumps(list(map(Result._asdict, results)))


def dump_to_file(stream: Generator[str, None, None], out: TextIO):
out.writelines(stream)
def output_json(results: List[Result], f: TextIO):
json.dump(list(map(Result._asdict, results)), f)


# CLI
Expand Down Expand Up @@ -510,16 +513,18 @@ def calculate_rewards(
approval_threshold,
total_stake_approval_threshold,
)
out_stream = (
output_json(results)
if output_format == OutputFormat.JSON
else output_csv(results)
)
chalenge_ouput_file_path = build_path_for_challenge(

challenge_output_file_path = build_path_for_challenge(
output_file, challenge.title.replace(" ", "_").replace(":", "_")
)
with open(chalenge_ouput_file_path, "w", encoding="utf-8") as out_file:
dump_to_file(out_stream, out_file)

with open(
challenge_output_file_path, "w", encoding="utf-8", newline=""
) as out_file:
if output_format == OutputFormat.JSON:
output_json(results, out_file)
elif output_format == OutputFormat.CSV:
output_csv(results, out_file)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion src/bin/cli/advisor_reviews/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ mod test {
};

export.exec().unwrap();
let reviews: Vec<AdvisorReview> = csv::load_data_from_csv(&tmp_file).unwrap();
let reviews: Vec<AdvisorReview> = csv::load_data_from_csv::<_, b','>(&tmp_file).unwrap();
assert_eq!(reviews.len(), 1);
}
}
22 changes: 19 additions & 3 deletions src/bin/cli/rewards/community_advisors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use chain_crypto::digest::DigestOf;
use serde::Serialize;
use std::collections::BTreeSet;
use std::path::{Path, PathBuf};
use std::str::FromStr;

Expand Down Expand Up @@ -87,6 +88,20 @@ impl CommunityAdvisors {
let proposal_reviews = read_proposal_reviews(&assessments_path)?;
let approved_proposals = read_approved_proposals(&approved_proposals_path)?;

let approved_set = approved_proposals.keys().cloned().collect::<BTreeSet<_>>();
let proposal_reviews_set = proposal_reviews.keys().cloned().collect::<BTreeSet<_>>();
let diff = approved_set
.difference(&proposal_reviews_set)
.collect::<BTreeSet<_>>();

if !diff.is_empty() {
println!(
"WARNING!, {} proposals without reviews: {:?}",
diff.len(),
diff,
);
}

let rewards = calculate_ca_rewards(
proposal_reviews,
&approved_proposals,
Expand All @@ -102,7 +117,7 @@ impl CommunityAdvisors {
}

fn read_proposal_reviews(path: &Path) -> Result<ProposalsReviews, Error> {
let reviews: Vec<AdvisorReviewRow> = utils::csv::load_data_from_csv(path)?;
let reviews: Vec<AdvisorReviewRow> = utils::csv::load_data_from_csv::<_, b','>(path)?;
let mut proposal_reviews = ProposalsReviews::new();

for review in reviews.into_iter() {
Expand All @@ -116,12 +131,13 @@ fn read_proposal_reviews(path: &Path) -> Result<ProposalsReviews, Error> {
}

fn read_approved_proposals(path: &Path) -> Result<ApprovedProposals, Error> {
let approved_proposals: Vec<ApprovedProposalRow> = utils::csv::load_data_from_csv(path)?;
let approved_proposals: Vec<ApprovedProposalRow> =
utils::csv::load_data_from_csv::<_, b','>(path)?;
approved_proposals
.into_iter()
.filter_map(|proposal| match proposal.status {
ProposalStatus::Approved => Some(
Funds::from_str(&proposal.requested_funds)
Funds::from_str(&proposal.requested_dollars)
.map(|funds| (proposal.proposal_id, funds)),
),
ProposalStatus::NotApproved => None,
Expand Down
3 changes: 2 additions & 1 deletion src/bin/cli/rewards/veterans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ impl VeteransRewards {
to,
total_rewards,
} = self;
let reviews: Vec<veterans::VeteranReviewsCount> = csv::load_data_from_csv(&from)?;
let reviews: Vec<veterans::VeteranReviewsCount> =
csv::load_data_from_csv::<_, b','>(&from)?;
let results = veterans::calculate_veteran_advisors_rewards(&reviews, total_rewards);
csv::dump_data_to_csv(&results, &to)?;

Expand Down
3 changes: 2 additions & 1 deletion src/community_advisors/models/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ mod tests {
#[test]
fn test_deserialize() {
let file_path = PathBuf::from("./resources/testing/valid_assessments.csv");
let data: Vec<AdvisorReviewRow> = csv_utils::load_data_from_csv(&file_path).unwrap();
let data: Vec<AdvisorReviewRow> =
csv_utils::load_data_from_csv::<_, b','>(&file_path).unwrap();
assert_eq!(data.len(), 1);
}

Expand Down
6 changes: 4 additions & 2 deletions src/community_advisors/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ pub enum ProposalStatus {

#[derive(Deserialize)]
pub struct ApprovedProposalRow {
#[serde(rename(deserialize = "internal_id"))]
pub proposal_id: String,
#[serde(rename(deserialize = "meets_approval_threshold"))]
pub status: ProposalStatus,
pub requested_funds: String,
pub requested_dollars: String,
}

impl<'de> Deserialize<'de> for ProposalStatus {
Expand All @@ -23,7 +25,7 @@ impl<'de> Deserialize<'de> for ProposalStatus {
{
let status: String = String::deserialize(deserializer)?;
Ok(match status.to_lowercase().as_ref() {
"approved" => ProposalStatus::Approved,
"yes" => ProposalStatus::Approved,
_ => ProposalStatus::NotApproved,
})
}
Expand Down
2 changes: 0 additions & 2 deletions src/rewards/community_advisors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ fn get_tickets_per_proposal(
ProposalTickets::Legacy { winning_tkts, .. } => {
// it would be a bit harder to track it otherwise, and we don't need this additional
// complexity now
println!("{}", id);
assert_eq!(
0,
rewards_slots.max_winning_tickets() % LEGACY_MAX_WINNING_TICKETS
Expand Down Expand Up @@ -88,7 +87,6 @@ fn calculate_rewards_per_proposal(
get_tickets_per_proposal(proposal_reviews, rewards_slots);

let base_ticket_reward = funding.proposal_funds() / Rewards::from(total_tickets);
println!("total tickets {} {}", total_tickets, base_ticket_reward);

proposals_tickets
.into_iter()
Expand Down
5 changes: 4 additions & 1 deletion src/utils/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ use serde::Serialize;

use std::path::Path;

pub fn load_data_from_csv<T: DeserializeOwned>(file_path: &Path) -> Result<Vec<T>, csv::Error> {
pub fn load_data_from_csv<T: DeserializeOwned, const DELIMITER: u8>(
file_path: &Path,
) -> Result<Vec<T>, csv::Error> {
let mut csv_reader = csv::ReaderBuilder::new()
.has_headers(true)
.delimiter(DELIMITER)
.from_path(file_path)?;
csv_reader.deserialize().collect::<Result<Vec<T>, _>>()
}
Expand Down
2 changes: 1 addition & 1 deletion src/vca_reviews/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl AdvisorReviewRow {

pub fn read_vca_reviews_aggregated_file(filepath: &Path) -> Result<Vec<AdvisorReview>, Error> {
Ok(
utils::csv::load_data_from_csv::<AdvisorReviewRow>(filepath)?
utils::csv::load_data_from_csv::<AdvisorReviewRow, b','>(filepath)?
.into_iter()
.map(|review| review.as_advisor_review())
.collect(),
Expand Down

0 comments on commit 31b4e8e

Please sign in to comment.