### Exercise 3.1 + 3.2


In [17]:
import csv
from pprint import pprint

In [18]:
def read_portfolio(filename: str) -> list:
    records = []
    with open(filename, "rt") as file:
        rows = csv.reader(file)
        headers = next(rows)
        types = [str, int, float]
        records = [
            {name: func(val) for name, func, val in zip(headers, types, row)} for row in rows
        ]
    return records


def read_prices(filename: str) -> dict:
    prices = {}
    with open(filename, "rt") as file:
        rows = csv.reader(file)
        for row in rows:
            try:
                prices[row[0]] = float(row[1])
            except IndexError:
                pass
    return prices


def make_report_data(portfolio: list, prices: dict) -> list:
    rows = []
    for stock in portfolio:
        cur_prices = prices[stock["name"]]
        change = cur_prices - stock["price"]
        summary = (stock["name"], stock["shares"], cur_prices, change)
        rows.append(summary)
    return rows


def print_report(report_data: list):
    headers = ("Name", "Shares", "Price", "Change")
    print("%10s %10s %10s %10s" % headers)
    print(("-" * 10 + " ") * len(headers))
    for name, shares, price, change in report_data:
        price_str = "$" + str(price)
        print(f"{name:>10s} {shares:>10d} {price_str:>10s} {change:>10.2f}")


def portfolio_report(portfolio_data: str, prices_data: str):
    portfolio = read_portfolio(portfolio_data)
    prices = read_prices(prices_data)
    summary = make_report_data(portfolio, prices)
    print_report(summary)


portfolio_report("Data/portfolio.csv", "Data/prices.csv")

      Name     Shares      Price     Change
---------- ---------- ---------- ---------- 
        AA        100      $9.22     -22.98
       IBM         50    $106.28      15.18
       CAT        150     $35.46     -47.98
      MSFT        200     $20.89     -30.34
        GE         95     $13.48     -26.89
      MSFT         50     $20.89     -44.21
       IBM        100    $106.28      35.84


In [19]:
# portfolio_report("Data/portfolio2.csv", "Data/prices.csv")
files = ["Data/portfolio.csv", "Data/portfolio2.csv"]
for name in files:
    print(f"{name:-^43s}")
    portfolio_report(name, "Data/prices.csv")
    print()

------------Data/portfolio.csv-------------
      Name     Shares      Price     Change
---------- ---------- ---------- ---------- 
        AA        100      $9.22     -22.98
       IBM         50    $106.28      15.18
       CAT        150     $35.46     -47.98
      MSFT        200     $20.89     -30.34
        GE         95     $13.48     -26.89
      MSFT         50     $20.89     -44.21
       IBM        100    $106.28      35.84

------------Data/portfolio2.csv------------
      Name     Shares      Price     Change
---------- ---------- ---------- ---------- 
        AA         50      $9.22     -17.88
       HPQ        250     $34.35      -8.80
      MSFT         25     $20.89     -29.26
        GE        125     $13.48     -38.62



### Exercise 3.3


In [20]:
def parse_csv(filename):
    """
    Parse a CSV file into a list of records
    """
    with open(filename) as file:
        rows = csv.reader(file)
        headers = next(rows)
        records = []
        for row in rows:
            if not row:
                continue
            record = dict(zip(headers, row))
            records.append(record)

    return records

In [21]:
portfolio = parse_csv("Data/portfolio.csv")
print(portfolio)

[{'name': 'AA', 'shares': '100', 'price': '32.20'}, {'name': 'IBM', 'shares': '50', 'price': '91.10'}, {'name': 'CAT', 'shares': '150', 'price': '83.44'}, {'name': 'MSFT', 'shares': '200', 'price': '51.23'}, {'name': 'GE', 'shares': '95', 'price': '40.37'}, {'name': 'MSFT', 'shares': '50', 'price': '65.10'}, {'name': 'IBM', 'shares': '100', 'price': '70.44'}]


### Exercise 3.4


In [22]:
def parse_csv_with_select(filename, select=None):
    """
    Parse a CSV file with predefined cols into a list of records
    """
    with open(filename) as file:
        rows = csv.reader(file)
        headers = next(rows)
        if select is not None:
            indices = [headers.index(colname) for colname in select]
            parse_headers = select
        else:
            indices = []
            parse_headers = headers
        records = []
        for row in rows:
            if not row:
                continue
            if indices:
                row = [row[index] for index in indices]
            record = dict(zip(parse_headers, row))
            records.append(record)

    return records

In [23]:
headers = ["name", "date", "time", "shares", "price"]
select = ["name", "shares"]
portfolio_select = parse_csv_with_select("Data/portfolio.csv", select=select)
print(portfolio_select)

[{'name': 'AA', 'shares': '100'}, {'name': 'IBM', 'shares': '50'}, {'name': 'CAT', 'shares': '150'}, {'name': 'MSFT', 'shares': '200'}, {'name': 'GE', 'shares': '95'}, {'name': 'MSFT', 'shares': '50'}, {'name': 'IBM', 'shares': '100'}]


### Exercise 3.5


In [24]:
def parse_csv_with_types(filename, types=None):
    """
    Parse a CSV file with predefined cols into a list of records
    """
    with open(filename) as file:
        rows = csv.reader(file)
        headers = next(rows)
        records = []
        for row in rows:
            if not row:
                continue
            if types:
                row = [func(val) for func, val in zip(types, row)]
            record = dict(zip(headers, row))
            records.append(record)

    return records

In [25]:
types = [str, int]
portfolio_types = parse_csv_with_types("Data/portfolio.csv", types=types)
print(portfolio_types)

[{'name': 'AA', 'shares': 100}, {'name': 'IBM', 'shares': 50}, {'name': 'CAT', 'shares': 150}, {'name': 'MSFT', 'shares': 200}, {'name': 'GE', 'shares': 95}, {'name': 'MSFT', 'shares': 50}, {'name': 'IBM', 'shares': 100}]


### Exercise 3.6


In [26]:
def parse_csv_without_headers(filename, types=None, has_headers=True):
    """
    Parse a CSV file with predefined cols into a list of records
    """
    with open(filename) as file:
        rows = csv.reader(file)
        headers = next(rows) if has_headers else []
        records = []
        for row in rows:
            if not row:
                continue
            if types:
                row = [func(val) for func, val in zip(types, row)]
            if has_headers:
                record = dict(zip(headers, row))
            else:
                record = tuple(row)
            records.append(record)

    return records

In [27]:
types = [str, float]
prices = parse_csv_without_headers("Data/prices.csv", types=types, has_headers=False)
print(prices)

[('AA', 9.22), ('AXP', 24.85), ('BA', 44.85), ('BAC', 11.27), ('C', 3.72), ('CAT', 35.46), ('CVX', 66.67), ('DD', 28.47), ('DIS', 24.22), ('GE', 13.48), ('GM', 0.75), ('HD', 23.16), ('HPQ', 34.35), ('IBM', 106.28), ('INTC', 15.72), ('JNJ', 55.16), ('JPM', 36.9), ('KFT', 26.11), ('KO', 49.16), ('MCD', 58.99), ('MMM', 57.1), ('MRK', 27.58), ('MSFT', 20.89), ('PFE', 15.19), ('PG', 51.94), ('T', 24.79), ('UTX', 52.61), ('VZ', 29.26), ('WMT', 49.74), ('XOM', 69.35)]


### Exercise 3.7


In [28]:
def parse_csv_with_delimiter(filename, types=None, delimiter=" "):
    """
    Parse a CSV file with predefined cols into a list of records
    """
    with open(filename) as file:
        rows = csv.reader(file, delimiter=delimiter)
        headers = next(rows)
        records = []
        for row in rows:
            if not row:
                continue
            if types:
                row = [func(val) for func, val in zip(types, row)]
            record = dict(zip(headers, row))
            records.append(record)

    return records

In [29]:
portfolio = parse_csv_with_delimiter("Data/portfolio.dat", types=[str, int, float], delimiter=" ")
print(portfolio)

[{'name': 'AA', 'shares': 100, 'price': 32.2}, {'name': 'IBM', 'shares': 50, 'price': 91.1}, {'name': 'CAT', 'shares': 150, 'price': 83.44}, {'name': 'MSFT', 'shares': 200, 'price': 51.23}, {'name': 'GE', 'shares': 95, 'price': 40.37}, {'name': 'MSFT', 'shares': 50, 'price': 65.1}, {'name': 'IBM', 'shares': 100, 'price': 70.44}]


In [30]:
def parse_csv(filename, select=None, types=None, has_headers=True, delimiter=","):
    """
    Parse a CSV file with predefined cols into a list of records
    """
    with open(filename) as file:
        rows = csv.reader(file, delimiter=delimiter)
        headers = next(rows) if has_headers else []
        if select:
            indices = [headers.index(colname) for colname in select]
            parse_headers = select
        else:
            indices = []
            parse_headers = headers
        records = []
        for row in rows:
            if not row:
                continue
            if indices:
                row = [row[index] for index in indices]
            if types:
                row = [func(val) for func, val in zip(types, row)]
            if has_headers:
                record = dict(zip(parse_headers, row))
            else:
                record = tuple(row)
            records.append(record)

    return records

In [31]:
portfolio = parse_csv("Data/portfolio.csv")
print(portfolio)

select = ["name", "shares"]
portfolio = parse_csv("Data/portfolio.csv", select=select)
print(portfolio)

types = [str, int, float]
portfolio = parse_csv("Data/portfolio.csv", types=types)
print(portfolio)

prices = parse_csv("Data/prices.csv", types=[str, float], has_headers=False)
print(prices)

portfolio = parse_csv(
    "Data/portfolio.dat",
    types=[str, int, float],
    delimiter=" ",
)
print(portfolio)

[{'name': 'AA', 'shares': '100', 'price': '32.20'}, {'name': 'IBM', 'shares': '50', 'price': '91.10'}, {'name': 'CAT', 'shares': '150', 'price': '83.44'}, {'name': 'MSFT', 'shares': '200', 'price': '51.23'}, {'name': 'GE', 'shares': '95', 'price': '40.37'}, {'name': 'MSFT', 'shares': '50', 'price': '65.10'}, {'name': 'IBM', 'shares': '100', 'price': '70.44'}]
[{'name': 'AA', 'shares': '100'}, {'name': 'IBM', 'shares': '50'}, {'name': 'CAT', 'shares': '150'}, {'name': 'MSFT', 'shares': '200'}, {'name': 'GE', 'shares': '95'}, {'name': 'MSFT', 'shares': '50'}, {'name': 'IBM', 'shares': '100'}]
[{'name': 'AA', 'shares': 100, 'price': 32.2}, {'name': 'IBM', 'shares': 50, 'price': 91.1}, {'name': 'CAT', 'shares': 150, 'price': 83.44}, {'name': 'MSFT', 'shares': 200, 'price': 51.23}, {'name': 'GE', 'shares': 95, 'price': 40.37}, {'name': 'MSFT', 'shares': 50, 'price': 65.1}, {'name': 'IBM', 'shares': 100, 'price': 70.44}]
[('AA', 9.22), ('AXP', 24.85), ('BA', 44.85), ('BAC', 11.27), ('C', 3.7

### Exercise 3.8

In [35]:
def parse_csv(filename, select=None, types=None, has_headers=True, delimiter=","):
    """
    Parse a CSV file with predefined cols into a list of records
    """
    if select and not has_headers:
        raise RuntimeError("select argument requires column headers")
    with open(filename) as file:
        rows = csv.reader(file, delimiter=delimiter)
        headers = next(rows) if has_headers else []
        if select:
            indices = [headers.index(colname) for colname in select]
            parse_headers = select
        else:
            indices = []
            parse_headers = headers
        records = []
        for row in rows:
            if not row:
                continue
            if indices:
                row = [row[index] for index in indices]
            if types:
                row = [func(val) for func, val in zip(types, row)]
            if has_headers:
                record = dict(zip(parse_headers, row))
            else:
                record = tuple(row)
            records.append(record)

    return records


parse_csv("Data/prices.csv", select=["name", "price"], has_headers=False)

RuntimeError: select argument requires column headers

### Exercise 3.9


In [36]:
def parse_csv(filename, select=None, types=None, has_headers=True, delimiter=","):
    """
    Parse a CSV file with predefined cols into a list of records
    """
    if select and not has_headers:
        raise RuntimeError("select argument requires column headers")
    with open(filename) as file:
        rows = csv.reader(file, delimiter=delimiter)
        headers = next(rows) if has_headers else []
        if select:
            indices = [headers.index(colname) for colname in select]
            parse_headers = select
        else:
            indices = []
            parse_headers = headers
        records = []
        for rowno, row in enumerate(rows, 1):
            if not row:
                continue
            if indices:
                row = [row[index] for index in indices]
            if types:
                try:
                    row = [func(val) for func, val in zip(types, row)]
                except Exception as e:
                    print(f"Row {rowno}: Couldn't convert {row}")
                    print(f"Row {rowno}: Reason {e}")
            if has_headers:
                record = dict(zip(parse_headers, row))
            else:
                record = tuple(row)
            records.append(record)

    return records


portfolio = parse_csv("Data/missing.csv", types=[str, int, float])
print(portfolio)

Row 4: Couldn't convert ['MSFT', '', '51.23']
Row 4: Reason invalid literal for int() with base 10: ''
Row 7: Couldn't convert ['IBM', '', '70.44']
Row 7: Reason invalid literal for int() with base 10: ''
[{'name': 'AA', 'shares': 100, 'price': 32.2}, {'name': 'IBM', 'shares': 50, 'price': 91.1}, {'name': 'CAT', 'shares': 150, 'price': 83.44}, {'name': 'MSFT', 'shares': '', 'price': '51.23'}, {'name': 'GE', 'shares': 95, 'price': 40.37}, {'name': 'MSFT', 'shares': 50, 'price': 65.1}, {'name': 'IBM', 'shares': '', 'price': '70.44'}]


### Exercise 3.10

In [41]:
def parse_csv(
    filename,
    select=None,
    types=None,
    has_headers=True,
    delimiter=",",
    silence_errors=False,
):
    """
    Parse a CSV file with predefined cols into a list of records
    """
    if select and not has_headers:
        raise RuntimeError("select argument requires column headers")
    with open(filename) as file:
        rows = csv.reader(file, delimiter=delimiter)
        headers = next(rows) if has_headers else []
        if select:
            indices = [headers.index(colname) for colname in select]
            parse_headers = select
        else:
            indices = []
            parse_headers = headers
        records = []
        for rowno, row in enumerate(rows, 1):
            if not row:
                continue
            if indices:
                row = [row[index] for index in indices]
            if types:
                try:
                    row = [func(val) for func, val in zip(types, row)]
                except Exception as e:
                    if not silence_errors:
                        print(f"Row {rowno}: Couldn't convert {row}")
                        print(f"Row {rowno}: Reason {e}")
                    continue
            if has_headers:
                record = dict(zip(parse_headers, row))
            else:
                record = tuple(row)
            records.append(record)

    return records


portfolio = parse_csv("Data/missing.csv", types=[str, int, float], silence_errors=True)
print(portfolio)

[{'name': 'AA', 'shares': 100, 'price': 32.2}, {'name': 'IBM', 'shares': 50, 'price': 91.1}, {'name': 'CAT', 'shares': 150, 'price': 83.44}, {'name': 'GE', 'shares': 95, 'price': 40.37}, {'name': 'MSFT', 'shares': 50, 'price': 65.1}]
